frontend tests now use new OO style, removed AttrDict usage

This commit is contained in:
stefandesouza
2023-10-22 16:43:46 +02:00
parent 6384ea2e18
commit db02359ea2
3 changed files with 81 additions and 82 deletions

View File

@@ -8,7 +8,6 @@ import re
from datetime import datetime as dt from datetime import datetime as dt
from osaca.semantics import INSTR_FLAGS, ArchSemantics, KernelDG, MachineModel from osaca.semantics import INSTR_FLAGS, ArchSemantics, KernelDG, MachineModel
from osaca.parser import AttrDict
def _get_version(*file_paths): def _get_version(*file_paths):
@@ -54,7 +53,7 @@ class Frontend(object):
:type instruction_form: `dict` :type instruction_form: `dict`
:returns: `True` if comment line, `False` otherwise :returns: `True` if comment line, `False` otherwise
""" """
return instruction_form["comment"] is not None and instruction_form["instruction"] is None return instruction_form.comment is not None and instruction_form.instruction is None
def throughput_analysis(self, kernel, show_lineno=False, show_cmnts=True): def throughput_analysis(self, kernel, show_lineno=False, show_cmnts=True):
""" """
@@ -82,14 +81,14 @@ class Frontend(object):
s += separator + "\n" s += separator + "\n"
for instruction_form in kernel: for instruction_form in kernel:
line = "{:4d} {} {} {}".format( line = "{:4d} {} {} {}".format(
instruction_form["line_number"], instruction_form.line_number,
self._get_port_pressure( self._get_port_pressure(
instruction_form["port_pressure"], port_len, separator=sep_list instruction_form.port_pressure, port_len, separator=sep_list
), ),
self._get_flag_symbols(instruction_form["flags"]) self._get_flag_symbols(instruction_form.flags)
if instruction_form["instruction"] is not None if instruction_form.instruction is not None
else " ", else " ",
instruction_form["line"].strip().replace("\t", " "), instruction_form.line.strip().replace("\t", " "),
) )
line = line if show_lineno else col_sep + col_sep.join(line.split(col_sep)[1:]) line = line if show_lineno else col_sep + col_sep.join(line.split(col_sep)[1:])
if show_cmnts is False and self._is_comment(instruction_form): if show_cmnts is False and self._is_comment(instruction_form):
@@ -113,20 +112,20 @@ class Frontend(object):
for instruction_form in cp_kernel: for instruction_form in cp_kernel:
s += ( s += (
"{:4d} {} {:4.1f} {}{}{} {}".format( "{:4d} {} {:4.1f} {}{}{} {}".format(
instruction_form["line_number"], instruction_form.line_number,
separator, separator,
instruction_form["latency_cp"], instruction_form.latency_cp,
separator, separator,
"X" if INSTR_FLAGS.LT_UNKWN in instruction_form["flags"] else " ", "X" if INSTR_FLAGS.LT_UNKWN in instruction_form.flags else " ",
separator, separator,
instruction_form["line"], instruction_form.line,
) )
) + "\n" ) + "\n"
s += ( s += (
"\n{:4} {} {:4.1f}".format( "\n{:4} {} {:4.1f}".format(
" " * max([len(str(instr_form["line_number"])) for instr_form in cp_kernel]), " " * max([len(str(instr_form.line_number)) for instr_form in cp_kernel]),
" " * len(separator), " " * len(separator),
sum([instr_form["latency_cp"] for instr_form in cp_kernel]), sum([instr_form.latency_cp for instr_form in cp_kernel]),
) )
) + "\n" ) + "\n"
return s return s
@@ -151,9 +150,9 @@ class Frontend(object):
separator, separator,
dep_dict[dep]["latency"], dep_dict[dep]["latency"],
separator, separator,
dep_dict[dep]["root"]["line"].strip(), dep_dict[dep]["root"].line.strip(),
separator, separator,
[node["line_number"] for node, lat in dep_dict[dep]["dependencies"]], [node.line_number for node, lat in dep_dict[dep]["dependencies"]],
) )
return s return s
@@ -238,10 +237,10 @@ class Frontend(object):
if lcd_warning: if lcd_warning:
warnings.append("LCDWarning") warnings.append("LCDWarning")
if INSTR_FLAGS.TP_UNKWN in [flag for instr in kernel for flag in instr["flags"]]: #if INSTR_FLAGS.TP_UNKWN in [flag for instr in kernel for flag in instr.flags]:
warnings.append("UnknownInstrWarning") # warnings.append("UnknownInstrWarning")
tp_sum = ArchSemantics.get_throughput_sum(kernel) or kernel[0]["port_pressure"] tp_sum = ArchSemantics.get_throughput_sum(kernel) or kernel[0].port_pressure
cp_kernel = kernel_dg.get_critical_path() cp_kernel = kernel_dg.get_critical_path()
dep_dict = kernel_dg.get_loopcarried_dependencies() dep_dict = kernel_dg.get_loopcarried_dependencies()
@@ -254,31 +253,31 @@ class Frontend(object):
"Warnings": warnings, "Warnings": warnings,
"Kernel": [ "Kernel": [
{ {
"Line": re.sub(r"\s+", " ", x["line"].strip()), "Line": re.sub(r"\s+", " ", x.line.strip()),
"LineNumber": x["line_number"], "LineNumber": x.line_number,
"Flags": list(x["flags"]), "Flags": list(x.flags),
"Instruction": x["instruction"], "Instruction": x.instruction,
"Operands": AttrDict.get_dict(x["operands"]), "Operands": x.operands,
"SemanticOperands": AttrDict.get_dict(x["semantic_operands"]), "SemanticOperands": x.semantic_operands,
"Label": x["label"], "Label": x.label,
"Directive": x["directive"], "Directive": x.directive,
"Latency": float(x["latency"]), "Latency": float(x.latency),
"LatencyCP": float(x["latency_cp"]), "LatencyCP": float(x.latency_cp),
"LatencyLCD": float(x["latency_lcd"]), "LatencyLCD": float(x.latency_lcd),
"Throughput": float(x["throughput"]), "Throughput": float(x.throughput),
"LatencyWithoutLoad": float(x["latency_wo_load"]), "LatencyWithoutLoad": float(x.latency_wo_load),
"PortPressure": { "PortPressure": {
self._machine_model.get_ports()[i]: v self._machine_model.get_ports()[i]: v
for i, v in enumerate(x["port_pressure"]) for i, v in enumerate(x.port_pressure)
}, },
"PortUops": [ "PortUops": [
{ {
"Ports": list(y[1]), "Ports": list(y[1]),
"Cycles": y[0], "Cycles": y[0],
} }
for y in x["port_uops"] for y in x.port_uops
], ],
"Comment": x["comment"], "Comment": x.comment,
} }
for x in kernel for x in kernel
], ],
@@ -286,7 +285,7 @@ class Frontend(object):
"PortPressure": { "PortPressure": {
self._machine_model.get_ports()[i]: v for i, v in enumerate(tp_sum) self._machine_model.get_ports()[i]: v for i, v in enumerate(tp_sum)
}, },
"CriticalPath": sum([x["latency_cp"] for x in cp_kernel]), "CriticalPath": sum([x.latency_cp for x in cp_kernel]),
"LCD": lcd_sum, "LCD": lcd_sum,
}, },
"Target": { "Target": {
@@ -325,7 +324,7 @@ class Frontend(object):
# Separator for ports # Separator for ports
separator = "-" * sum([x + 3 for x in port_len]) + "-" separator = "-" * sum([x + 3 for x in port_len]) + "-"
# ... for line numbers # ... for line numbers
separator += "--" + len(str(kernel[-1]["line_number"])) * "-" separator += "--" + len(str(kernel[-1].line_number)) * "-"
col_sep = "|" col_sep = "|"
# for LCD/CP column # for LCD/CP column
separator += "-" * (2 * 6 + len(col_sep)) + "-" * len(col_sep) + "--" separator += "-" * (2 * 6 + len(col_sep)) + "-" * len(col_sep) + "--"
@@ -333,14 +332,14 @@ class Frontend(object):
headline = "Port pressure in cycles" headline = "Port pressure in cycles"
headline_str = "{{:^{}}}".format(len(separator)) headline_str = "{{:^{}}}".format(len(separator))
# Prepare CP/LCD variable # Prepare CP/LCD variable
cp_lines = [x["line_number"] for x in cp_kernel] cp_lines = [x.line_number for x in cp_kernel]
lcd_sum = 0.0 lcd_sum = 0.0
lcd_lines = {} lcd_lines = {}
if dep_dict: if dep_dict:
longest_lcd = max(dep_dict, key=lambda ln: dep_dict[ln]["latency"]) longest_lcd = max(dep_dict, key=lambda ln: dep_dict[ln]["latency"])
lcd_sum = dep_dict[longest_lcd]["latency"] lcd_sum = dep_dict[longest_lcd]["latency"]
lcd_lines = { lcd_lines = {
instr["line_number"]: lat for instr, lat in dep_dict[longest_lcd]["dependencies"] instr.line_number: lat for instr, lat in dep_dict[longest_lcd]["dependencies"]
} }
port_line = ( port_line = (
@@ -354,31 +353,31 @@ class Frontend(object):
for instruction_form in kernel: for instruction_form in kernel:
if show_cmnts is False and self._is_comment(instruction_form): if show_cmnts is False and self._is_comment(instruction_form):
continue continue
line_number = instruction_form["line_number"] line_number = instruction_form.line_number
used_ports = [list(uops[1]) for uops in instruction_form["port_uops"]] used_ports = [list(uops[1]) for uops in instruction_form.port_uops]
used_ports = list(set([p for uops_ports in used_ports for p in uops_ports])) used_ports = list(set([p for uops_ports in used_ports for p in uops_ports]))
s += "{:4d} {}{} {} {}\n".format( s += "{:4d} {}{} {} {}\n".format(
line_number, line_number,
self._get_port_pressure( self._get_port_pressure(
instruction_form["port_pressure"], port_len, used_ports, sep_list instruction_form.port_pressure, port_len, used_ports, sep_list
), ),
self._get_lcd_cp_ports( self._get_lcd_cp_ports(
instruction_form["line_number"], instruction_form.line_number,
cp_kernel if line_number in cp_lines else None, cp_kernel if line_number in cp_lines else None,
lcd_lines.get(line_number), lcd_lines.get(line_number),
), ),
self._get_flag_symbols(instruction_form["flags"]) self._get_flag_symbols(instruction_form.flags)
if instruction_form["instruction"] is not None if instruction_form.instruction is not None
else " ", else " ",
instruction_form["line"].strip().replace("\t", " "), instruction_form.line.strip().replace("\t", " "),
) )
s += "\n" s += "\n"
# check for unknown instructions and throw warning if called without --ignore-unknown # check for unknown instructions and throw warning if called without --ignore-unknown
if not ignore_unknown and INSTR_FLAGS.TP_UNKWN in [ if not ignore_unknown and INSTR_FLAGS.TP_UNKWN in [
flag for instr in kernel for flag in instr["flags"] flag for instr in kernel for flag in instr.flags
]: ]:
num_missing = len( num_missing = len(
[instr["flags"] for instr in kernel if INSTR_FLAGS.TP_UNKWN in instr["flags"]] [instr.flags for instr in kernel if INSTR_FLAGS.TP_UNKWN in instr.flags]
) )
s += self._missing_instruction_error(num_missing) s += self._missing_instruction_error(num_missing)
else: else:
@@ -386,8 +385,8 @@ class Frontend(object):
tp_sum = ArchSemantics.get_throughput_sum(kernel) tp_sum = ArchSemantics.get_throughput_sum(kernel)
# if ALL instructions are unknown, take a line of 0s # if ALL instructions are unknown, take a line of 0s
if not tp_sum: if not tp_sum:
tp_sum = kernel[0]["port_pressure"] tp_sum = kernel[0].port_pressure
cp_sum = sum([x["latency_cp"] for x in cp_kernel]) cp_sum = sum([x.latency_cp for x in cp_kernel])
s += ( s += (
lineno_filler lineno_filler
+ self._get_port_pressure(tp_sum, port_len, separator=" ") + self._get_port_pressure(tp_sum, port_len, separator=" ")
@@ -500,14 +499,14 @@ class Frontend(object):
def _get_node_by_lineno(self, lineno, kernel): def _get_node_by_lineno(self, lineno, kernel):
"""Returns instruction form from kernel by its line number.""" """Returns instruction form from kernel by its line number."""
nodes = [instr for instr in kernel if instr["line_number"] == lineno] nodes = [instr for instr in kernel if instr.line_number == lineno]
return nodes[0] if len(nodes) > 0 else None return nodes[0] if len(nodes) > 0 else None
def _get_lcd_cp_ports(self, line_number, cp_dg, dep_lat, separator="|"): def _get_lcd_cp_ports(self, line_number, cp_dg, dep_lat, separator="|"):
"""Returns the CP and LCD line for one instruction.""" """Returns the CP and LCD line for one instruction."""
lat_cp = lat_lcd = "" lat_cp = lat_lcd = ""
if cp_dg: if cp_dg:
lat_cp = float(self._get_node_by_lineno(line_number, cp_dg)["latency_cp"]) lat_cp = float(self._get_node_by_lineno(line_number, cp_dg).latency_cp)
if dep_lat is not None: if dep_lat is not None:
lat_lcd = float(dep_lat) lat_lcd = float(dep_lat)
return "{} {:>4} {} {:>4} {}".format(separator, lat_cp, separator, lat_lcd, separator) return "{} {:>4} {} {:>4} {}".format(separator, lat_cp, separator, lat_lcd, separator)
@@ -516,7 +515,7 @@ class Frontend(object):
"""Returns the maximal length needed to print all throughputs of the kernel.""" """Returns the maximal length needed to print all throughputs of the kernel."""
port_len = [4 for x in self._machine_model.get_ports()] port_len = [4 for x in self._machine_model.get_ports()]
for instruction_form in kernel: for instruction_form in kernel:
for i, port in enumerate(instruction_form["port_pressure"]): for i, port in enumerate(instruction_form.port_pressure):
if len("{:.2f}".format(port)) > port_len[i]: if len("{:.2f}".format(port)) > port_len[i]:
port_len[i] = len("{:.2f}".format(port)) port_len[i] = len("{:.2f}".format(port))
return port_len return port_len

View File

@@ -230,18 +230,18 @@ class KernelDG(nx.DiGraph):
longest_path = nx.algorithms.dag.dag_longest_path(self.dg, weight="latency") longest_path = nx.algorithms.dag.dag_longest_path(self.dg, weight="latency")
# TODO verify that we can remove the next two lince due to earlier initialization # TODO verify that we can remove the next two lince due to earlier initialization
for line_number in longest_path: for line_number in longest_path:
self._get_node_by_lineno(int(line_number))["latency_cp"] = 0 self._get_node_by_lineno(int(line_number)).latency_cp = 0
# set cp latency to instruction # set cp latency to instruction
path_latency = 0.0 path_latency = 0.0
for s, d in nx.utils.pairwise(longest_path): for s, d in nx.utils.pairwise(longest_path):
node = self._get_node_by_lineno(int(s)) node = self._get_node_by_lineno(int(s))
node["latency_cp"] = self.dg.edges[(s, d)]["latency"] node.latency_cp = self.dg.edges[(s, d)]["latency"]
path_latency += node["latency_cp"] path_latency += node.latency_cp
# add latency for last instruction # add latency for last instruction
node = self._get_node_by_lineno(int(longest_path[-1])) node = self._get_node_by_lineno(int(longest_path[-1]))
node["latency_cp"] = node["latency"] node.latency_cp = node.latency
if max_latency_instr["latency"] > path_latency: if max_latency_instr.latency > path_latency:
max_latency_instr["latency_cp"] = float(max_latency_instr["latency"]) max_latency_instr.latency_cp = float(max_latency_instr.latency)
return [max_latency_instr] return [max_latency_instr]
else: else:
return [x for x in self.kernel if x.line_number in longest_path] return [x for x in self.kernel if x.line_number in longest_path]

View File

@@ -80,7 +80,7 @@ class TestFrontend(unittest.TestCase):
fe = Frontend(path_to_yaml=os.path.join(self.MODULE_DATA_DIR, "tx2.yml")) fe = Frontend(path_to_yaml=os.path.join(self.MODULE_DATA_DIR, "tx2.yml"))
fe.full_analysis(self.kernel_AArch64, dg, verbose=True) fe.full_analysis(self.kernel_AArch64, dg, verbose=True)
# TODO compare output with checked string # TODO compare output with checked string
def test_dict_output_x86(self): def test_dict_output_x86(self):
dg = KernelDG(self.kernel_x86, self.parser_x86, self.machine_model_csx, self.semantics_csx) dg = KernelDG(self.kernel_x86, self.parser_x86, self.machine_model_csx, self.semantics_csx)
fe = Frontend(path_to_yaml=os.path.join(self.MODULE_DATA_DIR, "csx.yml")) fe = Frontend(path_to_yaml=os.path.join(self.MODULE_DATA_DIR, "csx.yml"))
@@ -89,29 +89,29 @@ class TestFrontend(unittest.TestCase):
self.assertEqual("csx", analysis_dict["Header"]["Architecture"]) self.assertEqual("csx", analysis_dict["Header"]["Architecture"])
self.assertEqual(len(analysis_dict["Warnings"]), 0) self.assertEqual(len(analysis_dict["Warnings"]), 0)
for i, line in enumerate(self.kernel_x86): for i, line in enumerate(self.kernel_x86):
self.assertEqual(line["throughput"], analysis_dict["Kernel"][i]["Throughput"]) self.assertEqual(line.throughput, analysis_dict["Kernel"][i]["Throughput"])
self.assertEqual(line["latency"], analysis_dict["Kernel"][i]["Latency"]) self.assertEqual(line.latency, analysis_dict["Kernel"][i]["Latency"])
self.assertEqual( self.assertEqual(
line["latency_wo_load"], analysis_dict["Kernel"][i]["LatencyWithoutLoad"] line.latency_wo_load, analysis_dict["Kernel"][i]["LatencyWithoutLoad"]
) )
self.assertEqual(line["latency_cp"], analysis_dict["Kernel"][i]["LatencyCP"]) self.assertEqual(line.latency_cp, analysis_dict["Kernel"][i]["LatencyCP"])
self.assertEqual(line["instruction"], analysis_dict["Kernel"][i]["Instruction"]) self.assertEqual(line.instruction, analysis_dict["Kernel"][i]["Instruction"])
self.assertEqual(len(line["operands"]), len(analysis_dict["Kernel"][i]["Operands"])) self.assertEqual(len(line.operands), len(analysis_dict["Kernel"][i]["Operands"]))
self.assertEqual( self.assertEqual(
len(line["semantic_operands"]["source"]), len(line.semantic_operands["source"]),
len(analysis_dict["Kernel"][i]["SemanticOperands"]["source"]), len(analysis_dict["Kernel"][i]["SemanticOperands"]["source"]),
) )
self.assertEqual( self.assertEqual(
len(line["semantic_operands"]["destination"]), len(line.semantic_operands["destination"]),
len(analysis_dict["Kernel"][i]["SemanticOperands"]["destination"]), len(analysis_dict["Kernel"][i]["SemanticOperands"]["destination"]),
) )
self.assertEqual( self.assertEqual(
len(line["semantic_operands"]["src_dst"]), len(line.semantic_operands["src_dst"]),
len(analysis_dict["Kernel"][i]["SemanticOperands"]["src_dst"]), len(analysis_dict["Kernel"][i]["SemanticOperands"]["src_dst"]),
) )
self.assertEqual(line["flags"], analysis_dict["Kernel"][i]["Flags"]) self.assertEqual(line.flags, analysis_dict["Kernel"][i]["Flags"])
self.assertEqual(line["line_number"], analysis_dict["Kernel"][i]["LineNumber"]) self.assertEqual(line.line_number, analysis_dict["Kernel"][i]["LineNumber"])
def test_dict_output_AArch64(self): def test_dict_output_AArch64(self):
reduced_kernel = reduce_to_section(self.kernel_AArch64, self.semantics_tx2._isa) reduced_kernel = reduce_to_section(self.kernel_AArch64, self.semantics_tx2._isa)
dg = KernelDG( dg = KernelDG(
@@ -126,28 +126,28 @@ class TestFrontend(unittest.TestCase):
self.assertEqual("tx2", analysis_dict["Header"]["Architecture"]) self.assertEqual("tx2", analysis_dict["Header"]["Architecture"])
self.assertEqual(len(analysis_dict["Warnings"]), 0) self.assertEqual(len(analysis_dict["Warnings"]), 0)
for i, line in enumerate(reduced_kernel): for i, line in enumerate(reduced_kernel):
self.assertEqual(line["throughput"], analysis_dict["Kernel"][i]["Throughput"]) self.assertEqual(line.throughput, analysis_dict["Kernel"][i]["Throughput"])
self.assertEqual(line["latency"], analysis_dict["Kernel"][i]["Latency"]) self.assertEqual(line.latency, analysis_dict["Kernel"][i]["Latency"])
self.assertEqual( self.assertEqual(
line["latency_wo_load"], analysis_dict["Kernel"][i]["LatencyWithoutLoad"] line.latency_wo_load, analysis_dict["Kernel"][i]["LatencyWithoutLoad"]
) )
self.assertEqual(line["latency_cp"], analysis_dict["Kernel"][i]["LatencyCP"]) self.assertEqual(line.latency_cp, analysis_dict["Kernel"][i]["LatencyCP"])
self.assertEqual(line["instruction"], analysis_dict["Kernel"][i]["Instruction"]) self.assertEqual(line.instruction, analysis_dict["Kernel"][i]["Instruction"])
self.assertEqual(len(line["operands"]), len(analysis_dict["Kernel"][i]["Operands"])) self.assertEqual(len(line.operands), len(analysis_dict["Kernel"][i]["Operands"]))
self.assertEqual( self.assertEqual(
len(line["semantic_operands"]["source"]), len(line.semantic_operands["source"]),
len(analysis_dict["Kernel"][i]["SemanticOperands"]["source"]), len(analysis_dict["Kernel"][i]["SemanticOperands"]["source"]),
) )
self.assertEqual( self.assertEqual(
len(line["semantic_operands"]["destination"]), len(line.semantic_operands["destination"]),
len(analysis_dict["Kernel"][i]["SemanticOperands"]["destination"]), len(analysis_dict["Kernel"][i]["SemanticOperands"]["destination"]),
) )
self.assertEqual( self.assertEqual(
len(line["semantic_operands"]["src_dst"]), len(line.semantic_operands["src_dst"]),
len(analysis_dict["Kernel"][i]["SemanticOperands"]["src_dst"]), len(analysis_dict["Kernel"][i]["SemanticOperands"]["src_dst"]),
) )
self.assertEqual(line["flags"], analysis_dict["Kernel"][i]["Flags"]) self.assertEqual(line.flags, analysis_dict["Kernel"][i]["Flags"])
self.assertEqual(line["line_number"], analysis_dict["Kernel"][i]["LineNumber"]) self.assertEqual(line.line_number, analysis_dict["Kernel"][i]["LineNumber"])
################## ##################
# Helper functions # Helper functions