concretize separately: show concretization time per spec as they concretize when verbose (#40634)
This commit is contained in:
parent
4bade7ef96
commit
27a0425e5d
4 changed files with 47 additions and 98 deletions
|
@ -1480,11 +1480,12 @@ def _concretize_separately(self, tests=False):
|
|||
self._add_concrete_spec(s, concrete, new=False)
|
||||
|
||||
# Concretize any new user specs that we haven't concretized yet
|
||||
arguments, root_specs = [], []
|
||||
args, root_specs, i = [], [], 0
|
||||
for uspec, uspec_constraints in zip(self.user_specs, self.user_specs.specs_as_constraints):
|
||||
if uspec not in old_concretized_user_specs:
|
||||
root_specs.append(uspec)
|
||||
arguments.append((uspec_constraints, tests))
|
||||
args.append((i, uspec_constraints, tests))
|
||||
i += 1
|
||||
|
||||
# Ensure we don't try to bootstrap clingo in parallel
|
||||
if spack.config.get("config:concretizer", "clingo") == "clingo":
|
||||
|
@ -1503,34 +1504,36 @@ def _concretize_separately(self, tests=False):
|
|||
_ = spack.compilers.get_compiler_config()
|
||||
|
||||
# Early return if there is nothing to do
|
||||
if len(arguments) == 0:
|
||||
if len(args) == 0:
|
||||
return []
|
||||
|
||||
# Solve the environment in parallel on Linux
|
||||
start = time.time()
|
||||
max_processes = min(
|
||||
len(arguments), # Number of specs
|
||||
spack.util.cpus.determine_number_of_jobs(parallel=True),
|
||||
)
|
||||
num_procs = min(len(args), spack.util.cpus.determine_number_of_jobs(parallel=True))
|
||||
|
||||
# TODO: revisit this print as soon as darwin is parallel too
|
||||
# TODO: support parallel concretization on macOS and Windows
|
||||
msg = "Starting concretization"
|
||||
if sys.platform != "darwin":
|
||||
pool_size = spack.util.parallel.num_processes(max_processes=max_processes)
|
||||
if pool_size > 1:
|
||||
msg = msg + " pool with {0} processes".format(pool_size)
|
||||
if sys.platform not in ("darwin", "win32") and num_procs > 1:
|
||||
msg += f" pool with {num_procs} processes"
|
||||
tty.msg(msg)
|
||||
|
||||
concretized_root_specs = spack.util.parallel.parallel_map(
|
||||
_concretize_task, arguments, max_processes=max_processes, debug=tty.is_debug()
|
||||
)
|
||||
batch = []
|
||||
for i, concrete, duration in spack.util.parallel.imap_unordered(
|
||||
_concretize_task, args, processes=num_procs, debug=tty.is_debug()
|
||||
):
|
||||
batch.append((i, concrete))
|
||||
tty.verbose(f"[{duration:7.2f}s] {root_specs[i]}")
|
||||
sys.stdout.flush()
|
||||
|
||||
# Add specs in original order
|
||||
batch.sort(key=lambda x: x[0])
|
||||
by_hash = {} # for attaching information on test dependencies
|
||||
for root, (_, concrete) in zip(root_specs, batch):
|
||||
self._add_concrete_spec(root, concrete)
|
||||
by_hash[concrete.dag_hash()] = concrete
|
||||
|
||||
finish = time.time()
|
||||
tty.msg("Environment concretized in %.2f seconds." % (finish - start))
|
||||
by_hash = {}
|
||||
for abstract, concrete in zip(root_specs, concretized_root_specs):
|
||||
self._add_concrete_spec(abstract, concrete)
|
||||
by_hash[concrete.dag_hash()] = concrete
|
||||
tty.msg(f"Environment concretized in {finish - start:.2f} seconds")
|
||||
|
||||
# Unify the specs objects, so we get correct references to all parents
|
||||
self._read_lockfile_dict(self._to_lockfile_dict())
|
||||
|
@ -2392,10 +2395,12 @@ def _concretize_from_constraints(spec_constraints, tests=False):
|
|||
invalid_constraints.extend(inv_variant_constraints)
|
||||
|
||||
|
||||
def _concretize_task(packed_arguments):
|
||||
spec_constraints, tests = packed_arguments
|
||||
def _concretize_task(packed_arguments) -> Tuple[int, Spec, float]:
|
||||
index, spec_constraints, tests = packed_arguments
|
||||
with tty.SuppressOutput(msg_enabled=False):
|
||||
return _concretize_from_constraints(spec_constraints, tests)
|
||||
start = time.time()
|
||||
spec = _concretize_from_constraints(spec_constraints, tests)
|
||||
return index, spec, time.time() - start
|
||||
|
||||
|
||||
def make_repo_path(root):
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
import spack.build_environment
|
||||
import spack.environment as ev
|
||||
import spack.error
|
||||
import spack.spec
|
||||
import spack.store
|
||||
from spack.main import SpackCommand
|
||||
|
@ -237,7 +238,7 @@ def test_dev_build_env_version_mismatch(tmpdir, install_mockery, mutable_mock_en
|
|||
|
||||
env("create", "test", "./spack.yaml")
|
||||
with ev.read("test"):
|
||||
with pytest.raises(RuntimeError):
|
||||
with pytest.raises((RuntimeError, spack.error.UnsatisfiableSpecError)):
|
||||
install()
|
||||
|
||||
|
||||
|
|
|
@ -2,14 +2,11 @@
|
|||
# Spack Project Developers. See the top-level COPYRIGHT file for details.
|
||||
#
|
||||
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
|
||||
import contextlib
|
||||
import multiprocessing
|
||||
import os
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
from .cpus import cpus_available
|
||||
|
||||
|
||||
class ErrorFromWorker:
|
||||
"""Wrapper class to report an error from a worker process"""
|
||||
|
@ -56,79 +53,25 @@ def __call__(self, *args, **kwargs):
|
|||
return value
|
||||
|
||||
|
||||
def raise_if_errors(*results, **kwargs):
|
||||
"""Analyze results from worker Processes to search for ErrorFromWorker
|
||||
objects. If found print all of them and raise an exception.
|
||||
def imap_unordered(f, list_of_args, *, processes: int, debug=False):
|
||||
"""Wrapper around multiprocessing.Pool.imap_unordered.
|
||||
|
||||
Args:
|
||||
*results: results from worker processes
|
||||
debug: if True show complete stacktraces
|
||||
|
||||
Raise:
|
||||
RuntimeError: if ErrorFromWorker objects are in the results
|
||||
"""
|
||||
debug = kwargs.get("debug", False) # This can be a keyword only arg in Python 3
|
||||
errors = [x for x in results if isinstance(x, ErrorFromWorker)]
|
||||
if not errors:
|
||||
return
|
||||
|
||||
msg = "\n".join([error.stacktrace if debug else str(error) for error in errors])
|
||||
|
||||
error_fmt = "{0}"
|
||||
if len(errors) > 1 and not debug:
|
||||
error_fmt = "errors occurred during concretization of the environment:\n{0}"
|
||||
|
||||
raise RuntimeError(error_fmt.format(msg))
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def pool(*args, **kwargs):
|
||||
"""Context manager to start and terminate a pool of processes, similar to the
|
||||
default one provided in Python 3.X
|
||||
|
||||
Arguments are forwarded to the multiprocessing.Pool.__init__ method.
|
||||
"""
|
||||
try:
|
||||
p = multiprocessing.Pool(*args, **kwargs)
|
||||
yield p
|
||||
finally:
|
||||
p.terminate()
|
||||
p.join()
|
||||
|
||||
|
||||
def num_processes(max_processes=None):
|
||||
"""Return the number of processes in a pool.
|
||||
|
||||
Currently the function return the minimum between the maximum number
|
||||
of processes and the cpus available.
|
||||
|
||||
When a maximum number of processes is not specified return the cpus available.
|
||||
|
||||
Args:
|
||||
max_processes (int or None): maximum number of processes allowed
|
||||
"""
|
||||
max_processes or cpus_available()
|
||||
return min(cpus_available(), max_processes)
|
||||
|
||||
|
||||
def parallel_map(func, arguments, max_processes=None, debug=False):
|
||||
"""Map a task object to the list of arguments, return the list of results.
|
||||
|
||||
Args:
|
||||
func (Task): user defined task object
|
||||
arguments (list): list of arguments for the task
|
||||
max_processes (int or None): maximum number of processes allowed
|
||||
debug (bool): if False, raise an exception containing just the error messages
|
||||
f: function to apply
|
||||
list_of_args: list of tuples of args for the task
|
||||
processes: maximum number of processes allowed
|
||||
debug: if False, raise an exception containing just the error messages
|
||||
from workers, if True an exception with complete stacktraces
|
||||
|
||||
Raises:
|
||||
RuntimeError: if any error occurred in the worker processes
|
||||
"""
|
||||
task_wrapper = Task(func)
|
||||
if sys.platform != "darwin" and sys.platform != "win32":
|
||||
with pool(processes=num_processes(max_processes=max_processes)) as p:
|
||||
results = p.map(task_wrapper, arguments)
|
||||
else:
|
||||
results = list(map(task_wrapper, arguments))
|
||||
raise_if_errors(*results, debug=debug)
|
||||
return results
|
||||
if sys.platform in ("darwin", "win32") or len(list_of_args) == 1:
|
||||
yield from map(f, list_of_args)
|
||||
return
|
||||
|
||||
with multiprocessing.Pool(processes) as p:
|
||||
for result in p.imap_unordered(Task(f), list_of_args):
|
||||
if isinstance(result, ErrorFromWorker):
|
||||
raise RuntimeError(result.stacktrace if debug else str(result))
|
||||
yield result
|
||||
|
|
|
@ -144,7 +144,7 @@ default:
|
|||
- spack python -c "import os,sys; print(os.path.expandvars(sys.stdin.read()))"
|
||||
< "${SPACK_CI_CONFIG_ROOT}/${PIPELINE_MIRROR_TEMPLATE}" > "${SPACK_CI_CONFIG_ROOT}/mirrors.yaml"
|
||||
- spack config add -f "${SPACK_CI_CONFIG_ROOT}/mirrors.yaml"
|
||||
- spack
|
||||
- spack -v
|
||||
--config-scope "${SPACK_CI_CONFIG_ROOT}"
|
||||
--config-scope "${SPACK_CI_CONFIG_ROOT}/${SPACK_TARGET_PLATFORM}"
|
||||
--config-scope "${SPACK_CI_CONFIG_ROOT}/${SPACK_TARGET_PLATFORM}/${SPACK_TARGET_ARCH}"
|
||||
|
@ -197,7 +197,7 @@ default:
|
|||
- spack --version
|
||||
- cd share/spack/gitlab/cloud_pipelines/stacks/${SPACK_CI_STACK_NAME}
|
||||
- spack env activate --without-view .
|
||||
- spack
|
||||
- spack -v
|
||||
ci generate --check-index-only
|
||||
--buildcache-destination "${PUSH_BUILDCACHE_DEPRECATED}"
|
||||
--artifacts-root "${CI_PROJECT_DIR}/jobs_scratch_dir"
|
||||
|
|
Loading…
Reference in a new issue