Update archspec to v0.2.3 (#42854)

This commit is contained in:
Massimiliano Culpo 2024-03-12 09:31:15 +01:00 committed by GitHub
parent e12a8a69c7
commit 5f5fc78236
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 1836 additions and 213 deletions

View file

@ -18,7 +18,7 @@
* Homepage: https://pypi.python.org/pypi/archspec * Homepage: https://pypi.python.org/pypi/archspec
* Usage: Labeling, comparison and detection of microarchitectures * Usage: Labeling, comparison and detection of microarchitectures
* Version: 0.2.2 (commit 1dc58a5776dd77e6fc6e4ba5626af5b1fb24996e) * Version: 0.2.3 (commit 7b8fe60b69e2861e7dac104bc1c183decfcd3daf)
astunparse astunparse
---------------- ----------------

View file

@ -1,2 +1,3 @@
"""Init file to avoid namespace packages""" """Init file to avoid namespace packages"""
__version__ = "0.2.2"
__version__ = "0.2.3"

View file

@ -3,6 +3,7 @@
""" """
import sys import sys
from .cli import main from .cli import main
sys.exit(main()) sys.exit(main())

View file

@ -46,7 +46,11 @@ def _make_parser() -> argparse.ArgumentParser:
def cpu() -> int: def cpu() -> int:
"""Run the `archspec cpu` subcommand.""" """Run the `archspec cpu` subcommand."""
print(archspec.cpu.host()) try:
print(archspec.cpu.host())
except FileNotFoundError as exc:
print(exc)
return 1
return 0 return 0

View file

@ -5,10 +5,14 @@
"""The "cpu" package permits to query and compare different """The "cpu" package permits to query and compare different
CPU microarchitectures. CPU microarchitectures.
""" """
from .microarchitecture import Microarchitecture, UnsupportedMicroarchitecture
from .microarchitecture import TARGETS, generic_microarchitecture
from .microarchitecture import version_components
from .detect import host from .detect import host
from .microarchitecture import (
TARGETS,
Microarchitecture,
UnsupportedMicroarchitecture,
generic_microarchitecture,
version_components,
)
__all__ = [ __all__ = [
"Microarchitecture", "Microarchitecture",

View file

@ -4,15 +4,17 @@
# SPDX-License-Identifier: (Apache-2.0 OR MIT) # SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""Detection of CPU microarchitectures""" """Detection of CPU microarchitectures"""
import collections import collections
import functools
import os import os
import platform import platform
import re import re
import struct
import subprocess import subprocess
import warnings import warnings
from typing import Dict, List, Optional, Set, Tuple, Union
from .microarchitecture import generic_microarchitecture, TARGETS from ..vendor.cpuid.cpuid import CPUID
from .schema import TARGETS_JSON from .microarchitecture import TARGETS, Microarchitecture, generic_microarchitecture
from .schema import CPUID_JSON, TARGETS_JSON
#: Mapping from operating systems to chain of commands #: Mapping from operating systems to chain of commands
#: to obtain a dictionary of raw info on the current cpu #: to obtain a dictionary of raw info on the current cpu
@ -22,43 +24,46 @@
#: functions checking the compatibility of the host with a given target #: functions checking the compatibility of the host with a given target
COMPATIBILITY_CHECKS = {} COMPATIBILITY_CHECKS = {}
# Constants for commonly used architectures
X86_64 = "x86_64"
AARCH64 = "aarch64"
PPC64LE = "ppc64le"
PPC64 = "ppc64"
RISCV64 = "riscv64"
def info_dict(operating_system):
"""Decorator to mark functions that are meant to return raw info on def detection(operating_system: str):
the current cpu. """Decorator to mark functions that are meant to return partial information on the current cpu.
Args: Args:
operating_system (str or tuple): operating system for which the marked operating_system: operating system where this function can be used.
function is a viable factory of raw info dictionaries.
""" """
def decorator(factory): def decorator(factory):
INFO_FACTORY[operating_system].append(factory) INFO_FACTORY[operating_system].append(factory)
return factory
@functools.wraps(factory)
def _impl():
info = factory()
# Check that info contains a few mandatory fields
msg = 'field "{0}" is missing from raw info dictionary'
assert "vendor_id" in info, msg.format("vendor_id")
assert "flags" in info, msg.format("flags")
assert "model" in info, msg.format("model")
assert "model_name" in info, msg.format("model_name")
return info
return _impl
return decorator return decorator
@info_dict(operating_system="Linux") def partial_uarch(
def proc_cpuinfo(): name: str = "", vendor: str = "", features: Optional[Set[str]] = None, generation: int = 0
"""Returns a raw info dictionary by parsing the first entry of ) -> Microarchitecture:
``/proc/cpuinfo`` """Construct a partial microarchitecture, from information gathered during system scan."""
""" return Microarchitecture(
info = {} name=name,
parents=[],
vendor=vendor,
features=features or set(),
compilers={},
generation=generation,
)
@detection(operating_system="Linux")
def proc_cpuinfo() -> Microarchitecture:
"""Returns a partial Microarchitecture, obtained from scanning ``/proc/cpuinfo``"""
data = {}
with open("/proc/cpuinfo") as file: # pylint: disable=unspecified-encoding with open("/proc/cpuinfo") as file: # pylint: disable=unspecified-encoding
for line in file: for line in file:
key, separator, value = line.partition(":") key, separator, value = line.partition(":")
@ -70,11 +75,96 @@ def proc_cpuinfo():
# #
# we are on a blank line separating two cpus. Exit early as # we are on a blank line separating two cpus. Exit early as
# we want to read just the first entry in /proc/cpuinfo # we want to read just the first entry in /proc/cpuinfo
if separator != ":" and info: if separator != ":" and data:
break break
info[key.strip()] = value.strip() data[key.strip()] = value.strip()
return info
architecture = _machine()
if architecture == X86_64:
return partial_uarch(
vendor=data.get("vendor_id", "generic"), features=_feature_set(data, key="flags")
)
if architecture == AARCH64:
return partial_uarch(
vendor=_canonicalize_aarch64_vendor(data),
features=_feature_set(data, key="Features"),
)
if architecture in (PPC64LE, PPC64):
generation_match = re.search(r"POWER(\d+)", data.get("cpu", ""))
try:
generation = int(generation_match.group(1))
except AttributeError:
# There might be no match under emulated environments. For instance
# emulating a ppc64le with QEMU and Docker still reports the host
# /proc/cpuinfo and not a Power
generation = 0
return partial_uarch(generation=generation)
if architecture == RISCV64:
if data.get("uarch") == "sifive,u74-mc":
data["uarch"] = "u74mc"
return partial_uarch(name=data.get("uarch", RISCV64))
return generic_microarchitecture(architecture)
class CpuidInfoCollector:
"""Collects the information we need on the host CPU from cpuid"""
# pylint: disable=too-few-public-methods
def __init__(self):
self.cpuid = CPUID()
registers = self.cpuid.registers_for(**CPUID_JSON["vendor"]["input"])
self.highest_basic_support = registers.eax
self.vendor = struct.pack("III", registers.ebx, registers.edx, registers.ecx).decode(
"utf-8"
)
registers = self.cpuid.registers_for(**CPUID_JSON["highest_extension_support"]["input"])
self.highest_extension_support = registers.eax
self.features = self._features()
def _features(self):
result = set()
def check_features(data):
registers = self.cpuid.registers_for(**data["input"])
for feature_check in data["bits"]:
current = getattr(registers, feature_check["register"])
if self._is_bit_set(current, feature_check["bit"]):
result.add(feature_check["name"])
for call_data in CPUID_JSON["flags"]:
if call_data["input"]["eax"] > self.highest_basic_support:
continue
check_features(call_data)
for call_data in CPUID_JSON["extension-flags"]:
if call_data["input"]["eax"] > self.highest_extension_support:
continue
check_features(call_data)
return result
def _is_bit_set(self, register: int, bit: int) -> bool:
mask = 1 << bit
return register & mask > 0
@detection(operating_system="Windows")
def cpuid_info():
"""Returns a partial Microarchitecture, obtained from running the cpuid instruction"""
architecture = _machine()
if architecture == X86_64:
data = CpuidInfoCollector()
return partial_uarch(vendor=data.vendor, features=data.features)
return generic_microarchitecture(architecture)
def _check_output(args, env): def _check_output(args, env):
@ -83,14 +173,25 @@ def _check_output(args, env):
return str(output.decode("utf-8")) return str(output.decode("utf-8"))
WINDOWS_MAPPING = {
"AMD64": "x86_64",
"ARM64": "aarch64",
}
def _machine(): def _machine():
""" "Return the machine architecture we are on""" """Return the machine architecture we are on"""
operating_system = platform.system() operating_system = platform.system()
# If we are not on Darwin, trust what Python tells us # If we are not on Darwin or Windows, trust what Python tells us
if operating_system != "Darwin": if operating_system not in ("Darwin", "Windows"):
return platform.machine() return platform.machine()
# Normalize windows specific names
if operating_system == "Windows":
platform_machine = platform.machine()
return WINDOWS_MAPPING.get(platform_machine, platform_machine)
# On Darwin it might happen that we are on M1, but using an interpreter # On Darwin it might happen that we are on M1, but using an interpreter
# built for x86_64. In that case "platform.machine() == 'x86_64'", so we # built for x86_64. In that case "platform.machine() == 'x86_64'", so we
# need to fix that. # need to fix that.
@ -103,54 +204,47 @@ def _machine():
if "Apple" in output: if "Apple" in output:
# Note that a native Python interpreter on Apple M1 would return # Note that a native Python interpreter on Apple M1 would return
# "arm64" instead of "aarch64". Here we normalize to the latter. # "arm64" instead of "aarch64". Here we normalize to the latter.
return "aarch64" return AARCH64
return "x86_64" return X86_64
@info_dict(operating_system="Darwin") @detection(operating_system="Darwin")
def sysctl_info_dict(): def sysctl_info() -> Microarchitecture:
"""Returns a raw info dictionary parsing the output of sysctl.""" """Returns a raw info dictionary parsing the output of sysctl."""
child_environment = _ensure_bin_usrbin_in_path() child_environment = _ensure_bin_usrbin_in_path()
def sysctl(*args): def sysctl(*args: str) -> str:
return _check_output(["sysctl"] + list(args), env=child_environment).strip() return _check_output(["sysctl"] + list(args), env=child_environment).strip()
if _machine() == "x86_64": if _machine() == X86_64:
flags = ( features = (
sysctl("-n", "machdep.cpu.features").lower() f'{sysctl("-n", "machdep.cpu.features").lower()} '
+ " " f'{sysctl("-n", "machdep.cpu.leaf7_features").lower()}'
+ sysctl("-n", "machdep.cpu.leaf7_features").lower()
) )
info = { features = set(features.split())
"vendor_id": sysctl("-n", "machdep.cpu.vendor"),
"flags": flags,
"model": sysctl("-n", "machdep.cpu.model"),
"model name": sysctl("-n", "machdep.cpu.brand_string"),
}
else:
model = "unknown"
model_str = sysctl("-n", "machdep.cpu.brand_string").lower()
if "m2" in model_str:
model = "m2"
elif "m1" in model_str:
model = "m1"
elif "apple" in model_str:
model = "m1"
info = { # Flags detected on Darwin turned to their linux counterpart
"vendor_id": "Apple", for darwin_flag, linux_flag in TARGETS_JSON["conversions"]["darwin_flags"].items():
"flags": [], if darwin_flag in features:
"model": model, features.update(linux_flag.split())
"CPU implementer": "Apple",
"model name": sysctl("-n", "machdep.cpu.brand_string"), return partial_uarch(vendor=sysctl("-n", "machdep.cpu.vendor"), features=features)
}
return info model = "unknown"
model_str = sysctl("-n", "machdep.cpu.brand_string").lower()
if "m2" in model_str:
model = "m2"
elif "m1" in model_str:
model = "m1"
elif "apple" in model_str:
model = "m1"
return partial_uarch(name=model, vendor="Apple")
def _ensure_bin_usrbin_in_path(): def _ensure_bin_usrbin_in_path():
# Make sure that /sbin and /usr/sbin are in PATH as sysctl is # Make sure that /sbin and /usr/sbin are in PATH as sysctl is usually found there
# usually found there
child_environment = dict(os.environ.items()) child_environment = dict(os.environ.items())
search_paths = child_environment.get("PATH", "").split(os.pathsep) search_paths = child_environment.get("PATH", "").split(os.pathsep)
for additional_path in ("/sbin", "/usr/sbin"): for additional_path in ("/sbin", "/usr/sbin"):
@ -160,22 +254,10 @@ def _ensure_bin_usrbin_in_path():
return child_environment return child_environment
def adjust_raw_flags(info): def _canonicalize_aarch64_vendor(data: Dict[str, str]) -> str:
"""Adjust the flags detected on the system to homogenize """Adjust the vendor field to make it human-readable"""
slightly different representations. if "CPU implementer" not in data:
""" return "generic"
# Flags detected on Darwin turned to their linux counterpart
flags = info.get("flags", [])
d2l = TARGETS_JSON["conversions"]["darwin_flags"]
for darwin_flag, linux_flag in d2l.items():
if darwin_flag in flags:
info["flags"] += " " + linux_flag
def adjust_raw_vendor(info):
"""Adjust the vendor field to make it human readable"""
if "CPU implementer" not in info:
return
# Mapping numeric codes to vendor (ARM). This list is a merge from # Mapping numeric codes to vendor (ARM). This list is a merge from
# different sources: # different sources:
@ -185,43 +267,37 @@ def adjust_raw_vendor(info):
# https://github.com/gcc-mirror/gcc/blob/master/gcc/config/aarch64/aarch64-cores.def # https://github.com/gcc-mirror/gcc/blob/master/gcc/config/aarch64/aarch64-cores.def
# https://patchwork.kernel.org/patch/10524949/ # https://patchwork.kernel.org/patch/10524949/
arm_vendors = TARGETS_JSON["conversions"]["arm_vendors"] arm_vendors = TARGETS_JSON["conversions"]["arm_vendors"]
arm_code = info["CPU implementer"] arm_code = data["CPU implementer"]
if arm_code in arm_vendors: return arm_vendors.get(arm_code, arm_code)
info["CPU implementer"] = arm_vendors[arm_code]
def raw_info_dictionary(): def _feature_set(data: Dict[str, str], key: str) -> Set[str]:
"""Returns a dictionary with information on the cpu of the current host. return set(data.get(key, "").split())
This function calls all the viable factories one after the other until
there's one that is able to produce the requested information. def detected_info() -> Microarchitecture:
"""Returns a partial Microarchitecture with information on the CPU of the current host.
This function calls all the viable factories one after the other until there's one that is
able to produce the requested information. Falls-back to a generic microarchitecture, if none
of the calls succeed.
""" """
# pylint: disable=broad-except # pylint: disable=broad-except
info = {}
for factory in INFO_FACTORY[platform.system()]: for factory in INFO_FACTORY[platform.system()]:
try: try:
info = factory() return factory()
except Exception as exc: except Exception as exc:
warnings.warn(str(exc)) warnings.warn(str(exc))
if info: return generic_microarchitecture(_machine())
adjust_raw_flags(info)
adjust_raw_vendor(info)
break
return info
def compatible_microarchitectures(info): def compatible_microarchitectures(info: Microarchitecture) -> List[Microarchitecture]:
"""Returns an unordered list of known micro-architectures that are """Returns an unordered list of known micro-architectures that are compatible with the
compatible with the info dictionary passed as argument. partial Microarchitecture passed as input.
Args:
info (dict): dictionary containing information on the host cpu
""" """
architecture_family = _machine() architecture_family = _machine()
# If a tester is not registered, be conservative and assume no known # If a tester is not registered, assume no known target is compatible with the host
# target is compatible with the host
tester = COMPATIBILITY_CHECKS.get(architecture_family, lambda x, y: False) tester = COMPATIBILITY_CHECKS.get(architecture_family, lambda x, y: False)
return [x for x in TARGETS.values() if tester(info, x)] or [ return [x for x in TARGETS.values() if tester(info, x)] or [
generic_microarchitecture(architecture_family) generic_microarchitecture(architecture_family)
@ -230,8 +306,8 @@ def compatible_microarchitectures(info):
def host(): def host():
"""Detects the host micro-architecture and returns it.""" """Detects the host micro-architecture and returns it."""
# Retrieve a dictionary with raw information on the host's cpu # Retrieve information on the host's cpu
info = raw_info_dictionary() info = detected_info()
# Get a list of possible candidates for this micro-architecture # Get a list of possible candidates for this micro-architecture
candidates = compatible_microarchitectures(info) candidates = compatible_microarchitectures(info)
@ -258,16 +334,15 @@ def sorting_fn(item):
return max(candidates, key=sorting_fn) return max(candidates, key=sorting_fn)
def compatibility_check(architecture_family): def compatibility_check(architecture_family: Union[str, Tuple[str, ...]]):
"""Decorator to register a function as a proper compatibility check. """Decorator to register a function as a proper compatibility check.
A compatibility check function takes the raw info dictionary as a first A compatibility check function takes a partial Microarchitecture object as a first argument,
argument and an arbitrary target as the second argument. It returns True and an arbitrary target Microarchitecture as the second argument. It returns True if the
if the target is compatible with the info dictionary, False otherwise. target is compatible with first argument, False otherwise.
Args: Args:
architecture_family (str or tuple): architecture family for which architecture_family: architecture family for which this test can be used
this test can be used, e.g. x86_64 or ppc64le etc.
""" """
# Turn the argument into something iterable # Turn the argument into something iterable
if isinstance(architecture_family, str): if isinstance(architecture_family, str):
@ -280,86 +355,57 @@ def decorator(func):
return decorator return decorator
@compatibility_check(architecture_family=("ppc64le", "ppc64")) @compatibility_check(architecture_family=(PPC64LE, PPC64))
def compatibility_check_for_power(info, target): def compatibility_check_for_power(info, target):
"""Compatibility check for PPC64 and PPC64LE architectures.""" """Compatibility check for PPC64 and PPC64LE architectures."""
basename = platform.machine()
generation_match = re.search(r"POWER(\d+)", info.get("cpu", ""))
try:
generation = int(generation_match.group(1))
except AttributeError:
# There might be no match under emulated environments. For instance
# emulating a ppc64le with QEMU and Docker still reports the host
# /proc/cpuinfo and not a Power
generation = 0
# We can use a target if it descends from our machine type and our # We can use a target if it descends from our machine type and our
# generation (9 for POWER9, etc) is at least its generation. # generation (9 for POWER9, etc) is at least its generation.
arch_root = TARGETS[basename] arch_root = TARGETS[_machine()]
return ( return (
target == arch_root or arch_root in target.ancestors target == arch_root or arch_root in target.ancestors
) and target.generation <= generation ) and target.generation <= info.generation
@compatibility_check(architecture_family="x86_64") @compatibility_check(architecture_family=X86_64)
def compatibility_check_for_x86_64(info, target): def compatibility_check_for_x86_64(info, target):
"""Compatibility check for x86_64 architectures.""" """Compatibility check for x86_64 architectures."""
basename = "x86_64"
vendor = info.get("vendor_id", "generic")
features = set(info.get("flags", "").split())
# We can use a target if it descends from our machine type, is from our # We can use a target if it descends from our machine type, is from our
# vendor, and we have all of its features # vendor, and we have all of its features
arch_root = TARGETS[basename] arch_root = TARGETS[X86_64]
return ( return (
(target == arch_root or arch_root in target.ancestors) (target == arch_root or arch_root in target.ancestors)
and target.vendor in (vendor, "generic") and target.vendor in (info.vendor, "generic")
and target.features.issubset(features) and target.features.issubset(info.features)
) )
@compatibility_check(architecture_family="aarch64") @compatibility_check(architecture_family=AARCH64)
def compatibility_check_for_aarch64(info, target): def compatibility_check_for_aarch64(info, target):
"""Compatibility check for AARCH64 architectures.""" """Compatibility check for AARCH64 architectures."""
basename = "aarch64" # At the moment, it's not clear how to detect compatibility with
features = set(info.get("Features", "").split())
vendor = info.get("CPU implementer", "generic")
# At the moment it's not clear how to detect compatibility with
# a specific version of the architecture # a specific version of the architecture
if target.vendor == "generic" and target.name != "aarch64": if target.vendor == "generic" and target.name != AARCH64:
return False return False
arch_root = TARGETS[basename] arch_root = TARGETS[AARCH64]
arch_root_and_vendor = arch_root == target.family and target.vendor in ( arch_root_and_vendor = arch_root == target.family and target.vendor in (
vendor, info.vendor,
"generic", "generic",
) )
# On macOS it seems impossible to get all the CPU features # On macOS it seems impossible to get all the CPU features
# with syctl info, but for ARM we can get the exact model # with syctl info, but for ARM we can get the exact model
if platform.system() == "Darwin": if platform.system() == "Darwin":
model_key = info.get("model", basename) model = TARGETS[info.name]
model = TARGETS[model_key]
return arch_root_and_vendor and (target == model or target in model.ancestors) return arch_root_and_vendor and (target == model or target in model.ancestors)
return arch_root_and_vendor and target.features.issubset(features) return arch_root_and_vendor and target.features.issubset(info.features)
@compatibility_check(architecture_family="riscv64") @compatibility_check(architecture_family=RISCV64)
def compatibility_check_for_riscv64(info, target): def compatibility_check_for_riscv64(info, target):
"""Compatibility check for riscv64 architectures.""" """Compatibility check for riscv64 architectures."""
basename = "riscv64" arch_root = TARGETS[RISCV64]
uarch = info.get("uarch")
# sifive unmatched board
if uarch == "sifive,u74-mc":
uarch = "u74mc"
# catch-all for unknown uarchs
else:
uarch = "riscv64"
arch_root = TARGETS[basename]
return (target == arch_root or arch_root in target.ancestors) and ( return (target == arch_root or arch_root in target.ancestors) and (
target == uarch or target.vendor == "generic" target.name == info.name or target.vendor == "generic"
) )

View file

@ -13,6 +13,7 @@
import archspec import archspec
import archspec.cpu.alias import archspec.cpu.alias
import archspec.cpu.schema import archspec.cpu.schema
from .alias import FEATURE_ALIASES from .alias import FEATURE_ALIASES
from .schema import LazyDictionary from .schema import LazyDictionary
@ -47,7 +48,7 @@ class Microarchitecture:
which has "broadwell" as a parent, supports running binaries which has "broadwell" as a parent, supports running binaries
optimized for "broadwell". optimized for "broadwell".
vendor (str): vendor of the micro-architecture vendor (str): vendor of the micro-architecture
features (list of str): supported CPU flags. Note that the semantic features (set of str): supported CPU flags. Note that the semantic
of the flags in this field might vary among architectures, if of the flags in this field might vary among architectures, if
at all present. For instance x86_64 processors will list all at all present. For instance x86_64 processors will list all
the flags supported by a given CPU while Arm processors will the flags supported by a given CPU while Arm processors will
@ -180,24 +181,28 @@ def generic(self):
generics = [x for x in [self] + self.ancestors if x.vendor == "generic"] generics = [x for x in [self] + self.ancestors if x.vendor == "generic"]
return max(generics, key=lambda x: len(x.ancestors)) return max(generics, key=lambda x: len(x.ancestors))
def to_dict(self, return_list_of_items=False): def to_dict(self):
"""Returns a dictionary representation of this object. """Returns a dictionary representation of this object."""
return {
"name": str(self.name),
"vendor": str(self.vendor),
"features": sorted(str(x) for x in self.features),
"generation": self.generation,
"parents": [str(x) for x in self.parents],
"compilers": self.compilers,
}
Args: @staticmethod
return_list_of_items (bool): if True returns an ordered list of def from_dict(data) -> "Microarchitecture":
items instead of the dictionary """Construct a microarchitecture from a dictionary representation."""
""" return Microarchitecture(
list_of_items = [ name=data["name"],
("name", str(self.name)), parents=[TARGETS[x] for x in data["parents"]],
("vendor", str(self.vendor)), vendor=data["vendor"],
("features", sorted(str(x) for x in self.features)), features=set(data["features"]),
("generation", self.generation), compilers=data.get("compilers", {}),
("parents", [str(x) for x in self.parents]), generation=data.get("generation", 0),
] )
if return_list_of_items:
return list_of_items
return dict(list_of_items)
def optimization_flags(self, compiler, version): def optimization_flags(self, compiler, version):
"""Returns a string containing the optimization flags that needs """Returns a string containing the optimization flags that needs
@ -271,9 +276,7 @@ def tuplify(ver):
flags = flags_fmt.format(**compiler_entry) flags = flags_fmt.format(**compiler_entry)
return flags return flags
msg = ( msg = "cannot produce optimized binary for micro-architecture '{0}' with {1}@{2}"
"cannot produce optimized binary for micro-architecture '{0}' with {1}@{2}"
)
if compiler_info: if compiler_info:
versions = [x["versions"] for x in compiler_info] versions = [x["versions"] for x in compiler_info]
msg += f' [supported compiler versions are {", ".join(versions)}]' msg += f' [supported compiler versions are {", ".join(versions)}]'
@ -289,9 +292,7 @@ def generic_microarchitecture(name):
Args: Args:
name (str): name of the micro-architecture name (str): name of the micro-architecture
""" """
return Microarchitecture( return Microarchitecture(name, parents=[], vendor="generic", features=[], compilers={})
name, parents=[], vendor="generic", features=[], compilers={}
)
def version_components(version): def version_components(version):
@ -345,9 +346,7 @@ def fill_target_from_dict(name, data, targets):
compilers = values.get("compilers", {}) compilers = values.get("compilers", {})
generation = values.get("generation", 0) generation = values.get("generation", 0)
targets[name] = Microarchitecture( targets[name] = Microarchitecture(name, parents, vendor, features, compilers, generation)
name, parents, vendor, features, compilers, generation
)
known_targets = {} known_targets = {}
data = archspec.cpu.schema.TARGETS_JSON["microarchitectures"] data = archspec.cpu.schema.TARGETS_JSON["microarchitectures"]

View file

@ -7,7 +7,9 @@
""" """
import collections.abc import collections.abc
import json import json
import os.path import os
import pathlib
from typing import Tuple
class LazyDictionary(collections.abc.MutableMapping): class LazyDictionary(collections.abc.MutableMapping):
@ -46,21 +48,65 @@ def __len__(self):
return len(self.data) return len(self.data)
def _load_json_file(json_file): #: Environment variable that might point to a directory with a user defined JSON file
json_dir = os.path.join(os.path.dirname(__file__), "..", "json", "cpu") DIR_FROM_ENVIRONMENT = "ARCHSPEC_CPU_DIR"
json_dir = os.path.abspath(json_dir)
def _factory(): #: Environment variable that might point to a directory with extensions to JSON files
filename = os.path.join(json_dir, json_file) EXTENSION_DIR_FROM_ENVIRONMENT = "ARCHSPEC_EXTENSION_CPU_DIR"
with open(filename, "r", encoding="utf-8") as file:
return json.load(file)
return _factory
def _json_file(filename: str, allow_custom: bool = False) -> Tuple[pathlib.Path, pathlib.Path]:
"""Given a filename, returns the absolute path for the main JSON file, and an
optional absolute path for an extension JSON file.
Args:
filename: filename for the JSON file
allow_custom: if True, allows overriding the location where the file resides
"""
json_dir = pathlib.Path(__file__).parent / ".." / "json" / "cpu"
if allow_custom and DIR_FROM_ENVIRONMENT in os.environ:
json_dir = pathlib.Path(os.environ[DIR_FROM_ENVIRONMENT])
json_dir = json_dir.absolute()
json_file = json_dir / filename
extension_file = None
if allow_custom and EXTENSION_DIR_FROM_ENVIRONMENT in os.environ:
extension_dir = pathlib.Path(os.environ[EXTENSION_DIR_FROM_ENVIRONMENT])
extension_dir.absolute()
extension_file = extension_dir / filename
return json_file, extension_file
def _load(json_file: pathlib.Path, extension_file: pathlib.Path):
with open(json_file, "r", encoding="utf-8") as file:
data = json.load(file)
if not extension_file or not extension_file.exists():
return data
with open(extension_file, "r", encoding="utf-8") as file:
extension_data = json.load(file)
top_level_sections = list(data.keys())
for key in top_level_sections:
if key not in extension_data:
continue
data[key].update(extension_data[key])
return data
#: In memory representation of the data in microarchitectures.json, #: In memory representation of the data in microarchitectures.json,
#: loaded on first access #: loaded on first access
TARGETS_JSON = LazyDictionary(_load_json_file("microarchitectures.json")) TARGETS_JSON = LazyDictionary(_load, *_json_file("microarchitectures.json", allow_custom=True))
#: JSON schema for microarchitectures.json, loaded on first access #: JSON schema for microarchitectures.json, loaded on first access
SCHEMA = LazyDictionary(_load_json_file("microarchitectures_schema.json")) TARGETS_JSON_SCHEMA = LazyDictionary(_load, *_json_file("microarchitectures_schema.json"))
#: Information on how to call 'cpuid' to get information on the HOST CPU
CPUID_JSON = LazyDictionary(_load, *_json_file("cpuid.json", allow_custom=True))
#: JSON schema for cpuid.json, loaded on first access
CPUID_JSON_SCHEMA = LazyDictionary(_load, *_json_file("cpuid_schema.json"))

View file

@ -9,11 +9,11 @@ language specific APIs.
Currently the repository contains the following JSON files: Currently the repository contains the following JSON files:
```console ```console
. cpu/
├── COPYRIGHT ├── cpuid.json # Contains information on CPUID calls to retrieve vendor and features on x86_64
└── cpu ├── cpuid_schema.json # Schema for the file above
   ├── microarchitectures.json # Contains information on CPU microarchitectures ├── microarchitectures.json # Contains information on CPU microarchitectures
   └── microarchitectures_schema.json # Schema for the file above └── microarchitectures_schema.json # Schema for the file above
``` ```

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,134 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Schema for microarchitecture definitions and feature aliases",
"type": "object",
"additionalProperties": false,
"properties": {
"vendor": {
"type": "object",
"additionalProperties": false,
"properties": {
"description": {
"type": "string"
},
"input": {
"type": "object",
"additionalProperties": false,
"properties": {
"eax": {
"type": "integer"
},
"ecx": {
"type": "integer"
}
}
}
}
},
"highest_extension_support": {
"type": "object",
"additionalProperties": false,
"properties": {
"description": {
"type": "string"
},
"input": {
"type": "object",
"additionalProperties": false,
"properties": {
"eax": {
"type": "integer"
},
"ecx": {
"type": "integer"
}
}
}
}
},
"flags": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"properties": {
"description": {
"type": "string"
},
"input": {
"type": "object",
"additionalProperties": false,
"properties": {
"eax": {
"type": "integer"
},
"ecx": {
"type": "integer"
}
}
},
"bits": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"properties": {
"name": {
"type": "string"
},
"register": {
"type": "string"
},
"bit": {
"type": "integer"
}
}
}
}
}
}
},
"extension-flags": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"properties": {
"description": {
"type": "string"
},
"input": {
"type": "object",
"additionalProperties": false,
"properties": {
"eax": {
"type": "integer"
},
"ecx": {
"type": "integer"
}
}
},
"bits": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"properties": {
"name": {
"type": "string"
},
"register": {
"type": "string"
},
"bit": {
"type": "integer"
}
}
}
}
}
}
}
}
}

