Speed-up environment concretization on linux with a process pool (#26264)

* Speed-up environment concretization with a process pool

We can exploit the fact that the environment is concretized
separately and use a pool of processes to concretize it.

* Add module spack.util.parallel

Module includes `pool` and `parallel_map` abstractions,
along with implementation details for both.

* Add a new hash type to pass specs across processes

* Add tty msg with concretization time
This commit is contained in:
Massimiliano Culpo 2021-10-19 17:09:34 +02:00 committed by GitHub
parent 64a323b22d
commit 2d45a9d617
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 233 additions and 28 deletions

View file

@ -9,6 +9,7 @@
import re
import shutil
import sys
import time
import ruamel.yaml as yaml
import six
@ -17,6 +18,8 @@
import llnl.util.filesystem as fs
import llnl.util.tty as tty
import spack.bootstrap
import spack.compilers
import spack.concretize
import spack.config
import spack.error
@ -28,10 +31,13 @@
import spack.spec
import spack.stage
import spack.store
import spack.subprocess_context
import spack.user_environment as uenv
import spack.util.cpus
import spack.util.environment
import spack.util.hash
import spack.util.lock as lk
import spack.util.parallel
import spack.util.path
import spack.util.spack_json as sjson
import spack.util.spack_yaml as syaml
@ -1111,14 +1117,57 @@ def _concretize_separately(self, tests=False):
self._add_concrete_spec(s, concrete, new=False)
# Concretize any new user specs that we haven't concretized yet
concretized_specs = []
arguments, root_specs = [], []
for uspec, uspec_constraints in zip(
self.user_specs, self.user_specs.specs_as_constraints):
self.user_specs, self.user_specs.specs_as_constraints
):
if uspec not in old_concretized_user_specs:
concrete = _concretize_from_constraints(uspec_constraints, tests=tests)
self._add_concrete_spec(uspec, concrete)
concretized_specs.append((uspec, concrete))
return concretized_specs
root_specs.append(uspec)
arguments.append((uspec_constraints, tests))
# Ensure we don't try to bootstrap clingo in parallel
if spack.config.get('config:concretizer') == 'clingo':
with spack.bootstrap.ensure_bootstrap_configuration():
spack.bootstrap.ensure_clingo_importable_or_raise()
# Ensure all the indexes have been built or updated, since
# otherwise the processes in the pool may timeout on waiting
# for a write lock. We do this indirectly by retrieving the
# provider index, which should in turn trigger the update of
# all the indexes if there's any need for that.
_ = spack.repo.path.provider_index
# Ensure we have compilers in compilers.yaml to avoid that
# processes try to write the config file in parallel
_ = spack.compilers.get_compiler_config()
# Solve the environment in parallel on Linux
start = time.time()
max_processes = min(
max(len(arguments), 1), # Number of specs
16 # Cap on 16 cores
)
# TODO: revisit this print as soon as darwin is parallel too
msg = 'Starting concretization'
if sys.platform != 'darwin':
msg = msg + ' pool with {0} processes'.format(
spack.util.parallel.num_processes(max_processes=max_processes)
)
tty.msg(msg)
concretized_root_specs = spack.util.parallel.parallel_map(
_concretize_task, arguments, max_processes=max_processes
)
finish = time.time()
tty.msg('Environment concretized in {0} sec.'.format(finish - start))
results = []
for abstract, concrete in zip(root_specs, concretized_root_specs):
self._add_concrete_spec(abstract, concrete)
results.append((abstract, concrete))
return results
def concretize_and_add(self, user_spec, concrete_spec=None, tests=False):
"""Concretize and add a single spec to the environment.
@ -1962,6 +2011,12 @@ def _concretize_from_constraints(spec_constraints, tests=False):
invalid_constraints.extend(inv_variant_constraints)
def _concretize_task(packed_arguments):
spec_constraints, tests = packed_arguments
with tty.SuppressOutput(msg_enabled=False):
return _concretize_from_constraints(spec_constraints, tests)
def make_repo_path(root):
"""Make a RepoPath from the repo subdirectories in an environment."""
path = spack.repo.RepoPath()

View file

@ -48,7 +48,7 @@
import spack.util.crypto as crypto
import spack.util.pattern as pattern
import spack.util.url as url_util
import spack.util.web as web_util
import spack.util.web
import spack.version
from spack.util.compression import decompressor_for, extension
from spack.util.executable import CommandNotFoundError, which
@ -350,8 +350,8 @@ def _existing_url(self, url):
else:
# Telling urllib to check if url is accessible
try:
url, headers, response = web_util.read_from_url(url)
except web_util.SpackWebError:
url, headers, response = spack.util.web.read_from_url(url)
except spack.util.web.SpackWebError:
msg = "Urllib fetch failed to verify url {0}".format(url)
raise FailedDownloadError(url, msg)
return (response.getcode() is None or response.getcode() == 200)
@ -380,8 +380,8 @@ def _fetch_urllib(self, url):
# Run urllib but grab the mime type from the http headers
try:
url, headers, response = web_util.read_from_url(url)
except web_util.SpackWebError as e:
url, headers, response = spack.util.web.read_from_url(url)
except spack.util.web.SpackWebError as e:
# clean up archive on failure.
if self.archive_file:
os.remove(self.archive_file)
@ -571,7 +571,7 @@ def archive(self, destination):
if not self.archive_file:
raise NoArchiveFileError("Cannot call archive() before fetching.")
web_util.push_to_url(
spack.util.web.push_to_url(
self.archive_file,
destination,
keep_original=True)
@ -1388,12 +1388,12 @@ def fetch(self):
basename = os.path.basename(parsed_url.path)
with working_dir(self.stage.path):
_, headers, stream = web_util.read_from_url(self.url)
_, headers, stream = spack.util.web.read_from_url(self.url)
with open(basename, 'wb') as f:
shutil.copyfileobj(stream, f)
content_type = web_util.get_header(headers, 'Content-type')
content_type = spack.util.web.get_header(headers, 'Content-type')
if content_type == 'text/html':
warn_content_type_mismatch(self.archive_file or "the archive")

View file

@ -44,6 +44,13 @@ def attr(self):
deptype=('build', 'link', 'run'), package_hash=False, name='build_hash')
#: Hash descriptor used only to transfer a DAG, as is, across processes
process_hash = SpecHashDescriptor(
deptype=('build', 'link', 'run', 'test'),
package_hash=False,
name='process_hash'
)
#: Full hash used in build pipelines to determine when to rebuild packages.
full_hash = SpecHashDescriptor(
deptype=('build', 'link', 'run'), package_hash=True, name='full_hash')

View file

@ -2,7 +2,6 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""Here we consolidate the logic for creating an abstract description
of the information that module systems need.
@ -43,7 +42,7 @@
import spack.build_environment as build_environment
import spack.config
import spack.environment as ev
import spack.environment
import spack.error
import spack.paths
import spack.projections as proj
@ -698,12 +697,13 @@ def environment_modifications(self):
spec = self.spec.copy() # defensive copy before setting prefix
if use_view:
if use_view is True:
use_view = ev.default_view_name
use_view = spack.environment.default_view_name
env = ev.active_environment()
env = spack.environment.active_environment()
if not env:
raise ev.SpackEnvironmentViewError("Module generation with views "
"requires active environment")
raise spack.environment.SpackEnvironmentViewError(
"Module generation with views requires active environment"
)
view = env.views[use_view]

View file

@ -334,7 +334,7 @@ def solve(
self.control.configuration.asp.trans_ext = 'all'
self.control.configuration.asp.eq = '5'
self.control.configuration.configuration = 'tweety'
self.control.configuration.solve.parallel_mode = '2'
self.control.configuration.solve.parallel_mode = '1'
self.control.configuration.solver.opt_strategy = "usc,one"
# set up the problem -- this generates facts and rules

View file

@ -1567,6 +1567,14 @@ def build_hash(self, length=None):
"""
return self._cached_hash(ht.build_hash, length)
def process_hash(self, length=None):
"""Hash used to store specs in environments.
This hash includes build and test dependencies and is only used to
serialize a spec and pass it around among processes.
"""
return self._cached_hash(ht.process_hash, length)
def full_hash(self, length=None):
"""Hash to determine when to rebuild packages in the build pipeline.
@ -1832,6 +1840,7 @@ def node_dict_with_hashes(self, hash=ht.dag_hash):
not self._hashes_final) # lazily compute
if write_full_hash:
node[ht.full_hash.name] = self.full_hash()
write_build_hash = 'build' in hash.deptype and (
self._hashes_final and self._build_hash or # cached and final
not self._hashes_final) # lazily compute
@ -1839,8 +1848,12 @@ def node_dict_with_hashes(self, hash=ht.dag_hash):
node[ht.build_hash.name] = self.build_hash()
else:
node['concrete'] = False
if hash.name == 'build_hash':
node[hash.name] = self.build_hash()
elif hash.name == 'process_hash':
node[hash.name] = self.process_hash()
return node
def to_yaml(self, stream=None, hash=ht.dag_hash):
@ -1974,7 +1987,8 @@ def read_yaml_dep_specs(deps, hash_type=ht.dag_hash.name):
# new format: elements of dependency spec are keyed.
for key in (ht.full_hash.name,
ht.build_hash.name,
ht.dag_hash.name):
ht.dag_hash.name,
ht.process_hash.name):
if key in elt:
dep_hash, deptypes = elt[key], elt['type']
hash_type = key
@ -4430,7 +4444,7 @@ def __hash__(self):
return hash(lang.tuplify(self._cmp_iter))
def __reduce__(self):
return _spec_from_dict, (self.to_dict(hash=ht.build_hash),)
return _spec_from_dict, (self.to_dict(hash=ht.process_hash),)
def merge_abstract_anonymous_specs(*abstract_specs):

View file

@ -245,7 +245,7 @@ def test_dev_build_env_version_mismatch(tmpdir, mock_packages, install_mockery,
env('create', 'test', './spack.yaml')
with ev.read('test'):
with pytest.raises(spack.spec.UnsatisfiableVersionSpecError):
with pytest.raises(RuntimeError):
install()

View file

@ -5,7 +5,7 @@
import os
import sys
import spack.build_environment as build_env
import spack.build_environment
import spack.config
import spack.util.environment as environment
import spack.util.prefix as prefix
@ -85,13 +85,13 @@ def environment_modifications_for_spec(spec, view=None):
# Let the extendee/dependency modify their extensions/dependents
# before asking for package-specific modifications
env.extend(
build_env.modifications_from_dependencies(
spack.build_environment.modifications_from_dependencies(
spec, context='run'
)
)
# Package specific modifications
build_env.set_module_variables_for_package(spec.package)
spack.build_environment.set_module_variables_for_package(spec.package)
spec.package.setup_run_environment(env)
return env

View file

@ -0,0 +1,129 @@
# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
from __future__ import print_function
import contextlib
import multiprocessing
import os
import sys
import traceback
import six
from .cpus import cpus_available
class ErrorFromWorker(object):
"""Wrapper class to report an error from a worker process"""
def __init__(self, exc_cls, exc, tb):
"""Create an error object from an exception raised from
the worker process.
The attributes of the process error objects are all strings
as they are easy to send over a pipe.
Args:
exc: exception raised from the worker process
"""
self.pid = os.getpid()
self.error_message = ''.join(traceback.format_exception(exc_cls, exc, tb))
def __str__(self):
msg = "[PID={0.pid}] {0.error_message}"
return msg.format(self)
class Task(object):
"""Wrapped task that trap every Exception and return it as an
ErrorFromWorker object.
We are using a wrapper class instead of a decorator since the class
is pickleable, while a decorator with an inner closure is not.
"""
def __init__(self, func):
self.func = func
def __call__(self, *args, **kwargs):
try:
value = self.func(*args, **kwargs)
except Exception:
value = ErrorFromWorker(*sys.exc_info())
return value
def raise_if_errors(*results):
"""Analyze results from worker Processes to search for ErrorFromWorker
objects. If found print all of them and raise an exception.
Args:
*results: results from worker processes
Raise:
RuntimeError: if ErrorFromWorker objects are in the results
"""
err_stream = six.StringIO() # sys.stderr
errors = [x for x in results if isinstance(x, ErrorFromWorker)]
if not errors:
return
# Report the errors and then raise
for error in errors:
print(error, file=err_stream)
print('[PARENT PROCESS]:', file=err_stream)
traceback.print_stack(file=err_stream)
error_msg = 'errors occurred in worker processes:\n{0}'
raise RuntimeError(error_msg.format(err_stream.getvalue()))
@contextlib.contextmanager
def pool(*args, **kwargs):
"""Context manager to start and terminate a pool of processes, similar to the
default one provided in Python 3.X
Arguments are forwarded to the multiprocessing.Pool.__init__ method.
"""
try:
p = multiprocessing.Pool(*args, **kwargs)
yield p
finally:
p.terminate()
p.join()
def num_processes(max_processes=None):
"""Return the number of processes in a pool.
Currently the function return the minimum between the maximum number
of processes and the cpus available.
When a maximum number of processes is not specified return the cpus available.
Args:
max_processes (int or None): maximum number of processes allowed
"""
max_processes or cpus_available()
return min(cpus_available(), max_processes)
def parallel_map(func, arguments, max_processes=None):
"""Map a task object to the list of arguments, return the list of results.
Args:
func (Task): user defined task object
arguments (list): list of arguments for the task
max_processes (int or None): maximum number of processes allowed
Raises:
RuntimeError: if any error occurred in the worker processes
"""
task_wrapper = Task(func)
if sys.platform != 'darwin':
with pool(processes=num_processes(max_processes=max_processes)) as p:
results = p.map(task_wrapper, arguments)
else:
results = list(map(task_wrapper, arguments))
raise_if_errors(*results)
return results

View file

@ -39,7 +39,7 @@ class Flake8(Package):
variant('super-awesome-feature', default=True, description='Enable super awesome feature')
variant('somewhat-awesome-feature', default=False, description='Enable somewhat awesome feature')
provides('lapack', when='@2.0+super-awesome-feature+somewhat-awesome-feature')
provides('somevirt', when='@2.0+super-awesome-feature+somewhat-awesome-feature')
extends('python', ignore='bin/(why|does|every|package|that|depends|on|numpy|need|to|copy|f2py3?)')