refactoring to modulize marker_utils

This commit is contained in:
JanLJL
2019-06-05 13:51:41 +02:00
parent b81a4c68d3
commit 119dc5baa9
4 changed files with 130 additions and 111 deletions

View File

@@ -1,79 +1,14 @@
#!/usr/bin/env python3
from osaca.parser import ParserAArch64v81, ParserX86ATT
# from .marker_utils import reduce_to_section
class Analyzer(object):
def __init__(self, parser_result, isa):
self.ISA = isa
if isa == 'x86':
self.parser = ParserX86ATT()
start, end = self.find_marked_kernel_x86(parser_result)
elif isa == 'AArch64':
self.parser = ParserAArch64v81()
start, end = self.find_marked_kernel_AArch64(parser_result)
if start == -1:
raise LookupError('Could not find START MARKER. Make sure it is inserted!')
if end == -1:
raise LookupError('Could not find END MARKER. Make sure it is inserted!')
self.kernel = parser_result[start:end]
def find_marked_kernel_AArch64(self, lines):
nop_bytes = ['213', '3', '32', '31']
return self.find_marked_kernel(lines, ['mov'], 'x1', [111, 222], nop_bytes)
def find_marked_kernel_x86(self, lines):
nop_bytes = ['100', '103', '144']
return self.find_marked_kernel(lines, ['mov', 'movl'], 'ebx', [111, 222], nop_bytes)
def find_marked_kernel(self, lines, mov_instr, mov_reg, mov_vals, nop_bytes):
index_start = -1
index_end = -1
for i, line in enumerate(lines):
try:
if line['instruction'] in mov_instr and lines[i + 1]['directive'] is not None:
source = line['operands']['source']
destination = line['operands']['destination']
# instruction pair matches, check for operands
if (
'immediate' in source[0]
and self.parser.normalize_imd(source[0]['immediate']) == mov_vals[0]
and 'register' in destination[0]
and self.parser.get_full_reg_name(destination[0]['register']) == mov_reg
):
# operands of first instruction match start, check for second one
match, line_count = self.match_bytes(lines, i + 1, nop_bytes)
if(match):
# return first line after the marker
index_start = i + 1 + line_count
elif (
'immediate' in source[0]
and self.parser.normalize_imd(source[0]['immediate']) == mov_vals[1]
and 'register' in destination[0]
and self.parser.get_full_reg_name(destination[0]['register']) == mov_reg
):
# operand of first instruction match end, check for second one
match, line_count = self.match_bytes(lines, i + 1, nop_bytes)
if(match):
# return line of the marker
index_end = i
except TypeError:
print(i, line)
if index_start != -1 and index_end != -1:
break
return index_start, index_end
def match_bytes(self, lines, index, byte_list):
# either all bytes are in one line or in separate ones
extracted_bytes = []
line_count = 0
while (
index < len(lines)
and lines[index]['directive'] is not None
and lines[index]['directive']['name'] == 'byte'
):
line_count += 1
extracted_bytes += lines[index]['directive']['parameters']
index += 1
if extracted_bytes[0:len(byte_list)] == byte_list:
return True, line_count
return False, -1
self.kernel = parser_result