View file

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2014 Anders Høst
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View file

@ -0,0 +1,76 @@
cpuid.py
========
Now, this is silly!
Pure Python library for accessing information about x86 processors
by querying the [CPUID](http://en.wikipedia.org/wiki/CPUID)
instruction. Well, not exactly pure Python...
It works by allocating a small piece of virtual memory, copying
a raw x86 function to that memory, giving the memory execute
permissions and then calling the memory as a function. The injected
function executes the CPUID instruction and copies the result back
to a ctypes.Structure where is can be read by Python.
It should work fine on both 32 and 64 bit versions of Windows and Linux
running x86 processors. Apple OS X and other BSD systems should also work,
not tested though...
Why?
----
For poops and giggles. Plus, having access to a low-level feature
without having to compile a C wrapper is pretty neat.
Examples
--------
Getting info with eax=0:
import cpuid
q = cpuid.CPUID()
eax, ebx, ecx, edx = q(0)
Running the files:
$ python example.py
Vendor ID : GenuineIntel
CPU name : Intel(R) Xeon(R) CPU W3550 @ 3.07GHz
Vector instructions supported:
SSE : Yes
SSE2 : Yes
SSE3 : Yes
SSSE3 : Yes
SSE4.1 : Yes
SSE4.2 : Yes
SSE4a : --
AVX : --
AVX2 : --
$ python cpuid.py
CPUID A B C D
00000000 0000000b 756e6547 6c65746e 49656e69
00000001 000106a5 00100800 009ce3bd bfebfbff
00000002 55035a01 00f0b2e4 00000000 09ca212c
00000003 00000000 00000000 00000000 00000000
00000004 00000000 00000000 00000000 00000000
00000005 00000040 00000040 00000003 00001120
00000006 00000003 00000002 00000001 00000000
00000007 00000000 00000000 00000000 00000000
00000008 00000000 00000000 00000000 00000000
00000009 00000000 00000000 00000000 00000000
0000000a 07300403 00000044 00000000 00000603
0000000b 00000000 00000000 00000095 00000000
80000000 80000008 00000000 00000000 00000000
80000001 00000000 00000000 00000001 28100800
80000002 65746e49 2952286c 6f655820 2952286e
80000003 55504320 20202020 20202020 57202020
80000004 30353533 20402020 37302e33 007a4847
80000005 00000000 00000000 00000000 00000000
80000006 00000000 00000000 01006040 00000000
80000007 00000000 00000000 00000000 00000100
80000008 00003024 00000000 00000000 00000000

View file

@ -0,0 +1,172 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Anders Høst
#
from __future__ import print_function
import platform
import os
import ctypes
from ctypes import c_uint32, c_long, c_ulong, c_size_t, c_void_p, POINTER, CFUNCTYPE
# Posix x86_64:
# Three first call registers : RDI, RSI, RDX
# Volatile registers : RAX, RCX, RDX, RSI, RDI, R8-11
# Windows x86_64:
# Three first call registers : RCX, RDX, R8
# Volatile registers : RAX, RCX, RDX, R8-11
# cdecl 32 bit:
# Three first call registers : Stack (%esp)
# Volatile registers : EAX, ECX, EDX
_POSIX_64_OPC = [
0x53, # push %rbx
0x89, 0xf0, # mov %esi,%eax
0x89, 0xd1, # mov %edx,%ecx
0x0f, 0xa2, # cpuid
0x89, 0x07, # mov %eax,(%rdi)
0x89, 0x5f, 0x04, # mov %ebx,0x4(%rdi)
0x89, 0x4f, 0x08, # mov %ecx,0x8(%rdi)
0x89, 0x57, 0x0c, # mov %edx,0xc(%rdi)
0x5b, # pop %rbx
0xc3 # retq
]
_WINDOWS_64_OPC = [
0x53, # push %rbx
0x89, 0xd0, # mov %edx,%eax
0x49, 0x89, 0xc9, # mov %rcx,%r9
0x44, 0x89, 0xc1, # mov %r8d,%ecx
0x0f, 0xa2, # cpuid
0x41, 0x89, 0x01, # mov %eax,(%r9)
0x41, 0x89, 0x59, 0x04, # mov %ebx,0x4(%r9)
0x41, 0x89, 0x49, 0x08, # mov %ecx,0x8(%r9)
0x41, 0x89, 0x51, 0x0c, # mov %edx,0xc(%r9)
0x5b, # pop %rbx
0xc3 # retq
]
_CDECL_32_OPC = [
0x53, # push %ebx
0x57, # push %edi
0x8b, 0x7c, 0x24, 0x0c, # mov 0xc(%esp),%edi
0x8b, 0x44, 0x24, 0x10, # mov 0x10(%esp),%eax
0x8b, 0x4c, 0x24, 0x14, # mov 0x14(%esp),%ecx
0x0f, 0xa2, # cpuid
0x89, 0x07, # mov %eax,(%edi)
0x89, 0x5f, 0x04, # mov %ebx,0x4(%edi)
0x89, 0x4f, 0x08, # mov %ecx,0x8(%edi)
0x89, 0x57, 0x0c, # mov %edx,0xc(%edi)
0x5f, # pop %edi
0x5b, # pop %ebx
0xc3 # ret
]
is_windows = os.name == "nt"
is_64bit = ctypes.sizeof(ctypes.c_voidp) == 8
class CPUID_struct(ctypes.Structure):
_register_names = ("eax", "ebx", "ecx", "edx")
_fields_ = [(r, c_uint32) for r in _register_names]
def __getitem__(self, item):
if item not in self._register_names:
raise KeyError(item)
return getattr(self, item)
def __repr__(self):
return "eax=0x{:x}, ebx=0x{:x}, ecx=0x{:x}, edx=0x{:x}".format(self.eax, self.ebx, self.ecx, self.edx)
class CPUID(object):
def __init__(self):
if platform.machine() not in ("AMD64", "x86_64", "x86", "i686"):
raise SystemError("Only available for x86")
if is_windows:
if is_64bit:
# VirtualAlloc seems to fail under some weird
# circumstances when ctypes.windll.kernel32 is
# used under 64 bit Python. CDLL fixes this.
self.win = ctypes.CDLL("kernel32.dll")
opc = _WINDOWS_64_OPC
else:
# Here ctypes.windll.kernel32 is needed to get the
# right DLL. Otherwise it will fail when running
# 32 bit Python on 64 bit Windows.
self.win = ctypes.windll.kernel32
opc = _CDECL_32_OPC
else:
opc = _POSIX_64_OPC if is_64bit else _CDECL_32_OPC
size = len(opc)
code = (ctypes.c_ubyte * size)(*opc)
if is_windows:
self.win.VirtualAlloc.restype = c_void_p
self.win.VirtualAlloc.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_ulong, ctypes.c_ulong]
self.addr = self.win.VirtualAlloc(None, size, 0x1000, 0x40)
if not self.addr:
raise MemoryError("Could not allocate RWX memory")
ctypes.memmove(self.addr, code, size)
else:
from mmap import (
mmap,
MAP_PRIVATE,
MAP_ANONYMOUS,
PROT_WRITE,
PROT_READ,
PROT_EXEC,
)
self.mm = mmap(
-1,
size,
flags=MAP_PRIVATE | MAP_ANONYMOUS,
prot=PROT_WRITE | PROT_READ | PROT_EXEC,
)
self.mm.write(code)
self.addr = ctypes.addressof(ctypes.c_int.from_buffer(self.mm))
func_type = CFUNCTYPE(None, POINTER(CPUID_struct), c_uint32, c_uint32)
self.func_ptr = func_type(self.addr)
def __call__(self, eax, ecx=0):
struct = self.registers_for(eax=eax, ecx=ecx)
return struct.eax, struct.ebx, struct.ecx, struct.edx
def registers_for(self, eax, ecx=0):
"""Calls cpuid with eax and ecx set as the input arguments, and returns a structure
containing eax, ebx, ecx, and edx.
"""
struct = CPUID_struct()
self.func_ptr(struct, eax, ecx)
return struct
def __del__(self):
if is_windows:
self.win.VirtualFree.restype = c_long
self.win.VirtualFree.argtypes = [c_void_p, c_size_t, c_ulong]
self.win.VirtualFree(self.addr, 0, 0x8000)
else:
self.mm.close()
if __name__ == "__main__":
def valid_inputs():
cpuid = CPUID()
for eax in (0x0, 0x80000000):
highest, _, _, _ = cpuid(eax)
while eax <= highest:
regs = cpuid(eax)
yield (eax, regs)
eax += 1
print(" ".join(x.ljust(8) for x in ("CPUID", "A", "B", "C", "D")).strip())
for eax, regs in valid_inputs():
print("%08x" % eax, " ".join("%08x" % reg for reg in regs))

