From dd58e922e7dc8d808f5f8bd1ad41d7c407ce2d90 Mon Sep 17 00:00:00 2001 From: Harmen Stoppels Date: Thu, 18 Jan 2024 17:28:50 +0100 Subject: [PATCH] oci: only push in parallel when forking (#42143) --- lib/spack/spack/cmd/buildcache.py | 35 ++++++++++++++++++++++++------- lib/spack/spack/test/conftest.py | 16 ++------------ 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/lib/spack/spack/cmd/buildcache.py b/lib/spack/spack/cmd/buildcache.py index 94cce16030..ddfdee2907 100644 --- a/lib/spack/spack/cmd/buildcache.py +++ b/lib/spack/spack/cmd/buildcache.py @@ -7,13 +7,14 @@ import glob import hashlib import json +import multiprocessing import multiprocessing.pool import os import shutil import sys import tempfile import urllib.request -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union import llnl.util.tty as tty from llnl.string import plural @@ -307,8 +308,30 @@ def _progress(i: int, total: int): return "" -def _make_pool(): - return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True)) +class NoPool: + def map(self, func, args): + return [func(a) for a in args] + + def starmap(self, func, args): + return [func(*a) for a in args] + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + +MaybePool = Union[multiprocessing.pool.Pool, NoPool] + + +def _make_pool() -> MaybePool: + """Can't use threading because it's unsafe, and can't use spawned processes because of globals. + That leaves only forking""" + if multiprocessing.get_start_method() == "fork": + return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True)) + else: + return NoPool() def push_fn(args): @@ -591,7 +614,7 @@ def _push_oci( image_ref: ImageReference, installed_specs_with_deps: List[Spec], tmpdir: str, - pool: multiprocessing.pool.Pool, + pool: MaybePool, ) -> List[str]: """Push specs to an OCI registry @@ -692,9 +715,7 @@ def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]: return config if "spec" in config else None -def _update_index_oci( - image_ref: ImageReference, tmpdir: str, pool: multiprocessing.pool.Pool -) -> None: +def _update_index_oci(image_ref: ImageReference, tmpdir: str, pool: MaybePool) -> None: response = spack.oci.opener.urlopen(urllib.request.Request(url=image_ref.tags_url())) spack.oci.opener.ensure_status(response, 200) tags = json.load(response)["tags"] diff --git a/lib/spack/spack/test/conftest.py b/lib/spack/spack/test/conftest.py index 73c0891219..2a1cacf2fa 100644 --- a/lib/spack/spack/test/conftest.py +++ b/lib/spack/spack/test/conftest.py @@ -1950,17 +1950,5 @@ def pytest_runtest_setup(item): @pytest.fixture(scope="function") def disable_parallel_buildcache_push(monkeypatch): - class MockPool: - def map(self, func, args): - return [func(a) for a in args] - - def starmap(self, func, args): - return [func(*a) for a in args] - - def __enter__(self): - return self - - def __exit__(self, *args): - pass - - monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", MockPool) + """Disable process pools in tests.""" + monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", spack.cmd.buildcache.NoPool)