locks: allow locks to work under high contention (#27846)

* locks: allow locks to work under high contention

This is a bug found by Harshitha Menon.  

The `lock=None` line shouldn't be a release but should be 
```
return (lock_type, None)
``` 
to inform the caller it couldn't get the lock type requested without
disturbing the existing lock object in the database.  There were also a
couple of bugs due to taking write locks at the beginning without any
checking or release, and not releasing read locks before requeueing.
This version no longer gives me read upgrade to write errors, even
running 200 instances on one box.

* Change lock in check_deps_status to read, release if not installed,
  not sure why this was ever write, but read definitely is more
  appropriate here, and the read lock is only held out of the scope if
  the package is installed.

* Release read lock before requeueing to reduce chance of livelock, the
  timeout that caused the original issue now happens in roughly 3 of 200
  workers instead of 199 on average.
This commit is contained in:
Tom Scogland 2021-12-22 07:25:05 -08:00 committed by GitHub
parent 34873f5fe7
commit b7b6542804
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -794,10 +794,10 @@ def _check_deps_status(self, request):
.format(dep_id, action) .format(dep_id, action)
raise InstallError(err.format(request.pkg_id, msg)) raise InstallError(err.format(request.pkg_id, msg))
# Attempt to get a write lock to ensure another process does not # Attempt to get a read lock to ensure another process does not
# uninstall the dependency while the requested spec is being # uninstall the dependency while the requested spec is being
# installed # installed
ltype, lock = self._ensure_locked('write', dep_pkg) ltype, lock = self._ensure_locked('read', dep_pkg)
if lock is None: if lock is None:
msg = '{0} is write locked by another process'.format(dep_id) msg = '{0} is write locked by another process'.format(dep_id)
raise InstallError(err.format(request.pkg_id, msg)) raise InstallError(err.format(request.pkg_id, msg))
@ -816,6 +816,8 @@ def _check_deps_status(self, request):
tty.debug('Flagging {0} as installed per the database' tty.debug('Flagging {0} as installed per the database'
.format(dep_id)) .format(dep_id))
self._flag_installed(dep_pkg) self._flag_installed(dep_pkg)
else:
lock.release_read()
def _prepare_for_install(self, task): def _prepare_for_install(self, task):
""" """
@ -1027,7 +1029,7 @@ def _ensure_locked(self, lock_type, pkg):
except (lk.LockDowngradeError, lk.LockTimeoutError) as exc: except (lk.LockDowngradeError, lk.LockTimeoutError) as exc:
tty.debug(err.format(op, desc, pkg_id, exc.__class__.__name__, tty.debug(err.format(op, desc, pkg_id, exc.__class__.__name__,
str(exc))) str(exc)))
lock = None return (lock_type, None)
except (Exception, KeyboardInterrupt, SystemExit) as exc: except (Exception, KeyboardInterrupt, SystemExit) as exc:
tty.error(err.format(op, desc, pkg_id, exc.__class__.__name__, tty.error(err.format(op, desc, pkg_id, exc.__class__.__name__,
@ -1627,6 +1629,7 @@ def install(self):
# established by the other process -- failed, installed, or # established by the other process -- failed, installed, or
# uninstalled -- on the next pass. # uninstalled -- on the next pass.
if ltype == 'read': if ltype == 'read':
lock.release_read()
self._requeue_task(task) self._requeue_task(task)
continue continue