diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py index 2b9d2dfbf2..a533c57176 100644 --- a/lib/spack/llnl/util/lock.py +++ b/lib/spack/llnl/util/lock.py @@ -9,9 +9,10 @@ import sys import time from datetime import datetime +from types import TracebackType +from typing import IO, Any, Callable, ContextManager, Dict, Generator, Optional, Tuple, Type, Union -import llnl.util.tty as tty -from llnl.util.lang import pretty_seconds +from llnl.util import lang, tty import spack.util.string @@ -34,9 +35,12 @@ ] -#: A useful replacement for functions that should return True when not provided -#: for example. -true_fn = lambda: True +ReleaseFnType = Optional[Callable[[], bool]] + + +def true_fn() -> bool: + """A function that always returns True.""" + return True class OpenFile: @@ -48,7 +52,7 @@ class OpenFile: file descriptors as well in the future. """ - def __init__(self, fh): + def __init__(self, fh: IO) -> None: self.fh = fh self.refs = 0 @@ -78,11 +82,11 @@ class OpenFileTracker: work in Python and assume the GIL. """ - def __init__(self): + def __init__(self) -> None: """Create a new ``OpenFileTracker``.""" - self._descriptors = {} + self._descriptors: Dict[Any, OpenFile] = {} - def get_fh(self, path): + def get_fh(self, path: str) -> IO: """Get a filehandle for a lockfile. This routine will open writable files for read/write even if you're asking @@ -90,7 +94,7 @@ def get_fh(self, path): (write) lock later if requested. Arguments: - path (str): path to lock file we want a filehandle for + path: path to lock file we want a filehandle for """ # Open writable files as 'r+' so we can upgrade to write later os_mode, fh_mode = (os.O_RDWR | os.O_CREAT), "r+" @@ -157,7 +161,7 @@ def purge(self): #: Open file descriptors for locks in this process. Used to prevent one process #: from opening the sam file many times for different byte range locks -file_tracker = OpenFileTracker() +FILE_TRACKER = OpenFileTracker() def _attempts_str(wait_time, nattempts): @@ -166,7 +170,7 @@ def _attempts_str(wait_time, nattempts): return "" attempts = spack.util.string.plural(nattempts, "attempt") - return " after {} and {}".format(pretty_seconds(wait_time), attempts) + return " after {} and {}".format(lang.pretty_seconds(wait_time), attempts) class LockType: @@ -188,7 +192,7 @@ def to_module(tid): return lock @staticmethod - def is_valid(op): + def is_valid(op: int) -> bool: return op == LockType.READ or op == LockType.WRITE @@ -207,7 +211,15 @@ class Lock: overlapping byte ranges in the same file). """ - def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, desc=""): + def __init__( + self, + path: str, + start: int = 0, + length: int = 0, + default_timeout: Optional[float] = None, + debug: bool = False, + desc: str = "", + ) -> None: """Construct a new lock on the file at ``path``. By default, the lock applies to the whole file. Optionally, @@ -220,17 +232,17 @@ def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, d beginning of the file. Args: - path (str): path to the lock - start (int): optional byte offset at which the lock starts - length (int): optional number of bytes to lock - default_timeout (int): number of seconds to wait for lock attempts, + path: path to the lock + start: optional byte offset at which the lock starts + length: optional number of bytes to lock + default_timeout: seconds to wait for lock attempts, where None means to wait indefinitely - debug (bool): debug mode specific to locking - desc (str): optional debug message lock description, which is + debug: debug mode specific to locking + desc: optional debug message lock description, which is helpful for distinguishing between different Spack locks. """ self.path = path - self._file = None + self._file: Optional[IO] = None self._reads = 0 self._writes = 0 @@ -242,7 +254,7 @@ def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, d self.debug = debug # optional debug description - self.desc = " ({0})".format(desc) if desc else "" + self.desc = f" ({desc})" if desc else "" # If the user doesn't set a default timeout, or if they choose # None, 0, etc. then lock attempts will not time out (unless the @@ -250,11 +262,15 @@ def __init__(self, path, start=0, length=0, default_timeout=None, debug=False, d self.default_timeout = default_timeout or None # PID and host of lock holder (only used in debug mode) - self.pid = self.old_pid = None - self.host = self.old_host = None + self.pid: Optional[int] = None + self.old_pid: Optional[int] = None + self.host: Optional[str] = None + self.old_host: Optional[str] = None @staticmethod - def _poll_interval_generator(_wait_times=None): + def _poll_interval_generator( + _wait_times: Optional[Tuple[float, float, float]] = None + ) -> Generator[float, None, None]: """This implements a backoff scheme for polling a contended resource by suggesting a succession of wait times between polls. @@ -277,21 +293,21 @@ def _poll_interval_generator(_wait_times=None): num_requests += 1 yield wait_time - def __repr__(self): + def __repr__(self) -> str: """Formal representation of the lock.""" rep = "{0}(".format(self.__class__.__name__) for attr, value in self.__dict__.items(): rep += "{0}={1}, ".format(attr, value.__repr__()) return "{0})".format(rep.strip(", ")) - def __str__(self): + def __str__(self) -> str: """Readable string (with key fields) of the lock.""" location = "{0}[{1}:{2}]".format(self.path, self._start, self._length) timeout = "timeout={0}".format(self.default_timeout) activity = "#reads={0}, #writes={1}".format(self._reads, self._writes) return "({0}, {1}, {2})".format(location, timeout, activity) - def _lock(self, op, timeout=None): + def _lock(self, op: int, timeout: Optional[float] = None) -> Tuple[float, int]: """This takes a lock using POSIX locks (``fcntl.lockf``). The lock is implemented as a spin lock using a nonblocking call @@ -310,7 +326,7 @@ def _lock(self, op, timeout=None): # Create file and parent directories if they don't exist. if self._file is None: self._ensure_parent_directory() - self._file = file_tracker.get_fh(self.path) + self._file = FILE_TRACKER.get_fh(self.path) if LockType.to_module(op) == fcntl.LOCK_EX and self._file.mode == "r": # Attempt to upgrade to write lock w/a read-only file. @@ -319,7 +335,7 @@ def _lock(self, op, timeout=None): self._log_debug( "{} locking [{}:{}]: timeout {}".format( - op_str.lower(), self._start, self._length, pretty_seconds(timeout or 0) + op_str.lower(), self._start, self._length, lang.pretty_seconds(timeout or 0) ) ) @@ -343,15 +359,20 @@ def _lock(self, op, timeout=None): total_wait_time = time.time() - start_time raise LockTimeoutError(op_str.lower(), self.path, total_wait_time, num_attempts) - def _poll_lock(self, op): + def _poll_lock(self, op: int) -> bool: """Attempt to acquire the lock in a non-blocking manner. Return whether the locking attempt succeeds """ + assert self._file is not None, "cannot poll a lock without the file being set" module_op = LockType.to_module(op) try: # Try to get the lock (will raise if not available.) fcntl.lockf( - self._file, module_op | fcntl.LOCK_NB, self._length, self._start, os.SEEK_SET + self._file.fileno(), + module_op | fcntl.LOCK_NB, + self._length, + self._start, + os.SEEK_SET, ) # help for debugging distributed locking @@ -377,7 +398,7 @@ def _poll_lock(self, op): return False - def _ensure_parent_directory(self): + def _ensure_parent_directory(self) -> str: parent = os.path.dirname(self.path) # relative paths to lockfiles in the current directory have no parent @@ -396,20 +417,22 @@ def _ensure_parent_directory(self): raise return parent - def _read_log_debug_data(self): + def _read_log_debug_data(self) -> None: """Read PID and host data out of the file if it is there.""" + assert self._file is not None, "cannot read debug log without the file being set" self.old_pid = self.pid self.old_host = self.host line = self._file.read() if line: pid, host = line.strip().split(",") - _, _, self.pid = pid.rpartition("=") + _, _, pid = pid.rpartition("=") _, _, self.host = host.rpartition("=") - self.pid = int(self.pid) + self.pid = int(pid) - def _write_log_debug_data(self): + def _write_log_debug_data(self) -> None: """Write PID and host data to the file, recording old values.""" + assert self._file is not None, "cannot write debug log without the file being set" self.old_pid = self.pid self.old_host = self.host @@ -423,20 +446,21 @@ def _write_log_debug_data(self): self._file.flush() os.fsync(self._file.fileno()) - def _unlock(self): + def _unlock(self) -> None: """Releases a lock using POSIX locks (``fcntl.lockf``) Releases the lock regardless of mode. Note that read locks may be masquerading as write locks, but this removes either. """ - fcntl.lockf(self._file, fcntl.LOCK_UN, self._length, self._start, os.SEEK_SET) - file_tracker.release_by_fh(self._file) + assert self._file is not None, "cannot unlock without the file being set" + fcntl.lockf(self._file.fileno(), fcntl.LOCK_UN, self._length, self._start, os.SEEK_SET) + FILE_TRACKER.release_by_fh(self._file) self._file = None self._reads = 0 self._writes = 0 - def acquire_read(self, timeout=None): + def acquire_read(self, timeout: Optional[float] = None) -> bool: """Acquires a recursive, shared lock for reading. Read and write locks can be acquired and released in arbitrary @@ -461,7 +485,7 @@ def acquire_read(self, timeout=None): self._reads += 1 return False - def acquire_write(self, timeout=None): + def acquire_write(self, timeout: Optional[float] = None) -> bool: """Acquires a recursive, exclusive lock for writing. Read and write locks can be acquired and released in arbitrary @@ -491,7 +515,7 @@ def acquire_write(self, timeout=None): self._writes += 1 return False - def is_write_locked(self): + def is_write_locked(self) -> bool: """Check if the file is write locked Return: @@ -508,7 +532,7 @@ def is_write_locked(self): return False - def downgrade_write_to_read(self, timeout=None): + def downgrade_write_to_read(self, timeout: Optional[float] = None) -> None: """ Downgrade from an exclusive write lock to a shared read. @@ -527,7 +551,7 @@ def downgrade_write_to_read(self, timeout=None): else: raise LockDowngradeError(self.path) - def upgrade_read_to_write(self, timeout=None): + def upgrade_read_to_write(self, timeout: Optional[float] = None) -> None: """ Attempts to upgrade from a shared read lock to an exclusive write. @@ -546,7 +570,7 @@ def upgrade_read_to_write(self, timeout=None): else: raise LockUpgradeError(self.path) - def release_read(self, release_fn=None): + def release_read(self, release_fn: ReleaseFnType = None) -> bool: """Releases a read lock. Arguments: @@ -582,7 +606,7 @@ def release_read(self, release_fn=None): self._reads -= 1 return False - def release_write(self, release_fn=None): + def release_write(self, release_fn: ReleaseFnType = None) -> bool: """Releases a write lock. Arguments: @@ -623,58 +647,58 @@ def release_write(self, release_fn=None): else: return False - def cleanup(self): + def cleanup(self) -> None: if self._reads == 0 and self._writes == 0: os.unlink(self.path) else: raise LockError("Attempting to cleanup active lock.") - def _get_counts_desc(self): + def _get_counts_desc(self) -> str: return ( "(reads {0}, writes {1})".format(self._reads, self._writes) if tty.is_verbose() else "" ) - def _log_acquired(self, locktype, wait_time, nattempts): + def _log_acquired(self, locktype, wait_time, nattempts) -> None: attempts_part = _attempts_str(wait_time, nattempts) now = datetime.now() desc = "Acquired at %s" % now.strftime("%H:%M:%S.%f") self._log_debug(self._status_msg(locktype, "{0}{1}".format(desc, attempts_part))) - def _log_acquiring(self, locktype): + def _log_acquiring(self, locktype) -> None: self._log_debug(self._status_msg(locktype, "Acquiring"), level=3) - def _log_debug(self, *args, **kwargs): + def _log_debug(self, *args, **kwargs) -> None: """Output lock debug messages.""" kwargs["level"] = kwargs.get("level", 2) tty.debug(*args, **kwargs) - def _log_downgraded(self, wait_time, nattempts): + def _log_downgraded(self, wait_time, nattempts) -> None: attempts_part = _attempts_str(wait_time, nattempts) now = datetime.now() desc = "Downgraded at %s" % now.strftime("%H:%M:%S.%f") self._log_debug(self._status_msg("READ LOCK", "{0}{1}".format(desc, attempts_part))) - def _log_downgrading(self): + def _log_downgrading(self) -> None: self._log_debug(self._status_msg("WRITE LOCK", "Downgrading"), level=3) - def _log_released(self, locktype): + def _log_released(self, locktype) -> None: now = datetime.now() desc = "Released at %s" % now.strftime("%H:%M:%S.%f") self._log_debug(self._status_msg(locktype, desc)) - def _log_releasing(self, locktype): + def _log_releasing(self, locktype) -> None: self._log_debug(self._status_msg(locktype, "Releasing"), level=3) - def _log_upgraded(self, wait_time, nattempts): + def _log_upgraded(self, wait_time, nattempts) -> None: attempts_part = _attempts_str(wait_time, nattempts) now = datetime.now() desc = "Upgraded at %s" % now.strftime("%H:%M:%S.%f") self._log_debug(self._status_msg("WRITE LOCK", "{0}{1}".format(desc, attempts_part))) - def _log_upgrading(self): + def _log_upgrading(self) -> None: self._log_debug(self._status_msg("READ LOCK", "Upgrading"), level=3) - def _status_msg(self, locktype, status): + def _status_msg(self, locktype: str, status: str) -> str: status_desc = "[{0}] {1}".format(status, self._get_counts_desc()) return "{0}{1.desc}: {1.path}[{1._start}:{1._length}] {2}".format( locktype, self, status_desc @@ -709,7 +733,13 @@ class LockTransaction: """ - def __init__(self, lock, acquire=None, release=None, timeout=None): + def __init__( + self, + lock: Lock, + acquire: Union[ReleaseFnType, ContextManager] = None, + release: Union[ReleaseFnType, ContextManager] = None, + timeout: Optional[float] = None, + ) -> None: self._lock = lock self._timeout = timeout self._acquire_fn = acquire @@ -724,15 +754,20 @@ def __enter__(self): else: return self._as - def __exit__(self, type, value, traceback): + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> bool: suppress = False def release_fn(): if self._release_fn is not None: - return self._release_fn(type, value, traceback) + return self._release_fn(exc_type, exc_value, traceback) if self._as and hasattr(self._as, "__exit__"): - if self._as.__exit__(type, value, traceback): + if self._as.__exit__(exc_type, exc_value, traceback): suppress = True if self._exit(release_fn): @@ -740,6 +775,12 @@ def release_fn(): return suppress + def _enter(self) -> bool: + return NotImplemented + + def _exit(self, release_fn: ReleaseFnType) -> bool: + return NotImplemented + class ReadTransaction(LockTransaction): """LockTransaction context manager that does a read and releases it.""" @@ -785,7 +826,7 @@ def __init__(self, lock_type, path, time, attempts): super().__init__( fmt.format( lock_type, - pretty_seconds(time), + lang.pretty_seconds(time), attempts, "attempt" if attempts == 1 else "attempts", path, diff --git a/lib/spack/spack/test/conftest.py b/lib/spack/spack/test/conftest.py index 2a64d71f4d..6776c7db87 100644 --- a/lib/spack/spack/test/conftest.py +++ b/lib/spack/spack/test/conftest.py @@ -1701,15 +1701,15 @@ def mock_test_stage(mutable_config, tmpdir): @pytest.fixture(autouse=True) def inode_cache(): - llnl.util.lock.file_tracker.purge() + llnl.util.lock.FILE_TRACKER.purge() yield # TODO: it is a bug when the file tracker is non-empty after a test, # since it means a lock was not released, or the inode was not purged # when acquiring the lock failed. So, we could assert that here, but # currently there are too many issues to fix, so look for the more # serious issue of having a closed file descriptor in the cache. - assert not any(f.fh.closed for f in llnl.util.lock.file_tracker._descriptors.values()) - llnl.util.lock.file_tracker.purge() + assert not any(f.fh.closed for f in llnl.util.lock.FILE_TRACKER._descriptors.values()) + llnl.util.lock.FILE_TRACKER.purge() @pytest.fixture(autouse=True) diff --git a/lib/spack/spack/test/llnl/util/lock.py b/lib/spack/spack/test/llnl/util/lock.py index 15129aa9b7..c22288f430 100644 --- a/lib/spack/spack/test/llnl/util/lock.py +++ b/lib/spack/spack/test/llnl/util/lock.py @@ -687,8 +687,8 @@ def test_upgrade_read_to_write_fails_with_readonly_file(private_lock_path): with pytest.raises(lk.LockROFileError): lock.acquire_write() - # TODO: lk.file_tracker does not release private_lock_path - lk.file_tracker.release_by_stat(os.stat(private_lock_path)) + # TODO: lk.FILE_TRACKER does not release private_lock_path + lk.FILE_TRACKER.release_by_stat(os.stat(private_lock_path)) class ComplexAcquireAndRelease: @@ -1345,8 +1345,7 @@ def _lockf(fd, cmd, len, start, whence): with tmpdir.as_cwd(): lockfile = "lockfile" lock = lk.Lock(lockfile) - - touch(lockfile) + lock.acquire_read() monkeypatch.setattr(fcntl, "lockf", _lockf) @@ -1356,6 +1355,9 @@ def _lockf(fd, cmd, len, start, whence): with pytest.raises(IOError, match=err_msg): lock._poll_lock(fcntl.LOCK_EX) + monkeypatch.undo() + lock.release_read() + def test_upgrade_read_okay(tmpdir): """Test the lock read-to-write upgrade operation."""