spack checksum: improve interactive filtering (#40403)

* spack checksum: improve interactive filtering

* fix signature of executable

* Fix restart when using editor

* Don't show [x version(s) are new] when no known versions (e.g. in spack create <url>)

* Test ^D in test_checksum_interactive_quit_from_ask_each

* formatting

* colorize / skip header on invalid command

* show original total, not modified total

* use colify for command list

* Warn about possible URL changes

* show possible URL change as comment

* make mypy happy

* drop numbers

* [o]pen editor -> [e]dit
This commit is contained in:
Harmen Stoppels 2023-10-13 21:43:22 +02:00 committed by GitHub
parent edf4aa9f52
commit a5cb7a9816
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 352 additions and 55 deletions

View file

@ -7,6 +7,7 @@
import re
import sys
import llnl.string
import llnl.util.lang
from llnl.util import tty
@ -15,6 +16,7 @@
import spack.spec
import spack.stage
import spack.util.crypto
import spack.util.web as web_util
from spack.cmd.common import arguments
from spack.package_base import PackageBase, deprecated_version, preferred_version
from spack.util.editor import editor
@ -128,18 +130,38 @@ def checksum(parser, args):
remote_versions = pkg.fetch_remote_versions(args.jobs)
url_dict = remote_versions
# A spidered URL can differ from the package.py *computed* URL, pointing to different tarballs.
# For example, GitHub release pages sometimes have multiple tarballs with different shasum:
# - releases/download/1.0/<pkg>-1.0.tar.gz (uploaded tarball)
# - archive/refs/tags/1.0.tar.gz (generated tarball)
# We wanna ensure that `spack checksum` and `spack install` ultimately use the same URL, so
# here we check whether the crawled and computed URLs disagree, and if so, prioritize the
# former if that URL exists (just sending a HEAD request that is).
url_changed_for_version = set()
for version, url in url_dict.items():
possible_urls = pkg.all_urls_for_version(version)
if url not in possible_urls:
for possible_url in possible_urls:
if web_util.url_exists(possible_url):
url_dict[version] = possible_url
break
else:
url_changed_for_version.add(version)
if not url_dict:
tty.die(f"Could not find any remote versions for {pkg.name}")
# print an empty line to create a new output section block
print()
elif len(url_dict) > 1 and not args.batch and sys.stdin.isatty():
filtered_url_dict = spack.stage.interactive_version_filter(
url_dict, pkg.versions, url_changes=url_changed_for_version
)
if filtered_url_dict is None:
exit(0)
url_dict = filtered_url_dict
else:
tty.info(f"Found {llnl.string.plural(len(url_dict), 'version')} of {pkg.name}")
version_hashes = spack.stage.get_checksums_for_versions(
url_dict,
pkg.name,
keep_stage=args.keep_stage,
batch=(args.batch or len(versions) > 0 or len(url_dict) == 1),
fetch_options=pkg.fetch_options,
url_dict, pkg.name, keep_stage=args.keep_stage, fetch_options=pkg.fetch_options
)
if args.verify:

View file

@ -5,6 +5,7 @@
import os
import re
import sys
import urllib.parse
import llnl.util.tty as tty
@ -823,6 +824,11 @@ def get_versions(args, name):
# Find available versions
try:
url_dict = spack.url.find_versions_of_archive(args.url)
if len(url_dict) > 1 and not args.batch and sys.stdin.isatty():
url_dict_filtered = spack.stage.interactive_version_filter(url_dict)
if url_dict_filtered is None:
exit(0)
url_dict = url_dict_filtered
except UndetectableVersionError:
# Use fake versions
tty.warn("Couldn't detect version in: {0}".format(args.url))
@ -834,11 +840,7 @@ def get_versions(args, name):
url_dict = {version: args.url}
version_hashes = spack.stage.get_checksums_for_versions(
url_dict,
name,
first_stage_function=guesser,
keep_stage=args.keep_stage,
batch=(args.batch or len(url_dict) == 1),
url_dict, name, first_stage_function=guesser, keep_stage=args.keep_stage
)
versions = get_version_lines(version_hashes, url_dict)

View file

@ -7,12 +7,13 @@
import getpass
import glob
import hashlib
import io
import os
import shutil
import stat
import sys
import tempfile
from typing import Callable, Dict, Iterable, Optional
from typing import Callable, Dict, Iterable, Optional, Set
import llnl.string
import llnl.util.lang
@ -27,6 +28,8 @@
partition_path,
remove_linked_tree,
)
from llnl.util.tty.colify import colify
from llnl.util.tty.color import colorize
import spack.caches
import spack.config
@ -35,11 +38,14 @@
import spack.mirror
import spack.paths
import spack.spec
import spack.stage
import spack.util.lock
import spack.util.path as sup
import spack.util.pattern as pattern
import spack.util.url as url_util
from spack.util.crypto import bit_length, prefix_bits
from spack.util.editor import editor, executable
from spack.version import StandardVersion, VersionList
# The well-known stage source subdirectory name.
_source_path_subdir = "spack-src"
@ -860,11 +866,187 @@ def purge():
os.remove(stage_path)
def interactive_version_filter(
url_dict: Dict[StandardVersion, str],
known_versions: Iterable[StandardVersion] = (),
*,
url_changes: Set[StandardVersion] = set(),
input: Callable[..., str] = input,
) -> Optional[Dict[StandardVersion, str]]:
"""Interactively filter the list of spidered versions.
Args:
url_dict: Dictionary of versions to URLs
known_versions: Versions that can be skipped because they are already known
Returns:
Filtered dictionary of versions to URLs or None if the user wants to quit
"""
# Find length of longest string in the list for padding
sorted_and_filtered = sorted(url_dict.keys(), reverse=True)
version_filter = VersionList([":"])
max_len = max(len(str(v)) for v in sorted_and_filtered)
orig_url_dict = url_dict # only copy when using editor to modify
print_header = True
VERSION_COLOR = spack.spec.VERSION_COLOR
while True:
if print_header:
has_filter = version_filter != VersionList([":"])
header = []
if len(sorted_and_filtered) == len(orig_url_dict):
header.append(
f"Selected {llnl.string.plural(len(sorted_and_filtered), 'version')}"
)
else:
header.append(
f"Selected {len(sorted_and_filtered)} of {len(orig_url_dict)} versions"
)
if known_versions:
num_new = sum(1 for v in sorted_and_filtered if v not in known_versions)
header.append(f"{llnl.string.plural(num_new, 'new version')}")
if has_filter:
header.append(colorize(f"Filtered by {VERSION_COLOR}{version_filter}@."))
version_with_url = [
colorize(
f"{VERSION_COLOR}{str(v):{max_len}}@. {url_dict[v]}"
f"{' @K{# NOTE: change of URL}' if v in url_changes else ''}"
)
for v in sorted_and_filtered
]
tty.msg(". ".join(header), *llnl.util.lang.elide_list(version_with_url))
print()
print_header = True
print("commands:")
commands = (
"@*b{[c]}hecksum",
"@*b{[e]}dit",
"@*b{[f]}ilter",
"@*b{[a]}sk each",
"@*b{[n]}ew only",
"@*b{[r]}estart",
"@*b{[q]}uit",
)
colify(list(map(colorize, commands)), indent=2)
try:
command = input(colorize("@*g{command>} ")).strip().lower()
except EOFError:
print()
command = "q"
if command == "c":
break
elif command == "e":
# Create a temporary file in the stage dir with lines of the form
# <version> <url>
# which the user can modify. Once the editor is closed, the file is
# read back in and the versions to url dict is updated.
# Create a temporary file by hashing its contents.
buffer = io.StringIO()
buffer.write("# Edit this file to change the versions and urls to fetch\n")
for v in sorted_and_filtered:
buffer.write(f"{str(v):{max_len}} {url_dict[v]}\n")
data = buffer.getvalue().encode("utf-8")
short_hash = hashlib.sha1(data).hexdigest()[:7]
filename = f"{spack.stage.stage_prefix}versions-{short_hash}.txt"
filepath = os.path.join(spack.stage.get_stage_root(), filename)
# Write contents
with open(filepath, "wb") as f:
f.write(data)
# Open editor
editor(filepath, exec_fn=executable)
# Read back in
with open(filepath, "r") as f:
orig_url_dict, url_dict = url_dict, {}
for line in f:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
try:
version, url = line.split(None, 1)
except ValueError:
tty.warn(f"Couldn't parse: {line}")
continue
try:
url_dict[StandardVersion.from_string(version)] = url
except ValueError:
tty.warn(f"Invalid version: {version}")
continue
sorted_and_filtered = sorted(url_dict.keys(), reverse=True)
os.unlink(filepath)
elif command == "f":
tty.msg(
colorize(
f"Examples filters: {VERSION_COLOR}1.2@. "
f"or {VERSION_COLOR}1.1:1.3@. "
f"or {VERSION_COLOR}=1.2, 1.2.2:@."
)
)
try:
# Allow a leading @ version specifier
filter_spec = input(colorize("@*g{filter>} ")).strip().lstrip("@")
except EOFError:
print()
continue
try:
version_filter.intersect(VersionList([filter_spec]))
except ValueError:
tty.warn(f"Invalid version specifier: {filter_spec}")
continue
# Apply filter
sorted_and_filtered = [v for v in sorted_and_filtered if v.satisfies(version_filter)]
elif command == "a":
i = 0
while i < len(sorted_and_filtered):
v = sorted_and_filtered[i]
try:
answer = input(f" {str(v):{max_len}} {url_dict[v]} [Y/n]? ").strip().lower()
except EOFError:
# If ^D, don't fully exit, but go back to the command prompt, now with possibly
# fewer versions
print()
break
if answer in ("n", "no"):
del sorted_and_filtered[i]
elif answer in ("y", "yes", ""):
i += 1
else:
# Went over each version, so go to checksumming
break
elif command == "n":
sorted_and_filtered = [v for v in sorted_and_filtered if v not in known_versions]
elif command == "r":
url_dict = orig_url_dict
sorted_and_filtered = sorted(url_dict.keys(), reverse=True)
version_filter = VersionList([":"])
elif command == "q":
try:
if input("Really quit [y/N]? ").strip().lower() in ("y", "yes"):
return None
except EOFError:
print()
return None
else:
tty.warn(f"Ignoring invalid command: {command}")
print_header = False
continue
return {v: url_dict[v] for v in sorted_and_filtered}
def get_checksums_for_versions(
url_by_version: Dict[str, str],
package_name: str,
*,
batch: bool = False,
first_stage_function: Optional[Callable[[Stage, str], None]] = None,
keep_stage: bool = False,
concurrency: Optional[int] = None,
@ -890,32 +1072,7 @@ def get_checksums_for_versions(
Returns:
A dictionary mapping each version to the corresponding checksum
"""
sorted_versions = sorted(url_by_version.keys(), reverse=True)
# Find length of longest string in the list for padding
max_len = max(len(str(v)) for v in sorted_versions)
num_ver = len(sorted_versions)
tty.msg(
f"Found {llnl.string.plural(num_ver, 'version')} of {package_name}:",
"",
*llnl.util.lang.elide_list(
["{0:{1}} {2}".format(str(v), max_len, url_by_version[v]) for v in sorted_versions]
),
)
print()
if batch:
archives_to_fetch = len(sorted_versions)
else:
archives_to_fetch = tty.get_number(
"How many would you like to checksum?", default=1, abort="q"
)
if not archives_to_fetch:
tty.die("Aborted.")
versions = sorted_versions[:archives_to_fetch]
versions = sorted(url_by_version.keys(), reverse=True)
search_arguments = [(url_by_version[v], v) for v in versions]
version_hashes, errors = {}, []

View file

@ -7,12 +7,12 @@
import pytest
import llnl.util.tty as tty
import spack.cmd.checksum
import spack.repo
import spack.spec
from spack.main import SpackCommand
from spack.stage import interactive_version_filter
from spack.version import Version
spack_checksum = SpackCommand("checksum")
@ -56,18 +56,134 @@ def test_checksum(arguments, expected, mock_packages, mock_clone_repo, mock_stag
assert "version(" in output
@pytest.mark.not_on_windows("Not supported on Windows (yet)")
def test_checksum_interactive(mock_packages, mock_fetch, mock_stage, monkeypatch):
# TODO: mock_fetch doesn't actually work with stage, working around with ignoring
# fail_on_error for now
def _get_number(*args, **kwargs):
return 1
def input_from_commands(*commands):
"""Create a function that returns the next command from a list of inputs for interactive spack
checksum. If None is encountered, this is equivalent to EOF / ^D."""
commands = iter(commands)
monkeypatch.setattr(tty, "get_number", _get_number)
def _input(prompt):
cmd = next(commands)
if cmd is None:
raise EOFError
assert isinstance(cmd, str)
return cmd
output = spack_checksum("preferred-test", fail_on_error=False)
assert "version of preferred-test" in output
assert "version(" in output
return _input
def test_checksum_interactive_filter():
# Filter effectively by 1:1.0, then checksum.
input = input_from_commands("f", "@1:", "f", "@:1.0", "c")
assert interactive_version_filter(
{
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("1.0.1"): "https://www.example.com/pkg-1.0.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
},
input=input,
) == {
Version("1.0.1"): "https://www.example.com/pkg-1.0.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
}
def test_checksum_interactive_return_from_filter_prompt():
# Enter and then exit filter subcommand.
input = input_from_commands("f", None, "c")
assert interactive_version_filter(
{
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("1.0.1"): "https://www.example.com/pkg-1.0.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
},
input=input,
) == {
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("1.0.1"): "https://www.example.com/pkg-1.0.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
}
def test_checksum_interactive_quit_returns_none():
# Quit after filtering something out (y to confirm quit)
input = input_from_commands("f", "@1:", "q", "y")
assert (
interactive_version_filter(
{
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
},
input=input,
)
is None
)
def test_checksum_interactive_reset_resets():
# Filter 1:, then reset, then filter :0, should just given 0.9 (it was filtered out
# before reset)
input = input_from_commands("f", "@1:", "r", "f", ":0", "c")
assert interactive_version_filter(
{
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
},
input=input,
) == {Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz"}
def test_checksum_interactive_ask_each():
# Ask each should run on the filtered list. First select 1.x, then select only the second
# entry, which is 1.0.1.
input = input_from_commands("f", "@1:", "a", "n", "y", "n")
assert interactive_version_filter(
{
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("1.0.1"): "https://www.example.com/pkg-1.0.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
},
input=input,
) == {Version("1.0.1"): "https://www.example.com/pkg-1.0.1.tar.gz"}
def test_checksum_interactive_quit_from_ask_each():
# Enter ask each mode, select the second item, then quit from submenu, then checksum, which
# should still include the last item at which ask each stopped.
input = input_from_commands("a", "n", "y", None, "c")
assert interactive_version_filter(
{
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
},
input=input,
) == {
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
}
def test_checksum_interactive_new_only():
# The 1.0 version is known already, and should be dropped on `n`.
input = input_from_commands("n", "c")
assert interactive_version_filter(
{
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("1.0"): "https://www.example.com/pkg-1.0.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
},
known_versions=[Version("1.0")],
input=input,
) == {
Version("1.1"): "https://www.example.com/pkg-1.1.tar.gz",
Version("0.9"): "https://www.example.com/pkg-0.9.tar.gz",
}
def test_checksum_versions(mock_packages, mock_clone_repo, mock_fetch, mock_stage):

View file

@ -647,7 +647,7 @@ def find_versions_of_archive(
list_urls |= additional_list_urls
# Grab some web pages to scrape.
pages, links = spack.util.web.spider(list_urls, depth=list_depth, concurrency=concurrency)
_, links = spack.util.web.spider(list_urls, depth=list_depth, concurrency=concurrency)
# Scrape them for archive URLs
regexes = []

View file

@ -61,7 +61,7 @@ def executable(exe: str, args: List[str]) -> int:
return cmd.returncode
def editor(*args: List[str], exec_fn: Callable[[str, List[str]], int] = os.execv) -> bool:
def editor(*args: str, exec_fn: Callable[[str, List[str]], int] = os.execv) -> bool:
"""Invoke the user's editor.
This will try to execute the following, in order: