Windows symlinking support (#38599)

This reapplies 66f7540, which adds supports for hardlinks/junctions on
Windows systems where developer mode is not enabled.

The commit was reverted on account of multiple issues:

* Checks added to prevent dangling symlinks were interfering with
  existing CI builds on Linux (i.e. builds that otherwise succeed were
  failing for creating dangling symlinks).
* The logic also updated symlinking to perform redirection of relative
  paths, which lead to malformed symlinks.

This commit fixes these issues.
This commit is contained in:
James Smillie 2023-08-25 13:18:19 -06:00 committed by GitHub
parent ecfd9ef12b
commit 349ba83bc6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 786 additions and 160 deletions

View file

@ -18,11 +18,13 @@
import sys
import tempfile
from contextlib import contextmanager
from itertools import accumulate
from typing import Callable, Iterable, List, Match, Optional, Tuple, Union
import llnl.util.symlink
from llnl.util import tty
from llnl.util.lang import dedupe, memoized
from llnl.util.symlink import islink, symlink
from llnl.util.symlink import islink, readlink, resolve_link_target_relative_to_the_link, symlink
from spack.util.executable import Executable, which
from spack.util.path import path_to_os_path, system_path_filter
@ -101,7 +103,7 @@ def _nop(args, ns=None, follow_symlinks=None):
pass
# follow symlinks (aka don't not follow symlinks)
follow = follow_symlinks or not (os.path.islink(src) and os.path.islink(dst))
follow = follow_symlinks or not (islink(src) and islink(dst))
if follow:
# use the real function if it exists
def lookup(name):
@ -169,7 +171,7 @@ def rename(src, dst):
if sys.platform == "win32":
# Windows path existence checks will sometimes fail on junctions/links/symlinks
# so check for that case
if os.path.exists(dst) or os.path.islink(dst):
if os.path.exists(dst) or islink(dst):
os.remove(dst)
os.rename(src, dst)
@ -566,7 +568,7 @@ def set_install_permissions(path):
# If this points to a file maintained in a Spack prefix, it is assumed that
# this function will be invoked on the target. If the file is outside a
# Spack-maintained prefix, the permissions should not be modified.
if os.path.islink(path):
if islink(path):
return
if os.path.isdir(path):
os.chmod(path, 0o755)
@ -635,7 +637,7 @@ def chmod_x(entry, perms):
@system_path_filter
def copy_mode(src, dest):
"""Set the mode of dest to that of src unless it is a link."""
if os.path.islink(dest):
if islink(dest):
return
src_mode = os.stat(src).st_mode
dest_mode = os.stat(dest).st_mode
@ -721,26 +723,12 @@ def install(src, dest):
copy(src, dest, _permissions=True)
@system_path_filter
def resolve_link_target_relative_to_the_link(link):
"""
os.path.isdir uses os.path.exists, which for links will check
the existence of the link target. If the link target is relative to
the link, we need to construct a pathname that is valid from
our cwd (which may not be the same as the link's directory)
"""
target = os.readlink(link)
if os.path.isabs(target):
return target
link_dir = os.path.dirname(os.path.abspath(link))
return os.path.join(link_dir, target)
@system_path_filter
def copy_tree(
src: str,
dest: str,
symlinks: bool = True,
allow_broken_symlinks: bool = sys.platform != "win32",
ignore: Optional[Callable[[str], bool]] = None,
_permissions: bool = False,
):
@ -763,6 +751,8 @@ def copy_tree(
src (str): the directory to copy
dest (str): the destination directory
symlinks (bool): whether or not to preserve symlinks
allow_broken_symlinks (bool): whether or not to allow broken (dangling) symlinks,
On Windows, setting this to True will raise an exception. Defaults to true on unix.
ignore (typing.Callable): function indicating which files to ignore
_permissions (bool): for internal use only
@ -770,6 +760,8 @@ def copy_tree(
IOError: if *src* does not match any files or directories
ValueError: if *src* is a parent directory of *dest*
"""
if allow_broken_symlinks and sys.platform == "win32":
raise llnl.util.symlink.SymlinkError("Cannot allow broken symlinks on Windows!")
if _permissions:
tty.debug("Installing {0} to {1}".format(src, dest))
else:
@ -783,6 +775,11 @@ def copy_tree(
if not files:
raise IOError("No such file or directory: '{0}'".format(src))
# For Windows hard-links and junctions, the source path must exist to make a symlink. Add
# all symlinks to this list while traversing the tree, then when finished, make all
# symlinks at the end.
links = []
for src in files:
abs_src = os.path.abspath(src)
if not abs_src.endswith(os.path.sep):
@ -805,7 +802,7 @@ def copy_tree(
ignore=ignore,
follow_nonexisting=True,
):
if os.path.islink(s):
if islink(s):
link_target = resolve_link_target_relative_to_the_link(s)
if symlinks:
target = os.readlink(s)
@ -819,7 +816,9 @@ def escaped_path(path):
tty.debug("Redirecting link {0} to {1}".format(target, new_target))
target = new_target
symlink(target, d)
links.append((target, d, s))
continue
elif os.path.isdir(link_target):
mkdirp(d)
else:
@ -834,9 +833,17 @@ def escaped_path(path):
set_install_permissions(d)
copy_mode(s, d)
for target, d, s in links:
symlink(target, d, allow_broken_symlinks=allow_broken_symlinks)
if _permissions:
set_install_permissions(d)
copy_mode(s, d)
@system_path_filter
def install_tree(src, dest, symlinks=True, ignore=None):
def install_tree(
src, dest, symlinks=True, ignore=None, allow_broken_symlinks=sys.platform != "win32"
):
"""Recursively install an entire directory tree rooted at *src*.
Same as :py:func:`copy_tree` with the addition of setting proper
@ -847,12 +854,21 @@ def install_tree(src, dest, symlinks=True, ignore=None):
dest (str): the destination directory
symlinks (bool): whether or not to preserve symlinks
ignore (typing.Callable): function indicating which files to ignore
allow_broken_symlinks (bool): whether or not to allow broken (dangling) symlinks,
On Windows, setting this to True will raise an exception.
Raises:
IOError: if *src* does not match any files or directories
ValueError: if *src* is a parent directory of *dest*
"""
copy_tree(src, dest, symlinks=symlinks, ignore=ignore, _permissions=True)
copy_tree(
src,
dest,
symlinks=symlinks,
allow_broken_symlinks=allow_broken_symlinks,
ignore=ignore,
_permissions=True,
)
@system_path_filter
@ -1256,7 +1272,12 @@ def traverse_tree(
Keyword Arguments:
order (str): Whether to do pre- or post-order traversal. Accepted
values are 'pre' and 'post'
ignore (typing.Callable): function indicating which files to ignore
ignore (typing.Callable): function indicating which files to ignore. This will also
ignore symlinks if they point to an ignored file (regardless of whether the symlink
is explicitly ignored); note this only supports one layer of indirection (i.e. if
you have x -> y -> z, and z is ignored but x/y are not, then y would be ignored
but not x). To avoid this, make sure the ignore function also ignores the symlink
paths too.
follow_nonexisting (bool): Whether to descend into directories in
``src`` that do not exit in ``dest``. Default is True
follow_links (bool): Whether to descend into symlinks in ``src``
@ -1283,11 +1304,24 @@ def traverse_tree(
dest_child = os.path.join(dest_path, f)
rel_child = os.path.join(rel_path, f)
# If the source path is a link and the link's source is ignored, then ignore the link too,
# but only do this if the ignore is defined.
if ignore is not None:
if islink(source_child) and not follow_links:
target = readlink(source_child)
all_parents = accumulate(target.split(os.sep), lambda x, y: os.path.join(x, y))
if any(map(ignore, all_parents)):
tty.warn(
f"Skipping {source_path} because the source or a part of the source's "
f"path is included in the ignores."
)
continue
# Treat as a directory
# TODO: for symlinks, os.path.isdir looks for the link target. If the
# target is relative to the link, then that may not resolve properly
# relative to our cwd - see resolve_link_target_relative_to_the_link
if os.path.isdir(source_child) and (follow_links or not os.path.islink(source_child)):
if os.path.isdir(source_child) and (follow_links or not islink(source_child)):
# When follow_nonexisting isn't set, don't descend into dirs
# in source that do not exist in dest
if follow_nonexisting or os.path.exists(dest_child):
@ -1313,7 +1347,11 @@ def traverse_tree(
def lexists_islink_isdir(path):
"""Computes the tuple (lexists(path), islink(path), isdir(path)) in a minimal
number of stat calls."""
number of stat calls on unix. Use os.path and symlink.islink methods for windows."""
if sys.platform == "win32":
if not os.path.lexists(path):
return False, False, False
return os.path.lexists(path), islink(path), os.path.isdir(path)
# First try to lstat, so we know if it's a link or not.
try:
lst = os.lstat(path)
@ -1528,7 +1566,7 @@ def remove_if_dead_link(path):
Parameters:
path (str): The potential dead link
"""
if os.path.islink(path) and not os.path.exists(path):
if islink(path) and not os.path.exists(path):
os.unlink(path)
@ -1587,7 +1625,7 @@ def remove_linked_tree(path):
kwargs["onerror"] = readonly_file_handler(ignore_errors=True)
if os.path.exists(path):
if os.path.islink(path):
if islink(path):
shutil.rmtree(os.path.realpath(path), **kwargs)
os.unlink(path)
else:
@ -2693,7 +2731,7 @@ def remove_directory_contents(dir):
"""Remove all contents of a directory."""
if os.path.exists(dir):
for entry in [os.path.join(dir, entry) for entry in os.listdir(dir)]:
if os.path.isfile(entry) or os.path.islink(entry):
if os.path.isfile(entry) or islink(entry):
os.unlink(entry)
else:
shutil.rmtree(entry)

View file

@ -2,77 +2,188 @@
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import errno
import os
import re
import shutil
import subprocess
import sys
import tempfile
from os.path import exists, join
from llnl.util import lang
from llnl.util import lang, tty
from spack.error import SpackError
from spack.util.path import system_path_filter
if sys.platform == "win32":
from win32file import CreateHardLink
is_windows = sys.platform == "win32"
def symlink(real_path, link_path):
"""
Create a symbolic link.
On Windows, use junctions if os.symlink fails.
def symlink(source_path: str, link_path: str, allow_broken_symlinks: bool = not is_windows):
"""
if sys.platform != "win32":
os.symlink(real_path, link_path)
elif _win32_can_symlink():
# Windows requires target_is_directory=True when the target is a dir.
os.symlink(real_path, link_path, target_is_directory=os.path.isdir(real_path))
else:
try:
# Try to use junctions
_win32_junction(real_path, link_path)
except OSError as e:
if e.errno == errno.EEXIST:
# EEXIST error indicates that file we're trying to "link"
# is already present, don't bother trying to copy which will also fail
# just raise
raise
Create a link.
On non-Windows and Windows with System Administrator
privleges this will be a normal symbolic link via
os.symlink.
On Windows without privledges the link will be a
junction for a directory and a hardlink for a file.
On Windows the various link types are:
Symbolic Link: A link to a file or directory on the
same or different volume (drive letter) or even to
a remote file or directory (using UNC in its path).
Need System Administrator privileges to make these.
Hard Link: A link to a file on the same volume (drive
letter) only. Every file (file's data) has at least 1
hard link (file's name). But when this method creates
a new hard link there will be 2. Deleting all hard
links effectively deletes the file. Don't need System
Administrator privileges.
Junction: A link to a directory on the same or different
volume (drive letter) but not to a remote directory. Don't
need System Administrator privileges.
Parameters:
source_path (str): The real file or directory that the link points to.
Must be absolute OR relative to the link.
link_path (str): The path where the link will exist.
allow_broken_symlinks (bool): On Linux or Mac, don't raise an exception if the source_path
doesn't exist. This will still raise an exception on Windows.
"""
source_path = os.path.normpath(source_path)
win_source_path = source_path
link_path = os.path.normpath(link_path)
# Never allow broken links on Windows.
if sys.platform == "win32" and allow_broken_symlinks:
raise ValueError("allow_broken_symlinks parameter cannot be True on Windows.")
if not allow_broken_symlinks:
# Perform basic checks to make sure symlinking will succeed
if os.path.lexists(link_path):
raise SymlinkError(f"Link path ({link_path}) already exists. Cannot create link.")
if not os.path.exists(source_path):
if os.path.isabs(source_path) and not allow_broken_symlinks:
# An absolute source path that does not exist will result in a broken link.
raise SymlinkError(
f"Source path ({source_path}) is absolute but does not exist. Resulting "
f"link would be broken so not making link."
)
else:
# If all else fails, fall back to copying files
shutil.copyfile(real_path, link_path)
# os.symlink can create a link when the given source path is relative to
# the link path. Emulate this behavior and check to see if the source exists
# relative to the link patg ahead of link creation to prevent broken
# links from being made.
link_parent_dir = os.path.dirname(link_path)
relative_path = os.path.join(link_parent_dir, source_path)
if os.path.exists(relative_path):
# In order to work on windows, the source path needs to be modified to be
# relative because hardlink/junction dont resolve relative paths the same
# way as os.symlink. This is ignored on other operating systems.
win_source_path = relative_path
elif not allow_broken_symlinks:
raise SymlinkError(
f"The source path ({source_path}) is not relative to the link path "
f"({link_path}). Resulting link would be broken so not making link."
)
# Create the symlink
if sys.platform == "win32" and not _windows_can_symlink():
_windows_create_link(win_source_path, link_path)
else:
os.symlink(source_path, link_path, target_is_directory=os.path.isdir(source_path))
def islink(path):
return os.path.islink(path) or _win32_is_junction(path)
def islink(path: str) -> bool:
"""Override os.islink to give correct answer for spack logic.
For Non-Windows: a link can be determined with the os.path.islink method.
Windows-only methods will return false for other operating systems.
For Windows: spack considers symlinks, hard links, and junctions to
all be links, so if any of those are True, return True.
Args:
path (str): path to check if it is a link.
Returns:
bool - whether the path is any kind link or not.
"""
return any([os.path.islink(path), _windows_is_junction(path), _windows_is_hardlink(path)])
# '_win32' functions based on
# https://github.com/Erotemic/ubelt/blob/master/ubelt/util_links.py
def _win32_junction(path, link):
# junctions require absolute paths
if not os.path.isabs(link):
link = os.path.abspath(link)
def _windows_is_hardlink(path: str) -> bool:
"""Determines if a path is a windows hard link. This is accomplished
by looking at the number of links using os.stat. A non-hard-linked file
will have a st_nlink value of 1, whereas a hard link will have a value
larger than 1. Note that both the original and hard-linked file will
return True because they share the same inode.
# os.symlink will fail if link exists, emulate the behavior here
if exists(link):
raise OSError(errno.EEXIST, "File exists: %s -> %s" % (link, path))
Args:
path (str): Windows path to check for a hard link
if not os.path.isabs(path):
parent = os.path.join(link, os.pardir)
path = os.path.join(parent, path)
path = os.path.abspath(path)
Returns:
bool - Whether the path is a hard link or not.
"""
if sys.platform != "win32" or os.path.islink(path) or not os.path.exists(path):
return False
CreateHardLink(link, path)
return os.stat(path).st_nlink > 1
def _windows_is_junction(path: str) -> bool:
"""Determines if a path is a windows junction. A junction can be
determined using a bitwise AND operation between the file's
attribute bitmask and the known junction bitmask (0x400).
Args:
path (str): A non-file path
Returns:
bool - whether the path is a junction or not.
"""
if sys.platform != "win32" or os.path.islink(path) or os.path.isfile(path):
return False
import ctypes.wintypes
get_file_attributes = ctypes.windll.kernel32.GetFileAttributesW # type: ignore[attr-defined]
get_file_attributes.argtypes = (ctypes.wintypes.LPWSTR,)
get_file_attributes.restype = ctypes.wintypes.DWORD
invalid_file_attributes = 0xFFFFFFFF
reparse_point = 0x400
file_attr = get_file_attributes(str(path))
if file_attr == invalid_file_attributes:
return False
return file_attr & reparse_point > 0
@lang.memoized
def _win32_can_symlink():
def _windows_can_symlink() -> bool:
"""
Determines if windows is able to make a symlink depending on
the system configuration and the level of the user's permissions.
"""
if sys.platform != "win32":
tty.warn("windows_can_symlink method can't be used on non-Windows OS.")
return False
tempdir = tempfile.mkdtemp()
dpath = join(tempdir, "dpath")
fpath = join(tempdir, "fpath.txt")
dpath = os.path.join(tempdir, "dpath")
fpath = os.path.join(tempdir, "fpath.txt")
dlink = join(tempdir, "dlink")
flink = join(tempdir, "flink.txt")
dlink = os.path.join(tempdir, "dlink")
flink = os.path.join(tempdir, "flink.txt")
import llnl.util.filesystem as fs
@ -96,24 +207,136 @@ def _win32_can_symlink():
return can_symlink_directories and can_symlink_files
def _win32_is_junction(path):
def _windows_create_link(source: str, link: str):
"""
Determines if a path is a win32 junction
Attempts to create a Hard Link or Junction as an alternative
to a symbolic link. This is called when symbolic links cannot
be created.
"""
if os.path.islink(path):
return False
if sys.platform != "win32":
raise SymlinkError("windows_create_link method can't be used on non-Windows OS.")
elif os.path.isdir(source):
_windows_create_junction(source=source, link=link)
elif os.path.isfile(source):
_windows_create_hard_link(path=source, link=link)
else:
raise SymlinkError(
f"Cannot create link from {source}. It is neither a file nor a directory."
)
if sys.platform == "win32":
import ctypes.wintypes
GetFileAttributes = ctypes.windll.kernel32.GetFileAttributesW
GetFileAttributes.argtypes = (ctypes.wintypes.LPWSTR,)
GetFileAttributes.restype = ctypes.wintypes.DWORD
def _windows_create_junction(source: str, link: str):
"""Duly verify that the path and link are eligible to create a junction,
then create the junction.
"""
if sys.platform != "win32":
raise SymlinkError("windows_create_junction method can't be used on non-Windows OS.")
elif not os.path.exists(source):
raise SymlinkError("Source path does not exist, cannot create a junction.")
elif os.path.lexists(link):
raise SymlinkError("Link path already exists, cannot create a junction.")
elif not os.path.isdir(source):
raise SymlinkError("Source path is not a directory, cannot create a junction.")
INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
FILE_ATTRIBUTE_REPARSE_POINT = 0x400
import subprocess
res = GetFileAttributes(path)
return res != INVALID_FILE_ATTRIBUTES and bool(res & FILE_ATTRIBUTE_REPARSE_POINT)
cmd = ["cmd", "/C", "mklink", "/J", link, source]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
tty.debug(out.decode())
if proc.returncode != 0:
err = err.decode()
tty.error(err)
raise SymlinkError("Make junction command returned a non-zero return code.", err)
return False
def _windows_create_hard_link(path: str, link: str):
"""Duly verify that the path and link are eligible to create a hard
link, then create the hard link.
"""
if sys.platform != "win32":
raise SymlinkError("windows_create_hard_link method can't be used on non-Windows OS.")
elif not os.path.exists(path):
raise SymlinkError(f"File path {path} does not exist. Cannot create hard link.")
elif os.path.lexists(link):
raise SymlinkError(f"Link path ({link}) already exists. Cannot create hard link.")
elif not os.path.isfile(path):
raise SymlinkError(f"File path ({link}) is not a file. Cannot create hard link.")
else:
tty.debug(f"Creating hard link {link} pointing to {path}")
CreateHardLink(link, path)
def readlink(path: str):
"""Spack utility to override of os.readlink method to work cross platform"""
if _windows_is_hardlink(path):
return _windows_read_hard_link(path)
elif _windows_is_junction(path):
return _windows_read_junction(path)
else:
return os.readlink(path)
def _windows_read_hard_link(link: str) -> str:
"""Find all of the files that point to the same inode as the link"""
if sys.platform != "win32":
raise SymlinkError("Can't read hard link on non-Windows OS.")
link = os.path.abspath(link)
fsutil_cmd = ["fsutil", "hardlink", "list", link]
proc = subprocess.Popen(fsutil_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = proc.communicate()
if proc.returncode != 0:
raise SymlinkError(f"An error occurred while reading hard link: {err.decode()}")
# fsutil response does not include the drive name, so append it back to each linked file.
drive, link_tail = os.path.splitdrive(os.path.abspath(link))
links = set([os.path.join(drive, p) for p in out.decode().splitlines()])
links.remove(link)
if len(links) == 1:
return links.pop()
elif len(links) > 1:
# TODO: How best to handle the case where 3 or more paths point to a single inode?
raise SymlinkError(f"Found multiple paths pointing to the same inode {links}")
else:
raise SymlinkError("Cannot determine hard link source path.")
def _windows_read_junction(link: str):
"""Find the path that a junction points to."""
if sys.platform != "win32":
raise SymlinkError("Can't read junction on non-Windows OS.")
link = os.path.abspath(link)
link_basename = os.path.basename(link)
link_parent = os.path.dirname(link)
fsutil_cmd = ["dir", "/a:l", link_parent]
proc = subprocess.Popen(fsutil_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = proc.communicate()
if proc.returncode != 0:
raise SymlinkError(f"An error occurred while reading junction: {err.decode()}")
matches = re.search(rf"<JUNCTION>\s+{link_basename} \[(.*)]", out.decode())
if matches:
return matches.group(1)
else:
raise SymlinkError("Could not find junction path.")
@system_path_filter
def resolve_link_target_relative_to_the_link(link):
"""
os.path.isdir uses os.path.exists, which for links will check
the existence of the link target. If the link target is relative to
the link, we need to construct a pathname that is valid from
our cwd (which may not be the same as the link's directory)
"""
target = readlink(link)
if os.path.isabs(target):
return target
link_dir = os.path.dirname(os.path.abspath(link))
return os.path.join(link_dir, target)
class SymlinkError(SpackError):
"""Exception class for errors raised while creating symlinks,
junctions and hard links
"""

View file

@ -592,7 +592,9 @@ def dump_packages(spec: "spack.spec.Spec", path: str) -> None:
if node is spec:
spack.repo.PATH.dump_provenance(node, dest_pkg_dir)
elif source_pkg_dir:
fs.install_tree(source_pkg_dir, dest_pkg_dir)
fs.install_tree(
source_pkg_dir, dest_pkg_dir, allow_broken_symlinks=(sys.platform != "win32")
)
def get_dependent_ids(spec: "spack.spec.Spec") -> List[str]:
@ -1316,7 +1318,6 @@ def _prepare_for_install(self, task: BuildTask) -> None:
"""
Check the database and leftover installation directories/files and
prepare for a new install attempt for an uninstalled package.
Preparation includes cleaning up installation and stage directories
and ensuring the database is up-to-date.
@ -2394,7 +2395,9 @@ def _install_source(self) -> None:
src_target = os.path.join(pkg.spec.prefix, "share", pkg.name, "src")
tty.debug("{0} Copying source to {1}".format(self.pre, src_target))
fs.install_tree(pkg.stage.source_path, src_target)
fs.install_tree(
pkg.stage.source_path, src_target, allow_broken_symlinks=(sys.platform != "win32")
)
def _real_install(self) -> None:
import spack.builder

View file

@ -4,6 +4,7 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import os
import platform
import sys
import pytest
@ -223,6 +224,7 @@ def test_concretize_target_ranges(root_target_range, dep_target_range, result, m
(["21.11", "21.9"], None, False),
],
)
@pytest.mark.skipif(sys.platform == "win32", reason="Cray does not use windows")
def test_cray_platform_detection(versions, default, expected, tmpdir, monkeypatch, working_env):
ex_path = str(tmpdir.join("fake_craype_dir"))
fs.mkdirp(ex_path)

View file

@ -31,13 +31,16 @@ def test_fetch_missing_cache(tmpdir, _fetch_method):
@pytest.mark.parametrize("_fetch_method", ["curl", "urllib"])
def test_fetch(tmpdir, _fetch_method):
"""Ensure a fetch after expanding is effectively a no-op."""
testpath = str(tmpdir)
cache = os.path.join(testpath, "cache.tar.gz")
cache_dir = tmpdir.join("cache")
stage_dir = tmpdir.join("stage")
mkdirp(cache_dir)
mkdirp(stage_dir)
cache = os.path.join(cache_dir, "cache.tar.gz")
touch(cache)
url = url_util.path_to_file_url(cache)
with spack.config.override("config:url_fetch_method", _fetch_method):
fetcher = CacheURLFetchStrategy(url=url)
with Stage(fetcher, path=testpath) as stage:
with Stage(fetcher, path=str(stage_dir)) as stage:
source_path = stage.source_path
mkdirp(source_path)
fetcher.fetch()

View file

@ -27,7 +27,7 @@
import llnl.util.lang
import llnl.util.lock
import llnl.util.tty as tty
from llnl.util.filesystem import copy_tree, mkdirp, remove_linked_tree, working_dir
from llnl.util.filesystem import copy_tree, mkdirp, remove_linked_tree, touchp, working_dir
import spack.binary_distribution
import spack.caches
@ -565,6 +565,8 @@ def mock_repo_path():
def _pkg_install_fn(pkg, spec, prefix):
# sanity_check_prefix requires something in the install directory
mkdirp(prefix.bin)
if not os.path.exists(spec.package.build_log_path):
touchp(spec.package.build_log_path)
@pytest.fixture

View file

@ -13,7 +13,8 @@
import pytest
import llnl.util.filesystem as fs
from llnl.util.symlink import islink, symlink
import llnl.util.symlink
from llnl.util.symlink import SymlinkError, _windows_can_symlink, islink, symlink
import spack.paths
@ -150,7 +151,6 @@ def test_multiple_src_file_dest(self, stage):
fs.install("source/a/*/*", "dest/1")
@pytest.mark.not_on_windows("Skip test on Windows")
class TestCopyTree:
"""Tests for ``filesystem.copy_tree``"""
@ -189,7 +189,7 @@ def test_symlinks_true(self, stage):
def test_symlinks_true_ignore(self, stage):
"""Test copying when specifying relative paths that should be ignored"""
with fs.working_dir(str(stage)):
ignore = lambda p: p in ["c/d/e", "a"]
ignore = lambda p: p in [os.path.join("c", "d", "e"), "a"]
fs.copy_tree("source", "dest", symlinks=True, ignore=ignore)
assert not os.path.exists("dest/a")
assert os.path.exists("dest/c/d")
@ -231,7 +231,6 @@ def test_parent_dir(self, stage):
fs.copy_tree("source", "source/sub/directory")
@pytest.mark.not_on_windows("Skip test on Windows")
class TestInstallTree:
"""Tests for ``filesystem.install_tree``"""
@ -275,6 +274,15 @@ def test_symlinks_false(self, stage):
assert not os.path.islink("dest/2")
check_added_exe_permissions("source/2", "dest/2")
@pytest.mark.skipif(sys.platform == "win32", reason="Broken symlinks not allowed on Windows")
def test_allow_broken_symlinks(self, stage):
"""Test installing with a broken symlink."""
with fs.working_dir(str(stage)):
symlink("nonexistant.txt", "source/broken", allow_broken_symlinks=True)
fs.install_tree("source", "dest", symlinks=True, allow_broken_symlinks=True)
assert os.path.islink("dest/broken")
assert not os.path.exists(os.readlink("dest/broken"))
def test_glob_src(self, stage):
"""Test using a glob as the source."""
@ -746,6 +754,7 @@ def test_is_nonsymlink_exe_with_shebang(tmpdir):
assert not fs.is_nonsymlink_exe_with_shebang("symlink_to_executable_script")
@pytest.mark.skipif(sys.platform == "win32", reason="Unix-only test.")
def test_lexists_islink_isdir(tmpdir):
root = str(tmpdir)
@ -764,12 +773,12 @@ def test_lexists_islink_isdir(tmpdir):
with open(file, "wb") as f:
f.write(b"file")
os.symlink("dir", symlink_to_dir)
os.symlink("file", symlink_to_file)
os.symlink("does_not_exist", dangling_symlink)
os.symlink("dangling_symlink", symlink_to_dangling_symlink)
os.symlink("symlink_to_dir", symlink_to_symlink_to_dir)
os.symlink("symlink_to_file", symlink_to_symlink_to_file)
symlink("dir", symlink_to_dir)
symlink("file", symlink_to_file)
symlink("does_not_exist", dangling_symlink)
symlink("dangling_symlink", symlink_to_dangling_symlink)
symlink("symlink_to_dir", symlink_to_symlink_to_dir)
symlink("symlink_to_file", symlink_to_symlink_to_file)
assert fs.lexists_islink_isdir(dir) == (True, False, True)
assert fs.lexists_islink_isdir(file) == (True, False, False)
@ -781,6 +790,57 @@ def test_lexists_islink_isdir(tmpdir):
assert fs.lexists_islink_isdir(symlink_to_symlink_to_file) == (True, True, False)
@pytest.mark.skipif(sys.platform != "win32", reason="For Windows Only")
@pytest.mark.parametrize("win_can_symlink", [True, False])
def test_lexists_islink_isdir_windows(tmpdir, monkeypatch, win_can_symlink):
"""Run on windows without elevated privileges to test junctions and hard links which have
different results from the lexists_islink_isdir method.
"""
if win_can_symlink and not _windows_can_symlink():
pytest.skip("Cannot test dev mode behavior without dev mode enabled.")
with tmpdir.as_cwd():
monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: win_can_symlink)
dir = str(tmpdir.join("dir"))
file = str(tmpdir.join("file"))
nonexistent = str(tmpdir.join("does_not_exist"))
symlink_to_dir = str(tmpdir.join("symlink_to_dir"))
symlink_to_file = str(tmpdir.join("symlink_to_file"))
dangling_symlink = str(tmpdir.join("dangling_symlink"))
symlink_to_dangling_symlink = str(tmpdir.join("symlink_to_dangling_symlink"))
symlink_to_symlink_to_dir = str(tmpdir.join("symlink_to_symlink_to_dir"))
symlink_to_symlink_to_file = str(tmpdir.join("symlink_to_symlink_to_file"))
os.mkdir(dir)
assert fs.lexists_islink_isdir(dir) == (True, False, True)
symlink("dir", symlink_to_dir)
assert fs.lexists_islink_isdir(dir) == (True, False, True)
assert fs.lexists_islink_isdir(symlink_to_dir) == (True, True, True)
with open(file, "wb") as f:
f.write(b"file")
assert fs.lexists_islink_isdir(file) == (True, False, False)
symlink("file", symlink_to_file)
if win_can_symlink:
assert fs.lexists_islink_isdir(file) == (True, False, False)
else:
assert fs.lexists_islink_isdir(file) == (True, True, False)
assert fs.lexists_islink_isdir(symlink_to_file) == (True, True, False)
with pytest.raises(SymlinkError):
symlink("does_not_exist", dangling_symlink)
symlink("dangling_symlink", symlink_to_dangling_symlink)
symlink("symlink_to_dir", symlink_to_symlink_to_dir)
symlink("symlink_to_file", symlink_to_symlink_to_file)
assert fs.lexists_islink_isdir(nonexistent) == (False, False, False)
assert fs.lexists_islink_isdir(symlink_to_dangling_symlink) == (False, False, False)
assert fs.lexists_islink_isdir(symlink_to_symlink_to_dir) == (True, True, True)
assert fs.lexists_islink_isdir(symlink_to_symlink_to_file) == (True, True, False)
class RegisterVisitor(fs.BaseDirectoryVisitor):
"""A directory visitor that keeps track of all visited paths"""

View file

@ -4,12 +4,14 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import os
import sys
import pytest
import llnl.util.symlink
from llnl.util.filesystem import mkdirp, touchp, visit_directory_tree, working_dir
from llnl.util.link_tree import DestinationMergeVisitor, LinkTree, SourceMergeVisitor
from llnl.util.symlink import islink
from llnl.util.symlink import _windows_can_symlink, islink, readlink, symlink
from spack.stage import Stage
@ -44,77 +46,116 @@ def link_tree(stage):
def check_file_link(filename, expected_target):
assert os.path.isfile(filename)
assert islink(filename)
assert os.path.abspath(os.path.realpath(filename)) == os.path.abspath(expected_target)
if sys.platform != "win32" or llnl.util.symlink._windows_can_symlink():
assert os.path.abspath(os.path.realpath(filename)) == os.path.abspath(expected_target)
def check_dir(filename):
assert os.path.isdir(filename)
def test_merge_to_new_directory(stage, link_tree):
@pytest.mark.parametrize("run_as_root", [True, False])
def test_merge_to_new_directory(stage, link_tree, monkeypatch, run_as_root):
if sys.platform != "win32":
if run_as_root:
pass
else:
pytest.skip("Skipping duplicate test.")
elif _windows_can_symlink() or not run_as_root:
monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root)
else:
# Skip if trying to run as dev-mode without having dev-mode.
pytest.skip("Skipping portion of test which required dev-mode privileges.")
with working_dir(stage.path):
link_tree.merge("dest")
check_file_link("dest/1", "source/1")
check_file_link("dest/a/b/2", "source/a/b/2")
check_file_link("dest/a/b/3", "source/a/b/3")
check_file_link("dest/c/4", "source/c/4")
check_file_link("dest/c/d/5", "source/c/d/5")
check_file_link("dest/c/d/6", "source/c/d/6")
check_file_link("dest/c/d/e/7", "source/c/d/e/7")
files = [
("dest/1", "source/1"),
("dest/a/b/2", "source/a/b/2"),
("dest/a/b/3", "source/a/b/3"),
("dest/c/4", "source/c/4"),
("dest/c/d/5", "source/c/d/5"),
("dest/c/d/6", "source/c/d/6"),
("dest/c/d/e/7", "source/c/d/e/7"),
]
assert os.path.isabs(os.readlink("dest/1"))
assert os.path.isabs(os.readlink("dest/a/b/2"))
assert os.path.isabs(os.readlink("dest/a/b/3"))
assert os.path.isabs(os.readlink("dest/c/4"))
assert os.path.isabs(os.readlink("dest/c/d/5"))
assert os.path.isabs(os.readlink("dest/c/d/6"))
assert os.path.isabs(os.readlink("dest/c/d/e/7"))
for dest, source in files:
check_file_link(dest, source)
assert os.path.isabs(readlink(dest))
link_tree.unmerge("dest")
assert not os.path.exists("dest")
def test_merge_to_new_directory_relative(stage, link_tree):
@pytest.mark.parametrize("run_as_root", [True, False])
def test_merge_to_new_directory_relative(stage, link_tree, monkeypatch, run_as_root):
if sys.platform != "win32":
if run_as_root:
pass
else:
pytest.skip("Skipping duplicate test.")
elif _windows_can_symlink() or not run_as_root:
monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root)
else:
# Skip if trying to run as dev-mode without having dev-mode.
pytest.skip("Skipping portion of test which required dev-mode privileges.")
with working_dir(stage.path):
link_tree.merge("dest", relative=True)
check_file_link("dest/1", "source/1")
check_file_link("dest/a/b/2", "source/a/b/2")
check_file_link("dest/a/b/3", "source/a/b/3")
check_file_link("dest/c/4", "source/c/4")
check_file_link("dest/c/d/5", "source/c/d/5")
check_file_link("dest/c/d/6", "source/c/d/6")
check_file_link("dest/c/d/e/7", "source/c/d/e/7")
files = [
("dest/1", "source/1"),
("dest/a/b/2", "source/a/b/2"),
("dest/a/b/3", "source/a/b/3"),
("dest/c/4", "source/c/4"),
("dest/c/d/5", "source/c/d/5"),
("dest/c/d/6", "source/c/d/6"),
("dest/c/d/e/7", "source/c/d/e/7"),
]
assert not os.path.isabs(os.readlink("dest/1"))
assert not os.path.isabs(os.readlink("dest/a/b/2"))
assert not os.path.isabs(os.readlink("dest/a/b/3"))
assert not os.path.isabs(os.readlink("dest/c/4"))
assert not os.path.isabs(os.readlink("dest/c/d/5"))
assert not os.path.isabs(os.readlink("dest/c/d/6"))
assert not os.path.isabs(os.readlink("dest/c/d/e/7"))
for dest, source in files:
check_file_link(dest, source)
# Hard links/junctions are inherently absolute.
if sys.platform != "win32" or run_as_root:
assert not os.path.isabs(readlink(dest))
link_tree.unmerge("dest")
assert not os.path.exists("dest")
def test_merge_to_existing_directory(stage, link_tree):
@pytest.mark.parametrize("run_as_root", [True, False])
def test_merge_to_existing_directory(stage, link_tree, monkeypatch, run_as_root):
if sys.platform != "win32":
if run_as_root:
pass
else:
pytest.skip("Skipping duplicate test.")
elif _windows_can_symlink() or not run_as_root:
monkeypatch.setattr(llnl.util.symlink, "_windows_can_symlink", lambda: run_as_root)
else:
# Skip if trying to run as dev-mode without having dev-mode.
pytest.skip("Skipping portion of test which required dev-mode privileges.")
with working_dir(stage.path):
touchp("dest/x")
touchp("dest/a/b/y")
link_tree.merge("dest")
check_file_link("dest/1", "source/1")
check_file_link("dest/a/b/2", "source/a/b/2")
check_file_link("dest/a/b/3", "source/a/b/3")
check_file_link("dest/c/4", "source/c/4")
check_file_link("dest/c/d/5", "source/c/d/5")
check_file_link("dest/c/d/6", "source/c/d/6")
check_file_link("dest/c/d/e/7", "source/c/d/e/7")
files = [
("dest/1", "source/1"),
("dest/a/b/2", "source/a/b/2"),
("dest/a/b/3", "source/a/b/3"),
("dest/c/4", "source/c/4"),
("dest/c/d/5", "source/c/d/5"),
("dest/c/d/6", "source/c/d/6"),
("dest/c/d/e/7", "source/c/d/e/7"),
]
for dest, source in files:
check_file_link(dest, source)
assert os.path.isfile("dest/x")
assert os.path.isfile("dest/a/b/y")
@ -124,13 +165,8 @@ def test_merge_to_existing_directory(stage, link_tree):
assert os.path.isfile("dest/x")
assert os.path.isfile("dest/a/b/y")
assert not os.path.isfile("dest/1")
assert not os.path.isfile("dest/a/b/2")
assert not os.path.isfile("dest/a/b/3")
assert not os.path.isfile("dest/c/4")
assert not os.path.isfile("dest/c/d/5")
assert not os.path.isfile("dest/c/d/6")
assert not os.path.isfile("dest/c/d/e/7")
for dest, _ in files:
assert not os.path.isfile(dest)
def test_merge_with_empty_directories(stage, link_tree):
@ -192,9 +228,9 @@ def test_source_merge_visitor_does_not_follow_symlinked_dirs_at_depth(tmpdir):
os.mkdir(j("a", "b"))
os.mkdir(j("a", "b", "c"))
os.mkdir(j("a", "b", "c", "d"))
os.symlink(j("b"), j("a", "symlink_b"))
os.symlink(j("c"), j("a", "b", "symlink_c"))
os.symlink(j("d"), j("a", "b", "c", "symlink_d"))
symlink(j("b"), j("a", "symlink_b"))
symlink(j("c"), j("a", "b", "symlink_c"))
symlink(j("d"), j("a", "b", "c", "symlink_d"))
with open(j("a", "b", "c", "d", "file"), "wb"):
pass
@ -236,10 +272,11 @@ def test_source_merge_visitor_cant_be_cyclical(tmpdir):
j = os.path.join
with tmpdir.as_cwd():
os.mkdir(j("a"))
os.symlink(j("..", "b"), j("a", "symlink_b"))
os.symlink(j("symlink_b"), j("a", "symlink_b_b"))
os.mkdir(j("b"))
os.symlink(j("..", "a"), j("b", "symlink_a"))
symlink(j("..", "b"), j("a", "symlink_b"))
symlink(j("symlink_b"), j("a", "symlink_b_b"))
symlink(j("..", "a"), j("b", "symlink_a"))
visitor = SourceMergeVisitor()
visit_directory_tree(str(tmpdir), visitor)

View file

@ -0,0 +1,247 @@
# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""Tests for ``llnl/util/symlink.py``"""
import os
import sys
import tempfile
import pytest
from llnl.util import symlink
def test_symlink_file(tmpdir):
"""Test the symlink.symlink functionality on all operating systems for a file"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=test_dir)
link_file = str(tmpdir.join("link.txt"))
assert os.path.exists(link_file) is False
symlink.symlink(source_path=real_file, link_path=link_file)
assert os.path.exists(link_file)
assert symlink.islink(link_file)
def test_symlink_dir(tmpdir):
"""Test the symlink.symlink functionality on all operating systems for a directory"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
real_dir = os.path.join(test_dir, "real_dir")
link_dir = os.path.join(test_dir, "link_dir")
os.mkdir(real_dir)
symlink.symlink(source_path=real_dir, link_path=link_dir)
assert os.path.exists(link_dir)
assert symlink.islink(link_dir)
def test_symlink_source_not_exists(tmpdir):
"""Test the symlink.symlink method for the case where a source path does not exist"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
real_dir = os.path.join(test_dir, "real_dir")
link_dir = os.path.join(test_dir, "link_dir")
with pytest.raises(symlink.SymlinkError):
symlink.symlink(source_path=real_dir, link_path=link_dir, allow_broken_symlinks=False)
def test_symlink_src_relative_to_link(tmpdir):
"""Test the symlink.symlink functionality where the source value exists relative to the link
but not relative to the cwd"""
with tmpdir.as_cwd():
subdir_1 = tmpdir.join("a")
subdir_2 = os.path.join(subdir_1, "b")
link_dir = os.path.join(subdir_1, "c")
os.mkdir(subdir_1)
os.mkdir(subdir_2)
fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=subdir_2)
link_file = os.path.join(subdir_1, "link.txt")
symlink.symlink(
source_path=f"b/{os.path.basename(real_file)}",
link_path=f"a/{os.path.basename(link_file)}",
)
assert os.path.exists(link_file)
assert symlink.islink(link_file)
# Check dirs
assert not os.path.lexists(link_dir)
symlink.symlink(source_path="b", link_path="a/c")
assert os.path.lexists(link_dir)
def test_symlink_src_not_relative_to_link(tmpdir):
"""Test the symlink.symlink functionality where the source value does not exist relative to
the link and not relative to the cwd. NOTE that this symlink api call is EXPECTED to raise
a symlink.SymlinkError exception that we catch."""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
subdir_1 = os.path.join(test_dir, "a")
subdir_2 = os.path.join(subdir_1, "b")
link_dir = os.path.join(subdir_1, "c")
os.mkdir(subdir_1)
os.mkdir(subdir_2)
fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=subdir_2)
link_file = str(tmpdir.join("link.txt"))
# Expected SymlinkError because source path does not exist relative to link path
with pytest.raises(symlink.SymlinkError):
symlink.symlink(
source_path=f"d/{os.path.basename(real_file)}",
link_path=f"a/{os.path.basename(link_file)}",
allow_broken_symlinks=False,
)
assert not os.path.exists(link_file)
# Check dirs
assert not os.path.lexists(link_dir)
with pytest.raises(symlink.SymlinkError):
symlink.symlink(source_path="d", link_path="a/c", allow_broken_symlinks=False)
assert not os.path.lexists(link_dir)
def test_symlink_link_already_exists(tmpdir):
"""Test the symlink.symlink method for the case where a link already exists"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
real_dir = os.path.join(test_dir, "real_dir")
link_dir = os.path.join(test_dir, "link_dir")
os.mkdir(real_dir)
symlink.symlink(real_dir, link_dir, allow_broken_symlinks=False)
assert os.path.exists(link_dir)
with pytest.raises(symlink.SymlinkError):
symlink.symlink(source_path=real_dir, link_path=link_dir, allow_broken_symlinks=False)
@pytest.mark.skipif(not symlink._windows_can_symlink(), reason="Test requires elevated privileges")
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
def test_symlink_win_file(tmpdir):
"""Check that symlink.symlink makes a symlink file when run with elevated permissions"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=test_dir)
link_file = str(tmpdir.join("link.txt"))
symlink.symlink(source_path=real_file, link_path=link_file)
# Verify that all expected conditions are met
assert os.path.exists(link_file)
assert symlink.islink(link_file)
assert os.path.islink(link_file)
assert not symlink._windows_is_hardlink(link_file)
assert not symlink._windows_is_junction(link_file)
@pytest.mark.skipif(not symlink._windows_can_symlink(), reason="Test requires elevated privileges")
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
def test_symlink_win_dir(tmpdir):
"""Check that symlink.symlink makes a symlink dir when run with elevated permissions"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
real_dir = os.path.join(test_dir, "real")
link_dir = os.path.join(test_dir, "link")
os.mkdir(real_dir)
symlink.symlink(source_path=real_dir, link_path=link_dir)
# Verify that all expected conditions are met
assert os.path.exists(link_dir)
assert symlink.islink(link_dir)
assert os.path.islink(link_dir)
assert not symlink._windows_is_hardlink(link_dir)
assert not symlink._windows_is_junction(link_dir)
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
def test_windows_create_junction(tmpdir):
"""Test the symlink._windows_create_junction method"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
junction_real_dir = os.path.join(test_dir, "real_dir")
junction_link_dir = os.path.join(test_dir, "link_dir")
os.mkdir(junction_real_dir)
symlink._windows_create_junction(junction_real_dir, junction_link_dir)
# Verify that all expected conditions are met
assert os.path.exists(junction_link_dir)
assert symlink._windows_is_junction(junction_link_dir)
assert symlink.islink(junction_link_dir)
assert not os.path.islink(junction_link_dir)
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
def test_windows_create_hard_link(tmpdir):
"""Test the symlink._windows_create_hard_link method"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=test_dir)
link_file = str(tmpdir.join("link.txt"))
symlink._windows_create_hard_link(real_file, link_file)
# Verify that all expected conditions are met
assert os.path.exists(link_file)
assert symlink._windows_is_hardlink(real_file)
assert symlink._windows_is_hardlink(link_file)
assert symlink.islink(link_file)
assert not os.path.islink(link_file)
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
def test_windows_create_link_dir(tmpdir):
"""Test the functionality of the windows_create_link method with a directory
which should result in making a junction.
"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
real_dir = os.path.join(test_dir, "real")
link_dir = os.path.join(test_dir, "link")
os.mkdir(real_dir)
symlink._windows_create_link(real_dir, link_dir)
# Verify that all expected conditions are met
assert os.path.exists(link_dir)
assert symlink.islink(link_dir)
assert not symlink._windows_is_hardlink(link_dir)
assert symlink._windows_is_junction(link_dir)
assert not os.path.islink(link_dir)
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
def test_windows_create_link_file(tmpdir):
"""Test the functionality of the windows_create_link method with a file
which should result in the creation of a hard link. It also tests the
functionality of the symlink islink infrastructure.
"""
with tmpdir.as_cwd():
test_dir = str(tmpdir)
fd, real_file = tempfile.mkstemp(prefix="real", suffix=".txt", dir=test_dir)
link_file = str(tmpdir.join("link.txt"))
symlink._windows_create_link(real_file, link_file)
# Verify that all expected conditions are met
assert symlink._windows_is_hardlink(link_file)
assert symlink.islink(link_file)
assert not symlink._windows_is_junction(link_file)
@pytest.mark.skipif(sys.platform != "win32", reason="Test is only for Windows")
def test_windows_read_link(tmpdir):
"""Makes sure symlink.readlink can read the link source for hard links and
junctions on windows."""
with tmpdir.as_cwd():
real_dir_1 = "real_dir_1"
real_dir_2 = "real_dir_2"
link_dir_1 = "link_dir_1"
link_dir_2 = "link_dir_2"
os.mkdir(real_dir_1)
os.mkdir(real_dir_2)
# Create a file and a directory
_, real_file_1 = tempfile.mkstemp(prefix="real_1", suffix=".txt", dir=".")
_, real_file_2 = tempfile.mkstemp(prefix="real_2", suffix=".txt", dir=".")
link_file_1 = "link_1.txt"
link_file_2 = "link_2.txt"
# Make hard link/junction
symlink._windows_create_hard_link(real_file_1, link_file_1)
symlink._windows_create_hard_link(real_file_2, link_file_2)
symlink._windows_create_junction(real_dir_1, link_dir_1)
symlink._windows_create_junction(real_dir_2, link_dir_2)
assert symlink.readlink(link_file_1) == os.path.abspath(real_file_1)
assert symlink.readlink(link_file_2) == os.path.abspath(real_file_2)
assert symlink.readlink(link_dir_1) == os.path.abspath(real_dir_1)
assert symlink.readlink(link_dir_2) == os.path.abspath(real_dir_2)

View file

@ -8,7 +8,7 @@
import pytest
from llnl.util.filesystem import resolve_link_target_relative_to_the_link
from llnl.util.symlink import resolve_link_target_relative_to_the_link
import spack.mirror
import spack.repo
@ -228,6 +228,9 @@ def successful_expand(_class):
def successful_apply(*args, **kwargs):
pass
def successful_symlink(*args, **kwargs):
pass
with Stage("spack-mirror-test") as stage:
mirror_root = os.path.join(stage.path, "test-mirror")
@ -235,6 +238,7 @@ def successful_apply(*args, **kwargs):
monkeypatch.setattr(spack.fetch_strategy.URLFetchStrategy, "expand", successful_expand)
monkeypatch.setattr(spack.patch, "apply_patch", successful_apply)
monkeypatch.setattr(spack.caches.MirrorCache, "store", record_store)
monkeypatch.setattr(spack.caches.MirrorCache, "symlink", successful_symlink)
with spack.config.override("config:checksum", False):
spack.mirror.create(mirror_root, list(spec.traverse()))

View file

@ -147,8 +147,15 @@ def test_relocate_links(tmpdir):
own_prefix_path = str(tmpdir.join("prefix_a", "file"))
dep_prefix_path = str(tmpdir.join("prefix_b", "file"))
new_own_prefix_path = str(tmpdir.join("new_prefix_a", "file"))
new_dep_prefix_path = str(tmpdir.join("new_prefix_b", "file"))
system_path = os.path.join(os.path.sep, "system", "path")
fs.touchp(own_prefix_path)
fs.touchp(new_own_prefix_path)
fs.touchp(dep_prefix_path)
fs.touchp(new_dep_prefix_path)
# Old prefixes to new prefixes
prefix_to_prefix = OrderedDict(
[