Remove **kwargs from function signatures in llnl.util.filesystem (#34804)

Since we dropped support for Python 2.7, we can embrace using keyword only arguments 
for many functions in Spack that use **kwargs in the function signature. Here this is done 
for the llnl.util.filesystem module.

There were a couple of bugs lurking in the code related to typo-like errors when retrieving
from kwargs. Those have been fixed as well.
This commit is contained in:
Massimiliano Culpo 2023-01-10 14:23:42 +01:00 committed by GitHub
parent cc333b600c
commit 9d00e7d15d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -17,6 +17,7 @@
import tempfile
from contextlib import contextmanager
from sys import platform as _platform
from typing import Callable, List, Match, Optional, Tuple, Union
from llnl.util import tty
from llnl.util.lang import dedupe, memoized
@ -214,7 +215,16 @@ def same_path(path1, path2):
return norm1 == norm2
def filter_file(regex, repl, *filenames, **kwargs):
def filter_file(
regex: str,
repl: Union[str, Callable[[Match], str]],
*filenames: str,
string: bool = False,
backup: bool = False,
ignore_absent: bool = False,
start_at: Optional[str] = None,
stop_at: Optional[str] = None,
) -> None:
r"""Like sed, but uses python regular expressions.
Filters every line of each file through regex and replaces the file
@ -226,12 +236,10 @@ def filter_file(regex, repl, *filenames, **kwargs):
can contain ``\1``, ``\2``, etc. to represent back-substitution
as sed would allow.
Parameters:
Args:
regex (str): The regular expression to search for
repl (str): The string to replace matches with
*filenames: One or more files to search and replace
Keyword Arguments:
string (bool): Treat regex as a plain string. Default it False
backup (bool): Make backup file(s) suffixed with ``~``. Default is False
ignore_absent (bool): Ignore any files that don't exist.
@ -246,17 +254,11 @@ def filter_file(regex, repl, *filenames, **kwargs):
file is copied verbatim. Default is to filter until the end of the
file.
"""
string = kwargs.get("string", False)
backup = kwargs.get("backup", False)
ignore_absent = kwargs.get("ignore_absent", False)
start_at = kwargs.get("start_at", None)
stop_at = kwargs.get("stop_at", None)
# Allow strings to use \1, \2, etc. for replacement, like sed
if not callable(repl):
unescaped = repl.replace(r"\\", "\\")
def replace_groups_with_groupid(m):
def replace_groups_with_groupid(m: Match) -> str:
def groupid_to_group(x):
return m.group(int(x.group(1)))
@ -290,15 +292,14 @@ def groupid_to_group(x):
shutil.copy(filename, tmp_filename)
try:
# To avoid translating line endings (\n to \r\n and vis versa)
# Open as a text file and filter until the end of the file is
# reached, or we found a marker in the line if it was specified
#
# To avoid translating line endings (\n to \r\n and vice-versa)
# we force os.open to ignore translations and use the line endings
# the file comes with
extra_kwargs = {"errors": "surrogateescape", "newline": ""}
# Open as a text file and filter until the end of the file is
# reached or we found a marker in the line if it was specified
with open(tmp_filename, mode="r", **extra_kwargs) as input_file:
with open(filename, mode="w", **extra_kwargs) as output_file:
with open(tmp_filename, mode="r", errors="surrogateescape", newline="") as input_file:
with open(filename, mode="w", errors="surrogateescape", newline="") as output_file:
do_filtering = start_at is None
# Using iter and readline is a workaround needed not to
# disable input_file.tell(), which will happen if we call
@ -321,10 +322,10 @@ def groupid_to_group(x):
# If we stopped filtering at some point, reopen the file in
# binary mode and copy verbatim the remaining part
if current_position and stop_at:
with open(tmp_filename, mode="rb") as input_file:
input_file.seek(current_position)
with open(filename, mode="ab") as output_file:
output_file.writelines(input_file.readlines())
with open(tmp_filename, mode="rb") as input_binary_buffer:
input_binary_buffer.seek(current_position)
with open(filename, mode="ab") as output_binary_buffer:
output_binary_buffer.writelines(input_binary_buffer.readlines())
except BaseException:
# clean up the original file on failure.
@ -343,8 +344,26 @@ class FileFilter(object):
def __init__(self, *filenames):
self.filenames = filenames
def filter(self, regex, repl, **kwargs):
return filter_file(regex, repl, *self.filenames, **kwargs)
def filter(
self,
regex: str,
repl: Union[str, Callable[[Match], str]],
string: bool = False,
backup: bool = False,
ignore_absent: bool = False,
start_at: Optional[str] = None,
stop_at: Optional[str] = None,
) -> None:
return filter_file(
regex,
repl,
*self.filenames,
string=string,
backup=backup,
ignore_absent=ignore_absent,
start_at=start_at,
stop_at=stop_at,
)
def change_sed_delimiter(old_delim, new_delim, *filenames):
@ -652,7 +671,13 @@ def resolve_link_target_relative_to_the_link(link):
@system_path_filter
def copy_tree(src, dest, symlinks=True, ignore=None, _permissions=False):
def copy_tree(
src: str,
dest: str,
symlinks: bool = True,
ignore: Optional[Callable[[str], bool]] = None,
_permissions: bool = False,
):
"""Recursively copy an entire directory tree rooted at *src*.
If the destination directory *dest* does not already exist, it will
@ -710,7 +735,7 @@ def copy_tree(src, dest, symlinks=True, ignore=None, _permissions=False):
abs_src,
abs_dest,
order="pre",
follow_symlinks=not symlinks,
follow_links=not symlinks,
ignore=ignore,
follow_nonexisting=True,
):
@ -812,45 +837,32 @@ def chgrp_if_not_world_writable(path, group):
chgrp(path, group)
def mkdirp(*paths, **kwargs):
def mkdirp(
*paths: str,
mode: Optional[int] = None,
group: Optional[Union[str, int]] = None,
default_perms: Optional[str] = None,
):
"""Creates a directory, as well as parent directories if needed.
Arguments:
paths (str): paths to create with mkdirp
Keyword Aguments:
mode (permission bits or None): optional permissions to set
on the created directory -- use OS default if not provided
group (group name or None): optional group for permissions of
final created directory -- use OS default if not provided. Only
used if world write permissions are not set
default_perms (str or None): one of 'parents' or 'args'. The default permissions
that are set for directories that are not themselves an argument
for mkdirp. 'parents' means intermediate directories get the
permissions of their direct parent directory, 'args' means
intermediate get the same permissions specified in the arguments to
paths: paths to create with mkdirp
mode: optional permissions to set on the created directory -- use OS default
if not provided
group: optional group for permissions of final created directory -- use OS
default if not provided. Only used if world write permissions are not set
default_perms: one of 'parents' or 'args'. The default permissions that are set for
directories that are not themselves an argument for mkdirp. 'parents' means
intermediate directories get the permissions of their direct parent directory,
'args' means intermediate get the same permissions specified in the arguments to
mkdirp -- default value is 'args'
"""
mode = kwargs.get("mode", None)
group = kwargs.get("group", None)
default_perms = kwargs.get("default_perms", "args")
default_perms = default_perms or "args"
paths = path_to_os_path(*paths)
for path in paths:
if not os.path.exists(path):
try:
# detect missing intermediate folders
intermediate_folders = []
last_parent = ""
intermediate_path = os.path.dirname(path)
while intermediate_path:
if os.path.exists(intermediate_path):
last_parent = intermediate_path
break
intermediate_folders.append(intermediate_path)
intermediate_path = os.path.dirname(intermediate_path)
last_parent, intermediate_folders = longest_existing_parent(path)
# create folders
os.makedirs(path)
@ -884,7 +896,10 @@ def mkdirp(*paths, **kwargs):
os.chmod(intermediate_path, intermediate_mode)
if intermediate_group is not None:
chgrp_if_not_world_writable(intermediate_path, intermediate_group)
os.chmod(intermediate_path, intermediate_mode) # reset sticky bit after
if intermediate_mode is not None:
os.chmod(
intermediate_path, intermediate_mode
) # reset sticky bit after
except OSError as e:
if e.errno != errno.EEXIST or not os.path.isdir(path):
@ -893,6 +908,27 @@ def mkdirp(*paths, **kwargs):
raise OSError(errno.EEXIST, "File already exists", path)
def longest_existing_parent(path: str) -> Tuple[str, List[str]]:
"""Return the last existing parent and a list of all intermediate directories
to be created for the directory passed as input.
Args:
path: directory to be created
"""
# detect missing intermediate folders
intermediate_folders = []
last_parent = ""
intermediate_path = os.path.dirname(path)
while intermediate_path:
if os.path.lexists(intermediate_path):
last_parent = intermediate_path
break
intermediate_folders.append(intermediate_path)
intermediate_path = os.path.dirname(intermediate_path)
return last_parent, intermediate_folders
@system_path_filter
def force_remove(*paths):
"""Remove files without printing errors. Like ``rm -f``, does NOT
@ -906,8 +942,8 @@ def force_remove(*paths):
@contextmanager
@system_path_filter
def working_dir(dirname, **kwargs):
if kwargs.get("create", False):
def working_dir(dirname: str, *, create: bool = False):
if create:
mkdirp(dirname)
orig_dir = os.getcwd()
@ -1118,7 +1154,16 @@ def can_access(file_name):
@system_path_filter
def traverse_tree(source_root, dest_root, rel_path="", **kwargs):
def traverse_tree(
source_root: str,
dest_root: str,
rel_path: str = "",
*,
order: str = "pre",
ignore: Optional[Callable[[str], bool]] = None,
follow_nonexisting: bool = True,
follow_links: bool = False,
):
"""Traverse two filesystem trees simultaneously.
Walks the LinkTree directory in pre or post order. Yields each
@ -1150,16 +1195,11 @@ def traverse_tree(source_root, dest_root, rel_path="", **kwargs):
``src`` that do not exit in ``dest``. Default is True
follow_links (bool): Whether to descend into symlinks in ``src``
"""
follow_nonexisting = kwargs.get("follow_nonexisting", True)
follow_links = kwargs.get("follow_link", False)
# Yield in pre or post order?
order = kwargs.get("order", "pre")
if order not in ("pre", "post"):
raise ValueError("Order must be 'pre' or 'post'.")
# List of relative paths to ignore under the src root.
ignore = kwargs.get("ignore", None) or (lambda filename: False)
ignore = ignore or (lambda filename: False)
# Don't descend into ignored directories
if ignore(rel_path):
@ -1186,7 +1226,15 @@ def traverse_tree(source_root, dest_root, rel_path="", **kwargs):
# When follow_nonexisting isn't set, don't descend into dirs
# in source that do not exist in dest
if follow_nonexisting or os.path.exists(dest_child):
tuples = traverse_tree(source_root, dest_root, rel_child, **kwargs)
tuples = traverse_tree(
source_root,
dest_root,
rel_child,
order=order,
ignore=ignore,
follow_nonexisting=follow_nonexisting,
follow_links=follow_links,
)
for t in tuples:
yield t
@ -2573,13 +2621,15 @@ def keep_modification_time(*filenames):
@contextmanager
def temporary_dir(*args, **kwargs):
def temporary_dir(
suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None
):
"""Create a temporary directory and cd's into it. Delete the directory
on exit.
Takes the same arguments as tempfile.mkdtemp()
"""
tmp_dir = tempfile.mkdtemp(*args, **kwargs)
tmp_dir = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir)
try:
with working_dir(tmp_dir):
yield tmp_dir