Files
OSACA/tests/test_db_interface.py

191 lines
7.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Unit tests for DB interface
"""
import os
import unittest
from io import StringIO
import osaca.db_interface as dbi
from osaca.db_interface import sanity_check, _get_full_instruction_name
from osaca.semantics import MachineModel
from osaca.parser import InstructionForm
from osaca.parser.memory import MemoryOperand
from osaca.parser.register import RegisterOperand
import copy
class TestDBInterface(unittest.TestCase):
@classmethod
def setUpClass(self):
sample_entry = InstructionForm(
mnemonic="DoItRightAndDoItFast",
operands=[
MemoryOperand(offset="imd", base="gpr", index="gpr", scale=8),
RegisterOperand(name="xmm"),
],
throughput=1.25,
latency=125,
uops=6,
)
self.entry_csx = copy.copy(sample_entry)
self.entry_tx2 = copy.copy(sample_entry)
self.entry_zen1 = copy.copy(sample_entry)
self.entry_csx.port_pressure = [1.25, 0, 1.25, 0.5, 0.5, 0.5, 0.5, 0, 1.25, 1.25, 0]
self.entry_csx.port_pressure = [[5, "0156"], [1, "23"], [1, ["2D", "3D"]]]
self.entry_tx2.port_pressure = [2.5, 2.5, 0, 0, 0.5, 0.5]
self.entry_tx2.port_pressure = [[5, "01"], [1, "45"]]
self.entry_tx2.operands[1].name = None
self.entry_tx2.operands[1].prefix = "x"
self.entry_zen1.port_pressure = [1, 1, 1, 1, 0, 1, 0, 0, 0, 0.5, 1, 0.5, 1]
self.entry_zen1.port_pressure = [
[4, "0123"],
[1, "4"],
[1, "89"],
[2, ["8D", "9D"]],
]
###########
# Tests
###########
def test_add_single_entry(self):
mm_csx = MachineModel("csx")
mm_tx2 = MachineModel("tx2")
mm_zen1 = MachineModel("zen1")
num_entries_csx = len(mm_csx["instruction_forms"])
num_entries_tx2 = len(mm_tx2["instruction_forms"])
num_entries_zen1 = len(mm_zen1["instruction_forms"])
mm_csx.set_instruction_entry(self.entry_csx)
mm_tx2.set_instruction_entry(self.entry_tx2)
mm_zen1.set_instruction_entry(InstructionForm(mnemonic="empty_operation"))
num_entries_csx = len(mm_csx["instruction_forms"]) - num_entries_csx
num_entries_tx2 = len(mm_tx2["instruction_forms"]) - num_entries_tx2
num_entries_zen1 = len(mm_zen1["instruction_forms"]) - num_entries_zen1
self.assertEqual(num_entries_csx, 1)
self.assertEqual(num_entries_tx2, 1)
self.assertEqual(num_entries_zen1, 1)
def test_invalid_add(self):
entry = InstructionForm()
with self.assertRaises(KeyError):
MachineModel("csx").set_instruction_entry(entry)
with self.assertRaises(TypeError):
MachineModel("csx").set_instruction()
def test_sanity_check(self):
output = StringIO()
# non-verbose
sanity_check("csx", verbose=False, internet_check=False, output_file=output)
sanity_check("tx2", verbose=False, internet_check=False, output_file=output)
sanity_check("zen1", verbose=False, internet_check=False, output_file=output)
# verbose
sanity_check("csx", verbose=True, internet_check=False, output_file=output)
sanity_check("tx2", verbose=True, internet_check=False, output_file=output)
sanity_check("zen1", verbose=True, internet_check=False, output_file=output)
def test_ibench_import(self):
# only check import without dumping the DB file (takes too much time)
with open(self._find_file("ibench_import_x86.dat")) as input_file:
input_data = input_file.readlines()
entries = dbi._get_ibench_output(input_data, "x86")
self.assertEqual(len(entries), 3)
for _, e in entries.items():
self.assertIsNotNone(e.throughput)
self.assertIsNotNone(e.latency)
with open(self._find_file("ibench_import_aarch64.dat")) as input_file:
input_data = input_file.readlines()
entries = dbi._get_ibench_output(input_data, "aarch64")
self.assertEqual(len(entries), 4)
for _, e in entries.items():
self.assertIsNotNone(e.throughput)
self.assertIsNotNone(e.latency)
def test_asmbench_import(self):
# only check import without dumping the DB file (takes too much time)
with open(self._find_file("asmbench_import_x86.dat")) as input_file:
input_data = input_file.readlines()
entries = dbi._get_asmbench_output(input_data, "x86")
self.assertEqual(len(entries), 3)
for _, e in entries.items():
self.assertIsNotNone(e.throughput)
self.assertIsNotNone(e.latency)
with open(self._find_file("asmbench_import_aarch64.dat")) as input_file:
input_data = input_file.readlines()
entries = dbi._get_asmbench_output(input_data, "aarch64")
self.assertEqual(len(entries), 4)
for _, e in entries.items():
self.assertIsNotNone(e.throughput)
self.assertIsNotNone(e.latency)
# remove empty line => no import since broken format
del input_data[3]
entries = dbi._get_asmbench_output(input_data, "aarch64")
self.assertEqual(len(entries), 0)
with self.assertRaises(ValueError):
dbi.import_benchmark_output(
"csx", "invalid_bench_type", self._find_file("asmbench_import_x86.dat")
)
with self.assertRaises(AssertionError):
dbi.import_benchmark_output("csx", "ibench", "invalid_file")
def test_online_scraping(self):
# addpd -- suspicious instruction, normal URL
instr_1 = ["addpd", (True, "(r) (r,w)")]
self.assertEqual(dbi._scrape_from_felixcloutier(instr_1[0]), instr_1[1])
# movpd -- not suspicious,
instr_2 = ["movapd", (False, "(r) (w)")]
self.assertEqual(dbi._scrape_from_felixcloutier(instr_2[0]), instr_2[1])
# vfmadd132pd -- only in combined view with 213/231.
# No 2-operand version, therefore, empty string
instr_3 = ["vfmadd132pd", (True, "")]
self.assertEqual(dbi._scrape_from_felixcloutier(instr_3[0]), instr_3[1])
def test_human_readable_instr_name(self):
instr_form_x86 = dict(
name="vaddpd",
operands=[
RegisterOperand(name="xmm"),
RegisterOperand(name="xmm"),
RegisterOperand(name="xmm"),
],
)
instr_form_arm = dict(
name="fadd",
operands=[
RegisterOperand(prefix="v", shape="s"),
RegisterOperand(prefix="v", shape="s"),
RegisterOperand(prefix="v", shape="s"),
],
)
# test full instruction name
self.assertEqual(
_get_full_instruction_name(instr_form_x86),
"vaddpd register(name:xmm),register(name:xmm),register(name:xmm)",
)
self.assertEqual(
_get_full_instruction_name(instr_form_arm),
"fadd register(prefix:v,shape:s),register(prefix:v,shape:s),"
+ "register(prefix:v,shape:s)",
)
##################
# Helper functions
##################
@staticmethod
def _find_file(name):
testdir = os.path.dirname(__file__)
name = os.path.join(testdir, "test_files", name)
assert os.path.exists(name)
return name
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TestDBInterface)
unittest.TextTestRunner(verbosity=2, buffer=True).run(suite)