oci: only push in parallel when forking (#42143)

This commit is contained in:
Harmen Stoppels 2024-01-18 17:28:50 +01:00 committed by Massimiliano Culpo
parent b23a829c4c
commit dd58e922e7
2 changed files with 30 additions and 21 deletions

View file

@ -7,13 +7,14 @@
import glob import glob
import hashlib import hashlib
import json import json
import multiprocessing
import multiprocessing.pool import multiprocessing.pool
import os import os
import shutil import shutil
import sys import sys
import tempfile import tempfile
import urllib.request import urllib.request
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple, Union
import llnl.util.tty as tty import llnl.util.tty as tty
from llnl.string import plural from llnl.string import plural
@ -307,8 +308,30 @@ def _progress(i: int, total: int):
return "" return ""
def _make_pool(): class NoPool:
def map(self, func, args):
return [func(a) for a in args]
def starmap(self, func, args):
return [func(*a) for a in args]
def __enter__(self):
return self
def __exit__(self, *args):
pass
MaybePool = Union[multiprocessing.pool.Pool, NoPool]
def _make_pool() -> MaybePool:
"""Can't use threading because it's unsafe, and can't use spawned processes because of globals.
That leaves only forking"""
if multiprocessing.get_start_method() == "fork":
return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True)) return multiprocessing.pool.Pool(determine_number_of_jobs(parallel=True))
else:
return NoPool()
def push_fn(args): def push_fn(args):
@ -591,7 +614,7 @@ def _push_oci(
image_ref: ImageReference, image_ref: ImageReference,
installed_specs_with_deps: List[Spec], installed_specs_with_deps: List[Spec],
tmpdir: str, tmpdir: str,
pool: multiprocessing.pool.Pool, pool: MaybePool,
) -> List[str]: ) -> List[str]:
"""Push specs to an OCI registry """Push specs to an OCI registry
@ -692,9 +715,7 @@ def _config_from_tag(image_ref: ImageReference, tag: str) -> Optional[dict]:
return config if "spec" in config else None return config if "spec" in config else None
def _update_index_oci( def _update_index_oci(image_ref: ImageReference, tmpdir: str, pool: MaybePool) -> None:
image_ref: ImageReference, tmpdir: str, pool: multiprocessing.pool.Pool
) -> None:
response = spack.oci.opener.urlopen(urllib.request.Request(url=image_ref.tags_url())) response = spack.oci.opener.urlopen(urllib.request.Request(url=image_ref.tags_url()))
spack.oci.opener.ensure_status(response, 200) spack.oci.opener.ensure_status(response, 200)
tags = json.load(response)["tags"] tags = json.load(response)["tags"]

View file

@ -1950,17 +1950,5 @@ def pytest_runtest_setup(item):
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def disable_parallel_buildcache_push(monkeypatch): def disable_parallel_buildcache_push(monkeypatch):
class MockPool: """Disable process pools in tests."""
def map(self, func, args): monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", spack.cmd.buildcache.NoPool)
return [func(a) for a in args]
def starmap(self, func, args):
return [func(*a) for a in args]
def __enter__(self):
return self
def __exit__(self, *args):
pass
monkeypatch.setattr(spack.cmd.buildcache, "_make_pool", MockPool)