View file

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Anders Høst
#
from __future__ import print_function
import struct
import cpuid
def cpu_vendor(cpu):
_, b, c, d = cpu(0)
return struct.pack("III", b, d, c).decode("utf-8")
def cpu_name(cpu):
name = "".join((struct.pack("IIII", *cpu(0x80000000 + i)).decode("utf-8")
for i in range(2, 5)))
return name.split('\x00', 1)[0]
def is_set(cpu, leaf, subleaf, reg_idx, bit):
"""
@param {leaf} %eax
@param {sublead} %ecx, 0 in most cases
@param {reg_idx} idx of [%eax, %ebx, %ecx, %edx], 0-based
@param {bit} bit of reg selected by {reg_idx}, 0-based
"""
regs = cpu(leaf, subleaf)
if (1 << bit) & regs[reg_idx]:
return "Yes"
else:
return "--"
if __name__ == "__main__":
cpu = cpuid.CPUID()
print("Vendor ID : %s" % cpu_vendor(cpu))
print("CPU name : %s" % cpu_name(cpu))
print()
print("Vector instructions supported:")
print("SSE : %s" % is_set(cpu, 1, 0, 3, 25))
print("SSE2 : %s" % is_set(cpu, 1, 0, 3, 26))
print("SSE3 : %s" % is_set(cpu, 1, 0, 2, 0))
print("SSSE3 : %s" % is_set(cpu, 1, 0, 2, 9))
print("SSE4.1 : %s" % is_set(cpu, 1, 0, 2, 19))
print("SSE4.2 : %s" % is_set(cpu, 1, 0, 2, 20))
print("SSE4a : %s" % is_set(cpu, 0x80000001, 0, 2, 6))
print("AVX : %s" % is_set(cpu, 1, 0, 2, 28))
print("AVX2 : %s" % is_set(cpu, 7, 0, 1, 5))
print("BMI1 : %s" % is_set(cpu, 7, 0, 1, 3))
print("BMI2 : %s" % is_set(cpu, 7, 0, 1, 8))
# Intel RDT CMT/MBM
print("L3 Monitoring : %s" % is_set(cpu, 0xf, 0, 3, 1))
print("L3 Occupancy : %s" % is_set(cpu, 0xf, 1, 3, 0))
print("L3 Total BW : %s" % is_set(cpu, 0xf, 1, 3, 1))
print("L3 Local BW : %s" % is_set(cpu, 0xf, 1, 3, 2))