84
osaca/marker_utils.py Executable file
View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python3
from osaca.parser import ParserAArch64v81, ParserX86ATT
def reduce_to_section(kernel, isa):
if isa == 'x86':
start, end = find_marked_kernel_x86(kernel)
elif isa == 'AArch64':
start, end = find_marked_kernel_AArch64(kernel)
else:
raise ValueError('ISA not supported.')
if start == -1:
raise LookupError('Could not find START MARKER. Make sure it is inserted!')
if end == -1:
raise LookupError('Could not find END MARKER. Make sure it is inserted!')
return kernel[start:end]
def find_marked_kernel_AArch64(lines):
nop_bytes = ['213', '3', '32', '31']
return find_marked_kernel(lines, ParserAArch64v81(), ['mov'], 'x1', [111, 222], nop_bytes)
def find_marked_kernel_x86(lines):
nop_bytes = ['100', '103', '144']
return find_marked_kernel(
lines, ParserX86ATT(), ['mov', 'movl'], 'ebx', [111, 222], nop_bytes
)
def find_marked_kernel(lines, parser, mov_instr, mov_reg, mov_vals, nop_bytes):
index_start = -1
index_end = -1
for i, line in enumerate(lines):
try:
if line['instruction'] in mov_instr and lines[i + 1]['directive'] is not None:
source = line['operands']['source']
destination = line['operands']['destination']
# instruction pair matches, check for operands
if (
'immediate' in source[0]
and parser.normalize_imd(source[0]['immediate']) == mov_vals[0]
and 'register' in destination[0]
and parser.get_full_reg_name(destination[0]['register']) == mov_reg
):
# operands of first instruction match start, check for second one
match, line_count = match_bytes(lines, i + 1, nop_bytes)
if match:
# return first line after the marker
index_start = i + 1 + line_count
elif (
'immediate' in source[0]
and parser.normalize_imd(source[0]['immediate']) == mov_vals[1]
and 'register' in destination[0]
and parser.get_full_reg_name(destination[0]['register']) == mov_reg
):
# operand of first instruction match end, check for second one
match, line_count = match_bytes(lines, i + 1, nop_bytes)
if match:
# return line of the marker
index_end = i
except TypeError:
print(i, line)
if index_start != -1 and index_end != -1:
break
return index_start, index_end
def match_bytes(lines, index, byte_list):
# either all bytes are in one line or in separate ones
extracted_bytes = []
line_count = 0
while (
index < len(lines)
and lines[index]['directive'] is not None
and lines[index]['directive']['name'] == 'byte'
):
line_count += 1
extracted_bytes += lines[index]['directive']['parameters']
index += 1
if extracted_bytes[0:len(byte_list)] == byte_list:
return True, line_count
return False, -1

View File

@@ -8,7 +8,7 @@ suite = unittest.TestLoader().loadTestsFromNames(
[
'test_parser_x86att',
'test_parser_AArch64v81',
'test_analyzer'
'test_marker_utils'
]
)

View File

