#!/usr/bin/env python3 import sys import re from operator import itemgetter import logging logging.basicConfig(stream=sys.stderr, level=logging.ERROR) def parse_arguments(): import argparse parser = argparse.ArgumentParser( description='Calculate POP metrics from a list of mpiP reports.') parser.add_argument('-s', '--scaling', choices=['strong','weak'], default='weak', help='Strong or weak scaling experiment') parser.add_argument('mpip_report', nargs='+', type=argparse.FileType('r')) return parser.parse_args() def get_Nranks(fn): import subprocess cmd = "grep '^@ MPI Task Assignment' %s | sed -n '$='" % (fn,) result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE) try: nranks = int(result.stdout) except ValueError: logging.warning("Cannot infer nranks from '@ MPI Task Assignment'.") nranks = None return {"ranks": nranks} def get_times(report): logging.debug("Getting times from %s" % report.name) import re start_time_stats = re.compile("^@-+ Task Time Statistics") separator = re.compile("^-") header = re.compile(".*AppTime") maxline = re.compile("^Max ") meanline = re.compile("^Mean\s+([\.\d]+)\s+([\.\d]+)") minline = re.compile("^Min\s+([\.\d]+)\s+([\.\d]+)") stdline = re.compile("^Stddev ") aggrline = re.compile("^Aggregate\s+([\.\d]+)\s+([\.\d]+)") times = dict() # find start of time statistcs line = report.readline() while line: if start_time_stats.match(line): break line = report.readline() logging.debug("Done start ") # expect separator line = report.readline() if not separator.match(line): logging.error("Failed to parse expected separator.") return times logging.debug("Done separator") # expect header line = report.readline() if not header.match(line): logging.error("Failed to parse expected header.") return times logging.debug("Done header") # expect Max line line = report.readline() if not maxline.match(line): logging.error("Failed to parse expected Max line.") return times # expect and parse Mean line line = report.readline() m = meanline.match(line) if not m: logging.error("Failed to parse expected Mean line.") return times avg_app = float(m.group(1)) avg_mpi = float(m.group(2)) # expect and parse Min line line = report.readline() m = minline.match(line) if not m: logging.error("Failed to parse expected Min line.") return times min_mpi = float(m.group(2)) # expect Stddev line line = report.readline() if not stdline.match(line): logging.error("Failed to parse expected Stddev line.") return times # expect and parse Aggregate line line = report.readline() m = aggrline.match(line) if not m: logging.error("Failed to parse expected Aggregate line.") return times aggr_app = float(m.group(1)) logging.debug("Done times") elapsed = avg_app times["elapsed"] = elapsed times["avg useful"] = elapsed - avg_mpi times["max useful"] = elapsed - min_mpi times["ranks_alt"] = round(aggr_app/avg_app) return times def efficiencies(report): metrics = dict() metrics.update(get_Nranks(report.name)) metrics.update(get_times(report)) if metrics["ranks"] == None: metrics["ranks"] = metrics["ranks_alt"] metrics["PE"] = metrics["avg useful"]/metrics["elapsed"] metrics["LB"] = metrics["avg useful"]/metrics["max useful"] metrics["CE"] = metrics["max useful"]/metrics["elapsed"] return metrics def scalabilities(metrics, strong): ref = metrics[0] nranks_ref = ref["ranks"] useful_ref = ref["avg useful"] for m in metrics: scal = useful_ref/m["avg useful"] if strong: scal *= nranks_ref/m["ranks"] m["CScal"] = scal m["GE"] = m["PE"]*scal return metrics def dump(metrics, stream=sys.stdout, sep=",", keys=None): if not keys: keys = [ "ranks", "elapsed", "avg useful", "max useful", "GE", "PE", "LB", "CE", "CScal" ] header = "# " + sep.join(keys) + "\n" stream.write(header) for m in metrics: line = sep.join([str(m[key]) for key in keys]) + "\n" stream.write(line) def pretty_print(metrics, stream=sys.stdout): descriptions = { "GE": " GE ", "PE": " PE ", "LB": " LB ", "CE": " CE ", "TrE": " TrE ", "SerR": " SerE ", "CScal": " CScal ", "elapsed": " Elapsed time ", "avg useful": " Average useful ", "max useful": " Max useful " } eff_keys = ["GE", "PE", "LB", "CE"] scal_keys = ["CScal"] other_keys = ["elapsed", "avg useful", "max useful"] separator = "|" width_desc = len(descriptions["GE"]) width_col = 7 width_separator = len(separator) ncols = len(metrics) skip = "-" * (width_desc + (width_separator+width_col)*ncols + width_separator) table = "" table += skip + "\n" table += " " * width_desc + separator for col in metrics: table += f'{col["ranks"]:^{width_col}d}' + separator table += "\n" table += skip + "\n" for key in eff_keys: line = descriptions[key] + separator for col in metrics: line += f'{col[key]:^{width_col}.2f}' + separator table += line + "\n" for key in scal_keys: line = descriptions[key] + separator for col in metrics: line += f'{col[key]:^{width_col}.2f}' + separator table += line + "\n" table += skip + "\n" for key in other_keys: line = descriptions[key] + separator for col in metrics: line += f'{col[key]:^{width_col}.2g}' + separator table += line + "\n" table += skip + "\n" stream.write(table) def print_scaling(scaling, stream=sys.stdout): stream.write(f'CScal was calculated for {scaling} scaling.\n') def main(): args = parse_arguments() metrics = list() # compute parallel efficiencies for each input for report in args.mpip_report: print("Processing file %s" % report.name) metrics.append(efficiencies(report)) # sort metrics by ranks metrics.sort(key=itemgetter('ranks')) # compute scalabilities metrics = scalabilities(metrics, strong=args.scaling=='strong') dump(metrics) pretty_print(metrics) print_scaling(args.scaling) if __name__ == "__main__": main()