feat: добавил возможность писать сообщения об ошибках к исключениям виратуальной машинки
This commit is contained in:
322
src/vm.py
322
src/vm.py
@ -1,322 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import ClassVar, Callable
|
||||
from ctypes import c_uint32, c_int32
|
||||
import struct
|
||||
from optable import OPCODES, OpcodeDescription, OpL, OpA, OpF, OpD
|
||||
from enum import IntFlag, Enum, auto
|
||||
|
||||
WORD_SIZE: int = 4
|
||||
|
||||
class VMFlags(IntFlag):
|
||||
# if last instruction is big, then you should
|
||||
# skip more memory on step
|
||||
AFTER_BRANCH = auto()
|
||||
EXPANDED_INSTR = auto()
|
||||
|
||||
class Condition:
|
||||
def __init__(self, cond: int):
|
||||
self.i: bool = bool(cond & (1 << 3))
|
||||
self.v: bool = bool(cond & (1 << 2))
|
||||
self.n: bool = bool(cond & (1 << 1))
|
||||
self.z: bool = bool(cond & (1 << 0))
|
||||
|
||||
class VMCC(IntFlag):
|
||||
OVERFLOW = 1 << 2
|
||||
NEGATIVE = 1 << 1
|
||||
ZERO = 1 << 0
|
||||
|
||||
@dataclass
|
||||
class Breakpoint(Exception):
|
||||
address: int
|
||||
|
||||
class VMExceptionType(Enum):
|
||||
END_OF_MEM = auto()
|
||||
INVALID_OPCODE = auto()
|
||||
|
||||
@dataclass
|
||||
class VMException(Exception):
|
||||
cause: VMExceptionType
|
||||
pc: int
|
||||
|
||||
@dataclass
|
||||
class VM:
|
||||
instr_callbacks: ClassVar[dict[OpcodeDescription, Callable]]
|
||||
mem: bytearray
|
||||
cc: VMCC
|
||||
pc: c_uint32
|
||||
registers: list[c_int32]
|
||||
breakpoints: set[int]
|
||||
_vm_flags: VMFlags
|
||||
|
||||
def __init__(self, mem: bytearray):
|
||||
self.mem: bytearray = mem
|
||||
self.cc: VMCC = VMCC(0)
|
||||
self.pc: c_uint32 = c_uint32(0)
|
||||
self.registers: list[c_int32] = [c_int32(0) for _ in range(256)]
|
||||
self.breakpoints: set[int] = set()
|
||||
self._vm_flags: VMFlags = VMFlags(0)
|
||||
self.__init_callbacks__()
|
||||
|
||||
def __init_callbacks__(self):
|
||||
VM.instr_callbacks = {
|
||||
# ariphmetic
|
||||
OpD(OpF.UNEXPANDED, OpL.MATH, OpA.ADD):
|
||||
self._math_callback_gen(lambda lhs, rhs: lhs + rhs),
|
||||
|
||||
OpD(OpF.QUICK, OpL.MATH, OpA.ADD):
|
||||
self._math_quick_callback_gen(lambda lhs, rhs: lhs + rhs),
|
||||
|
||||
OpD(OpF.UNEXPANDED, OpL.MATH, OpA.SUB):
|
||||
self._math_callback_gen(lambda lhs, rhs: lhs - rhs),
|
||||
|
||||
OpD(OpF.QUICK, OpL.MATH, OpA.SUB):
|
||||
self._math_quick_callback_gen(lambda lhs, rhs: lhs - rhs),
|
||||
|
||||
OpD(OpF.UNEXPANDED, OpL.MATH, OpA.MUL):
|
||||
self._math_callback_gen(lambda lhs, rhs: lhs * rhs),
|
||||
|
||||
OpD(OpF.QUICK, OpL.MATH, OpA.MUL):
|
||||
self._math_quick_callback_gen(lambda lhs, rhs: lhs * rhs),
|
||||
|
||||
OpD(OpF.UNEXPANDED, OpL.MATH, OpA.DIV):
|
||||
self._math_callback_gen(lambda lhs, rhs: lhs // rhs),
|
||||
|
||||
OpD(OpF.QUICK, OpL.MATH, OpA.DIV):
|
||||
self._math_quick_callback_gen(lambda lhs, rhs: lhs // rhs),
|
||||
|
||||
# logical ops
|
||||
OpD(OpF.UNEXPANDED, OpL.MATH, OpA.AND):
|
||||
self._math_callback_gen(lambda lhs, rhs: lhs & rhs),
|
||||
|
||||
OpD(OpF.UNEXPANDED, OpL.MATH, OpA.OR):
|
||||
self._math_callback_gen(lambda lhs, rhs: lhs | rhs),
|
||||
|
||||
OpD(OpF.UNEXPANDED, OpL.MATH, OpA.XOR):
|
||||
self._math_callback_gen(lambda lhs, rhs: lhs ^ rhs),
|
||||
|
||||
OpD(OpF.UNEXPANDED, OpL.MATH, OpA.MASK):
|
||||
self._math_callback_gen(
|
||||
lambda lhs, rhs: lhs & ((~rhs + (1 << 32)) % (1 << 32))
|
||||
),
|
||||
|
||||
# block 2
|
||||
OpD(OpF(0), OpL.MEM, OpA.LOAD):
|
||||
self._load_callback,
|
||||
OpD(OpF.QUICK, OpL.MEM, OpA.LOAD):
|
||||
self._load_callback,
|
||||
OpD(OpF(0), OpL.MEM, OpA.STORE):
|
||||
self._store_callback,
|
||||
OpD(OpF.QUICK, OpL.MEM, OpA.STORE):
|
||||
self._store_callback,
|
||||
|
||||
# block 3
|
||||
OpD(OpF(0), OpL.BRANCH, OpA.BRANCH):
|
||||
self._branch_callback,
|
||||
OpD(OpF.QUICK, OpL.BRANCH, OpA.BRANCH):
|
||||
self._branch_callback,
|
||||
OpD(OpF(0), OpL.BRANCH, OpA.IND_BRANCH):
|
||||
self._branch_indexed_callback
|
||||
}
|
||||
|
||||
def step(self) -> None:
|
||||
"""
|
||||
Make one step (only step into)
|
||||
"""
|
||||
if self._to_raw_bytes_offset(self.pc) > len(self.mem) - WORD_SIZE:
|
||||
raise VMException(
|
||||
VMExceptionType.END_OF_MEM,
|
||||
self.pc.value
|
||||
)
|
||||
opcode, *_ = instr = self._fetch_instr()
|
||||
opdesc = self._get_opcode_desc(opcode)
|
||||
args: tuple[int, ...] = self._parse_instr_fields(bytes(instr))
|
||||
if not (OpF.UNEXPANDED in opdesc.flags or
|
||||
OpF.QUICK in opdesc.flags):
|
||||
extra = struct.unpack(">i", self._fetch_instr())[0]
|
||||
args = (*args, extra)
|
||||
self._run_callback(opdesc, args)
|
||||
self._vm_flags &= ~(VMFlags.EXPANDED_INSTR)
|
||||
self._vm_flags &= ~(VMFlags.AFTER_BRANCH)
|
||||
|
||||
def continue_(self) -> None:
|
||||
"""
|
||||
Continue from current breakpoint
|
||||
"""
|
||||
while self._to_raw_bytes_offset(self.pc.value) < len(self.mem):
|
||||
if self.pc.value in self.breakpoints:
|
||||
raise Breakpoint(self.pc.value)
|
||||
self.step()
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Run from very beginning
|
||||
"""
|
||||
self.pc = c_uint32(0)
|
||||
while (self._to_raw_bytes_offset(self.pc.value) < len(self.mem)):
|
||||
self.continue_()
|
||||
|
||||
def _fetch_instr(self) -> bytearray:
|
||||
"""
|
||||
Берет из памяти следущее слово, возвращает его
|
||||
|
||||
ВАЖНО: инкрементирует счетчик команд на каждом вызове
|
||||
"""
|
||||
addr = self._to_raw_bytes_offset(self.pc)
|
||||
instr = self.mem[addr:addr+WORD_SIZE]
|
||||
self._increment_pc()
|
||||
return instr
|
||||
|
||||
def _get_opcode_desc(self, opcode: int):
|
||||
"""
|
||||
Обертка над обращением к таблице опкодов из файлика optable.py.
|
||||
Нужна чтобы прокидывать исключения виртуальной машины, а не
|
||||
KeyError
|
||||
"""
|
||||
if not opcode in OPCODES:
|
||||
raise VMException(
|
||||
VMExceptionType.INVALID_OPCODE,
|
||||
self.pc.value
|
||||
)
|
||||
return OPCODES[opcode]
|
||||
|
||||
def _to_raw_bytes_offset(self, pointer: int | c_uint32 | c_int32) -> int:
|
||||
"""
|
||||
Переводит адресс из указателей в рамках процессора, которые
|
||||
являются 32-битными, в формат смещения в сырых байтах в памяти
|
||||
"""
|
||||
if isinstance(pointer, int):
|
||||
return pointer * WORD_SIZE
|
||||
return pointer.value * WORD_SIZE
|
||||
|
||||
def _parse_instr_fields(
|
||||
self,
|
||||
instr: bytes
|
||||
) -> tuple[int, int, int, int]:
|
||||
return struct.unpack(">BBBb", instr)
|
||||
|
||||
def _run_callback(
|
||||
self,
|
||||
opdesc: OpcodeDescription,
|
||||
args: tuple[int, ...]
|
||||
) -> None:
|
||||
if not (OpF.QUICK in opdesc.flags or OpF.UNEXPANDED in opdesc.flags):
|
||||
self._vm_flags |= VMFlags.EXPANDED_INSTR
|
||||
if opdesc.layout == OpL.MATH:
|
||||
assert len(args) == 4
|
||||
_, r3, r1, r2_or_i8 = args
|
||||
# поскольку этот колбэк сгенерирован,
|
||||
# ему необходимо в явном виде указывать
|
||||
# аргумент self
|
||||
self.instr_callbacks[opdesc](
|
||||
self, r3, r1, r2_or_i8
|
||||
)
|
||||
if opdesc.layout == OpL.MEM:
|
||||
if OpF.QUICK in opdesc.flags:
|
||||
assert len(args) == 4
|
||||
_, r3, r1, i8 = args
|
||||
self.instr_callbacks[opdesc](
|
||||
r3, r1, i8
|
||||
)
|
||||
else:
|
||||
assert len(args) == 5
|
||||
_, r3, r1, _, disp = args
|
||||
self.instr_callbacks[opdesc](
|
||||
r3, r1, disp
|
||||
)
|
||||
|
||||
if opdesc.layout == OpL.BRANCH:
|
||||
if OpF.QUICK in opdesc.flags:
|
||||
assert len(args) == 4
|
||||
_, cond, _, i8 = args
|
||||
self.instr_callbacks[opdesc](
|
||||
cond, i8
|
||||
)
|
||||
elif opdesc.action == OpA.IND_BRANCH:
|
||||
assert len(args) == 5
|
||||
_, cond, r1, _, disp = args
|
||||
self.instr_callbacks[opdesc](
|
||||
cond, r1, disp
|
||||
)
|
||||
else:
|
||||
assert len(args) == 5
|
||||
_, cond, _, _, disp = args
|
||||
self.instr_callbacks[opdesc](
|
||||
cond, disp
|
||||
)
|
||||
|
||||
def _math_callback_gen(
|
||||
self,
|
||||
operation: Callable[[int, int], int]
|
||||
) -> Callable[["VM", int, int ,int], None]:
|
||||
"""
|
||||
Поскольку математические операции конструируются
|
||||
по одному шаблону, я завел функцию высшего порядка,
|
||||
которая будет их генерировать
|
||||
"""
|
||||
def callback(self, r3: int, r1: int, r2: int):
|
||||
lhs = self.registers[r1].value
|
||||
rhs = self.registers[r2].value
|
||||
self.registers[r3] = c_int32(operation(lhs, rhs))
|
||||
|
||||
return callback
|
||||
|
||||
def _math_quick_callback_gen(
|
||||
self,
|
||||
operation: Callable[[int, int], int]
|
||||
) -> Callable[["VM", int, int ,int], None]:
|
||||
"""
|
||||
Поскольку математические операции конструируются
|
||||
по одному шаблону, я завел функцию высшего порядка,
|
||||
которая будет их генерировать.
|
||||
|
||||
В отлиие от _math_callback_gen, эта уже предназначена для
|
||||
операций с пометкой QUICK
|
||||
"""
|
||||
def callback(self, r3: int, r1: int, i8: int) -> None:
|
||||
self.cc = VMCC(0)
|
||||
lhs = self.registers[r1].value
|
||||
result = operation(lhs, i8)
|
||||
if result < 0:
|
||||
self.cc |= VMCC.NEGATIVE
|
||||
elif result == 0:
|
||||
self.cc |= VMCC.ZERO
|
||||
# самая дорогая проверка на переполнение)
|
||||
try:
|
||||
struct.pack('i', result)
|
||||
except struct.error:
|
||||
self.cc |= VMCC.OVERFLOW
|
||||
self.registers[r3] = c_int32(result)
|
||||
return callback
|
||||
|
||||
def _load_callback(self, r3: int, r1: int, disp: int) -> None:
|
||||
addr = self._to_raw_bytes_offset(self.registers[r1].value + disp)
|
||||
self.registers[r3] = c_int32(
|
||||
struct.unpack(
|
||||
">i", self.mem[addr:addr+WORD_SIZE])[0]
|
||||
)
|
||||
|
||||
def _store_callback(self, r3: int, r1: int, disp: int) -> None:
|
||||
addr = self._to_raw_bytes_offset(self.registers[r1].value + disp)
|
||||
self.mem[addr:addr+WORD_SIZE] = struct.pack(
|
||||
">i",
|
||||
self.registers[r3].value
|
||||
)
|
||||
|
||||
def _branch_callback(self, cond: int, disp: int) -> None:
|
||||
c = Condition(cond)
|
||||
vm_c = Condition(self.cc)
|
||||
if (c.v & vm_c.v) | (c.n & vm_c.n) | (c.z & vm_c.z) == c.i:
|
||||
self._vm_flags = VMFlags.AFTER_BRANCH
|
||||
self.pc = c_uint32(self.pc.value + disp)
|
||||
|
||||
def _branch_indexed_callback(self, cond: int, r1: int, disp: int) -> None:
|
||||
c = Condition(cond)
|
||||
vm_c = Condition(self.cc.value)
|
||||
if (c.v & vm_c.v) | (c.n & vm_c.n) | (c.z & vm_c.z) == c.i:
|
||||
self._vm_flags = VMFlags.AFTER_BRANCH
|
||||
addr = self.registers[r1].value + disp
|
||||
self.pc = c_uint32(addr)
|
||||
|
||||
def _increment_pc(self):
|
||||
self.pc = c_uint32(self.pc.value + 1)
|
||||
|
||||
Reference in New Issue
Block a user