spack.patch: add type hints (#42811)
Co-authored-by: Todd Gamblin <tgamblin@llnl.gov>
This commit is contained in:
parent
9ea9ee05c8
commit
f35ff441f2
2 changed files with 222 additions and 77 deletions
|
@ -703,11 +703,14 @@ def _execute_patch(pkg_or_dep: Union["spack.package_base.PackageBase", Dependenc
|
|||
|
||||
patch: spack.patch.Patch
|
||||
if "://" in url_or_filename:
|
||||
if sha256 is None:
|
||||
raise ValueError("patch() with a url requires a sha256")
|
||||
|
||||
patch = spack.patch.UrlPatch(
|
||||
pkg,
|
||||
url_or_filename,
|
||||
level,
|
||||
working_dir,
|
||||
working_dir=working_dir,
|
||||
ordering_key=ordering_key,
|
||||
sha256=sha256,
|
||||
archive_sha256=archive_sha256,
|
||||
|
|
|
@ -9,9 +9,9 @@
|
|||
import os.path
|
||||
import pathlib
|
||||
import sys
|
||||
from typing import Any, Dict, Optional, Tuple, Type
|
||||
|
||||
import llnl.util.filesystem
|
||||
import llnl.util.lang
|
||||
from llnl.url import allowed_archive
|
||||
|
||||
import spack
|
||||
|
@ -25,15 +25,16 @@
|
|||
from spack.util.executable import which, which_string
|
||||
|
||||
|
||||
def apply_patch(stage, patch_path, level=1, working_dir="."):
|
||||
def apply_patch(
|
||||
stage: spack.stage.Stage, patch_path: str, level: int = 1, working_dir: str = "."
|
||||
) -> None:
|
||||
"""Apply the patch at patch_path to code in the stage.
|
||||
|
||||
Args:
|
||||
stage (spack.stage.Stage): stage with code that will be patched
|
||||
patch_path (str): filesystem location for the patch to apply
|
||||
level (int or None): patch level (default 1)
|
||||
working_dir (str): relative path *within* the stage to change to
|
||||
(default '.')
|
||||
stage: stage with code that will be patched
|
||||
patch_path: filesystem location for the patch to apply
|
||||
level: patch level
|
||||
working_dir: relative path *within* the stage to change to
|
||||
"""
|
||||
git_utils_path = os.environ.get("PATH", "")
|
||||
if sys.platform == "win32":
|
||||
|
@ -58,16 +59,24 @@ def apply_patch(stage, patch_path, level=1, working_dir="."):
|
|||
class Patch:
|
||||
"""Base class for patches.
|
||||
|
||||
Arguments:
|
||||
pkg (str): the package that owns the patch
|
||||
|
||||
The owning package is not necessarily the package to apply the patch
|
||||
to -- in the case where a dependent package patches its dependency,
|
||||
it is the dependent's fullname.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, pkg, path_or_url, level, working_dir):
|
||||
sha256: str
|
||||
|
||||
def __init__(
|
||||
self, pkg: "spack.package_base.PackageBase", path_or_url: str, level: int, working_dir: str
|
||||
) -> None:
|
||||
"""Initialize a new Patch instance.
|
||||
|
||||
Args:
|
||||
pkg: the package that owns the patch
|
||||
path_or_url: the relative path or URL to a patch file
|
||||
level: patch level
|
||||
working_dir: relative path *within* the stage to change to
|
||||
"""
|
||||
# validate level (must be an integer >= 0)
|
||||
if not isinstance(level, int) or not level >= 0:
|
||||
raise ValueError("Patch level needs to be a non-negative integer.")
|
||||
|
@ -75,27 +84,28 @@ def __init__(self, pkg, path_or_url, level, working_dir):
|
|||
# Attributes shared by all patch subclasses
|
||||
self.owner = pkg.fullname
|
||||
self.path_or_url = path_or_url # needed for debug output
|
||||
self.path = None # must be set before apply()
|
||||
self.path: Optional[str] = None # must be set before apply()
|
||||
self.level = level
|
||||
self.working_dir = working_dir
|
||||
|
||||
def apply(self, stage: "spack.stage.Stage"):
|
||||
def apply(self, stage: spack.stage.Stage) -> None:
|
||||
"""Apply a patch to source in a stage.
|
||||
|
||||
Arguments:
|
||||
stage (spack.stage.Stage): stage where source code lives
|
||||
Args:
|
||||
stage: stage where source code lives
|
||||
"""
|
||||
if not self.path or not os.path.isfile(self.path):
|
||||
raise NoSuchPatchError(f"No such patch: {self.path}")
|
||||
|
||||
apply_patch(stage, self.path, self.level, self.working_dir)
|
||||
|
||||
@property
|
||||
def stage(self):
|
||||
return None
|
||||
# TODO: Use TypedDict once Spack supports Python 3.8+ only
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Dictionary representation of the patch.
|
||||
|
||||
def to_dict(self):
|
||||
"""Partial dictionary -- subclases should add to this."""
|
||||
Returns:
|
||||
A dictionary representation.
|
||||
"""
|
||||
return {
|
||||
"owner": self.owner,
|
||||
"sha256": self.sha256,
|
||||
|
@ -103,31 +113,55 @@ def to_dict(self):
|
|||
"working_dir": self.working_dir,
|
||||
}
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: object) -> bool:
|
||||
"""Equality check.
|
||||
|
||||
Args:
|
||||
other: another patch
|
||||
|
||||
Returns:
|
||||
True if both patches have the same checksum, else False
|
||||
"""
|
||||
if not isinstance(other, Patch):
|
||||
return NotImplemented
|
||||
return self.sha256 == other.sha256
|
||||
|
||||
def __hash__(self):
|
||||
def __hash__(self) -> int:
|
||||
"""Unique hash.
|
||||
|
||||
Returns:
|
||||
A unique hash based on the sha256.
|
||||
"""
|
||||
return hash(self.sha256)
|
||||
|
||||
|
||||
class FilePatch(Patch):
|
||||
"""Describes a patch that is retrieved from a file in the repository.
|
||||
"""Describes a patch that is retrieved from a file in the repository."""
|
||||
|
||||
Arguments:
|
||||
pkg (str): the class object for the package that owns the patch
|
||||
relative_path (str): path to patch, relative to the repository
|
||||
directory for a package.
|
||||
level (int): level to pass to patch command
|
||||
working_dir (str): path within the source directory where patch
|
||||
should be applied
|
||||
"""
|
||||
_sha256: Optional[str] = None
|
||||
|
||||
def __init__(self, pkg, relative_path, level, working_dir, ordering_key=None):
|
||||
def __init__(
|
||||
self,
|
||||
pkg: "spack.package_base.PackageBase",
|
||||
relative_path: str,
|
||||
level: int,
|
||||
working_dir: str,
|
||||
ordering_key: Optional[Tuple[str, int]] = None,
|
||||
) -> None:
|
||||
"""Initialize a new FilePatch instance.
|
||||
|
||||
Args:
|
||||
pkg: the class object for the package that owns the patch
|
||||
relative_path: path to patch, relative to the repository directory for a package.
|
||||
level: level to pass to patch command
|
||||
working_dir: path within the source directory where patch should be applied
|
||||
ordering_key: key used to ensure patches are applied in a consistent order
|
||||
"""
|
||||
self.relative_path = relative_path
|
||||
|
||||
# patches may be defined by relative paths to parent classes
|
||||
# search mro to look for the file
|
||||
abs_path = None
|
||||
abs_path: Optional[str] = None
|
||||
# At different times we call FilePatch on instances and classes
|
||||
pkg_cls = pkg if inspect.isclass(pkg) else pkg.__class__
|
||||
for cls in inspect.getmro(pkg_cls):
|
||||
|
@ -150,50 +184,90 @@ def __init__(self, pkg, relative_path, level, working_dir, ordering_key=None):
|
|||
|
||||
super().__init__(pkg, abs_path, level, working_dir)
|
||||
self.path = abs_path
|
||||
self._sha256 = None
|
||||
self.ordering_key = ordering_key
|
||||
|
||||
@property
|
||||
def sha256(self):
|
||||
if self._sha256 is None:
|
||||
def sha256(self) -> str:
|
||||
"""Get the patch checksum.
|
||||
|
||||
Returns:
|
||||
The sha256 of the patch file.
|
||||
"""
|
||||
if self._sha256 is None and self.path is not None:
|
||||
self._sha256 = checksum(hashlib.sha256, self.path)
|
||||
assert isinstance(self._sha256, str)
|
||||
return self._sha256
|
||||
|
||||
def to_dict(self):
|
||||
return llnl.util.lang.union_dicts(super().to_dict(), {"relative_path": self.relative_path})
|
||||
@sha256.setter
|
||||
def sha256(self, value: str) -> None:
|
||||
"""Set the patch checksum.
|
||||
|
||||
Args:
|
||||
value: the sha256
|
||||
"""
|
||||
self._sha256 = value
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Dictionary representation of the patch.
|
||||
|
||||
Returns:
|
||||
A dictionary representation.
|
||||
"""
|
||||
data = super().to_dict()
|
||||
data["relative_path"] = self.relative_path
|
||||
return data
|
||||
|
||||
|
||||
class UrlPatch(Patch):
|
||||
"""Describes a patch that is retrieved from a URL.
|
||||
"""Describes a patch that is retrieved from a URL."""
|
||||
|
||||
Arguments:
|
||||
pkg (str): the package that owns the patch
|
||||
url (str): URL where the patch can be fetched
|
||||
level (int): level to pass to patch command
|
||||
working_dir (str): path within the source directory where patch
|
||||
should be applied
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
pkg: "spack.package_base.PackageBase",
|
||||
url: str,
|
||||
level: int = 1,
|
||||
*,
|
||||
working_dir: str = ".",
|
||||
sha256: str, # This is required for UrlPatch
|
||||
ordering_key: Optional[Tuple[str, int]] = None,
|
||||
archive_sha256: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Initialize a new UrlPatch instance.
|
||||
|
||||
def __init__(self, pkg, url, level=1, working_dir=".", ordering_key=None, **kwargs):
|
||||
Arguments:
|
||||
pkg: the package that owns the patch
|
||||
url: URL where the patch can be fetched
|
||||
level: level to pass to patch command
|
||||
working_dir: path within the source directory where patch should be applied
|
||||
ordering_key: key used to ensure patches are applied in a consistent order
|
||||
sha256: sha256 sum of the patch, used to verify the patch
|
||||
archive_sha256: sha256 sum of the *archive*, if the patch is compressed
|
||||
(only required for compressed URL patches)
|
||||
"""
|
||||
super().__init__(pkg, url, level, working_dir)
|
||||
|
||||
self.url = url
|
||||
self._stage = None
|
||||
self._stage: Optional[spack.stage.Stage] = None
|
||||
|
||||
self.ordering_key = ordering_key
|
||||
|
||||
self.archive_sha256 = kwargs.get("archive_sha256")
|
||||
if allowed_archive(self.url) and not self.archive_sha256:
|
||||
if allowed_archive(self.url) and not archive_sha256:
|
||||
raise PatchDirectiveError(
|
||||
"Compressed patches require 'archive_sha256' "
|
||||
"and patch 'sha256' attributes: %s" % self.url
|
||||
)
|
||||
self.archive_sha256 = archive_sha256
|
||||
|
||||
self.sha256 = kwargs.get("sha256")
|
||||
if not self.sha256:
|
||||
if not sha256:
|
||||
raise PatchDirectiveError("URL patches require a sha256 checksum")
|
||||
self.sha256 = sha256
|
||||
|
||||
def apply(self, stage: "spack.stage.Stage"):
|
||||
def apply(self, stage: spack.stage.Stage) -> None:
|
||||
"""Apply a patch to source in a stage.
|
||||
|
||||
Args:
|
||||
stage: stage where source code lives
|
||||
"""
|
||||
assert self.stage.expanded, "Stage must be expanded before applying patches"
|
||||
|
||||
# Get the patch file.
|
||||
|
@ -204,15 +278,20 @@ def apply(self, stage: "spack.stage.Stage"):
|
|||
return super().apply(stage)
|
||||
|
||||
@property
|
||||
def stage(self):
|
||||
def stage(self) -> spack.stage.Stage:
|
||||
"""The stage in which to download (and unpack) the URL patch.
|
||||
|
||||
Returns:
|
||||
The stage object.
|
||||
"""
|
||||
if self._stage:
|
||||
return self._stage
|
||||
|
||||
fetch_digest = self.archive_sha256 or self.sha256
|
||||
|
||||
# Two checksums, one for compressed file, one for its contents
|
||||
if self.archive_sha256:
|
||||
fetcher = fs.FetchAndVerifyExpandedFile(
|
||||
if self.archive_sha256 and self.sha256:
|
||||
fetcher: fs.FetchStrategy = fs.FetchAndVerifyExpandedFile(
|
||||
self.url, archive_sha256=self.archive_sha256, expanded_sha256=self.sha256
|
||||
)
|
||||
else:
|
||||
|
@ -231,7 +310,12 @@ def stage(self):
|
|||
)
|
||||
return self._stage
|
||||
|
||||
def to_dict(self):
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Dictionary representation of the patch.
|
||||
|
||||
Returns:
|
||||
A dictionary representation.
|
||||
"""
|
||||
data = super().to_dict()
|
||||
data["url"] = self.url
|
||||
if self.archive_sha256:
|
||||
|
@ -239,8 +323,21 @@ def to_dict(self):
|
|||
return data
|
||||
|
||||
|
||||
def from_dict(dictionary, repository=None):
|
||||
"""Create a patch from json dictionary."""
|
||||
def from_dict(
|
||||
dictionary: Dict[str, Any], repository: Optional["spack.repo.RepoPath"] = None
|
||||
) -> Patch:
|
||||
"""Create a patch from json dictionary.
|
||||
|
||||
Args:
|
||||
dictionary: dictionary representation of a patch
|
||||
repository: repository containing package
|
||||
|
||||
Returns:
|
||||
A patch object.
|
||||
|
||||
Raises:
|
||||
ValueError: If *owner* or *url*/*relative_path* are missing in the dictionary.
|
||||
"""
|
||||
repository = repository or spack.repo.PATH
|
||||
owner = dictionary.get("owner")
|
||||
if "owner" not in dictionary:
|
||||
|
@ -252,7 +349,7 @@ def from_dict(dictionary, repository=None):
|
|||
pkg_cls,
|
||||
dictionary["url"],
|
||||
dictionary["level"],
|
||||
dictionary["working_dir"],
|
||||
working_dir=dictionary["working_dir"],
|
||||
sha256=dictionary["sha256"],
|
||||
archive_sha256=dictionary.get("archive_sha256"),
|
||||
)
|
||||
|
@ -267,7 +364,7 @@ def from_dict(dictionary, repository=None):
|
|||
# TODO: handle this more gracefully.
|
||||
sha256 = dictionary["sha256"]
|
||||
checker = Checker(sha256)
|
||||
if not checker.check(patch.path):
|
||||
if patch.path and not checker.check(patch.path):
|
||||
raise fs.ChecksumError(
|
||||
"sha256 checksum failed for %s" % patch.path,
|
||||
"Expected %s but got %s " % (sha256, checker.sum)
|
||||
|
@ -295,10 +392,17 @@ class PatchCache:
|
|||
namespace2.package2:
|
||||
<patch json>
|
||||
... etc. ...
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, repository, data=None):
|
||||
def __init__(
|
||||
self, repository: "spack.repo.RepoPath", data: Optional[Dict[str, Any]] = None
|
||||
) -> None:
|
||||
"""Initialize a new PatchCache instance.
|
||||
|
||||
Args:
|
||||
repository: repository containing package
|
||||
data: nested dictionary of patches
|
||||
"""
|
||||
if data is None:
|
||||
self.index = {}
|
||||
else:
|
||||
|
@ -309,21 +413,39 @@ def __init__(self, repository, data=None):
|
|||
self.repository = repository
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, stream, repository):
|
||||
def from_json(cls, stream: Any, repository: "spack.repo.RepoPath") -> "PatchCache":
|
||||
"""Initialize a new PatchCache instance from JSON.
|
||||
|
||||
Args:
|
||||
stream: stream of data
|
||||
repository: repository containing package
|
||||
|
||||
Returns:
|
||||
A new PatchCache instance.
|
||||
"""
|
||||
return PatchCache(repository=repository, data=sjson.load(stream))
|
||||
|
||||
def to_json(self, stream):
|
||||
def to_json(self, stream: Any) -> None:
|
||||
"""Dump a JSON representation to a stream.
|
||||
|
||||
Args:
|
||||
stream: stream of data
|
||||
"""
|
||||
sjson.dump({"patches": self.index}, stream)
|
||||
|
||||
def patch_for_package(self, sha256: str, pkg):
|
||||
def patch_for_package(self, sha256: str, pkg: "spack.package_base.PackageBase") -> Patch:
|
||||
"""Look up a patch in the index and build a patch object for it.
|
||||
|
||||
Arguments:
|
||||
sha256: sha256 hash to look up
|
||||
pkg (spack.package_base.PackageBase): Package object to get patch for.
|
||||
|
||||
We build patch objects lazily because building them requires that
|
||||
we have information about the package's location in its repo."""
|
||||
we have information about the package's location in its repo.
|
||||
|
||||
Args:
|
||||
sha256: sha256 hash to look up
|
||||
pkg: Package object to get patch for.
|
||||
|
||||
Returns:
|
||||
The patch object.
|
||||
"""
|
||||
sha_index = self.index.get(sha256)
|
||||
if not sha_index:
|
||||
raise PatchLookupError(
|
||||
|
@ -346,7 +468,12 @@ def patch_for_package(self, sha256: str, pkg):
|
|||
patch_dict["sha256"] = sha256
|
||||
return from_dict(patch_dict, repository=self.repository)
|
||||
|
||||
def update_package(self, pkg_fullname):
|
||||
def update_package(self, pkg_fullname: str) -> None:
|
||||
"""Update the patch cache.
|
||||
|
||||
Args:
|
||||
pkg_fullname: package to update.
|
||||
"""
|
||||
# remove this package from any patch entries that reference it.
|
||||
empty = []
|
||||
for sha256, package_to_patch in self.index.items():
|
||||
|
@ -372,14 +499,29 @@ def update_package(self, pkg_fullname):
|
|||
p2p = self.index.setdefault(sha256, {})
|
||||
p2p.update(package_to_patch)
|
||||
|
||||
def update(self, other):
|
||||
"""Update this cache with the contents of another."""
|
||||
def update(self, other: "PatchCache") -> None:
|
||||
"""Update this cache with the contents of another.
|
||||
|
||||
Args:
|
||||
other: another patch cache to merge
|
||||
"""
|
||||
for sha256, package_to_patch in other.index.items():
|
||||
p2p = self.index.setdefault(sha256, {})
|
||||
p2p.update(package_to_patch)
|
||||
|
||||
@staticmethod
|
||||
def _index_patches(pkg_class, repository):
|
||||
def _index_patches(
|
||||
pkg_class: Type["spack.package_base.PackageBase"], repository: "spack.repo.RepoPath"
|
||||
) -> Dict[Any, Any]:
|
||||
"""Patch index for a specific patch.
|
||||
|
||||
Args:
|
||||
pkg_class: package object to get patches for
|
||||
repository: repository containing the package
|
||||
|
||||
Returns:
|
||||
The patch index for that package.
|
||||
"""
|
||||
index = {}
|
||||
|
||||
# Add patches from the class
|
||||
|
|
Loading…
Reference in a new issue