Extract prefix locks and failure markers from Database (#39024)

This PR extracts two responsibilities from the `Database` class:
1. Managing locks for prefixes during an installation
2. Marking installation failures

and pushes them into their own class (`SpecLocker` and `FailureMarker`). These responsibilities are also pushed up into the `Store`, leaving to `Database` only the duty to manage `index.json` files.

`SpecLocker` classes no longer share a global list of locks, but locks are per instance. Their identifier is simply `(dag hash, package name)`, and not the spec prefix path, to avoid circular dependencies across Store / Database / Spec.
This commit is contained in:
Massimiliano Culpo 2023-08-07 12:47:52 +02:00 committed by GitHub
parent 27f04b3544
commit ba1d295023
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 358 additions and 384 deletions

View file

@ -17,6 +17,7 @@
import spack.config
import spack.repo
import spack.stage
import spack.store
import spack.util.path
from spack.paths import lib_path, var_path
@ -121,7 +122,7 @@ def clean(parser, args):
if args.failures:
tty.msg("Removing install failure marks")
spack.installer.clear_failures()
spack.store.STORE.failure_tracker.clear_all()
if args.misc_cache:
tty.msg("Removing cached information on repositories")

View file

@ -21,10 +21,11 @@
import contextlib
import datetime
import os
import pathlib
import socket
import sys
import time
from typing import Dict, List, NamedTuple, Set, Type, Union
from typing import Any, Callable, Dict, Generator, List, NamedTuple, Set, Type, Union
try:
import uuid
@ -141,22 +142,23 @@ class InstallStatuses:
def canonicalize(cls, query_arg):
if query_arg is True:
return [cls.INSTALLED]
elif query_arg is False:
if query_arg is False:
return [cls.MISSING]
elif query_arg is any:
if query_arg is any:
return [cls.INSTALLED, cls.DEPRECATED, cls.MISSING]
elif isinstance(query_arg, InstallStatus):
if isinstance(query_arg, InstallStatus):
return [query_arg]
else:
try: # Try block catches if it is not an iterable at all
if any(type(x) != InstallStatus for x in query_arg):
raise TypeError
except TypeError:
raise TypeError(
"installation query must be `any`, boolean, "
"InstallStatus, or iterable of InstallStatus"
)
return query_arg
try:
statuses = list(query_arg)
if all(isinstance(x, InstallStatus) for x in statuses):
return statuses
except TypeError:
pass
raise TypeError(
"installation query must be `any`, boolean, "
"InstallStatus, or iterable of InstallStatus"
)
class InstallRecord:
@ -306,15 +308,16 @@ def __reduce__(self):
"""
#: Data class to configure locks in Database objects
#:
#: Args:
#: enable (bool): whether to enable locks or not.
#: database_timeout (int or None): timeout for the database lock
#: package_timeout (int or None): timeout for the package lock
class LockConfiguration(NamedTuple):
"""Data class to configure locks in Database objects
Args:
enable: whether to enable locks or not.
database_timeout: timeout for the database lock
package_timeout: timeout for the package lock
"""
enable: bool
database_timeout: Optional[int]
package_timeout: Optional[int]
@ -348,13 +351,230 @@ def lock_configuration(configuration):
)
def prefix_lock_path(root_dir: Union[str, pathlib.Path]) -> pathlib.Path:
"""Returns the path of the prefix lock file, given the root directory.
Args:
root_dir: root directory containing the database directory
"""
return pathlib.Path(root_dir) / _DB_DIRNAME / "prefix_lock"
def failures_lock_path(root_dir: Union[str, pathlib.Path]) -> pathlib.Path:
"""Returns the path of the failures lock file, given the root directory.
Args:
root_dir: root directory containing the database directory
"""
return pathlib.Path(root_dir) / _DB_DIRNAME / "prefix_failures"
class SpecLocker:
"""Manages acquiring and releasing read or write locks on concrete specs."""
def __init__(self, lock_path: Union[str, pathlib.Path], default_timeout: Optional[float]):
self.lock_path = pathlib.Path(lock_path)
self.default_timeout = default_timeout
# Maps (spec.dag_hash(), spec.name) to the corresponding lock object
self.locks: Dict[Tuple[str, str], lk.Lock] = {}
def lock(self, spec: "spack.spec.Spec", timeout: Optional[float] = None) -> lk.Lock:
"""Returns a lock on a concrete spec.
The lock is a byte range lock on the nth byte of a file.
The lock file is ``self.lock_path``.
n is the sys.maxsize-bit prefix of the DAG hash. This makes likelihood of collision is
very low AND it gives us readers-writer lock semantics with just a single lockfile, so
no cleanup required.
"""
assert spec.concrete, "cannot lock a non-concrete spec"
timeout = timeout or self.default_timeout
key = self._lock_key(spec)
if key not in self.locks:
self.locks[key] = self.raw_lock(spec, timeout=timeout)
else:
self.locks[key].default_timeout = timeout
return self.locks[key]
def raw_lock(self, spec: "spack.spec.Spec", timeout: Optional[float] = None) -> lk.Lock:
"""Returns a raw lock for a Spec, but doesn't keep track of it."""
return lk.Lock(
str(self.lock_path),
start=spec.dag_hash_bit_prefix(bit_length(sys.maxsize)),
length=1,
default_timeout=timeout,
desc=spec.name,
)
def has_lock(self, spec: "spack.spec.Spec") -> bool:
"""Returns True if the spec is already managed by this spec locker"""
return self._lock_key(spec) in self.locks
def _lock_key(self, spec: "spack.spec.Spec") -> Tuple[str, str]:
return (spec.dag_hash(), spec.name)
@contextlib.contextmanager
def write_lock(self, spec: "spack.spec.Spec") -> Generator["SpecLocker", None, None]:
lock = self.lock(spec)
lock.acquire_write()
try:
yield self
except lk.LockError:
# This addresses the case where a nested lock attempt fails inside
# of this context manager
raise
except (Exception, KeyboardInterrupt):
lock.release_write()
raise
else:
lock.release_write()
def clear(self, spec: "spack.spec.Spec") -> Tuple[bool, Optional[lk.Lock]]:
key = self._lock_key(spec)
lock = self.locks.pop(key, None)
return bool(lock), lock
def clear_all(self, clear_fn: Optional[Callable[[lk.Lock], Any]] = None) -> None:
if clear_fn is not None:
for lock in self.locks.values():
clear_fn(lock)
self.locks.clear()
class FailureTracker:
"""Tracks installation failures.
Prefix failure marking takes the form of a byte range lock on the nth
byte of a file for coordinating between concurrent parallel build
processes and a persistent file, named with the full hash and
containing the spec, in a subdirectory of the database to enable
persistence across overlapping but separate related build processes.
The failure lock file lives alongside the install DB.
``n`` is the sys.maxsize-bit prefix of the associated DAG hash to make
the likelihood of collision very low with no cleanup required.
"""
def __init__(self, root_dir: Union[str, pathlib.Path], default_timeout: Optional[float]):
#: Ensure a persistent location for dealing with parallel installation
#: failures (e.g., across near-concurrent processes).
self.dir = pathlib.Path(root_dir) / _DB_DIRNAME / "failures"
self.dir.mkdir(parents=True, exist_ok=True)
self.locker = SpecLocker(failures_lock_path(root_dir), default_timeout=default_timeout)
def clear(self, spec: "spack.spec.Spec", force: bool = False) -> None:
"""Removes any persistent and cached failure tracking for the spec.
see `mark()`.
Args:
spec: the spec whose failure indicators are being removed
force: True if the failure information should be cleared when a failure lock
exists for the file, or False if the failure should not be cleared (e.g.,
it may be associated with a concurrent build)
"""
locked = self.lock_taken(spec)
if locked and not force:
tty.msg(f"Retaining failure marking for {spec.name} due to lock")
return
if locked:
tty.warn(f"Removing failure marking despite lock for {spec.name}")
succeeded, lock = self.locker.clear(spec)
if succeeded and lock is not None:
lock.release_write()
if self.persistent_mark(spec):
path = self._path(spec)
tty.debug(f"Removing failure marking for {spec.name}")
try:
path.unlink()
except OSError as err:
tty.warn(
f"Unable to remove failure marking for {spec.name} ({str(path)}): {str(err)}"
)
def clear_all(self) -> None:
"""Force remove install failure tracking files."""
tty.debug("Releasing prefix failure locks")
self.locker.clear_all(
clear_fn=lambda x: x.release_write() if x.is_write_locked() else True
)
tty.debug("Removing prefix failure tracking files")
try:
for fail_mark in os.listdir(str(self.dir)):
try:
(self.dir / fail_mark).unlink()
except OSError as exc:
tty.warn(f"Unable to remove failure marking file {fail_mark}: {str(exc)}")
except OSError as exc:
tty.warn(f"Unable to remove failure marking files: {str(exc)}")
def mark(self, spec: "spack.spec.Spec") -> lk.Lock:
"""Marks a spec as failing to install.
Args:
spec: spec that failed to install
"""
# Dump the spec to the failure file for (manual) debugging purposes
path = self._path(spec)
path.write_text(spec.to_json())
# Also ensure a failure lock is taken to prevent cleanup removal
# of failure status information during a concurrent parallel build.
if not self.locker.has_lock(spec):
try:
mark = self.locker.lock(spec)
mark.acquire_write()
except lk.LockTimeoutError:
# Unlikely that another process failed to install at the same
# time but log it anyway.
tty.debug(f"PID {os.getpid()} failed to mark install failure for {spec.name}")
tty.warn(f"Unable to mark {spec.name} as failed.")
return self.locker.lock(spec)
def has_failed(self, spec: "spack.spec.Spec") -> bool:
"""Return True if the spec is marked as failed."""
# The failure was detected in this process.
if self.locker.has_lock(spec):
return True
# The failure was detected by a concurrent process (e.g., an srun),
# which is expected to be holding a write lock if that is the case.
if self.lock_taken(spec):
return True
# Determine if the spec may have been marked as failed by a separate
# spack build process running concurrently.
return self.persistent_mark(spec)
def lock_taken(self, spec: "spack.spec.Spec") -> bool:
"""Return True if another process has a failure lock on the spec."""
check = self.locker.raw_lock(spec)
return check.is_write_locked()
def persistent_mark(self, spec: "spack.spec.Spec") -> bool:
"""Determine if the spec has a persistent failure marking."""
return self._path(spec).exists()
def _path(self, spec: "spack.spec.Spec") -> pathlib.Path:
"""Return the path to the spec's failure file, which may not exist."""
assert spec.concrete, "concrete spec required for failure path"
return self.dir / f"{spec.name}-{spec.dag_hash()}"
class Database:
#: Per-process lock objects for each install prefix
_prefix_locks: Dict[str, lk.Lock] = {}
#: Per-process failure (lock) objects for each install prefix
_prefix_failures: Dict[str, lk.Lock] = {}
#: Fields written for each install record
record_fields: Tuple[str, ...] = DEFAULT_INSTALL_RECORD_FIELDS
@ -392,24 +612,10 @@ def __init__(
self._verifier_path = os.path.join(self.database_directory, "index_verifier")
self._lock_path = os.path.join(self.database_directory, "lock")
# This is for other classes to use to lock prefix directories.
self.prefix_lock_path = os.path.join(self.database_directory, "prefix_lock")
# Ensure a persistent location for dealing with parallel installation
# failures (e.g., across near-concurrent processes).
self._failure_dir = os.path.join(self.database_directory, "failures")
# Support special locks for handling parallel installation failures
# of a spec.
self.prefix_fail_path = os.path.join(self.database_directory, "prefix_failures")
# Create needed directories and files
if not is_upstream and not os.path.exists(self.database_directory):
fs.mkdirp(self.database_directory)
if not is_upstream and not os.path.exists(self._failure_dir):
fs.mkdirp(self._failure_dir)
self.is_upstream = is_upstream
self.last_seen_verifier = ""
# Failed write transactions (interrupted by exceptions) will alert
@ -423,15 +629,7 @@ def __init__(
# initialize rest of state.
self.db_lock_timeout = lock_cfg.database_timeout
self.package_lock_timeout = lock_cfg.package_timeout
tty.debug("DATABASE LOCK TIMEOUT: {0}s".format(str(self.db_lock_timeout)))
timeout_format_str = (
"{0}s".format(str(self.package_lock_timeout))
if self.package_lock_timeout
else "No timeout"
)
tty.debug("PACKAGE LOCK TIMEOUT: {0}".format(str(timeout_format_str)))
self.lock: Union[ForbiddenLock, lk.Lock]
if self.is_upstream:
@ -471,212 +669,6 @@ def read_transaction(self):
"""Get a read lock context manager for use in a `with` block."""
return self._read_transaction_impl(self.lock, acquire=self._read)
def _failed_spec_path(self, spec):
"""Return the path to the spec's failure file, which may not exist."""
if not spec.concrete:
raise ValueError("Concrete spec required for failure path for {0}".format(spec.name))
return os.path.join(self._failure_dir, "{0}-{1}".format(spec.name, spec.dag_hash()))
def clear_all_failures(self) -> None:
"""Force remove install failure tracking files."""
tty.debug("Releasing prefix failure locks")
for pkg_id in list(self._prefix_failures.keys()):
lock = self._prefix_failures.pop(pkg_id, None)
if lock:
lock.release_write()
# Remove all failure markings (aka files)
tty.debug("Removing prefix failure tracking files")
for fail_mark in os.listdir(self._failure_dir):
try:
os.remove(os.path.join(self._failure_dir, fail_mark))
except OSError as exc:
tty.warn(
"Unable to remove failure marking file {0}: {1}".format(fail_mark, str(exc))
)
def clear_failure(self, spec: "spack.spec.Spec", force: bool = False) -> None:
"""
Remove any persistent and cached failure tracking for the spec.
see `mark_failed()`.
Args:
spec: the spec whose failure indicators are being removed
force: True if the failure information should be cleared when a prefix failure
lock exists for the file, or False if the failure should not be cleared (e.g.,
it may be associated with a concurrent build)
"""
failure_locked = self.prefix_failure_locked(spec)
if failure_locked and not force:
tty.msg("Retaining failure marking for {0} due to lock".format(spec.name))
return
if failure_locked:
tty.warn("Removing failure marking despite lock for {0}".format(spec.name))
lock = self._prefix_failures.pop(spec.prefix, None)
if lock:
lock.release_write()
if self.prefix_failure_marked(spec):
try:
path = self._failed_spec_path(spec)
tty.debug("Removing failure marking for {0}".format(spec.name))
os.remove(path)
except OSError as err:
tty.warn(
"Unable to remove failure marking for {0} ({1}): {2}".format(
spec.name, path, str(err)
)
)
def mark_failed(self, spec: "spack.spec.Spec") -> lk.Lock:
"""
Mark a spec as failing to install.
Prefix failure marking takes the form of a byte range lock on the nth
byte of a file for coordinating between concurrent parallel build
processes and a persistent file, named with the full hash and
containing the spec, in a subdirectory of the database to enable
persistence across overlapping but separate related build processes.
The failure lock file, ``spack.store.STORE.db.prefix_failures``, lives
alongside the install DB. ``n`` is the sys.maxsize-bit prefix of the
associated DAG hash to make the likelihood of collision very low with
no cleanup required.
"""
# Dump the spec to the failure file for (manual) debugging purposes
path = self._failed_spec_path(spec)
with open(path, "w") as f:
spec.to_json(f)
# Also ensure a failure lock is taken to prevent cleanup removal
# of failure status information during a concurrent parallel build.
err = "Unable to mark {0.name} as failed."
prefix = spec.prefix
if prefix not in self._prefix_failures:
mark = lk.Lock(
self.prefix_fail_path,
start=spec.dag_hash_bit_prefix(bit_length(sys.maxsize)),
length=1,
default_timeout=self.package_lock_timeout,
desc=spec.name,
)
try:
mark.acquire_write()
except lk.LockTimeoutError:
# Unlikely that another process failed to install at the same
# time but log it anyway.
tty.debug(
"PID {0} failed to mark install failure for {1}".format(os.getpid(), spec.name)
)
tty.warn(err.format(spec))
# Whether we or another process marked it as a failure, track it
# as such locally.
self._prefix_failures[prefix] = mark
return self._prefix_failures[prefix]
def prefix_failed(self, spec: "spack.spec.Spec") -> bool:
"""Return True if the prefix (installation) is marked as failed."""
# The failure was detected in this process.
if spec.prefix in self._prefix_failures:
return True
# The failure was detected by a concurrent process (e.g., an srun),
# which is expected to be holding a write lock if that is the case.
if self.prefix_failure_locked(spec):
return True
# Determine if the spec may have been marked as failed by a separate
# spack build process running concurrently.
return self.prefix_failure_marked(spec)
def prefix_failure_locked(self, spec: "spack.spec.Spec") -> bool:
"""Return True if a process has a failure lock on the spec."""
check = lk.Lock(
self.prefix_fail_path,
start=spec.dag_hash_bit_prefix(bit_length(sys.maxsize)),
length=1,
default_timeout=self.package_lock_timeout,
desc=spec.name,
)
return check.is_write_locked()
def prefix_failure_marked(self, spec: "spack.spec.Spec") -> bool:
"""Determine if the spec has a persistent failure marking."""
return os.path.exists(self._failed_spec_path(spec))
def prefix_lock(self, spec: "spack.spec.Spec", timeout: Optional[float] = None) -> lk.Lock:
"""Get a lock on a particular spec's installation directory.
NOTE: The installation directory **does not** need to exist.
Prefix lock is a byte range lock on the nth byte of a file.
The lock file is ``spack.store.STORE.db.prefix_lock`` -- the DB
tells us what to call it and it lives alongside the install DB.
n is the sys.maxsize-bit prefix of the DAG hash. This makes
likelihood of collision is very low AND it gives us
readers-writer lock semantics with just a single lockfile, so no
cleanup required.
"""
timeout = timeout or self.package_lock_timeout
prefix = spec.prefix
if prefix not in self._prefix_locks:
self._prefix_locks[prefix] = lk.Lock(
self.prefix_lock_path,
start=spec.dag_hash_bit_prefix(bit_length(sys.maxsize)),
length=1,
default_timeout=timeout,
desc=spec.name,
)
elif timeout != self._prefix_locks[prefix].default_timeout:
self._prefix_locks[prefix].default_timeout = timeout
return self._prefix_locks[prefix]
@contextlib.contextmanager
def prefix_read_lock(self, spec):
prefix_lock = self.prefix_lock(spec)
prefix_lock.acquire_read()
try:
yield self
except lk.LockError:
# This addresses the case where a nested lock attempt fails inside
# of this context manager
raise
except (Exception, KeyboardInterrupt):
prefix_lock.release_read()
raise
else:
prefix_lock.release_read()
@contextlib.contextmanager
def prefix_write_lock(self, spec):
prefix_lock = self.prefix_lock(spec)
prefix_lock.acquire_write()
try:
yield self
except lk.LockError:
# This addresses the case where a nested lock attempt fails inside
# of this context manager
raise
except (Exception, KeyboardInterrupt):
prefix_lock.release_write()
raise
else:
prefix_lock.release_write()
def _write_to_file(self, stream):
"""Write out the database in JSON format to the stream passed
as argument.

View file

@ -519,13 +519,6 @@ def _try_install_from_binary_cache(
)
def clear_failures() -> None:
"""
Remove all failure tracking markers for the Spack instance.
"""
spack.store.STORE.db.clear_all_failures()
def combine_phase_logs(phase_log_files: List[str], log_path: str) -> None:
"""
Read set or list of logs and combine them into one file.
@ -1126,15 +1119,13 @@ class PackageInstaller:
instance.
"""
def __init__(self, installs: List[Tuple["spack.package_base.PackageBase", dict]] = []):
def __init__(self, installs: List[Tuple["spack.package_base.PackageBase", dict]] = []) -> None:
"""Initialize the installer.
Args:
installs (list): list of tuples, where each
tuple consists of a package (PackageBase) and its associated
install arguments (dict)
Return:
PackageInstaller: instance
"""
# List of build requests
self.build_requests = [BuildRequest(pkg, install_args) for pkg, install_args in installs]
@ -1287,7 +1278,7 @@ def _check_deps_status(self, request: BuildRequest) -> None:
dep_id = package_id(dep_pkg)
# Check for failure since a prefix lock is not required
if spack.store.STORE.db.prefix_failed(dep):
if spack.store.STORE.failure_tracker.has_failed(dep):
action = "'spack install' the dependency"
msg = "{0} is marked as an install failure: {1}".format(dep_id, action)
raise InstallError(err.format(request.pkg_id, msg), pkg=dep_pkg)
@ -1502,7 +1493,7 @@ def _ensure_locked(
if lock is None:
tty.debug(msg.format("Acquiring", desc, pkg_id, pretty_seconds(timeout or 0)))
op = "acquire"
lock = spack.store.STORE.db.prefix_lock(pkg.spec, timeout)
lock = spack.store.STORE.prefix_locker.lock(pkg.spec, timeout)
if timeout != lock.default_timeout:
tty.warn(
"Expected prefix lock timeout {0}, not {1}".format(
@ -1627,12 +1618,12 @@ def _add_tasks(self, request: BuildRequest, all_deps):
# Clear any persistent failure markings _unless_ they are
# associated with another process in this parallel build
# of the spec.
spack.store.STORE.db.clear_failure(dep, force=False)
spack.store.STORE.failure_tracker.clear(dep, force=False)
install_package = request.install_args.get("install_package")
if install_package and request.pkg_id not in self.build_tasks:
# Be sure to clear any previous failure
spack.store.STORE.db.clear_failure(request.spec, force=True)
spack.store.STORE.failure_tracker.clear(request.spec, force=True)
# If not installing dependencies, then determine their
# installation status before proceeding
@ -1888,7 +1879,7 @@ def _update_failed(
err = "" if exc is None else ": {0}".format(str(exc))
tty.debug("Flagging {0} as failed{1}".format(pkg_id, err))
if mark:
self.failed[pkg_id] = spack.store.STORE.db.mark_failed(task.pkg.spec)
self.failed[pkg_id] = spack.store.STORE.failure_tracker.mark(task.pkg.spec)
else:
self.failed[pkg_id] = None
task.status = STATUS_FAILED
@ -2074,7 +2065,7 @@ def install(self) -> None:
# Flag a failed spec. Do not need an (install) prefix lock since
# assume using a separate (failed) prefix lock file.
if pkg_id in self.failed or spack.store.STORE.db.prefix_failed(spec):
if pkg_id in self.failed or spack.store.STORE.failure_tracker.has_failed(spec):
term_status.clear()
tty.warn("{0} failed to install".format(pkg_id))
self._update_failed(task)

View file

@ -2209,7 +2209,7 @@ def uninstall_by_spec(spec, force=False, deprecator=None):
pkg = None
# Pre-uninstall hook runs first.
with spack.store.STORE.db.prefix_write_lock(spec):
with spack.store.STORE.prefix_locker.write_lock(spec):
if pkg is not None:
try:
spack.hooks.pre_uninstall(spec)

View file

@ -326,7 +326,7 @@ def __init__(
self.keep = keep
# File lock for the stage directory. We use one file for all
# stage locks. See spack.database.Database.prefix_lock for
# stage locks. See spack.database.Database.prefix_locker.lock for
# details on this approach.
self._lock = None
if lock:

View file

@ -25,13 +25,14 @@
from typing import Any, Callable, Dict, Generator, List, Optional, Union
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util import tty
import spack.config
import spack.database
import spack.directory_layout
import spack.error
import spack.paths
import spack.spec
import spack.util.path
#: default installation root, relative to the Spack install path
@ -134,18 +135,21 @@ def parse_install_tree(config_dict):
class Store:
"""A store is a path full of installed Spack packages.
Stores consist of packages installed according to a
``DirectoryLayout``, along with an index, or _database_ of their
contents. The directory layout controls what paths look like and how
Spack ensures that each unique spec gets its own unique directory (or
not, though we don't recommend that). The database is a single file
that caches metadata for the entire Spack installation. It prevents
us from having to spider the install tree to figure out what's there.
Stores consist of packages installed according to a ``DirectoryLayout``, along with a database
of their contents.
The directory layout controls what paths look like and how Spack ensures that each unique spec
gets its own unique directory (or not, though we don't recommend that).
The database is a single file that caches metadata for the entire Spack installation. It
prevents us from having to spider the install tree to figure out what's there.
The store is also able to lock installation prefixes, and to mark installation failures.
Args:
root: path to the root of the install tree
unpadded_root: path to the root of the install tree without padding.
The sbang script has to be installed here to work with padded roots
unpadded_root: path to the root of the install tree without padding. The sbang script has
to be installed here to work with padded roots
projections: expression according to guidelines that describes how to construct a path to
a package prefix in this store
hash_length: length of the hashes used in the directory layout. Spec hash suffixes will be
@ -170,6 +174,19 @@ def __init__(
self.upstreams = upstreams
self.lock_cfg = lock_cfg
self.db = spack.database.Database(root, upstream_dbs=upstreams, lock_cfg=lock_cfg)
timeout_format_str = (
f"{str(lock_cfg.package_timeout)}s" if lock_cfg.package_timeout else "No timeout"
)
tty.debug("PACKAGE LOCK TIMEOUT: {0}".format(str(timeout_format_str)))
self.prefix_locker = spack.database.SpecLocker(
spack.database.prefix_lock_path(root), default_timeout=lock_cfg.package_timeout
)
self.failure_tracker = spack.database.FailureTracker(
self.root, default_timeout=lock_cfg.package_timeout
)
self.layout = spack.directory_layout.DirectoryLayout(
root, projections=projections, hash_length=hash_length
)

View file

@ -10,9 +10,11 @@
import llnl.util.filesystem as fs
import spack.caches
import spack.cmd.clean
import spack.main
import spack.package_base
import spack.stage
import spack.store
clean = spack.main.SpackCommand("clean")
@ -33,7 +35,7 @@ def __call__(self, *args, **kwargs):
monkeypatch.setattr(spack.stage, "purge", Counter("stages"))
monkeypatch.setattr(spack.caches.fetch_cache, "destroy", Counter("downloads"), raising=False)
monkeypatch.setattr(spack.caches.misc_cache, "destroy", Counter("caches"))
monkeypatch.setattr(spack.installer, "clear_failures", Counter("failures"))
monkeypatch.setattr(spack.store.STORE.failure_tracker, "clear_all", Counter("failures"))
monkeypatch.setattr(spack.cmd.clean, "remove_python_cache", Counter("python_cache"))
yield counts

View file

@ -23,6 +23,7 @@
import spack.environment as ev
import spack.hash_types as ht
import spack.package_base
import spack.store
import spack.util.executable
from spack.error import SpackError
from spack.main import SpackCommand
@ -705,9 +706,11 @@ def test_cache_only_fails(tmpdir, mock_fetch, install_mockery, capfd):
assert "was not installed" in out
# Check that failure prefix locks are still cached
failure_lock_prefixes = ",".join(spack.store.STORE.db._prefix_failures.keys())
assert "libelf" in failure_lock_prefixes
assert "libdwarf" in failure_lock_prefixes
failed_packages = [
pkg_name for dag_hash, pkg_name in spack.store.STORE.failure_tracker.locker.locks.keys()
]
assert "libelf" in failed_packages
assert "libdwarf" in failed_packages
def test_install_only_dependencies(tmpdir, mock_fetch, install_mockery):

View file

@ -950,21 +950,14 @@ def disable_compiler_execution(monkeypatch, request):
@pytest.fixture(scope="function")
def install_mockery(temporary_store, mutable_config, mock_packages):
def install_mockery(temporary_store: spack.store.Store, mutable_config, mock_packages):
"""Hooks a fake install directory, DB, and stage directory into Spack."""
# We use a fake package, so temporarily disable checksumming
with spack.config.override("config:checksum", False):
yield
# Also wipe out any cached prefix failure locks (associated with
# the session-scoped mock archive).
for pkg_id in list(temporary_store.db._prefix_failures.keys()):
lock = spack.store.STORE.db._prefix_failures.pop(pkg_id, None)
if lock:
try:
lock.release_write()
except Exception:
pass
# Wipe out any cached prefix failure locks (associated with the session-scoped mock archive)
temporary_store.failure_tracker.clear_all()
@pytest.fixture(scope="function")

View file

@ -807,22 +807,22 @@ def test_query_spec_with_non_conditional_virtual_dependency(database):
def test_failed_spec_path_error(database):
"""Ensure spec not concrete check is covered."""
s = spack.spec.Spec("a")
with pytest.raises(ValueError, match="Concrete spec required"):
spack.store.STORE.db._failed_spec_path(s)
with pytest.raises(AssertionError, match="concrete spec required"):
spack.store.STORE.failure_tracker.mark(s)
@pytest.mark.db
def test_clear_failure_keep(mutable_database, monkeypatch, capfd):
"""Add test coverage for clear_failure operation when to be retained."""
def _is(db, spec):
def _is(self, spec):
return True
# Pretend the spec has been failure locked
monkeypatch.setattr(spack.database.Database, "prefix_failure_locked", _is)
monkeypatch.setattr(spack.database.FailureTracker, "lock_taken", _is)
s = spack.spec.Spec("a")
spack.store.STORE.db.clear_failure(s)
s = spack.spec.Spec("a").concretized()
spack.store.STORE.failure_tracker.clear(s)
out = capfd.readouterr()[0]
assert "Retaining failure marking" in out
@ -831,16 +831,16 @@ def _is(db, spec):
def test_clear_failure_forced(default_mock_concretization, mutable_database, monkeypatch, capfd):
"""Add test coverage for clear_failure operation when force."""
def _is(db, spec):
def _is(self, spec):
return True
# Pretend the spec has been failure locked
monkeypatch.setattr(spack.database.Database, "prefix_failure_locked", _is)
monkeypatch.setattr(spack.database.FailureTracker, "lock_taken", _is)
# Ensure raise OSError when try to remove the non-existent marking
monkeypatch.setattr(spack.database.Database, "prefix_failure_marked", _is)
monkeypatch.setattr(spack.database.FailureTracker, "persistent_mark", _is)
s = default_mock_concretization("a")
spack.store.STORE.db.clear_failure(s, force=True)
spack.store.STORE.failure_tracker.clear(s, force=True)
out = capfd.readouterr()[1]
assert "Removing failure marking despite lock" in out
assert "Unable to remove failure marking" in out
@ -858,55 +858,34 @@ def _raise_exc(lock):
with tmpdir.as_cwd():
s = default_mock_concretization("a")
spack.store.STORE.db.mark_failed(s)
spack.store.STORE.failure_tracker.mark(s)
out = str(capsys.readouterr()[1])
assert "Unable to mark a as failed" in out
# Clean up the failure mark to ensure it does not interfere with other
# tests using the same spec.
del spack.store.STORE.db._prefix_failures[s.prefix]
spack.store.STORE.failure_tracker.clear_all()
@pytest.mark.db
def test_prefix_failed(default_mock_concretization, mutable_database, monkeypatch):
"""Add coverage to prefix_failed operation."""
def _is(db, spec):
return True
"""Add coverage to failed operation."""
s = default_mock_concretization("a")
# Confirm the spec is not already marked as failed
assert not spack.store.STORE.db.prefix_failed(s)
assert not spack.store.STORE.failure_tracker.has_failed(s)
# Check that a failure entry is sufficient
spack.store.STORE.db._prefix_failures[s.prefix] = None
assert spack.store.STORE.db.prefix_failed(s)
spack.store.STORE.failure_tracker.mark(s)
assert spack.store.STORE.failure_tracker.has_failed(s)
# Remove the entry and check again
del spack.store.STORE.db._prefix_failures[s.prefix]
assert not spack.store.STORE.db.prefix_failed(s)
spack.store.STORE.failure_tracker.clear(s)
assert not spack.store.STORE.failure_tracker.has_failed(s)
# Now pretend that the prefix failure is locked
monkeypatch.setattr(spack.database.Database, "prefix_failure_locked", _is)
assert spack.store.STORE.db.prefix_failed(s)
def test_prefix_read_lock_error(default_mock_concretization, mutable_database, monkeypatch):
"""Cover the prefix read lock exception."""
def _raise(db, spec):
raise lk.LockError("Mock lock error")
s = default_mock_concretization("a")
# Ensure subsequent lock operations fail
monkeypatch.setattr(lk.Lock, "acquire_read", _raise)
with pytest.raises(Exception):
with spack.store.STORE.db.prefix_read_lock(s):
assert False
monkeypatch.setattr(spack.database.FailureTracker, "lock_taken", lambda self, spec: True)
assert spack.store.STORE.failure_tracker.has_failed(s)
def test_prefix_write_lock_error(default_mock_concretization, mutable_database, monkeypatch):
@ -921,7 +900,7 @@ def _raise(db, spec):
monkeypatch.setattr(lk.Lock, "acquire_write", _raise)
with pytest.raises(Exception):
with spack.store.STORE.db.prefix_write_lock(s):
with spack.store.STORE.prefix_locker.write_lock(s):
assert False

View file

@ -159,7 +159,7 @@ def test_partial_install_delete_prefix_and_stage(install_mockery, mock_fetch, wo
s.package.remove_prefix = rm_prefix_checker.remove_prefix
# must clear failure markings for the package before re-installing it
spack.store.STORE.db.clear_failure(s, True)
spack.store.STORE.failure_tracker.clear(s, True)
s.package.set_install_succeed()
s.package.stage = MockStage(s.package.stage)
@ -354,7 +354,7 @@ def test_partial_install_keep_prefix(install_mockery, mock_fetch, monkeypatch, w
assert os.path.exists(s.package.prefix)
# must clear failure markings for the package before re-installing it
spack.store.STORE.db.clear_failure(s, True)
spack.store.STORE.failure_tracker.clear(s, True)
s.package.set_install_succeed()
s.package.stage = MockStage(s.package.stage)

View file

@ -19,6 +19,7 @@
import spack.compilers
import spack.concretize
import spack.config
import spack.database
import spack.installer as inst
import spack.package_base
import spack.package_prefs as prefs
@ -364,7 +365,7 @@ def test_ensure_locked_err(install_mockery, monkeypatch, tmpdir, capsys):
"""Test _ensure_locked when a non-lock exception is raised."""
mock_err_msg = "Mock exception error"
def _raise(lock, timeout):
def _raise(lock, timeout=None):
raise RuntimeError(mock_err_msg)
const_arg = installer_args(["trivial-install-test-package"], {})
@ -432,7 +433,7 @@ def test_ensure_locked_new_lock(install_mockery, tmpdir, lock_type, reads, write
def test_ensure_locked_new_warn(install_mockery, monkeypatch, tmpdir, capsys):
orig_pl = spack.database.Database.prefix_lock
orig_pl = spack.database.SpecLocker.lock
def _pl(db, spec, timeout):
lock = orig_pl(db, spec, timeout)
@ -444,7 +445,7 @@ def _pl(db, spec, timeout):
installer = create_installer(const_arg)
spec = installer.build_requests[0].pkg.spec
monkeypatch.setattr(spack.database.Database, "prefix_lock", _pl)
monkeypatch.setattr(spack.database.SpecLocker, "lock", _pl)
lock_type = "read"
ltype, lock = installer._ensure_locked(lock_type, spec.package)
@ -597,59 +598,50 @@ def _repoerr(repo, name):
assert "Couldn't copy in provenance for cmake" in out
def test_clear_failures_success(install_mockery):
def test_clear_failures_success(tmpdir):
"""Test the clear_failures happy path."""
failures = spack.database.FailureTracker(str(tmpdir), default_timeout=0.1)
spec = spack.spec.Spec("a")
spec._mark_concrete()
# Set up a test prefix failure lock
lock = lk.Lock(
spack.store.STORE.db.prefix_fail_path, start=1, length=1, default_timeout=1e-9, desc="test"
)
try:
lock.acquire_write()
except lk.LockTimeoutError:
tty.warn("Failed to write lock the test install failure")
spack.store.STORE.db._prefix_failures["test"] = lock
# Set up a fake failure mark (or file)
fs.touch(os.path.join(spack.store.STORE.db._failure_dir, "test"))
failures.mark(spec)
assert failures.has_failed(spec)
# Now clear failure tracking
inst.clear_failures()
failures.clear_all()
# Ensure there are no cached failure locks or failure marks
assert len(spack.store.STORE.db._prefix_failures) == 0
assert len(os.listdir(spack.store.STORE.db._failure_dir)) == 0
assert len(failures.locker.locks) == 0
assert len(os.listdir(failures.dir)) == 0
# Ensure the core directory and failure lock file still exist
assert os.path.isdir(spack.store.STORE.db._failure_dir)
assert os.path.isdir(failures.dir)
# Locks on windows are a no-op
if sys.platform != "win32":
assert os.path.isfile(spack.store.STORE.db.prefix_fail_path)
assert os.path.isfile(failures.locker.lock_path)
def test_clear_failures_errs(install_mockery, monkeypatch, capsys):
@pytest.mark.xfail(sys.platform == "win32", reason="chmod does not prevent removal on Win")
def test_clear_failures_errs(tmpdir, capsys):
"""Test the clear_failures exception paths."""
orig_fn = os.remove
err_msg = "Mock os remove"
failures = spack.database.FailureTracker(str(tmpdir), default_timeout=0.1)
spec = spack.spec.Spec("a")
spec._mark_concrete()
failures.mark(spec)
def _raise_except(path):
raise OSError(err_msg)
# Set up a fake failure mark (or file)
fs.touch(os.path.join(spack.store.STORE.db._failure_dir, "test"))
monkeypatch.setattr(os, "remove", _raise_except)
# Make the file marker not writeable, so that clearing_failures fails
failures.dir.chmod(0o000)
# Clear failure tracking
inst.clear_failures()
failures.clear_all()
# Ensure expected warning generated
out = str(capsys.readouterr()[1])
assert "Unable to remove failure" in out
assert err_msg in out
# Restore remove for teardown
monkeypatch.setattr(os, "remove", orig_fn)
failures.dir.chmod(0o750)
def test_combine_phase_logs(tmpdir):
@ -694,14 +686,18 @@ def test_combine_phase_logs_does_not_care_about_encoding(tmpdir):
assert f.read() == data * 2
def test_check_deps_status_install_failure(install_mockery, monkeypatch):
def test_check_deps_status_install_failure(install_mockery):
"""Tests that checking the dependency status on a request to install
'a' fails, if we mark the dependency as failed.
"""
s = spack.spec.Spec("a").concretized()
for dep in s.traverse(root=False):
spack.store.STORE.failure_tracker.mark(dep)
const_arg = installer_args(["a"], {})
installer = create_installer(const_arg)
request = installer.build_requests[0]
# Make sure the package is identified as failed
monkeypatch.setattr(spack.database.Database, "prefix_failed", _true)
with pytest.raises(inst.InstallError, match="install failure"):
installer._check_deps_status(request)
@ -1006,7 +1002,7 @@ def test_install_failed(install_mockery, monkeypatch, capsys):
installer = create_installer(const_arg)
# Make sure the package is identified as failed
monkeypatch.setattr(spack.database.Database, "prefix_failed", _true)
monkeypatch.setattr(spack.database.FailureTracker, "has_failed", _true)
with pytest.raises(inst.InstallError, match="request failed"):
installer.install()
@ -1022,7 +1018,7 @@ def test_install_failed_not_fast(install_mockery, monkeypatch, capsys):
installer = create_installer(const_arg)
# Make sure the package is identified as failed
monkeypatch.setattr(spack.database.Database, "prefix_failed", _true)
monkeypatch.setattr(spack.database.FailureTracker, "has_failed", _true)
with pytest.raises(inst.InstallError, match="request failed"):
installer.install()
@ -1121,7 +1117,7 @@ def test_install_fail_fast_on_detect(install_mockery, monkeypatch, capsys):
#
# This will prevent b from installing, which will cause the build of a
# to be skipped.
monkeypatch.setattr(spack.database.Database, "prefix_failed", _true)
monkeypatch.setattr(spack.database.FailureTracker, "has_failed", _true)
with pytest.raises(inst.InstallError, match="after first install failure"):
installer.install()