Parametrized lock test and make it work with MPI

- Lock test can be run either as a node-local test or as an MPI test.

- Lock test is now parametrized by filesystem, so you can test the
  locking capabilities of your NFS, Lustre, or GPFS filesystem.  See docs
  for details.
This commit is contained in:
Todd Gamblin 2017-07-03 17:30:18 -07:00
parent bd7a591df1
commit b4d1654e68
2 changed files with 255 additions and 46 deletions

View file

@ -127,8 +127,9 @@ def _lock(self, op, timeout=_default_timeout):
return return
except IOError as error: except IOError as e:
if error.errno == errno.EAGAIN or error.errno == errno.EACCES: if e.errno in (errno.EAGAIN, errno.EACCES):
# EAGAIN and EACCES == locked by another process
pass pass
else: else:
raise raise
@ -197,6 +198,8 @@ def acquire_read(self, timeout=_default_timeout):
tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
.format(self)) .format(self))
self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError. self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError.
tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquired]'
.format(self))
self._reads += 1 self._reads += 1
return True return True
else: else:
@ -219,6 +222,8 @@ def acquire_write(self, timeout=_default_timeout):
'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]' 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
.format(self)) .format(self))
self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError. self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError.
tty.debug('WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquired]'
.format(self))
self._writes += 1 self._writes += 1
return True return True
else: else:

View file

