Create rename utility function

os.rename() fails on Windows if file already exists.

Create getuid utility function (#21736)

On Windows, replace os.getuid with ctypes.windll.shell32.IsUserAnAdmin().

Tests: Use getuid util function

Co-authored-by: lou.lawrence@kitware.com <lou.lawrence@kitware.com>
Co-authored-by: Betsy McPhail <betsy.mcphail@kitware.com>
This commit is contained in:
Betsy McPhail 2021-01-26 14:30:32 -05:00 committed by Peter Scheibel
parent b60a0eea01
commit a7de2fa380
11 changed files with 45 additions and 23 deletions

View file

@ -6,6 +6,7 @@
import errno
import glob
import grp
import ctypes
import hashlib
import itertools
import numbers
@ -44,6 +45,7 @@
'fix_darwin_install_name',
'force_remove',
'force_symlink',
'getuid',
'chgrp',
'chmod_x',
'copy',
@ -60,6 +62,7 @@
'remove_directory_contents',
'remove_if_dead_link',
'remove_linked_tree',
'rename',
'set_executable',
'set_install_permissions',
'touch',
@ -71,6 +74,23 @@
]
def getuid():
if _platform == "win32":
if ctypes.windll.shell32.IsUserAnAdmin() == 0:
return 1
return 0
else:
return os.getuid()
def rename(src, dst):
# On Windows, os.rename will fail if the destination file already exists
if is_windows:
if os.path.exists(dst):
os.remove(dst)
os.rename(src, dst)
def path_contains_subdirectory(path, root):
norm_root = os.path.abspath(root).rstrip(os.path.sep) + os.path.sep
norm_path = os.path.abspath(path).rstrip(os.path.sep) + os.path.sep
@ -293,7 +313,7 @@ def group_ids(uid=None):
(list of int): gids of groups the user is a member of
"""
if uid is None:
uid = os.getuid()
uid = getuid()
user = pwd.getpwuid(uid).pw_name
return [g.gr_gid for g in grp.getgrall() if user in g.gr_mem]

View file

@ -45,7 +45,7 @@
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util.filesystem import mkdirp
from llnl.util.filesystem import mkdirp, rename
import spack.compilers
import spack.paths
@ -292,7 +292,7 @@ def _write_section(self, section):
with open(tmp, 'w') as f:
syaml.dump_config(data_to_write, stream=f,
default_flow_style=False)
os.rename(tmp, self.path)
rename(tmp, self.path)
except (yaml.YAMLError, IOError) as e:
raise ConfigFileError(
"Error writing to config file: '%s'" % str(e))

View file

@ -1001,7 +1001,8 @@ def _write(self, type, value, traceback):
try:
with open(temp_file, 'w') as f:
self._write_to_file(f)
os.rename(temp_file, self._index_path)
fs.rename(temp_file, self._index_path)
if _use_uuid:
with open(self._verifier_path, 'w') as f:
new_verifier = str(uuid.uuid4())

View file

@ -565,7 +565,7 @@ def _write_extensions(self, spec, extensions):
}, tmp, default_flow_style=False, encoding='utf-8')
# Atomic update by moving tmpfile on top of old one.
os.rename(tmp.name, path)
fs.rename(tmp.name, path)
class DirectoryLayoutError(SpackError):

View file

@ -38,6 +38,7 @@
from llnl.util.filesystem import (
get_single_file,
mkdirp,
rename,
temp_cwd,
temp_rename,
working_dir,
@ -324,7 +325,7 @@ def fetch(self):
try:
partial_file, save_file = self._fetch_from_url(url)
if save_file and (partial_file is not None):
os.rename(partial_file, save_file)
rename(partial_file, save_file)
break
except FailedDownloadError as e:
errors.append(str(e))
@ -1399,7 +1400,7 @@ def fetch(self):
warn_content_type_mismatch(self.archive_file or "the archive")
if self.stage.save_filename:
os.rename(
rename(
os.path.join(self.stage.path, basename),
self.stage.save_filename)

View file

@ -456,12 +456,12 @@ def test_pkg_build_paths(install_mockery):
# Now check the newer log filename
last_log = 'spack-build.txt'
os.rename(older_log, last_log)
fs.rename(older_log, last_log)
assert spec.package.log_path.endswith(last_log)
# Check the old environment file
last_env = 'spack-build.env'
os.rename(last_log, last_env)
fs.rename(last_log, last_env)
assert spec.package.env_path.endswith(last_env)
# Cleanup
@ -492,12 +492,12 @@ def test_pkg_install_paths(install_mockery):
# Now check the newer install log filename
last_log = 'build.txt'
os.rename(older_log, last_log)
fs.rename(older_log, last_log)
assert spec.package.install_log_path.endswith(last_log)
# Check the old install environment file
last_env = 'build.env'
os.rename(last_log, last_env)
fs.rename(last_log, last_env)
assert spec.package.install_env_path.endswith(last_env)
# Cleanup

View file

@ -59,7 +59,7 @@
import llnl.util.lock as lk
import llnl.util.multiproc as mp
from llnl.util.filesystem import touch
from llnl.util.filesystem import getuid, touch
#
# This test can be run with MPI. MPI is "enabled" if we can import
@ -580,7 +580,7 @@ def test_write_lock_timeout_with_multiple_readers_3_2_ranges(lock_path):
TimeoutWrite(lock_path, 5, 1))
@pytest.mark.skipif(os.getuid() == 0, reason='user is root')
@pytest.mark.skipif(getuid() == 0, reason='user is root')
def test_read_lock_on_read_only_lockfile(lock_dir, lock_path):
"""read-only directory, read-only lockfile."""
touch(lock_path)

View file

@ -13,7 +13,7 @@
import pytest
from llnl.util.filesystem import mkdirp, partition_path, touch, working_dir
from llnl.util.filesystem import getuid, mkdirp, partition_path, touch, working_dir
import spack.paths
import spack.stage
@ -357,7 +357,7 @@ def check_stage_dir_perms(prefix, path):
user = getpass.getuser()
prefix_status = os.stat(prefix)
uid = os.getuid()
uid = getuid()
# Obtain lists of ancestor and descendant paths of the $user node, if any.
#
@ -753,7 +753,7 @@ def _stat(path):
# The following check depends on the patched os.stat as a poor
# substitute for confirming the generated warnings.
assert os.stat(user_path).st_uid != os.getuid()
assert os.stat(user_path).st_uid != getuid()
def test_resolve_paths(self):
"""Test _resolve_paths."""

View file

@ -8,7 +8,7 @@
import pytest
from llnl.util.filesystem import group_ids
from llnl.util.filesystem import getuid, group_ids
import spack.config
import spack.util.lock as lk
@ -42,7 +42,7 @@ def test_disable_locking(tmpdir):
@pytest.mark.nomockstage
def test_lock_checks_user(tmpdir):
"""Ensure lock checks work with a self-owned, self-group repo."""
uid = os.getuid()
uid = getuid()
if uid not in group_ids():
pytest.skip("user has no group with gid == uid")
@ -76,7 +76,7 @@ def test_lock_checks_user(tmpdir):
@pytest.mark.nomockstage
def test_lock_checks_group(tmpdir):
"""Ensure lock checks work with a self-owned, non-self-group repo."""
uid = os.getuid()
uid = getuid()
gid = next((g for g in group_ids() if g != uid), None)
if not gid:
pytest.skip("user has no group with gid != uid")

View file

@ -6,7 +6,7 @@
import os
import shutil
from llnl.util.filesystem import mkdirp
from llnl.util.filesystem import mkdirp, rename
from spack.error import SpackError
from spack.util.lock import Lock, ReadTransaction, WriteTransaction
@ -145,7 +145,7 @@ def __exit__(cm, type, value, traceback): # noqa
shutil.rmtree(cm.tmp_filename, True)
else:
os.rename(cm.tmp_filename, cm.orig_filename)
rename(cm.tmp_filename, cm.orig_filename)
return WriteTransaction(
self._get_lock(key), acquire=WriteContextManager)

View file

@ -22,7 +22,7 @@
import llnl.util.lang
import llnl.util.tty as tty
from llnl.util.filesystem import mkdirp
from llnl.util.filesystem import mkdirp, rename
import spack.config
import spack.error
@ -173,7 +173,7 @@ def push_to_url(
shutil.copy(local_file_path, remote_file_path)
else:
try:
os.rename(local_file_path, remote_file_path)
rename(local_file_path, remote_file_path)
except OSError as e:
if e.errno == errno.EXDEV:
# NOTE(opadron): The above move failed because it crosses