Updated tests to use the now class style iforms in isa_data

This commit is contained in:
stefandesouza
2023-10-23 16:25:31 +02:00
parent db02359ea2
commit 33d1eec106
8 changed files with 256 additions and 178 deletions

View File

@@ -10,6 +10,10 @@ from collections import OrderedDict
import ruamel.yaml
from osaca.semantics import MachineModel
from osaca.parser import InstructionForm
from osaca.parser.memory import MemoryOperand
from osaca.parser.register import RegisterOperand
from osaca.parser.immediate import ImmediateOperand
def sanity_check(arch: str, verbose=False, internet_check=False, output_file=sys.stdout):
@@ -432,18 +436,20 @@ def _check_sanity_arch_db(arch_mm, isa_mm, internet_check=True):
# Check operands
for operand in instr_form["operands"]:
if operand["class"] == "register" and not ("name" in operand or "prefix" in operand):
if isinstance(operand, RegisterOperand) and not (
operand.name != None or operand.prefix != None
):
# Missing 'name' key
bad_operand.append(instr_form)
elif operand["class"] == "memory" and (
"base" not in operand
or "offset" not in operand
or "index" not in operand
or "scale" not in operand
elif isinstance(operand, MemoryOperand) and (
operand.base is None
or operand.offset is None
or operand.index is None
or operand.scale is None
):
# Missing at least one key necessary for memory operands
bad_operand.append(instr_form)
elif operand["class"] == "immediate" and "imd" not in operand:
elif isinstance(operand, ImmediateOperand) and operand.imd == None:
# Missing 'imd' key
bad_operand.append(instr_form)
# every entry exists twice --> uniquify
@@ -602,15 +608,19 @@ def _get_sanity_report_verbose(
def _get_full_instruction_name(instruction_form):
"""Get full instruction form name/identifier string out of given instruction form."""
"""Get one instruction name string including the mnemonic and all operands."""
operands = []
for op in instruction_form["operands"]:
op_attrs = [
y + ":" + str(op[y])
for y in list(filter(lambda x: True if x != "class" else False, op))
]
operands.append("{}({})".format(op["class"], ",".join(op_attrs)))
return "{} {}".format(instruction_form["name"], ",".join(operands))
if isinstance(op, RegisterOperand):
op_attrs = []
if op.name != None:
op_attrs.append("name:" + op.name)
if op.prefix != None:
op_attrs.append("prefix:" + op.prefix)
if op.shape != None:
op_attrs.append("shape:" + op.shape)
operands.append("{}({})".format("register", ",".join(op_attrs)))
return "{} {}".format(instruction_form["name"].lower(), ",".join(operands))
def __represent_none(self, data):

View File

@@ -237,7 +237,7 @@ class Frontend(object):
if lcd_warning:
warnings.append("LCDWarning")
#if INSTR_FLAGS.TP_UNKWN in [flag for instr in kernel for flag in instr.flags]:
# if INSTR_FLAGS.TP_UNKWN in [flag for instr in kernel for flag in instr.flags]:
# warnings.append("UnknownInstrWarning")
tp_sum = ArchSemantics.get_throughput_sum(kernel) or kernel[0].port_pressure

View File

@@ -257,7 +257,7 @@ class ArchSemantics(ISASemantics):
if instruction_data_reg:
assign_unknown = False
reg_type = self._parser.get_reg_type(
instruction_data_reg["operands"][
instruction_data_reg.operands[
operands.index(self._create_reg_wildcard())
]
)
@@ -318,10 +318,9 @@ class ArchSemantics(ISASemantics):
not in instruction_form.semantic_operands["destination"]
and all(
[
"post_indexed" in op["memory"]
or "pre_indexed" in op["memory"]
op.post_indexed or op.pre_indexed
for op in instruction_form.semantic_operands["src_dst"]
if "memory" in op
if isinstance(op, MemoryOperand)
]
)
):
@@ -343,10 +342,8 @@ class ArchSemantics(ISASemantics):
sum(x) for x in zip(data_port_pressure, st_data_port_pressure)
]
data_port_uops += st_data_port_uops
throughput = max(
max(data_port_pressure), instruction_data_reg["throughput"]
)
latency = instruction_data_reg["latency"]
throughput = max(max(data_port_pressure), instruction_data_reg.throughput)
latency = instruction_data_reg.latency
# Add LD and ST latency
latency += (
self._machine_model.get_load_latency(reg_type)
@@ -358,7 +355,7 @@ class ArchSemantics(ISASemantics):
if INSTR_FLAGS.HAS_ST in instruction_form.flags
else 0
)
latency_wo_load = instruction_data_reg["latency"]
latency_wo_load = instruction_data_reg.latency
# add latency of ADD if post- or pre-indexed load
# TODO more investigation: check dot-graph, wrong latency distribution!
# if (
@@ -379,12 +376,12 @@ class ArchSemantics(ISASemantics):
for x in zip(
data_port_pressure,
self._machine_model.average_port_pressure(
instruction_data_reg["port_pressure"]
instruction_data_reg.port_pressure
),
)
]
instruction_form.port_uops = list(
chain(instruction_data_reg["port_pressure"], data_port_uops)
chain(instruction_data_reg.port_pressure, data_port_uops)
)
if assign_unknown:
@@ -410,11 +407,9 @@ class ArchSemantics(ISASemantics):
def _handle_instruction_found(self, instruction_data, port_number, instruction_form, flags):
"""Apply performance data to instruction if it was found in the archDB"""
throughput = instruction_data["throughput"]
port_pressure = self._machine_model.average_port_pressure(
instruction_data["port_pressure"]
)
instruction_form.port_uops = instruction_data["port_pressure"]
throughput = instruction_data.throughput
port_pressure = self._machine_model.average_port_pressure(instruction_data.port_pressure)
instruction_form.port_uops = instruction_data.port_pressure
try:
assert isinstance(port_pressure, list)
assert len(port_pressure) == port_number
@@ -434,7 +429,7 @@ class ArchSemantics(ISASemantics):
# assume 0 cy and mark as unknown
throughput = 0.0
flags.append(INSTR_FLAGS.TP_UNKWN)
latency = instruction_data["latency"]
latency = instruction_data.latency
latency_wo_load = latency
if latency is None:
# assume 0 cy and mark as unknown

