diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py index 2cab436e2d..55837c371e 100644 --- a/lib/spack/llnl/util/lock.py +++ b/lib/spack/llnl/util/lock.py @@ -127,8 +127,9 @@ def _lock(self, op, timeout=_default_timeout): return - except IOError as error: - if error.errno == errno.EAGAIN or error.errno == errno.EACCES: + except IOError as e: + if e.errno in (errno.EAGAIN, errno.EACCES): + # EAGAIN and EACCES == locked by another process pass else: raise @@ -197,6 +198,8 @@ def acquire_read(self, timeout=_default_timeout): tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' .format(self)) self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError. + tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquired]' + .format(self)) self._reads += 1 return True else: @@ -219,6 +222,8 @@ def acquire_write(self, timeout=_default_timeout): 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' .format(self)) self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError. + tty.debug('WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquired]' + .format(self)) self._writes += 1 return True else: diff --git a/lib/spack/spack/test/lock.py b/lib/spack/spack/test/lock.py index 7cf4571016..347b72b575 100644 --- a/lib/spack/spack/test/lock.py +++ b/lib/spack/spack/test/lock.py @@ -22,37 +22,178 @@ # License along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ############################################################################## -""" -These tests ensure that our lock works correctly. +"""These tests ensure that our lock works correctly. + +This can be run in two ways. + +First, it can be run as a node-local test, with a typical invocation like +this:: + + spack test lock + +You can *also* run it as an MPI program, which allows you to test locks +across nodes. So, e.g., you can run the test like this:: + + mpirun -n 7 spack test lock + +And it will test locking correctness among MPI processes. Ideally, you +want the MPI processes to span across multiple nodes, so, e.g., for SLURM +you might do this:: + + srun -N 7 -n 7 -m cyclic spack test lock + +You can use this to test whether your shared filesystem properly supports +POSIX reader-writer locking with byte ranges through fcntl. + +If you want to test on multiple filesystems, you can modify the +``locations`` list below. By default it looks like this:: + + locations = [ + tempfile.gettempdir(), # standard tmp directory (potentially local) + '/nfs/tmp2/%u', # NFS tmp mount + '/p/lscratch*/%u' # Lustre scratch mount + ] + +Add names and paths for your preferred filesystem mounts to test on them; +the tests are parametrized to run on all the filesystems listed in this +dict. Note that 'tmp' will be skipped for MPI testing, as it is often a +node-local filesystem, and multi-node tests will fail if the locks aren't +actually on a shared filesystem. + """ import os import shutil -import functools import tempfile import traceback +import glob +import getpass +from contextlib import contextmanager from multiprocessing import Process import pytest -from llnl.util.filesystem import join_path, touch, mkdirp +from llnl.util.filesystem import join_path, touch from llnl.util.lock import * from spack.util.multiproc import Barrier -# This is the longest a failed test will take, as the barriers will -# time out and raise an exception. +# +# This test can be run with MPI. MPI is "enabled" if we can import +# mpi4py and the number of total MPI processes is greater than 1. +# Otherwise it just runs as a node-local test. +# +# NOTE: MPI mode is different from node-local mode in that node-local +# mode will spawn its own test processes, while MPI mode assumes you've +# run this script as a SPMD application. In MPI mode, no additional +# processes are spawned, and you need to ensure that you mpirun the +# script with enough processes for all the multiproc_test cases below. +# +# If you don't run with enough processes, tests that require more +# processes than you currently have will be skipped. +# +mpi = False +comm = None +try: + from mpi4py import MPI + comm = MPI.COMM_WORLD + if comm.size > 1: + mpi = True +except: + pass + + +"""This is a list of filesystem locations to test locks in. Paths are +expanded so that %u is replaced with the current username. '~' is also +legal and will be expanded to the user's home directory. + +Tests are skipped for directories that don't exist, so you'll need to +update this with the locations of NFS, Lustre, and other mounts on your +system. +""" +locations = [ + tempfile.gettempdir(), + os.path.join('/nfs/tmp2/', getpass.getuser()), + os.path.join('/p/lscratch*/', getpass.getuser()), +] + +"""This is the longest a failed multiproc test will take. +Barriers will time out and raise an exception after this interval. +In MPI mode, barriers don't time out (they hang). See mpi_multiproc_test. +""" barrier_timeout = 5 +"""This is the lock timeout for expected failures. +This may need to be higher for some filesystems.""" +lock_fail_timeout = 0.1 -@pytest.fixture() -def lock_path(): - tempdir = tempfile.mkdtemp() - lock_file = join_path(tempdir, 'lockfile') + +@contextmanager +def read_only(path): + orginal_mode = os.stat(path).st_mode + os.chmod(path, 0o444) + yield + os.chmod(path, orginal_mode) + + +@pytest.fixture(scope='session', params=locations) +def lock_test_directory(request): + """This fixture causes tests to be executed for many different mounts. + + See the ``locations`` dict above for details. + """ + return request.param + + +@pytest.fixture(scope='session') +def lock_dir(lock_test_directory): + parent = next((p for p in glob.glob(lock_test_directory) + if os.path.exists(p) and os.access(p, os.W_OK)), None) + if not parent: + # Skip filesystems that don't exist or aren't writable + pytest.skip("requires filesystem: '%s'" % lock_test_directory) + elif mpi and parent == tempfile.gettempdir(): + # Skip local tmp test for MPI runs + pytest.skip("skipping local tmp directory for MPI test.") + + tempdir = None + if not mpi or comm.rank == 0: + tempdir = tempfile.mkdtemp(dir=parent) + if mpi: + tempdir = comm.bcast(tempdir) + + yield tempdir + + if mpi: + # rank 0 may get here before others, in which case it'll try to + # remove the directory while other processes try to re-create the + # lock. This will give errno 39: directory not empty. Use a + # barrier to ensure everyone is done first. + comm.barrier() + + if not mpi or comm.rank == 0: + shutil.rmtree(tempdir) + + +@pytest.fixture +def private_lock_path(lock_dir): + """In MPI mode, this is a private lock for each rank in a multiproc test. + + For other modes, it is the same as a shared lock. + """ + lock_file = join_path(lock_dir, 'lockfile') + if mpi: + lock_file += '.%s' % comm.rank yield lock_file - shutil.rmtree(tempdir) -def multiproc_test(*functions): +@pytest.fixture +def lock_path(lock_dir): + """This lock is shared among all processes in a multiproc test.""" + lock_file = join_path(lock_dir, 'lockfile') + yield lock_file + + +def local_multiproc_test(*functions): """Order some processes using simple barrier synchronization.""" b = Barrier(len(functions), timeout=barrier_timeout) procs = [Process(target=f, args=(b,)) for f in functions] @@ -65,6 +206,52 @@ def multiproc_test(*functions): assert p.exitcode == 0 +def mpi_multiproc_test(*functions): + """SPMD version of multiproc test. + + This needs to be run like so: + + srun spack test lock + + Each process executes its corresponding function. This is different + from ``multiproc_test`` above, which spawns the processes. This will + skip tests if there are too few processes to run them. + """ + procs = len(functions) + if procs > comm.size: + pytest.skip("requires at least %d MPI processes" % procs) + + comm.Barrier() # barrier before each MPI test + + include = comm.rank < len(functions) + subcomm = comm.Split(include) + + class subcomm_barrier(object): + """Stand-in for multiproc barrier for MPI-parallel jobs.""" + def wait(self): + subcomm.Barrier() + + if include: + try: + functions[subcomm.rank](subcomm_barrier()) + except: + # aborting is the best we can do for MPI tests without + # hanging, since we're using MPI barriers. This will fail + # early and it loses the nice pytest output, but at least it + # gets use a stacktrace on the processes that failed. + traceback.print_exc() + comm.Abort() + subcomm.Free() + + comm.Barrier() # barrier after each MPI test. + + +"""``multiproc_test()`` should be called by tests below. +``multiproc_test()`` will work for either MPI runs or for local runs. +""" +multiproc_test = mpi_multiproc_test if mpi else local_multiproc_test + + # # Process snippets below can be composed into tests. # @@ -91,7 +278,7 @@ def fn(barrier): lock = Lock(lock_path, start, length) barrier.wait() # wait for lock acquire in first process with pytest.raises(LockError): - lock.acquire_write(0.1) + lock.acquire_write(lock_fail_timeout) barrier.wait() return fn @@ -101,7 +288,7 @@ def fn(barrier): lock = Lock(lock_path, start, length) barrier.wait() # wait for lock acquire in first process with pytest.raises(LockError): - lock.acquire_read(0.1) + lock.acquire_read(lock_fail_timeout) barrier.wait() return fn @@ -111,7 +298,9 @@ def fn(barrier): # exclusive lock is held. # def test_write_lock_timeout_on_write(lock_path): - multiproc_test(acquire_write(lock_path), timeout_write(lock_path)) + multiproc_test( + acquire_write(lock_path), + timeout_write(lock_path)) def test_write_lock_timeout_on_write_2(lock_path): @@ -258,7 +447,8 @@ def test_write_lock_timeout_on_read_ranges_3(lock_path): def test_write_lock_timeout_on_read_ranges_4(lock_path): multiproc_test( acquire_read(lock_path, 0, 64), - timeout_write(lock_path, 10, 1), timeout_write(lock_path, 32, 1)) + timeout_write(lock_path, 10, 1), + timeout_write(lock_path, 32, 1)) def test_write_lock_timeout_on_read_ranges_5(lock_path): @@ -268,6 +458,7 @@ def test_write_lock_timeout_on_read_ranges_5(lock_path): timeout_write(lock_path, 127, 1), timeout_write(lock_path, 90, 10)) + # # Test that exclusive locks time while lots of shared locks are held. # @@ -339,12 +530,19 @@ def test_write_lock_timeout_with_multiple_readers_3_2_ranges(lock_path): # # Test that read can be upgraded to write. # -def test_upgrade_read_to_write(lock_path): +def test_upgrade_read_to_write(private_lock_path): + """Test that a read lock can be upgraded to a write lock. + + Note that to upgrade a read lock to a write lock, you have the be the + only holder of a read lock. Client code needs to coordinate that for + shared locks. For this test, we use a private lock just to test that an + upgrade is possible. + """ # ensure lock file exists the first time, so we open it read-only # to begin wtih. - touch(lock_path) + touch(private_lock_path) - lock = Lock(lock_path) + lock = Lock(private_lock_path) assert lock._reads == 0 assert lock._writes == 0 @@ -368,26 +566,28 @@ def test_upgrade_read_to_write(lock_path): assert lock._writes == 0 assert lock._file is None + # # Test that read-only file can be read-locked but not write-locked. # -def test_upgrade_read_to_write_fails_with_readonly_file(lock_path): +def test_upgrade_read_to_write_fails_with_readonly_file(private_lock_path): # ensure lock file exists the first time, so we open it read-only # to begin wtih. - touch(lock_path) - os.chmod(lock_path, 0o444) + touch(private_lock_path) - lock = Lock(lock_path) - assert lock._reads == 0 - assert lock._writes == 0 + with read_only(private_lock_path): + lock = Lock(private_lock_path) + assert lock._reads == 0 + assert lock._writes == 0 - lock.acquire_read() - assert lock._reads == 1 - assert lock._writes == 0 - assert lock._file.mode == 'r' + lock.acquire_read() + assert lock._reads == 1 + assert lock._writes == 0 + assert lock._file.mode == 'r' + + with pytest.raises(LockError): + lock.acquire_write() - with pytest.raises(LockError): - lock.acquire_write() # # Longer test case that ensures locks are reusable. Ordering is @@ -404,7 +604,7 @@ def p1(barrier): lock.release_write() # release and others acquire read barrier.wait() # ---------------------------------------- 3 with pytest.raises(LockError): - lock.acquire_write(0.1) + lock.acquire_write(lock_fail_timeout) lock.acquire_read() barrier.wait() # ---------------------------------------- 4 lock.release_read() @@ -413,9 +613,9 @@ def p1(barrier): # p2 upgrades read to write barrier.wait() # ---------------------------------------- 6 with pytest.raises(LockError): - lock.acquire_write(0.1) + lock.acquire_write(lock_fail_timeout) with pytest.raises(LockError): - lock.acquire_read(0.1) + lock.acquire_read(lock_fail_timeout) barrier.wait() # ---------------------------------------- 7 # p2 releases write and read barrier.wait() # ---------------------------------------- 8 @@ -425,9 +625,9 @@ def p1(barrier): # p3 upgrades read to write barrier.wait() # ---------------------------------------- 10 with pytest.raises(LockError): - lock.acquire_write(0.1) + lock.acquire_write(lock_fail_timeout) with pytest.raises(LockError): - lock.acquire_read(0.1) + lock.acquire_read(lock_fail_timeout) barrier.wait() # ---------------------------------------- 11 # p3 releases locks barrier.wait() # ---------------------------------------- 12 @@ -441,9 +641,9 @@ def p2(barrier): # p1 acquires write barrier.wait() # ---------------------------------------- 1 with pytest.raises(LockError): - lock.acquire_write(0.1) + lock.acquire_write(lock_fail_timeout) with pytest.raises(LockError): - lock.acquire_read(0.1) + lock.acquire_read(lock_fail_timeout) barrier.wait() # ---------------------------------------- 2 lock.acquire_read() barrier.wait() # ---------------------------------------- 3 @@ -465,9 +665,9 @@ def p2(barrier): # p3 upgrades read to write barrier.wait() # ---------------------------------------- 10 with pytest.raises(LockError): - lock.acquire_write(0.1) + lock.acquire_write(lock_fail_timeout) with pytest.raises(LockError): - lock.acquire_read(0.1) + lock.acquire_read(lock_fail_timeout) barrier.wait() # ---------------------------------------- 11 # p3 releases locks barrier.wait() # ---------------------------------------- 12 @@ -481,9 +681,9 @@ def p3(barrier): # p1 acquires write barrier.wait() # ---------------------------------------- 1 with pytest.raises(LockError): - lock.acquire_write(0.1) + lock.acquire_write(lock_fail_timeout) with pytest.raises(LockError): - lock.acquire_read(0.1) + lock.acquire_read(lock_fail_timeout) barrier.wait() # ---------------------------------------- 2 lock.acquire_read() barrier.wait() # ---------------------------------------- 3 @@ -495,9 +695,9 @@ def p3(barrier): # p2 upgrades read to write barrier.wait() # ---------------------------------------- 6 with pytest.raises(LockError): - lock.acquire_write(0.1) + lock.acquire_write(lock_fail_timeout) with pytest.raises(LockError): - lock.acquire_read(0.1) + lock.acquire_read(lock_fail_timeout) barrier.wait() # ---------------------------------------- 7 # p2 releases write & read barrier.wait() # ---------------------------------------- 8 @@ -517,6 +717,7 @@ def p3(barrier): multiproc_test(p1, p2, p3) + def test_transaction(lock_path): def enter_fn(): vals['entered'] = True @@ -542,6 +743,7 @@ def exit_fn(t, v, tb): assert vals['exited'] assert not vals['exception'] + def test_transaction_with_exception(lock_path): def enter_fn(): vals['entered'] = True @@ -574,6 +776,7 @@ def do_write_with_exception(): assert vals['exited'] assert vals['exception'] + def test_transaction_with_context_manager(lock_path): class TestContextManager(object): @@ -634,6 +837,7 @@ def exit_fn(t, v, tb): assert not vals['exited_fn'] assert not vals['exception_fn'] + def test_transaction_with_context_manager_and_exception(lock_path): class TestContextManager(object): def __enter__(self):