Simplified the spack.util.gpg implementation (#23889)

* Simplified the spack.util.gpg implementation

All the classes defined in this Python module,
which were previously used to construct singleton
instances, have been removed in favor of four
global variables. These variables are initialized
lazily, like before.

The API of the module has been unchanged for the
most part. A few tests have been modified to use
the new global names.
This commit is contained in:
Massimiliano Culpo 2021-06-01 19:05:42 +02:00 committed by GitHub
parent c6d21fa154
commit 707a3f7df8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 320 additions and 384 deletions

View file

@ -73,7 +73,7 @@ def tutorial(parser, args):
tty.msg("Ensuring that we trust tutorial binaries",
"spack gpg trust %s" % tutorial_key)
spack.util.gpg.Gpg().trust(tutorial_key)
spack.util.gpg.trust(tutorial_key)
# Note that checkout MUST be last. It changes Spack under our feet.
# If you don't put this last, you'll get import errors for the code

View file

@ -318,8 +318,6 @@ def test_relative_rpaths_install_nondefault(mirror_dir):
buildcache_cmd('install', '-auf', cspec.name)
@pytest.mark.skipif(not spack.util.gpg.has_gpg(),
reason='This test requires gpg')
def test_push_and_fetch_keys(mock_gnupghome):
testpath = str(mock_gnupghome)
@ -333,7 +331,7 @@ def test_push_and_fetch_keys(mock_gnupghome):
# dir 1: create a new key, record its fingerprint, and push it to a new
# mirror
with spack.util.gpg.gnupg_home_override(gpg_dir1):
with spack.util.gpg.gnupghome_override(gpg_dir1):
spack.util.gpg.create(name='test-key',
email='fake@test.key',
expires='0',
@ -347,7 +345,7 @@ def test_push_and_fetch_keys(mock_gnupghome):
# dir 2: import the key from the mirror, and confirm that its fingerprint
# matches the one created above
with spack.util.gpg.gnupg_home_override(gpg_dir2):
with spack.util.gpg.gnupghome_override(gpg_dir2):
assert len(spack.util.gpg.public_keys()) == 0
bindist.get_keys(mirrors=mirrors, install=True, trust=True, force=True)

View file

@ -56,8 +56,6 @@ def test_urlencode_string():
assert(s_enc == 'Spack+Test+Project')
@pytest.mark.skipif(not spack.util.gpg.has_gpg(),
reason='This test requires gpg')
def test_import_signing_key(mock_gnupghome):
signing_key_dir = spack_paths.mock_gpg_keys_path
signing_key_path = os.path.join(signing_key_dir, 'package-signing-key')

View file

@ -138,8 +138,6 @@ def test_buildcache_create_fail_on_perm_denied(
tmpdir.chmod(0o700)
@pytest.mark.skipif(not spack.util.gpg.has_gpg(),
reason='This test requires gpg')
def test_update_key_index(tmpdir, mutable_mock_env_path,
install_mockery, mock_packages, mock_fetch,
mock_stage, mock_gnupghome):

View file

@ -647,8 +647,6 @@ def test_ci_generate_with_external_pkg(tmpdir, mutable_mock_env_path,
assert not any('externaltool' in key for key in yaml_contents)
@pytest.mark.skipif(not spack.util.gpg.has_gpg(),
reason='This test requires gpg')
def test_ci_rebuild(tmpdir, mutable_mock_env_path, env_deactivate,
install_mockery, mock_packages, monkeypatch,
mock_gnupghome, mock_fetch):
@ -864,8 +862,6 @@ def fake_dl_method(spec, dest, require_cdashid, m_url=None):
@pytest.mark.disable_clean_stage_check
@pytest.mark.skipif(not spack.util.gpg.has_gpg(),
reason='This test requires gpg')
def test_push_mirror_contents(tmpdir, mutable_mock_env_path, env_deactivate,
install_mockery, mock_packages, mock_fetch,
mock_stage, mock_gnupghome):

View file

@ -41,24 +41,20 @@ def test_find_gpg(cmd_name, version, tmpdir, mock_gnupghome, monkeypatch):
monkeypatch.setitem(os.environ, "PATH", str(tmpdir))
if version == 'undetectable' or version.endswith('1.3.4'):
with pytest.raises(spack.util.gpg.SpackGPGError):
spack.util.gpg.ensure_gpg(reevaluate=True)
spack.util.gpg.init(force=True)
else:
spack.util.gpg.ensure_gpg(reevaluate=True)
gpg_exe = spack.util.gpg.get_global_gpg_instance().gpg_exe
assert isinstance(gpg_exe, spack.util.executable.Executable)
gpgconf_exe = spack.util.gpg.get_global_gpg_instance().gpgconf_exe
assert isinstance(gpgconf_exe, spack.util.executable.Executable)
spack.util.gpg.init(force=True)
assert spack.util.gpg.GPG is not None
assert spack.util.gpg.GPGCONF is not None
def test_no_gpg_in_path(tmpdir, mock_gnupghome, monkeypatch):
monkeypatch.setitem(os.environ, "PATH", str(tmpdir))
with pytest.raises(spack.util.gpg.SpackGPGError):
spack.util.gpg.ensure_gpg(reevaluate=True)
spack.util.gpg.init(force=True)
@pytest.mark.maybeslow
@pytest.mark.skipif(not spack.util.gpg.has_gpg(),
reason='These tests require gnupg2')
def test_gpg(tmpdir, mock_gnupghome):
# Verify a file with an empty keyring.
with pytest.raises(ProcessError):

View file

@ -842,8 +842,14 @@ def mock_gnupghome(monkeypatch):
# have to make our own tmpdir with a shorter name than pytest's.
# This comes up because tmp paths on macOS are already long-ish, and
# pytest makes them longer.
try:
spack.util.gpg.init()
except spack.util.gpg.SpackGPGError:
if not spack.util.gpg.GPG:
pytest.skip('This test requires gpg')
short_name_tmpdir = tempfile.mkdtemp()
with spack.util.gpg.gnupg_home_override(short_name_tmpdir):
with spack.util.gpg.gnupghome_override(short_name_tmpdir):
yield short_name_tmpdir
# clean up, since we are doing this manually

View file

@ -39,8 +39,6 @@ def fake_fetchify(url, pkg):
pkg.fetcher = fetcher
@pytest.mark.skipif(not spack.util.gpg.has_gpg(),
reason='This test requires gpg')
@pytest.mark.usefixtures('install_mockery', 'mock_gnupghome')
def test_buildcache(mock_archive, tmpdir):
# tweak patchelf to only do a download

View file

@ -9,6 +9,12 @@
import spack.util.gpg
@pytest.fixture()
def has_socket_dir():
spack.util.gpg.init()
return bool(spack.util.gpg.SOCKET_DIR)
def test_parse_gpg_output_case_one():
# Two keys, fingerprint for primary keys, but not subkeys
output = """sec::2048:1:AAAAAAAAAAAAAAAA:AAAAAAAAAA:AAAAAAAAAA:::::::::
@ -20,7 +26,7 @@ def test_parse_gpg_output_case_one():
uid:::::::AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA::Joe (Test) <j.s@s.com>:
ssb::2048:1:AAAAAAAAAAAAAAAA:AAAAAAAAAA::::::::::
"""
keys = spack.util.gpg.parse_secret_keys_output(output)
keys = spack.util.gpg._parse_secret_keys_output(output)
assert len(keys) == 2
assert keys[0] == 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
@ -37,7 +43,7 @@ def test_parse_gpg_output_case_two():
fpr:::::::::YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY:
grp:::::::::AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA:
"""
keys = spack.util.gpg.parse_secret_keys_output(output)
keys = spack.util.gpg._parse_secret_keys_output(output)
assert len(keys) == 1
assert keys[0] == 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
@ -56,19 +62,19 @@ def test_parse_gpg_output_case_three():
ssb::2048:1:AAAAAAAAAAAAAAAA:AAAAAAAAAA::::::::::
fpr:::::::::ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ:"""
keys = spack.util.gpg.parse_secret_keys_output(output)
keys = spack.util.gpg._parse_secret_keys_output(output)
assert len(keys) == 2
assert keys[0] == 'WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW'
assert keys[1] == 'YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY'
@pytest.mark.skipif(not spack.util.gpg.GpgConstants.user_run_dir,
reason='This test requires /var/run/user/$(id -u)')
@pytest.mark.requires_executables('gpg2')
def test_really_long_gnupg_home_dir(tmpdir):
N = 960
def test_really_long_gnupghome_dir(tmpdir, has_socket_dir):
if not has_socket_dir:
pytest.skip('This test requires /var/run/user/$(id -u)')
N = 960
tdir = str(tmpdir)
while len(tdir) < N:
tdir = os.path.join(tdir, 'filler')
@ -76,10 +82,11 @@ def test_really_long_gnupg_home_dir(tmpdir):
tdir = tdir[:N].rstrip(os.sep)
tdir += '0' * (N - len(tdir))
with spack.util.gpg.gnupg_home_override(tdir):
spack.util.gpg.create(name='Spack testing 1',
email='test@spack.io',
comment='Spack testing key',
expires='0')
with spack.util.gpg.gnupghome_override(tdir):
spack.util.gpg.create(
name='Spack testing 1',
email='test@spack.io',
comment='Spack testing key',
expires='0'
)
spack.util.gpg.list(True, True)

View file

@ -2,74 +2,123 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import contextlib
import errno
import functools
import os
import re
import llnl.util.lang
import spack.error
import spack.paths
import spack.util.executable
import spack.version
_gnupg_version_re = r"^gpg(conf)? \(GnuPG\) (.*)$"
_gnupg_home_override = None
_global_gpg_instance = None
#: Executable instance for "gpg", initialized lazily
GPG = None
#: Executable instance for "gpgconf", initialized lazily
GPGCONF = None
#: Socket directory required if a non default home directory is used
SOCKET_DIR = None
#: GNUPGHOME environment variable in the context of this Python module
GNUPGHOME = None
def get_gnupg_home(gnupg_home=None):
"""Returns the directory that should be used as the GNUPGHOME environment
variable when calling gpg.
def clear():
"""Reset the global state to uninitialized."""
global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME
GPG, GPGCONF, SOCKET_DIR, GNUPGHOME = None, None, None, None
If a [gnupg_home] is passed directly (and not None), that value will be
used.
Otherwise, if there is an override set (and it is not None), then that
value will be used.
def init(gnupghome=None, force=False):
"""Initialize the global objects in the module, if not set.
Otherwise, if the environment variable "SPACK_GNUPGHOME" is set, then that
value will be used.
When calling any gpg executable, the GNUPGHOME environment
variable is set to:
Otherwise, the default gpg path for Spack will be used.
1. The value of the `gnupghome` argument, if not None
2. The value of the "SPACK_GNUPGHOME" environment variable, if set
3. The default gpg path for Spack otherwise
See also: gnupg_home_override()
Args:
gnupghome (str): value to be used for GNUPGHOME when calling
GnuPG executables
force (bool): if True forces the re-initialization even if the
global objects are set already
"""
return (gnupg_home or
_gnupg_home_override or
os.getenv('SPACK_GNUPGHOME') or
spack.paths.gpg_path)
global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME
if force:
clear()
# If the executables are already set, there's nothing to do
if GPG and GNUPGHOME:
return
# Set the value of GNUPGHOME to be used in this module
GNUPGHOME = (gnupghome or
os.getenv('SPACK_GNUPGHOME') or
spack.paths.gpg_path)
# Set the executable objects for "gpg" and "gpgconf"
GPG, GPGCONF = _gpg(), _gpgconf()
GPG.add_default_env('GNUPGHOME', GNUPGHOME)
if GPGCONF:
GPGCONF.add_default_env('GNUPGHOME', GNUPGHOME)
# Set the socket dir if not using GnuPG defaults
SOCKET_DIR = _socket_dir(GPGCONF)
# Make sure that the GNUPGHOME exists
if not os.path.exists(GNUPGHOME):
os.makedirs(GNUPGHOME)
os.chmod(GNUPGHOME, 0o700)
if not os.path.isdir(GNUPGHOME):
msg = 'GNUPGHOME "{0}" exists and is not a directory'.format(GNUPGHOME)
raise SpackGPGError(msg)
if SOCKET_DIR is not None:
GPGCONF('--create-socketdir')
def _autoinit(func):
"""Decorator to ensure that global variables have been initialized before
running the decorated function.
Args:
func (callable): decorated function
"""
@functools.wraps(func)
def _wrapped(*args, **kwargs):
init()
return func(*args, **kwargs)
return _wrapped
@contextlib.contextmanager
def gnupg_home_override(new_gnupg_home):
global _gnupg_home_override
global _global_gpg_instance
def gnupghome_override(dir):
"""Set the GNUPGHOME to a new location for this context.
old_gnupg_home_override = _gnupg_home_override
old_global_gpg_instance = _global_gpg_instance
Args:
dir (str): new value for GNUPGHOME
"""
global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME
_gnupg_home_override = new_gnupg_home
_global_gpg_instance = None
# Store backup values
_GPG, _GPGCONF = GPG, GPGCONF
_SOCKET_DIR, _GNUPGHOME = SOCKET_DIR, GNUPGHOME
clear()
# Clear global state
init(gnupghome=dir, force=True)
yield
_gnupg_home_override = old_gnupg_home_override
_global_gpg_instance = old_global_gpg_instance
clear()
GPG, GPGCONF = _GPG, _GPGCONF
SOCKET_DIR, GNUPGHOME = _SOCKET_DIR, _GNUPGHOME
def get_global_gpg_instance():
global _global_gpg_instance
if _global_gpg_instance is None:
_global_gpg_instance = Gpg()
return _global_gpg_instance
def parse_secret_keys_output(output):
def _parse_secret_keys_output(output):
keys = []
found_sec = False
for line in output.split('\n'):
@ -84,7 +133,7 @@ def parse_secret_keys_output(output):
return keys
def parse_public_keys_output(output):
def _parse_public_keys_output(output):
keys = []
found_pub = False
for line in output.split('\n'):
@ -99,334 +148,224 @@ def parse_public_keys_output(output):
return keys
cached_property = getattr(functools, 'cached_property', None)
# If older python version has no cached_property, emulate it here.
# TODO(opadron): maybe this shim should be moved to llnl.util.lang?
if not cached_property:
def cached_property(*args, **kwargs):
result = property(llnl.util.lang.memoized(*args, **kwargs))
attr = result.fget.__name__
@result.deleter
def result(self):
getattr(type(self), attr).fget.cache.pop((self,), None)
return result
class _GpgConstants(object):
@cached_property
def target_version(self):
return spack.version.Version('2')
@cached_property
def gpgconf_string(self):
exe_str = spack.util.executable.which_string(
'gpgconf', 'gpg2conf', 'gpgconf2')
no_gpgconf_msg = (
'Spack requires gpgconf version >= 2\n'
' To install a suitable version using Spack, run\n'
' spack install gnupg@2:\n'
' and load it by running\n'
' spack load gnupg@2:')
if not exe_str:
raise SpackGPGError(no_gpgconf_msg)
exe = spack.util.executable.Executable(exe_str)
output = exe('--version', output=str)
match = re.search(_gnupg_version_re, output, re.M)
if not match:
raise SpackGPGError('Could not determine gpgconf version')
if spack.version.Version(match.group(2)) < self.target_version:
raise SpackGPGError(no_gpgconf_msg)
# ensure that the gpgconf we found can run "gpgconf --create-socketdir"
try:
exe('--dry-run', '--create-socketdir')
except spack.util.executable.ProcessError:
# no dice
exe_str = None
return exe_str
@cached_property
def gpg_string(self):
exe_str = spack.util.executable.which_string('gpg2', 'gpg')
no_gpg_msg = (
'Spack requires gpg version >= 2\n'
' To install a suitable version using Spack, run\n'
' spack install gnupg@2:\n'
' and load it by running\n'
' spack load gnupg@2:')
if not exe_str:
raise SpackGPGError(no_gpg_msg)
exe = spack.util.executable.Executable(exe_str)
output = exe('--version', output=str)
match = re.search(_gnupg_version_re, output, re.M)
if not match:
raise SpackGPGError('Could not determine gpg version')
if spack.version.Version(match.group(2)) < self.target_version:
raise SpackGPGError(no_gpg_msg)
return exe_str
@cached_property
def user_run_dir(self):
# Try to ensure that (/var)/run/user/$(id -u) exists so that
# `gpgconf --create-socketdir` can be run later.
#
# NOTE(opadron): This action helps prevent a large class of
# "file-name-too-long" errors in gpg.
try:
has_suitable_gpgconf = bool(GpgConstants.gpgconf_string)
except SpackGPGError:
has_suitable_gpgconf = False
# If there is no suitable gpgconf, don't even bother trying to
# precreate a user run dir.
if not has_suitable_gpgconf:
return None
result = None
for var_run in ('/run', '/var/run'):
if not os.path.exists(var_run):
continue
var_run_user = os.path.join(var_run, 'user')
try:
if not os.path.exists(var_run_user):
os.mkdir(var_run_user)
os.chmod(var_run_user, 0o777)
user_dir = os.path.join(var_run_user, str(os.getuid()))
if not os.path.exists(user_dir):
os.mkdir(user_dir)
os.chmod(user_dir, 0o700)
# If the above operation fails due to lack of permissions, then
# just carry on without running gpgconf and hope for the best.
#
# NOTE(opadron): Without a dir in which to create a socket for IPC,
# gnupg may fail if GNUPGHOME is set to a path that
# is too long, where "too long" in this context is
# actually quite short; somewhere in the
# neighborhood of more than 100 characters.
#
# TODO(opadron): Maybe a warning should be printed in this case?
except OSError as exc:
if exc.errno not in (errno.EPERM, errno.EACCES):
raise
user_dir = None
# return the last iteration that provides a usable user run dir
if user_dir is not None:
result = user_dir
return result
def clear(self):
for attr in ('gpgconf_string', 'gpg_string', 'user_run_dir'):
try:
delattr(self, attr)
except AttributeError:
pass
GpgConstants = _GpgConstants()
def ensure_gpg(reevaluate=False):
if reevaluate:
GpgConstants.clear()
if GpgConstants.user_run_dir is not None:
GpgConstants.gpgconf_string
GpgConstants.gpg_string
return True
def has_gpg(*args, **kwargs):
try:
return ensure_gpg(*args, **kwargs)
except SpackGPGError:
return False
# NOTE(opadron): When adding methods to this class, consider adding convenience
# wrapper functions further down in this file.
class Gpg(object):
def __init__(self, gnupg_home=None):
self.gnupg_home = get_gnupg_home(gnupg_home)
@cached_property
def prep(self):
# Make sure that suitable versions of gpgconf and gpg are available
ensure_gpg()
# Make sure that the GNUPGHOME exists
if not os.path.exists(self.gnupg_home):
os.makedirs(self.gnupg_home)
os.chmod(self.gnupg_home, 0o700)
if not os.path.isdir(self.gnupg_home):
raise SpackGPGError(
'GNUPGHOME "{0}" exists and is not a directory'.format(
self.gnupg_home))
if GpgConstants.user_run_dir is not None:
self.gpgconf_exe('--create-socketdir')
return True
@cached_property
def gpgconf_exe(self):
exe = spack.util.executable.Executable(GpgConstants.gpgconf_string)
exe.add_default_env('GNUPGHOME', self.gnupg_home)
return exe
@cached_property
def gpg_exe(self):
exe = spack.util.executable.Executable(GpgConstants.gpg_string)
exe.add_default_env('GNUPGHOME', self.gnupg_home)
return exe
def __call__(self, *args, **kwargs):
if self.prep:
return self.gpg_exe(*args, **kwargs)
def create(self, **kwargs):
r, w = os.pipe()
r = os.fdopen(r, 'r')
w = os.fdopen(w, 'w')
w.write('''
Key-Type: rsa
Key-Length: 4096
Key-Usage: sign
Name-Real: %(name)s
Name-Email: %(email)s
Name-Comment: %(comment)s
Expire-Date: %(expires)s
%%no-protection
%%commit
''' % kwargs)
w.close()
self('--gen-key', '--batch', input=r)
r.close()
def signing_keys(self, *args):
output = self('--list-secret-keys', '--with-colons', '--fingerprint',
*args, output=str)
return parse_secret_keys_output(output)
def public_keys(self, *args):
output = self('--list-public-keys', '--with-colons', '--fingerprint',
*args, output=str)
return parse_public_keys_output(output)
def export_keys(self, location, keys, secret=False):
if secret:
self("--export-secret-keys", "--armor", "--output", location, *keys)
else:
self('--batch', '--yes', '--armor', '--export', '--output',
location, *keys)
def trust(self, keyfile):
self('--import', keyfile)
def untrust(self, signing, *keys):
if signing:
skeys = self.signing_keys(*keys)
self('--batch', '--yes', '--delete-secret-keys', *skeys)
pkeys = self.public_keys(*keys)
self('--batch', '--yes', '--delete-keys', *pkeys)
def sign(self, key, file, output, clearsign=False):
self(('--clearsign' if clearsign else '--detach-sign'),
'--armor', '--default-key', key,
'--output', output, file)
def verify(self, signature, file, suppress_warnings=False):
self('--verify', signature, file,
**({'error': str} if suppress_warnings else {}))
def list(self, trusted, signing):
if trusted:
self('--list-public-keys')
if signing:
self('--list-secret-keys')
class SpackGPGError(spack.error.SpackError):
"""Class raised when GPG errors are detected."""
# Convenience wrappers for methods of the Gpg class
# __call__ is a bit of a special case, since the Gpg instance is, itself, the
# "thing" that is being called.
@functools.wraps(Gpg.__call__)
def gpg(*args, **kwargs):
return get_global_gpg_instance()(*args, **kwargs)
@_autoinit
def create(**kwargs):
"""Create a new key pair."""
r, w = os.pipe()
with contextlib.closing(os.fdopen(r, 'r')) as r:
with contextlib.closing(os.fdopen(w, 'w')) as w:
w.write('''
Key-Type: rsa
Key-Length: 4096
Key-Usage: sign
Name-Real: %(name)s
Name-Email: %(email)s
Name-Comment: %(comment)s
Expire-Date: %(expires)s
%%no-protection
%%commit
''' % kwargs)
GPG('--gen-key', '--batch', input=r)
gpg.name = 'gpg' # type: ignore[attr-defined]
@_autoinit
def signing_keys(*args):
"""Return the keys that can be used to sign binaries."""
output = GPG(
'--list-secret-keys', '--with-colons', '--fingerprint',
*args, output=str
)
return _parse_secret_keys_output(output)
@functools.wraps(Gpg.create)
def create(*args, **kwargs):
return get_global_gpg_instance().create(*args, **kwargs)
@_autoinit
def public_keys(*args):
"""Return the keys that can be used to verify binaries."""
output = GPG(
'--list-public-keys', '--with-colons', '--fingerprint',
*args, output=str
)
return _parse_public_keys_output(output)
@functools.wraps(Gpg.signing_keys)
def signing_keys(*args, **kwargs):
return get_global_gpg_instance().signing_keys(*args, **kwargs)
@_autoinit
def export_keys(location, keys, secret=False):
"""Export public keys to a location passed as argument.
Args:
location (str): where to export the keys
keys (list): keys to be exported
secret (bool): whether to export secret keys or not
"""
if secret:
GPG("--export-secret-keys", "--armor", "--output", location, *keys)
else:
GPG("--batch", "--yes", "--armor", "--export", "--output", location, *keys)
@functools.wraps(Gpg.public_keys)
def public_keys(*args, **kwargs):
return get_global_gpg_instance().public_keys(*args, **kwargs)
@_autoinit
def trust(keyfile):
"""Import a public key from a file.
Args:
keyfile (str): file with the public key
"""
GPG('--import', keyfile)
@functools.wraps(Gpg.export_keys)
def export_keys(*args, **kwargs):
return get_global_gpg_instance().export_keys(*args, **kwargs)
@_autoinit
def untrust(signing, *keys):
"""Delete known keys.
Args:
signing (bool): if True deletes the secret keys
*keys: keys to be deleted
"""
if signing:
skeys = signing_keys(*keys)
GPG('--batch', '--yes', '--delete-secret-keys', *skeys)
pkeys = public_keys(*keys)
GPG('--batch', '--yes', '--delete-keys', *pkeys)
@functools.wraps(Gpg.trust)
def trust(*args, **kwargs):
return get_global_gpg_instance().trust(*args, **kwargs)
@_autoinit
def sign(key, file, output, clearsign=False):
"""Sign a file with a key.
Args:
key: key to be used to sign
file (str): file to be signed
output (str): output file (either the clearsigned file or
the detached signature)
clearsign (bool): if True wraps the document in an ASCII-armored
signature, if False creates a detached signature
"""
signopt = '--clearsign' if clearsign else '--detach-sign'
GPG(signopt, '--armor', '--default-key', key, '--output', output, file)
@functools.wraps(Gpg.untrust)
def untrust(*args, **kwargs):
return get_global_gpg_instance().untrust(*args, **kwargs)
@_autoinit
def verify(signature, file, suppress_warnings=False):
"""Verify the signature on a file.
Args:
signature (str): signature of the file
file (str): file to be verified
suppress_warnings (bool): whether or not to suppress warnings
from GnuPG
"""
kwargs = {'error': str} if suppress_warnings else {}
GPG('--verify', signature, file, **kwargs)
@functools.wraps(Gpg.sign)
def sign(*args, **kwargs):
return get_global_gpg_instance().sign(*args, **kwargs)
@_autoinit
def list(trusted, signing):
"""List known keys.
Args:
trusted (bool): if True list public keys
signing (bool): if True list private keys
"""
if trusted:
GPG('--list-public-keys')
if signing:
GPG('--list-secret-keys')
@functools.wraps(Gpg.verify)
def verify(*args, **kwargs):
return get_global_gpg_instance().verify(*args, **kwargs)
def _verify_exe_or_raise(exe):
msg = (
'Spack requires gpgconf version >= 2\n'
' To install a suitable version using Spack, run\n'
' spack install gnupg@2:\n'
' and load it by running\n'
' spack load gnupg@2:'
)
if not exe:
raise SpackGPGError(msg)
output = exe('--version', output=str)
match = re.search(r"^gpg(conf)? \(GnuPG\) (.*)$", output, re.M)
if not match:
raise SpackGPGError(
'Could not determine "{0}" version'.format(exe.name)
)
if spack.version.Version(match.group(2)) < spack.version.Version('2'):
raise SpackGPGError(msg)
@functools.wraps(Gpg.list)
def list(*args, **kwargs):
return get_global_gpg_instance().list(*args, **kwargs)
def _gpgconf():
exe = spack.util.executable.which('gpgconf', 'gpg2conf', 'gpgconf2')
_verify_exe_or_raise(exe)
# ensure that the gpgconf we found can run "gpgconf --create-socketdir"
try:
exe('--dry-run', '--create-socketdir')
except spack.util.executable.ProcessError:
# no dice
exe = None
return exe
def _gpg():
exe = spack.util.executable.which('gpg2', 'gpg')
_verify_exe_or_raise(exe)
return exe
def _socket_dir(gpgconf):
# Try to ensure that (/var)/run/user/$(id -u) exists so that
# `gpgconf --create-socketdir` can be run later.
#
# NOTE(opadron): This action helps prevent a large class of
# "file-name-too-long" errors in gpg.
# If there is no suitable gpgconf, don't even bother trying to
# pre-create a user run dir.
if not gpgconf:
return None
result = None
for var_run in ('/run', '/var/run'):
if not os.path.exists(var_run):
continue
var_run_user = os.path.join(var_run, 'user')
try:
if not os.path.exists(var_run_user):
os.mkdir(var_run_user)
os.chmod(var_run_user, 0o777)
user_dir = os.path.join(var_run_user, str(os.getuid()))
if not os.path.exists(user_dir):
os.mkdir(user_dir)
os.chmod(user_dir, 0o700)
# If the above operation fails due to lack of permissions, then
# just carry on without running gpgconf and hope for the best.
#
# NOTE(opadron): Without a dir in which to create a socket for IPC,
# gnupg may fail if GNUPGHOME is set to a path that
# is too long, where "too long" in this context is
# actually quite short; somewhere in the
# neighborhood of more than 100 characters.
#
# TODO(opadron): Maybe a warning should be printed in this case?
except OSError as exc:
if exc.errno not in (errno.EPERM, errno.EACCES):
raise
user_dir = None
# return the last iteration that provides a usable user run dir
if user_dir is not None:
result = user_dir
return result