From f50377de7f087868dd481b4129694b85e3594ba6 Mon Sep 17 00:00:00 2001 From: Massimiliano Culpo Date: Sat, 4 Nov 2023 00:10:42 +0100 Subject: [PATCH] environment: solve one spec per child process (#40876) Looking at the memory profiles of concurrent solves for environment with unify:false, it seems memory is only ramping up. This exchange in the potassco mailing list: https://sourceforge.net/p/potassco/mailman/potassco-users/thread/b55b5b8c2e8945409abb3fa3c935c27e%40lohn.at/#msg36517698 Seems to suggest that clingo doesn't release memory until end of the application. Since when unify:false we distribute work to processes, here we give a maxtaskperchild=1, so we clean memory after each solve. --- lib/spack/spack/environment/environment.py | 6 +++++- lib/spack/spack/util/parallel.py | 9 +++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/spack/spack/environment/environment.py b/lib/spack/spack/environment/environment.py index 3fd75f3d70..85c10e366b 100644 --- a/lib/spack/spack/environment/environment.py +++ b/lib/spack/spack/environment/environment.py @@ -1525,7 +1525,11 @@ def _concretize_separately(self, tests=False): batch = [] for j, (i, concrete, duration) in enumerate( spack.util.parallel.imap_unordered( - _concretize_task, args, processes=num_procs, debug=tty.is_debug() + _concretize_task, + args, + processes=num_procs, + debug=tty.is_debug(), + maxtaskperchild=1, ) ): batch.append((i, concrete)) diff --git a/lib/spack/spack/util/parallel.py b/lib/spack/spack/util/parallel.py index 683835641a..c8e6ef7907 100644 --- a/lib/spack/spack/util/parallel.py +++ b/lib/spack/spack/util/parallel.py @@ -6,6 +6,7 @@ import os import sys import traceback +from typing import Optional class ErrorFromWorker: @@ -53,7 +54,9 @@ def __call__(self, *args, **kwargs): return value -def imap_unordered(f, list_of_args, *, processes: int, debug=False): +def imap_unordered( + f, list_of_args, *, processes: int, maxtaskperchild: Optional[int] = None, debug=False +): """Wrapper around multiprocessing.Pool.imap_unordered. Args: @@ -62,6 +65,8 @@ def imap_unordered(f, list_of_args, *, processes: int, debug=False): processes: maximum number of processes allowed debug: if False, raise an exception containing just the error messages from workers, if True an exception with complete stacktraces + maxtaskperchild: number of tasks to be executed by a child before being + killed and substituted Raises: RuntimeError: if any error occurred in the worker processes @@ -70,7 +75,7 @@ def imap_unordered(f, list_of_args, *, processes: int, debug=False): yield from map(f, list_of_args) return - with multiprocessing.Pool(processes) as p: + with multiprocessing.Pool(processes, maxtasksperchild=maxtaskperchild) as p: for result in p.imap_unordered(Task(f), list_of_args): if isinstance(result, ErrorFromWorker): raise RuntimeError(result.stacktrace if debug else str(result))