@ -22,37 +22,178 @@
# License along with this program; if not, write to the Free Software # License along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
############################################################################## ##############################################################################
""" """These tests ensure that our lock works correctly.
These tests ensure that our lock works correctly.
This can be run in two ways.
First, it can be run as a node-local test, with a typical invocation like
this::
spack test lock
You can *also* run it as an MPI program, which allows you to test locks
across nodes. So, e.g., you can run the test like this::
mpirun -n 7 spack test lock
And it will test locking correctness among MPI processes. Ideally, you
want the MPI processes to span across multiple nodes, so, e.g., for SLURM
you might do this::
srun -N 7 -n 7 -m cyclic spack test lock
You can use this to test whether your shared filesystem properly supports
POSIX reader-writer locking with byte ranges through fcntl.
If you want to test on multiple filesystems, you can modify the
``locations`` list below. By default it looks like this::
locations = [
tempfile.gettempdir(), # standard tmp directory (potentially local)
'/nfs/tmp2/%u', # NFS tmp mount
'/p/lscratch*/%u' # Lustre scratch mount
]
Add names and paths for your preferred filesystem mounts to test on them;
the tests are parametrized to run on all the filesystems listed in this
dict. Note that 'tmp' will be skipped for MPI testing, as it is often a
node-local filesystem, and multi-node tests will fail if the locks aren't
actually on a shared filesystem.
""" """
import os import os
import shutil import shutil
import functools
import tempfile import tempfile
import traceback import traceback
import glob
import getpass
from contextlib import contextmanager
from multiprocessing import Process from multiprocessing import Process
import pytest import pytest
from llnl.util.filesystem import join_path, touch, mkdirp from llnl.util.filesystem import join_path, touch
from llnl.util.lock import * from llnl.util.lock import *
from spack.util.multiproc import Barrier from spack.util.multiproc import Barrier
# This is the longest a failed test will take, as the barriers will #
# time out and raise an exception. # This test can be run with MPI. MPI is "enabled" if we can import
# mpi4py and the number of total MPI processes is greater than 1.
# Otherwise it just runs as a node-local test.
#
# NOTE: MPI mode is different from node-local mode in that node-local
# mode will spawn its own test processes, while MPI mode assumes you've
# run this script as a SPMD application. In MPI mode, no additional
# processes are spawned, and you need to ensure that you mpirun the
# script with enough processes for all the multiproc_test cases below.
#
# If you don't run with enough processes, tests that require more
# processes than you currently have will be skipped.
#
mpi = False
comm = None
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
if comm.size > 1:
mpi = True
except:
pass
"""This is a list of filesystem locations to test locks in. Paths are
expanded so that %u is replaced with the current username. '~' is also
legal and will be expanded to the user's home directory.
Tests are skipped for directories that don't exist, so you'll need to
update this with the locations of NFS, Lustre, and other mounts on your
system.
"""
locations = [
tempfile.gettempdir(),
os.path.join('/nfs/tmp2/', getpass.getuser()),
os.path.join('/p/lscratch*/', getpass.getuser()),
]
"""This is the longest a failed multiproc test will take.
Barriers will time out and raise an exception after this interval.
In MPI mode, barriers don't time out (they hang). See mpi_multiproc_test.
"""
barrier_timeout = 5 barrier_timeout = 5
"""This is the lock timeout for expected failures.
This may need to be higher for some filesystems."""
lock_fail_timeout = 0.1
@pytest.fixture()
def lock_path(): @contextmanager
tempdir = tempfile.mkdtemp() def read_only(path):
lock_file = join_path(tempdir, 'lockfile') orginal_mode = os.stat(path).st_mode
os.chmod(path, 0o444)
yield
os.chmod(path, orginal_mode)
@pytest.fixture(scope='session', params=locations)
def lock_test_directory(request):
"""This fixture causes tests to be executed for many different mounts.
See the ``locations`` dict above for details.
"""
return request.param
@pytest.fixture(scope='session')
def lock_dir(lock_test_directory):
parent = next((p for p in glob.glob(lock_test_directory)
if os.path.exists(p) and os.access(p, os.W_OK)), None)
if not parent:
# Skip filesystems that don't exist or aren't writable
pytest.skip("requires filesystem: '%s'" % lock_test_directory)
elif mpi and parent == tempfile.gettempdir():
# Skip local tmp test for MPI runs
pytest.skip("skipping local tmp directory for MPI test.")
tempdir = None
if not mpi or comm.rank == 0:
tempdir = tempfile.mkdtemp(dir=parent)
if mpi:
tempdir = comm.bcast(tempdir)
yield tempdir
if mpi:
# rank 0 may get here before others, in which case it'll try to
# remove the directory while other processes try to re-create the
# lock. This will give errno 39: directory not empty. Use a
# barrier to ensure everyone is done first.
comm.barrier()
if not mpi or comm.rank == 0:
shutil.rmtree(tempdir)
@pytest.fixture
def private_lock_path(lock_dir):
"""In MPI mode, this is a private lock for each rank in a multiproc test.
For other modes, it is the same as a shared lock.
"""
lock_file = join_path(lock_dir, 'lockfile')
if mpi:
lock_file += '.%s' % comm.rank
yield lock_file yield lock_file
shutil.rmtree(tempdir)
def multiproc_test(*functions): @pytest.fixture
def lock_path(lock_dir):
"""This lock is shared among all processes in a multiproc test."""
lock_file = join_path(lock_dir, 'lockfile')
yield lock_file
def local_multiproc_test(*functions):
"""Order some processes using simple barrier synchronization.""" """Order some processes using simple barrier synchronization."""
b = Barrier(len(functions), timeout=barrier_timeout) b = Barrier(len(functions), timeout=barrier_timeout)
procs = [Process(target=f, args=(b,)) for f in functions] procs = [Process(target=f, args=(b,)) for f in functions]
@ -65,6 +206,52 @@ def multiproc_test(*functions):
assert p.exitcode == 0 assert p.exitcode == 0
def mpi_multiproc_test(*functions):
"""SPMD version of multiproc test.
This needs to be run like so:
srun spack test lock
Each process executes its corresponding function. This is different
from ``multiproc_test`` above, which spawns the processes. This will
skip tests if there are too few processes to run them.
"""
procs = len(functions)
if procs > comm.size:
pytest.skip("requires at least %d MPI processes" % procs)
comm.Barrier() # barrier before each MPI test
include = comm.rank < len(functions)
subcomm = comm.Split(include)
class subcomm_barrier(object):
"""Stand-in for multiproc barrier for MPI-parallel jobs."""
def wait(self):
subcomm.Barrier()
if include:
try:
functions[subcomm.rank](subcomm_barrier())
except:
# aborting is the best we can do for MPI tests without
# hanging, since we're using MPI barriers. This will fail
# early and it loses the nice pytest output, but at least it
# gets use a stacktrace on the processes that failed.
traceback.print_exc()
comm.Abort()
subcomm.Free()
comm.Barrier() # barrier after each MPI test.
"""``multiproc_test()`` should be called by tests below.
``multiproc_test()`` will work for either MPI runs or for local runs.
"""
multiproc_test = mpi_multiproc_test if mpi else local_multiproc_test
# #
# Process snippets below can be composed into tests. # Process snippets below can be composed into tests.
# #
@ -91,7 +278,7 @@ def fn(barrier):
lock = Lock(lock_path, start, length) lock = Lock(lock_path, start, length)
barrier.wait() # wait for lock acquire in first process barrier.wait() # wait for lock acquire in first process
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_write(0.1) lock.acquire_write(lock_fail_timeout)
barrier.wait() barrier.wait()
return fn return fn
@ -101,7 +288,7 @@ def fn(barrier):
lock = Lock(lock_path, start, length) lock = Lock(lock_path, start, length)
barrier.wait() # wait for lock acquire in first process barrier.wait() # wait for lock acquire in first process
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_read(0.1) lock.acquire_read(lock_fail_timeout)
barrier.wait() barrier.wait()
return fn return fn
@ -111,7 +298,9 @@ def fn(barrier):
# exclusive lock is held. # exclusive lock is held.
# #
def test_write_lock_timeout_on_write(lock_path): def test_write_lock_timeout_on_write(lock_path):
multiproc_test(acquire_write(lock_path), timeout_write(lock_path)) multiproc_test(
acquire_write(lock_path),
timeout_write(lock_path))
def test_write_lock_timeout_on_write_2(lock_path): def test_write_lock_timeout_on_write_2(lock_path):
@ -258,7 +447,8 @@ def test_write_lock_timeout_on_read_ranges_3(lock_path):
def test_write_lock_timeout_on_read_ranges_4(lock_path): def test_write_lock_timeout_on_read_ranges_4(lock_path):
multiproc_test( multiproc_test(
acquire_read(lock_path, 0, 64), acquire_read(lock_path, 0, 64),
timeout_write(lock_path, 10, 1), timeout_write(lock_path, 32, 1)) timeout_write(lock_path, 10, 1),
timeout_write(lock_path, 32, 1))
def test_write_lock_timeout_on_read_ranges_5(lock_path): def test_write_lock_timeout_on_read_ranges_5(lock_path):
@ -268,6 +458,7 @@ def test_write_lock_timeout_on_read_ranges_5(lock_path):
timeout_write(lock_path, 127, 1), timeout_write(lock_path, 127, 1),
timeout_write(lock_path, 90, 10)) timeout_write(lock_path, 90, 10))
# #
# Test that exclusive locks time while lots of shared locks are held. # Test that exclusive locks time while lots of shared locks are held.
# #
@ -339,12 +530,19 @@ def test_write_lock_timeout_with_multiple_readers_3_2_ranges(lock_path):
# #
# Test that read can be upgraded to write. # Test that read can be upgraded to write.
# #
def test_upgrade_read_to_write(lock_path): def test_upgrade_read_to_write(private_lock_path):
"""Test that a read lock can be upgraded to a write lock.
Note that to upgrade a read lock to a write lock, you have the be the
only holder of a read lock. Client code needs to coordinate that for
shared locks. For this test, we use a private lock just to test that an
upgrade is possible.
"""
# ensure lock file exists the first time, so we open it read-only # ensure lock file exists the first time, so we open it read-only
# to begin wtih. # to begin wtih.
touch(lock_path) touch(private_lock_path)
lock = Lock(lock_path) lock = Lock(private_lock_path)
assert lock._reads == 0 assert lock._reads == 0
assert lock._writes == 0 assert lock._writes == 0
@ -368,26 +566,28 @@ def test_upgrade_read_to_write(lock_path):
assert lock._writes == 0 assert lock._writes == 0
assert lock._file is None assert lock._file is None
# #
# Test that read-only file can be read-locked but not write-locked. # Test that read-only file can be read-locked but not write-locked.
# #
def test_upgrade_read_to_write_fails_with_readonly_file(lock_path): def test_upgrade_read_to_write_fails_with_readonly_file(private_lock_path):
# ensure lock file exists the first time, so we open it read-only # ensure lock file exists the first time, so we open it read-only
# to begin wtih. # to begin wtih.
touch(lock_path) touch(private_lock_path)
os.chmod(lock_path, 0o444)
lock = Lock(lock_path) with read_only(private_lock_path):
assert lock._reads == 0 lock = Lock(private_lock_path)
assert lock._writes == 0 assert lock._reads == 0
assert lock._writes == 0
lock.acquire_read() lock.acquire_read()
assert lock._reads == 1 assert lock._reads == 1
assert lock._writes == 0 assert lock._writes == 0
assert lock._file.mode == 'r' assert lock._file.mode == 'r'
with pytest.raises(LockError):
lock.acquire_write()
with pytest.raises(LockError):
lock.acquire_write()
# #
# Longer test case that ensures locks are reusable. Ordering is # Longer test case that ensures locks are reusable. Ordering is
@ -404,7 +604,7 @@ def p1(barrier):
lock.release_write() # release and others acquire read lock.release_write() # release and others acquire read
barrier.wait() # ---------------------------------------- 3 barrier.wait() # ---------------------------------------- 3
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_write(0.1) lock.acquire_write(lock_fail_timeout)
lock.acquire_read() lock.acquire_read()
barrier.wait() # ---------------------------------------- 4 barrier.wait() # ---------------------------------------- 4
lock.release_read() lock.release_read()
@ -413,9 +613,9 @@ def p1(barrier):
# p2 upgrades read to write # p2 upgrades read to write
barrier.wait() # ---------------------------------------- 6 barrier.wait() # ---------------------------------------- 6
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_write(0.1) lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_read(0.1) lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 7 barrier.wait() # ---------------------------------------- 7
# p2 releases write and read # p2 releases write and read
barrier.wait() # ---------------------------------------- 8 barrier.wait() # ---------------------------------------- 8
@ -425,9 +625,9 @@ def p1(barrier):
# p3 upgrades read to write # p3 upgrades read to write
barrier.wait() # ---------------------------------------- 10 barrier.wait() # ---------------------------------------- 10
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_write(0.1) lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_read(0.1) lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 11 barrier.wait() # ---------------------------------------- 11
# p3 releases locks # p3 releases locks
barrier.wait() # ---------------------------------------- 12 barrier.wait() # ---------------------------------------- 12
@ -441,9 +641,9 @@ def p2(barrier):
# p1 acquires write # p1 acquires write
barrier.wait() # ---------------------------------------- 1 barrier.wait() # ---------------------------------------- 1
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_write(0.1) lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_read(0.1) lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 2 barrier.wait() # ---------------------------------------- 2
lock.acquire_read() lock.acquire_read()
barrier.wait() # ---------------------------------------- 3 barrier.wait() # ---------------------------------------- 3
@ -465,9 +665,9 @@ def p2(barrier):
# p3 upgrades read to write # p3 upgrades read to write
barrier.wait() # ---------------------------------------- 10 barrier.wait() # ---------------------------------------- 10
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_write(0.1) lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_read(0.1) lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 11 barrier.wait() # ---------------------------------------- 11
# p3 releases locks # p3 releases locks
barrier.wait() # ---------------------------------------- 12 barrier.wait() # ---------------------------------------- 12
@ -481,9 +681,9 @@ def p3(barrier):
# p1 acquires write # p1 acquires write
barrier.wait() # ---------------------------------------- 1 barrier.wait() # ---------------------------------------- 1
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_write(0.1) lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_read(0.1) lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 2 barrier.wait() # ---------------------------------------- 2
lock.acquire_read() lock.acquire_read()
barrier.wait() # ---------------------------------------- 3 barrier.wait() # ---------------------------------------- 3
@ -495,9 +695,9 @@ def p3(barrier):
# p2 upgrades read to write # p2 upgrades read to write
barrier.wait() # ---------------------------------------- 6 barrier.wait() # ---------------------------------------- 6
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_write(0.1) lock.acquire_write(lock_fail_timeout)
with pytest.raises(LockError): with pytest.raises(LockError):
lock.acquire_read(0.1) lock.acquire_read(lock_fail_timeout)
barrier.wait() # ---------------------------------------- 7 barrier.wait() # ---------------------------------------- 7
# p2 releases write & read # p2 releases write & read
barrier.wait() # ---------------------------------------- 8 barrier.wait() # ---------------------------------------- 8
@ -517,6 +717,7 @@ def p3(barrier):
multiproc_test(p1, p2, p3) multiproc_test(p1, p2, p3)
def test_transaction(lock_path): def test_transaction(lock_path):
def enter_fn(): def enter_fn():
vals['entered'] = True vals['entered'] = True
@ -542,6 +743,7 @@ def exit_fn(t, v, tb):
assert vals['exited'] assert vals['exited']
assert not vals['exception'] assert not vals['exception']
def test_transaction_with_exception(lock_path): def test_transaction_with_exception(lock_path):
def enter_fn(): def enter_fn():
vals['entered'] = True vals['entered'] = True
@ -574,6 +776,7 @@ def do_write_with_exception():
assert vals['exited'] assert vals['exited']
assert vals['exception'] assert vals['exception']
def test_transaction_with_context_manager(lock_path): def test_transaction_with_context_manager(lock_path):
class TestContextManager(object): class TestContextManager(object):
@ -634,6 +837,7 @@ def exit_fn(t, v, tb):
assert not vals['exited_fn'] assert not vals['exited_fn']
assert not vals['exception_fn'] assert not vals['exception_fn']
def test_transaction_with_context_manager_and_exception(lock_path): def test_transaction_with_context_manager_and_exception(lock_path):
class TestContextManager(object): class TestContextManager(object):
def __enter__(self): def __enter__(self):