environment: solve one spec per child process (#40876)

Looking at the memory profiles of concurrent solves
for environment with unify:false, it seems memory
is only ramping up.

This exchange in the potassco mailing list:
 https://sourceforge.net/p/potassco/mailman/potassco-users/thread/b55b5b8c2e8945409abb3fa3c935c27e%40lohn.at/#msg36517698

Seems to suggest that clingo doesn't release memory
until end of the application.

Since when unify:false we distribute work to processes,
here we give a maxtaskperchild=1, so we clean memory
after each solve.
This commit is contained in:
Massimiliano Culpo 2023-11-04 00:10:42 +01:00 committed by GitHub
parent 8e96d3a051
commit f50377de7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 3 deletions

View file

@ -1525,7 +1525,11 @@ def _concretize_separately(self, tests=False):
batch = [] batch = []
for j, (i, concrete, duration) in enumerate( for j, (i, concrete, duration) in enumerate(
spack.util.parallel.imap_unordered( spack.util.parallel.imap_unordered(
_concretize_task, args, processes=num_procs, debug=tty.is_debug() _concretize_task,
args,
processes=num_procs,
debug=tty.is_debug(),
maxtaskperchild=1,
) )
): ):
batch.append((i, concrete)) batch.append((i, concrete))

View file

@ -6,6 +6,7 @@
import os import os
import sys import sys
import traceback import traceback
from typing import Optional
class ErrorFromWorker: class ErrorFromWorker:
@ -53,7 +54,9 @@ def __call__(self, *args, **kwargs):
return value return value
def imap_unordered(f, list_of_args, *, processes: int, debug=False): def imap_unordered(
f, list_of_args, *, processes: int, maxtaskperchild: Optional[int] = None, debug=False
):
"""Wrapper around multiprocessing.Pool.imap_unordered. """Wrapper around multiprocessing.Pool.imap_unordered.
Args: Args:
@ -62,6 +65,8 @@ def imap_unordered(f, list_of_args, *, processes: int, debug=False):
processes: maximum number of processes allowed processes: maximum number of processes allowed
debug: if False, raise an exception containing just the error messages debug: if False, raise an exception containing just the error messages
from workers, if True an exception with complete stacktraces from workers, if True an exception with complete stacktraces
maxtaskperchild: number of tasks to be executed by a child before being
killed and substituted
Raises: Raises:
RuntimeError: if any error occurred in the worker processes RuntimeError: if any error occurred in the worker processes
@ -70,7 +75,7 @@ def imap_unordered(f, list_of_args, *, processes: int, debug=False):
yield from map(f, list_of_args) yield from map(f, list_of_args)
return return
with multiprocessing.Pool(processes) as p: with multiprocessing.Pool(processes, maxtasksperchild=maxtaskperchild) as p:
for result in p.imap_unordered(Task(f), list_of_args): for result in p.imap_unordered(Task(f), list_of_args):
if isinstance(result, ErrorFromWorker): if isinstance(result, ErrorFromWorker):
raise RuntimeError(result.stacktrace if debug else str(result)) raise RuntimeError(result.stacktrace if debug else str(result))