View file

@ -10,6 +10,8 @@
import tempfile import tempfile
from typing import Dict, List, Set from typing import Dict, List, Set
import archspec.cpu
import spack.compiler import spack.compiler
import spack.operating_systems.windows_os import spack.operating_systems.windows_os
import spack.platforms import spack.platforms
@ -186,6 +188,9 @@ def __init__(self, *args, **kwargs):
# get current platform architecture and format for vcvars argument # get current platform architecture and format for vcvars argument
arch = spack.platforms.real_host().default.lower() arch = spack.platforms.real_host().default.lower()
arch = arch.replace("-", "_") arch = arch.replace("-", "_")
if str(archspec.cpu.host().family) == "x86_64":
arch = "amd64"
self.vcvars_call = VCVarsInvocation(vcvars_script_path, arch, self.msvc_version) self.vcvars_call = VCVarsInvocation(vcvars_script_path, arch, self.msvc_version)
env_cmds.append(self.vcvars_call) env_cmds.append(self.vcvars_call)
# Below is a check for a valid fortran path # Below is a check for a valid fortran path

View file

@ -102,7 +102,10 @@ def to_dict_or_value(self):
if self.microarchitecture.vendor == "generic": if self.microarchitecture.vendor == "generic":
return str(self) return str(self)
return syaml.syaml_dict(self.microarchitecture.to_dict(return_list_of_items=True)) # Get rid of compiler flag information before turning the uarch into a dict
uarch_dict = self.microarchitecture.to_dict()
uarch_dict.pop("compilers", None)
return syaml.syaml_dict(uarch_dict.items())
def __repr__(self): def __repr__(self):
cls_name = self.__class__.__name__ cls_name = self.__class__.__name__