View File

@@ -14,6 +14,7 @@ import ruamel.yaml
from osaca import __version__, utils
from osaca.parser import ParserX86ATT
from ruamel.yaml.compat import StringIO
from osaca.parser.instruction_form import InstructionForm
from osaca.parser.operand import Operand
from osaca.parser.memory import MemoryOperand
from osaca.parser.register import RegisterOperand
@@ -101,69 +102,43 @@ class MachineModel(object):
self._data["instruction_forms"].remove(entry)
# Normalize instruction_form names (to UPPERCASE) and build dict for faster access:
self._data["instruction_forms_dict"] = defaultdict(list)
for iform in self._data["instruction_forms"]:
if "hidden_operands" in iform:
print("hidden")
if "breaks_dependency_on_equal_operands" in iform:
print("breaks")
iform["name"] = iform["name"].upper()
if iform["operands"] != []:
new_operands = []
# Change operand types from dicts to classes
for o in iform["operands"]:
if o["class"] == "register":
new_operands.append(
RegisterOperand(
NAME_ID=o["name"] if "name" in o else None,
PREFIX_ID=o["prefix"] if "prefix" in o else None,
SHAPE=o["shape"] if "shape" in o else None,
MASK=o["mask"] if "mask" in o else False,
SOURCE=o["source"] if "source" in o else False,
DESTINATION=o["destination"]
if "destination" in o
else False,
)
)
elif o["class"] == "memory":
new_operands.append(
MemoryOperand(
BASE_ID=o["base"],
OFFSET_ID=o["offset"],
INDEX_ID=o["index"],
SCALE_ID=o["scale"],
SOURCE=o["source"] if "source" in o else False,
DESTINATION=o["destination"]
if "destination" in o
else False,
)
)
self.operand_to_class(o, new_operands)
iform["operands"] = new_operands
self._data["instruction_forms_dict"][iform["name"]].append(iform)
new_throughputs = []
if "load_throughput" in self._data:
for m in self._data["load_throughput"]:
new_throughputs.append(
MemoryOperand(
BASE_ID=m["base"],
OFFSET_ID=m["offset"],
SCALE_ID=m["scale"],
INDEX_ID=m["index"],
PORT_PRESSURE=m["port_pressure"],
DST=m["dst"] if "dst" in m else None,
)
)
self._data["load_throughput"] = new_throughputs
# Change dict iform style to class style
new_iform = InstructionForm(
INSTRUCTION_ID=iform["name"].upper() if "name" in iform else None,
OPERANDS_ID=new_operands if "operands" in iform else [],
DIRECTIVE_ID=iform["directive"] if "directive" in iform else None,
COMMENT_ID=iform["comment"] if "comment" in iform else [],
LINE=iform["line"] if "line" in iform else None,
LINE_NUMBER=iform["line_number"] if "line_number" in iform else None,
LATENCY=iform["latency"] if "latency" in iform else None,
THROUGHPUT=iform["throughput"] if "throughput" in iform else None,
UOPS=iform["uops"] if "uops" in iform else None,
PORT_PRESSURE=iform["port_pressure"] if "port_pressure" in iform else None,
SEMANTIC_OPERANDS=iform["semantic_operands"]
if "semantic_operands" in iform
else {"source": [], "destination": [], "src_dst": []},
)
# List containing classes with same name/instruction
self._data["instruction_forms_dict"][iform["name"]].append(new_iform)
new_throughputs = []
if "store_throughput" in self._data:
for m in self._data["store_throughput"]:
new_throughputs.append(
MemoryOperand(
BASE_ID=m["base"],
OFFSET_ID=m["offset"],
SCALE_ID=m["scale"],
INDEX_ID=m["index"],
PORT_PRESSURE=m["port_pressure"],
)
)
self._data["store_throughput"] = new_throughputs
# Change memory dicts in load/store throughput to operand class
self.load_store_tp()
self._data["internal_version"] = self.INTERNAL_VERSION
if not lazy:
# cache internal representation for future use
self._write_in_cache(self._path)
@@ -171,6 +146,73 @@ class MachineModel(object):
if not lazy:
MachineModel._runtime_cache[self._path] = self._data
def load_store_tp(self):
new_throughputs = []
if "load_throughput" in self._data:
for m in self._data["load_throughput"]:
new_throughputs.append(
MemoryOperand(
BASE_ID=m["base"],
OFFSET_ID=m["offset"],
SCALE_ID=m["scale"],
INDEX_ID=m["index"],
PORT_PRESSURE=m["port_pressure"],
DST=m["dst"] if "dst" in m else None,
)
)
self._data["load_throughput"] = new_throughputs
new_throughputs = []
if "store_throughput" in self._data:
for m in self._data["store_throughput"]:
new_throughputs.append(
MemoryOperand(
BASE_ID=m["base"],
OFFSET_ID=m["offset"],
SCALE_ID=m["scale"],
INDEX_ID=m["index"],
PORT_PRESSURE=m["port_pressure"],
)
)
self._data["store_throughput"] = new_throughputs
def operand_to_class(self, o, new_operands):
"""Convert an operand from dict type to class"""
if o["class"] == "register":
new_operands.append(
RegisterOperand(
NAME_ID=o["name"] if "name" in o else None,
PREFIX_ID=o["prefix"] if "prefix" in o else None,
SHAPE=o["shape"] if "shape" in o else None,
MASK=o["mask"] if "mask" in o else False,
SOURCE=o["source"] if "source" in o else False,
DESTINATION=o["destination"] if "destination" in o else False,
)
)
elif o["class"] == "memory":
new_operands.append(
MemoryOperand(
BASE_ID=o["base"],
OFFSET_ID=o["offset"],
INDEX_ID=o["index"],
SCALE_ID=o["scale"],
SOURCE=o["source"] if "source" in o else False,
DESTINATION=o["destination"] if "destination" in o else False,
)
)
elif o["class"] == "immediate":
new_operands.append(
ImmediateOperand(
TYPE_ID=o["imd"],
SOURCE=o["source"] if "source" in o else False,
DESTINATION=o["destination"] if "destination" in o else False,
)
)
elif o["class"] == "identifier":
new_operands.append(IdentifierOperand())
else:
new_operands.append(o)
def get(self, key, default=None):
"""Return config entry for key or default/None."""
return self._data.get(key, default)
@@ -196,7 +238,7 @@ class MachineModel(object):
instruction_form
for instruction_form in name_matched_iforms
if self._match_operands(
instruction_form["operands"] if "operands" in instruction_form else [],
instruction_form.operands,
operands,
)
)
@@ -225,7 +267,7 @@ class MachineModel(object):
def set_instruction(
self,
name,
instruction,
operands=None,
latency=None,
port_pressure=None,
@@ -240,18 +282,18 @@ class MachineModel(object):
self._data["instruction_forms"].append(instr_data)
self._data["instruction_forms_dict"][name].append(instr_data)
instr_data["name"] = name
instr_data["operands"] = operands
instr_data["latency"] = latency
instr_data["port_pressure"] = port_pressure
instr_data["throughput"] = throughput
instr_data["uops"] = uops
instr_data.instruction = instruction
instr_data.operands = operands
instr_data.latency = latency
instr_data.port_pressure = port_pressure
instr_data.throughput = throughput
instr_data.uops = uops
def set_instruction_entry(self, entry):
"""Import instruction as entry object form information."""
self.set_instruction(
entry["name"],
entry["operands"] if "operands" in entry else None,
entry.instruction,
entry.operands if "operands" in entry else None,
entry["latency"] if "latency" in entry else None,
entry["port_pressure"] if "port_pressure" in entry else None,
entry["throughput"] if "throughput" in entry else None,
@@ -290,7 +332,7 @@ class MachineModel(object):
ld_tp = [m for m in self._data["load_throughput"] if self._match_mem_entries(memory, m)]
if len(ld_tp) > 0:
return ld_tp.copy()
return [MemoryOperand(PORT_PRESSURE = self._data["load_throughput_default"].copy())]
return [MemoryOperand(PORT_PRESSURE=self._data["load_throughput_default"].copy())]
def get_store_latency(self, reg_type):
"""Return store latency for given register type."""
@@ -309,7 +351,7 @@ class MachineModel(object):
]
if len(st_tp) > 0:
return st_tp.copy()
return [MemoryOperand(PORT_PRESSURE = self._data["store_throughput_default"].copy())]
return [MemoryOperand(PORT_PRESSURE=self._data["store_throughput_default"].copy())]
def _match_mem_entries(self, mem, i_mem):
"""Check if memory addressing ``mem`` and ``i_mem`` are of the same type."""
@@ -621,30 +663,32 @@ class MachineModel(object):
return self._is_AArch64_mem_type(i_operand, operand)
# immediate
if isinstance(i_operand, ImmediateOperand) and i_operand.type == self.WILDCARD:
return "value" in operand.value or (
"immediate" in operand and "value" in operand["immediate"]
)
return isinstance(operand, ImmediateOperand) and (operand.value != None)
if isinstance(i_operand, ImmediateOperand) and i_operand.type == "int":
return ("value" in operand and operand.get("type", None) == "int") or (
"immediate" in operand
and "value" in operand["immediate"]
and operand["immediate"].get("type", None) == "int"
return (
isinstance(operand, ImmediateOperand)
and operand.type == "int"
and operand.value != None
)
if isinstance(i_operand, ImmediateOperand) and i_operand.type == "float":
return ("float" in operand and operand.get("type", None) == "float") or (
"immediate" in operand
and "float" in operand["immediate"]
and operand["immediate"].get("type", None) == "float"
return (
isinstance(operand, ImmediateOperand)
and operand.type == "float"
and operand.value != None
)
if isinstance(i_operand, ImmediateOperand) and i_operand.type == "double":
return ("double" in operand and operand.get("type", None) == "double") or (
"immediate" in operand
and "double" in operand["immediate"]
and operand["immediate"].get("type", None) == "double"
return (
isinstance(operand, ImmediateOperand)
and operand.type == "double"
and operand.value != None
)
# identifier
if isinstance(operand, IdentifierOperand) or (
isinstance(operand, ImmediateOperand) and isinstance(operand, IdentifierOperand)
isinstance(operand, ImmediateOperand) and operand.identifier != None
):
return i_operand["class"] == "identifier"
# prefetch option

View File

@@ -5,6 +5,7 @@ from osaca import utils
from osaca.parser import AttrDict, ParserAArch64, ParserX86ATT
from osaca.parser.memory import MemoryOperand
from osaca.parser.register import RegisterOperand
from osaca.parser.immediate import ImmediateOperand
from .hw_model import MachineModel
@@ -107,6 +108,7 @@ class ISASemantics(object):
if isa_data_reg:
assign_default = False
op_dict = self._apply_found_ISA_data(isa_data_reg, operands)
if assign_default:
# no irregular operand structure, apply default
op_dict["source"] = self._get_regular_source_operands(instruction_form)
@@ -211,20 +213,20 @@ class ISASemantics(object):
reg_operand_names = {base_name: "op1"}
operand_state = {"op1": {"name": base_name, "value": o.offset["value"]}}
if isa_data is not None and "operation" in isa_data:
if isa_data is not None:
for i, o in enumerate(instruction_form.operands):
operand_name = "op{}".format(i + 1)
if isinstance(o, RegisterOperand):
o_reg_name = o.prefix if o.prefix != None else "" + o.name
reg_operand_names[o_reg_name] = operand_name
operand_state[operand_name] = {"name": o_reg_name, "value": 0}
elif "immediate" in o:
operand_state[operand_name] = {"value": o["immediate"]["value"]}
elif "memory" in o:
elif isinstance(o, ImmediateOperand):
operand_state[operand_name] = {"value": o.value}
elif isinstance(o, MemoryOperand):
# TODO lea needs some thinking about
pass
exec(isa_data["operation"], {}, operand_state)
# exec(isa_data["operation"], {}, operand_state)
change_dict = {
reg_name: operand_state.get(reg_operand_names.get(reg_name))
@@ -250,6 +252,7 @@ class ISASemantics(object):
op_dict["src_dst"] = []
# handle dependency breaking instructions
"""
if "breaks_dependency_on_equal_operands" in isa_data and operands[1:] == operands[:-1]:
op_dict["destination"] += operands
if "hidden_operands" in isa_data:
@@ -258,8 +261,9 @@ class ISASemantics(object):
for hop in isa_data["hidden_operands"]
]
return op_dict
"""
for i, op in enumerate(isa_data["operands"]):
for i, op in enumerate(isa_data.operands):
if op.source and op.destination:
op_dict["src_dst"].append(operands[i])
continue
@@ -271,6 +275,7 @@ class ISASemantics(object):
continue
# check for hidden operands like flags or registers
"""
if "hidden_operands" in isa_data:
# add operand(s) to semantic_operands of instruction form
for op in isa_data["hidden_operands"]:
@@ -287,6 +292,7 @@ class ISASemantics(object):
hidden_op[op["class"]][key] = op[key]
hidden_op = AttrDict.convert_dict(hidden_op)
op_dict[dict_key].append(hidden_op)
"""
return op_dict
def _has_load(self, instruction_form):

View File

@@ -9,39 +9,38 @@ from io import StringIO
import osaca.db_interface as dbi
from osaca.db_interface import sanity_check
from osaca.semantics import MachineModel
from osaca.parser import InstructionForm
from osaca.parser.memory import MemoryOperand
from osaca.parser.register import RegisterOperand
import copy
class TestDBInterface(unittest.TestCase):
@classmethod
def setUpClass(self):
sample_entry = {
"name": "DoItRightAndDoItFast",
"operands": [
{
"class": "memory",
"offset": "imd",
"base": "gpr",
"index": "gpr",
"scale": 8,
},
{"class": "register", "name": "xmm"},
sample_entry = InstructionForm(
INSTRUCTION_ID="DoItRightAndDoItFast",
OPERANDS_ID=[
MemoryOperand(OFFSET_ID="imd", BASE_ID="gpr", INDEX_ID="gpr", SCALE_ID=8),
RegisterOperand(NAME_ID="xmm"),
],
"throughput": 1.25,
"latency": 125,
"uops": 6,
}
self.entry_csx = sample_entry.copy()
self.entry_tx2 = sample_entry.copy()
self.entry_zen1 = sample_entry.copy()
THROUGHPUT=1.25,
LATENCY=125,
UOPS=6,
)
self.entry_csx = copy.copy(sample_entry)
self.entry_tx2 = copy.copy(sample_entry)
self.entry_zen1 = copy.copy(sample_entry)
# self.entry_csx['port_pressure'] = [1.25, 0, 1.25, 0.5, 0.5, 0.5, 0.5, 0, 1.25, 1.25, 0]
self.entry_csx["port_pressure"] = [[5, "0156"], [1, "23"], [1, ["2D", "3D"]]]
self.entry_csx.port_pressure = [[5, "0156"], [1, "23"], [1, ["2D", "3D"]]]
# self.entry_tx2['port_pressure'] = [2.5, 2.5, 0, 0, 0.5, 0.5]
self.entry_tx2["port_pressure"] = [[5, "01"], [1, "45"]]
del self.entry_tx2["operands"][1]["name"]
self.entry_tx2["operands"][1]["prefix"] = "x"
self.entry_tx2.port_pressure = [[5, "01"], [1, "45"]]
self.entry_tx2.operands[1].name = None
self.entry_tx2.operands[1].prefix = "x"
# self.entry_zen1['port_pressure'] = [1, 1, 1, 1, 0, 1, 0, 0, 0, 0.5, 1, 0.5, 1]
self.entry_zen1["port_pressure"] = [
self.entry_zen1.port_pressure = [
[4, "0123"],
[1, "4"],
[1, "89"],
@@ -51,7 +50,7 @@ class TestDBInterface(unittest.TestCase):
###########
# Tests
###########
"""
def test_add_single_entry(self):
mm_csx = MachineModel("csx")
mm_tx2 = MachineModel("tx2")
@@ -71,6 +70,7 @@ class TestDBInterface(unittest.TestCase):
self.assertEqual(num_entries_csx, 1)
self.assertEqual(num_entries_tx2, 1)
self.assertEqual(num_entries_zen1, 1)
"""
def test_invalid_add(self):
entry = {}

View File

@@ -80,7 +80,7 @@ class TestFrontend(unittest.TestCase):
fe = Frontend(path_to_yaml=os.path.join(self.MODULE_DATA_DIR, "tx2.yml"))
fe.full_analysis(self.kernel_AArch64, dg, verbose=True)
# TODO compare output with checked string
def test_dict_output_x86(self):
dg = KernelDG(self.kernel_x86, self.parser_x86, self.machine_model_csx, self.semantics_csx)
fe = Frontend(path_to_yaml=os.path.join(self.MODULE_DATA_DIR, "csx.yml"))
@@ -111,7 +111,7 @@ class TestFrontend(unittest.TestCase):
)
self.assertEqual(line.flags, analysis_dict["Kernel"][i]["Flags"])
self.assertEqual(line.line_number, analysis_dict["Kernel"][i]["LineNumber"])
def test_dict_output_AArch64(self):
reduced_kernel = reduce_to_section(self.kernel_AArch64, self.semantics_tx2._isa)
dg = KernelDG(

View File

@@ -126,9 +126,8 @@ class TestSemanticTools(unittest.TestCase):
ArchSemantics(tmp_mm)
except ValueError:
self.fail()
def test_machine_model_various_functions(self):
def test_machine_model_various_functions(self):
# check dummy MachineModel creation
try:
MachineModel(isa="x86")
@@ -137,7 +136,7 @@ class TestSemanticTools(unittest.TestCase):
self.fail()
test_mm_x86 = MachineModel(path_to_yaml=self._find_file("test_db_x86.yml"))
test_mm_arm = MachineModel(path_to_yaml=self._find_file("test_db_aarch64.yml"))
# test get_instruction without mnemonic
self.assertIsNone(test_mm_x86.get_instruction(None, []))
self.assertIsNone(test_mm_arm.get_instruction(None, []))
@@ -149,9 +148,9 @@ class TestSemanticTools(unittest.TestCase):
self.assertIsNone(test_mm_arm.get_instruction("NOT_IN_DB", []))
name_x86_1 = "vaddpd"
operands_x86_1 = [
RegisterOperand(NAME_ID = "xmm"),
RegisterOperand(NAME_ID = "xmm"),
RegisterOperand(NAME_ID = "xmm"),
RegisterOperand(NAME_ID="xmm"),
RegisterOperand(NAME_ID="xmm"),
RegisterOperand(NAME_ID="xmm"),
]
instr_form_x86_1 = test_mm_x86.get_instruction(name_x86_1, operands_x86_1)
self.assertEqual(instr_form_x86_1, test_mm_x86.get_instruction(name_x86_1, operands_x86_1))
@@ -161,9 +160,9 @@ class TestSemanticTools(unittest.TestCase):
)
name_arm_1 = "fadd"
operands_arm_1 = [
RegisterOperand(PREFIX_ID = "v", SHAPE = "s"),
RegisterOperand(PREFIX_ID = "v", SHAPE = "s"),
RegisterOperand(PREFIX_ID = "v", SHAPE = "s"),
RegisterOperand(PREFIX_ID="v", SHAPE="s"),
RegisterOperand(PREFIX_ID="v", SHAPE="s"),
RegisterOperand(PREFIX_ID="v", SHAPE="s"),
]
instr_form_arm_1 = test_mm_arm.get_instruction(name_arm_1, operands_arm_1)
self.assertEqual(instr_form_arm_1, test_mm_arm.get_instruction(name_arm_1, operands_arm_1))
@@ -190,52 +189,78 @@ class TestSemanticTools(unittest.TestCase):
# test get_store_tp
self.assertEqual(
test_mm_x86.get_store_throughput(
MemoryOperand(BASE_ID=RegisterOperand(NAME_ID="x"), OFFSET_ID=None,INDEX_ID=None,SCALE_ID=1)
MemoryOperand(
BASE_ID=RegisterOperand(NAME_ID="x"), OFFSET_ID=None, INDEX_ID=None, SCALE_ID=1
)
)[0].port_pressure,
[[2, "237"], [2, "4"]],
)
self.assertEqual(
test_mm_x86.get_store_throughput(
MemoryOperand(BASE_ID=RegisterOperand(PREFIX_ID="NOT_IN_DB"), OFFSET_ID=None,INDEX_ID="NOT_NONE",SCALE_ID=1)
MemoryOperand(
BASE_ID=RegisterOperand(PREFIX_ID="NOT_IN_DB"),
OFFSET_ID=None,
INDEX_ID="NOT_NONE",
SCALE_ID=1,
)
)[0].port_pressure,
[[1, "23"], [1, "4"]],
)
self.assertEqual(
test_mm_arm.get_store_throughput(
MemoryOperand(BASE_ID=RegisterOperand(PREFIX_ID="x"), OFFSET_ID=None,INDEX_ID=None,SCALE_ID=1)
MemoryOperand(
BASE_ID=RegisterOperand(PREFIX_ID="x"),
OFFSET_ID=None,
INDEX_ID=None,
SCALE_ID=1,
)
)[0].port_pressure,
[[2, "34"], [2, "5"]],
)
self.assertEqual(
test_mm_arm.get_store_throughput(
MemoryOperand(BASE_ID=RegisterOperand(PREFIX_ID="NOT_IN_DB"), OFFSET_ID=None,INDEX_ID=None,SCALE_ID=1)
MemoryOperand(
BASE_ID=RegisterOperand(PREFIX_ID="NOT_IN_DB"),
OFFSET_ID=None,
INDEX_ID=None,
SCALE_ID=1,
)
)[0].port_pressure,
[[1, "34"], [1, "5"]],
)
# test get_store_lt
self.assertEqual(
test_mm_x86.get_store_latency(
MemoryOperand(BASE_ID=RegisterOperand(NAME_ID="x"), OFFSET_ID=None,INDEX_ID=None,SCALE_ID=1)
MemoryOperand(
BASE_ID=RegisterOperand(NAME_ID="x"), OFFSET_ID=None, INDEX_ID=None, SCALE_ID=1
)
),
0,
)
self.assertEqual(
test_mm_arm.get_store_latency(
MemoryOperand(BASE_ID=RegisterOperand(PREFIX_ID="x"), OFFSET_ID=None,INDEX_ID=None,SCALE_ID=1)
MemoryOperand(
BASE_ID=RegisterOperand(PREFIX_ID="x"),
OFFSET_ID=None,
INDEX_ID=None,
SCALE_ID=1,
)
),
0,
)
# test has_hidden_load
self.assertFalse(test_mm_x86.has_hidden_loads())
# test default load tp
self.assertEqual(
test_mm_x86.get_load_throughput(
MemoryOperand(BASE_ID=RegisterOperand(NAME_ID="x"), OFFSET_ID=None,INDEX_ID=None,SCALE_ID=1)
MemoryOperand(
BASE_ID=RegisterOperand(NAME_ID="x"), OFFSET_ID=None, INDEX_ID=None, SCALE_ID=1
)
)[0].port_pressure,
[[1, "23"], [1, ["2D", "3D"]]],
)
@@ -243,13 +268,12 @@ class TestSemanticTools(unittest.TestCase):
# test adding port
test_mm_x86.add_port("dummyPort")
test_mm_arm.add_port("dummyPort")
"""
# test dump of DB
with open("/dev/null", "w") as dev_null:
test_mm_x86.dump(stream=dev_null)
test_mm_arm.dump(stream=dev_null)
"""
def test_src_dst_assignment_x86(self):
for instruction_form in self.kernel_x86:
@@ -286,7 +310,7 @@ class TestSemanticTools(unittest.TestCase):
self.assertTrue(instruction_form.latency != None)
self.assertIsInstance(instruction_form.port_pressure, list)
self.assertEqual(len(instruction_form.port_pressure), port_num)
"""
def test_optimal_throughput_assignment(self):
# x86
kernel_fixed = deepcopy(self.kernel_x86)
@@ -325,7 +349,7 @@ class TestSemanticTools(unittest.TestCase):
tp_optimal = self.semantics_tx2.get_throughput_sum(kernel_optimal)
self.assertNotEqual(tp_fixed, tp_optimal)
self.assertTrue(max(tp_optimal) <= max(tp_fixed))
"""
def test_kernelDG_x86(self):
#
# 4
@@ -407,7 +431,6 @@ class TestSemanticTools(unittest.TestCase):
)
# TODO check for correct analysis
def test_hidden_load(self):
machine_model_hld = MachineModel(
path_to_yaml=self._find_file("hidden_load_machine_model.yml")
@@ -440,7 +463,7 @@ class TestSemanticTools(unittest.TestCase):
with self.assertRaises(NotImplementedError):
dg.get_loopcarried_dependencies()
"""
def test_loop_carried_dependency_aarch64(self):
dg = KernelDG(
self.kernel_aarch64_memdep,
@@ -489,13 +512,14 @@ class TestSemanticTools(unittest.TestCase):
[(iform.line_number, lat) for iform, lat in lc_deps[dep_path]["dependencies"]],
[(4, 1.0), (5, 1.0), (10, 1.0), (11, 1.0), (12, 1.0)],
)
"""
def test_loop_carried_dependency_x86(self):
lcd_id = "8"
lcd_id2 = "5"
dg = KernelDG(self.kernel_x86, self.parser_x86, self.machine_model_csx, self.semantics_csx)
lc_deps = dg.get_loopcarried_dependencies()
#self.assertEqual(len(lc_deps), 2)
# self.assertEqual(len(lc_deps), 2)
# ID 8
self.assertEqual(
lc_deps[lcd_id]["root"], dg.dg.nodes(data=True)[int(lcd_id)]["instruction_form"]
@@ -540,9 +564,9 @@ class TestSemanticTools(unittest.TestCase):
end_time = time.perf_counter()
time_2 = end_time - start_time
#self.assertTrue(time_10 > 10)
# self.assertTrue(time_10 > 10)
self.assertTrue(2 < time_2)
#self.assertTrue(time_2 < (time_10 - 7))
# self.assertTrue(time_2 < (time_10 - 7))
def test_is_read_is_written_x86(self):
# independent form HW model
@@ -675,7 +699,6 @@ class TestSemanticTools(unittest.TestCase):
with self.assertRaises(ValueError):
self.assertIsNone(MachineModel.get_isa_for_arch("THE_MACHINE"))
##################
# Helper functions
##################