@@ -1,15 +1,15 @@
#!/usr/bin/env python3
"""
Unit tests for Analyzer object
Unit tests for IACA/OSACA marker utilities
"""
import os
import unittest
from osaca.analyzer import Analyzer
from osaca.marker_utils import reduce_to_section
from osaca.parser import ParserAArch64v81, ParserX86ATT
class TestAnalyzer(unittest.TestCase):
class TestMarkerUtils(unittest.TestCase):
def setUp(self):
self.parser_AArch = ParserAArch64v81()
self.parser_x86 = ParserX86ATT()
@@ -25,16 +25,16 @@ class TestAnalyzer(unittest.TestCase):
#################
def test_marker_detection_AArch64(self):
analyzer = Analyzer(self.parsed_AArch, 'AArch64')
self.assertEquals(len(analyzer.kernel), 138)
self.assertEquals(analyzer.kernel[0].line_number, 307)
self.assertEquals(analyzer.kernel[-1].line_number, 444)
kernel = reduce_to_section(self.parsed_AArch, 'AArch64')
self.assertEquals(len(kernel), 138)
self.assertEquals(kernel[0].line_number, 307)
self.assertEquals(kernel[-1].line_number, 444)
def test_marker_detection_x86(self):
analyzer = Analyzer(self.parsed_x86, 'x86')
self.assertEquals(len(analyzer.kernel), 9)
self.assertEquals(analyzer.kernel[0].line_number, 146)
self.assertEquals(analyzer.kernel[-1].line_number, 154)
kernel = reduce_to_section(self.parsed_x86, 'x86')
self.assertEquals(len(kernel), 9)
self.assertEquals(kernel[0].line_number, 146)
self.assertEquals(kernel[-1].line_number, 154)
def test_marker_matching_AArch64(self):
# preparation
@@ -97,8 +97,8 @@ class TestAnalyzer(unittest.TestCase):
bytes_end=bytes_var_2,
):
sample_parsed = self.parser_AArch.parse_file(sample_code)
analyzer = Analyzer(sample_parsed, 'AArch64')
self.assertEquals(len(analyzer.kernel), kernel_length)
sample_kernel = reduce_to_section(sample_parsed, 'AArch64')
self.assertEquals(len(sample_kernel), kernel_length)
kernel_start = len(
list(
filter(
@@ -109,7 +109,7 @@ class TestAnalyzer(unittest.TestCase):
parsed_kernel = self.parser_AArch.parse_file(
kernel, start_line=kernel_start
)
self.assertEquals(analyzer.kernel, parsed_kernel)
self.assertEquals(sample_kernel, parsed_kernel)
def test_marker_matching_x86(self):
# preparation
@@ -159,8 +159,8 @@ class TestAnalyzer(unittest.TestCase):
bytes_end=bytes_var_2,
):
sample_parsed = self.parser_x86.parse_file(sample_code)
analyzer = Analyzer(sample_parsed, 'x86')
self.assertEquals(len(analyzer.kernel), kernel_length)
sample_kernel = reduce_to_section(sample_parsed, 'x86')
self.assertEquals(len(sample_kernel), kernel_length)
kernel_start = len(
list(
filter(
@@ -171,7 +171,7 @@ class TestAnalyzer(unittest.TestCase):
parsed_kernel = self.parser_x86.parse_file(
kernel, start_line=kernel_start
)
self.assertEquals(analyzer.kernel, parsed_kernel)
self.assertEquals(sample_kernel, parsed_kernel)
def test_marker_special_cases_AArch(self):
bytes_line = '.byte 213,3,32,31\n'
@@ -190,46 +190,46 @@ class TestAnalyzer(unittest.TestCase):
# marker directly at the beginning
code_beginning = mov_start + bytes_line + kernel + mov_end + bytes_line + epilogue
beginning_parsed = self.parser_AArch.parse_file(code_beginning)
analyzer = Analyzer(beginning_parsed, 'AArch64')
self.assertEquals(len(analyzer.kernel), kernel_length)
test_kernel = reduce_to_section(beginning_parsed, 'AArch64')
self.assertEquals(len(test_kernel), kernel_length)
kernel_start = len(list(filter(None, (mov_start + bytes_line).split('\n'))))
parsed_kernel = self.parser_AArch.parse_file(kernel, start_line=kernel_start)
self.assertEquals(analyzer.kernel, parsed_kernel)
self.assertEquals(test_kernel, parsed_kernel)
# marker at the end
code_end = prologue + mov_start + bytes_line + kernel + mov_end + bytes_line + epilogue
end_parsed = self.parser_AArch.parse_file(code_end)
analyzer = Analyzer(end_parsed, 'AArch64')
self.assertEquals(len(analyzer.kernel), kernel_length)
test_kernel = reduce_to_section(end_parsed, 'AArch64')
self.assertEquals(len(test_kernel), kernel_length)
kernel_start = len(list(filter(None, (prologue + mov_start + bytes_line).split('\n'))))
parsed_kernel = self.parser_AArch.parse_file(kernel, start_line=kernel_start)
self.assertEquals(analyzer.kernel, parsed_kernel)
self.assertEquals(test_kernel, parsed_kernel)
# no kernel
code_empty = prologue + mov_start + bytes_line + mov_end + bytes_line + epilogue
empty_parsed = self.parser_AArch.parse_file(code_empty)
analyzer = Analyzer(empty_parsed, 'AArch64')
self.assertEquals(len(analyzer.kernel), 0)
test_kernel = reduce_to_section(empty_parsed, 'AArch64')
self.assertEquals(len(test_kernel), 0)
kernel_start = len(list(filter(None, (prologue + mov_start + bytes_line).split('\n'))))
self.assertEquals(analyzer.kernel, [])
self.assertEquals(test_kernel, [])
# no start marker
code_no_start = prologue + bytes_line + kernel + mov_end + bytes_line + epilogue
no_start_parsed = self.parser_AArch.parse_file(code_no_start)
with self.assertRaises(LookupError):
analyzer = Analyzer(no_start_parsed, 'AArch64')
reduce_to_section(no_start_parsed, 'AArch64')
# no end marker
code_no_end = prologue + mov_start + bytes_line + kernel + mov_end + epilogue
no_end_parsed = self.parser_AArch.parse_file(code_no_end)
with self.assertRaises(LookupError):
analyzer = Analyzer(no_end_parsed, 'AArch64')
reduce_to_section(no_end_parsed, 'AArch64')
# no marker at all
code_no_marker = prologue + kernel + epilogue
no_marker_parsed = self.parser_AArch.parse_file(code_no_marker)
with self.assertRaises(LookupError):
analyzer = Analyzer(no_marker_parsed, 'AArch64')
reduce_to_section(no_marker_parsed, 'AArch64')
def test_marker_special_cases_x86(self):
bytes_line = '.byte 100\n.byte 103\n.byte 144\n'
@@ -249,46 +249,46 @@ class TestAnalyzer(unittest.TestCase):
# marker directly at the beginning
code_beginning = mov_start + bytes_line + kernel + mov_end + bytes_line + epilogue
beginning_parsed = self.parser_x86.parse_file(code_beginning)
analyzer = Analyzer(beginning_parsed, 'x86')
self.assertEquals(len(analyzer.kernel), kernel_length)
test_kernel = reduce_to_section(beginning_parsed, 'x86')
self.assertEquals(len(test_kernel), kernel_length)
kernel_start = len(list(filter(None, (mov_start + bytes_line).split('\n'))))
parsed_kernel = self.parser_x86.parse_file(kernel, start_line=kernel_start)
self.assertEquals(analyzer.kernel, parsed_kernel)
self.assertEquals(test_kernel, parsed_kernel)
# marker at the end
code_end = prologue + mov_start + bytes_line + kernel + mov_end + bytes_line + epilogue
end_parsed = self.parser_x86.parse_file(code_end)
analyzer = Analyzer(end_parsed, 'x86')
self.assertEquals(len(analyzer.kernel), kernel_length)
test_kernel = reduce_to_section(end_parsed, 'x86')
self.assertEquals(len(test_kernel), kernel_length)
kernel_start = len(list(filter(None, (prologue + mov_start + bytes_line).split('\n'))))
parsed_kernel = self.parser_x86.parse_file(kernel, start_line=kernel_start)
self.assertEquals(analyzer.kernel, parsed_kernel)
self.assertEquals(test_kernel, parsed_kernel)
# no kernel
code_empty = prologue + mov_start + bytes_line + mov_end + bytes_line + epilogue
empty_parsed = self.parser_x86.parse_file(code_empty)
analyzer = Analyzer(empty_parsed, 'x86')
self.assertEquals(len(analyzer.kernel), 0)
test_kernel = reduce_to_section(empty_parsed, 'x86')
self.assertEquals(len(test_kernel), 0)
kernel_start = len(list(filter(None, (prologue + mov_start + bytes_line).split('\n'))))
self.assertEquals(analyzer.kernel, [])
self.assertEquals(test_kernel, [])
# no start marker
code_no_start = prologue + bytes_line + kernel + mov_end + bytes_line + epilogue
no_start_parsed = self.parser_x86.parse_file(code_no_start)
with self.assertRaises(LookupError):
analyzer = Analyzer(no_start_parsed, 'x86')
reduce_to_section(no_start_parsed, 'x86')
# no end marker
code_no_end = prologue + mov_start + bytes_line + kernel + mov_end + epilogue
no_end_parsed = self.parser_x86.parse_file(code_no_end)
with self.assertRaises(LookupError):
analyzer = Analyzer(no_end_parsed, 'x86')
reduce_to_section(no_end_parsed, 'x86')
# no marker at all
code_no_marker = prologue + kernel + epilogue
no_marker_parsed = self.parser_x86.parse_file(code_no_marker)
with self.assertRaises(LookupError):
analyzer = Analyzer(no_marker_parsed, 'x86')
reduce_to_section(no_marker_parsed, 'x86')
##################
# Helper functions
@@ -303,5 +303,5 @@ class TestAnalyzer(unittest.TestCase):
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(TestAnalyzer)
suite = unittest.TestLoader().loadTestsFromTestCase(TestMarkerUtils)
unittest.TextTestRunner(verbosity=2).run(suite)