from dataclasses import dataclass from typing import ClassVar, Callable from ctypes import c_uint32, c_int32 import struct from optable import OPCODES, OpcodeDescription, OpL, OpA, OpF, OpD from enum import IntFlag, Enum, auto WORD_SIZE: int = 4 class VMFlags(IntFlag): # if last instruction is big, then you should # skip more memory on step AFTER_BRANCH = auto() EXPANDED_INSTR = auto() class Condition: def __init__(self, cond: int): self.i: bool = bool(cond & (1 << 3)) self.v: bool = bool(cond & (1 << 2)) self.n: bool = bool(cond & (1 << 1)) self.z: bool = bool(cond & (1 << 0)) class VMCC(IntFlag): OVERFLOW = 1 << 2 NEGATIVE = 1 << 1 ZERO = 1 << 0 @dataclass class Breakpoint(Exception): address: int class VMExceptionType(Enum): END_OF_MEM = auto() INVALID_OPCODE = auto() @dataclass class VMException(Exception): cause: VMExceptionType pc: int message: str = "" @dataclass class VM: instr_callbacks: ClassVar[dict[OpcodeDescription, Callable]] mem: bytearray cc: VMCC pc: c_uint32 registers: list[c_int32] breakpoints: set[int] _vm_flags: VMFlags def __init__(self, mem: bytearray): self.mem: bytearray = mem self.cc: VMCC = VMCC(0) self.pc: c_uint32 = c_uint32(0) self.registers: list[c_int32] = [c_int32(0) for _ in range(256)] self.breakpoints: set[int] = set() self._vm_flags: VMFlags = VMFlags(0) self.__init_callbacks__() def __init_callbacks__(self): VM.instr_callbacks = { # ariphmetic OpD(OpF.UNEXPANDED, OpL.MATH, OpA.ADD): self._math_callback_gen(lambda lhs, rhs: lhs + rhs), OpD(OpF.QUICK, OpL.MATH, OpA.ADD): self._math_quick_callback_gen(lambda lhs, rhs: lhs + rhs), OpD(OpF.UNEXPANDED, OpL.MATH, OpA.SUB): self._math_callback_gen(lambda lhs, rhs: lhs - rhs), OpD(OpF.QUICK, OpL.MATH, OpA.SUB): self._math_quick_callback_gen(lambda lhs, rhs: lhs - rhs), OpD(OpF.UNEXPANDED, OpL.MATH, OpA.MUL): self._math_callback_gen(lambda lhs, rhs: lhs * rhs), OpD(OpF.QUICK, OpL.MATH, OpA.MUL): self._math_quick_callback_gen(lambda lhs, rhs: lhs * rhs), OpD(OpF.UNEXPANDED, OpL.MATH, OpA.DIV): self._math_callback_gen(lambda lhs, rhs: lhs // rhs), OpD(OpF.QUICK, OpL.MATH, OpA.DIV): self._math_quick_callback_gen(lambda lhs, rhs: lhs // rhs), # logical ops OpD(OpF.UNEXPANDED, OpL.MATH, OpA.AND): self._math_callback_gen(lambda lhs, rhs: lhs & rhs), OpD(OpF.UNEXPANDED, OpL.MATH, OpA.OR): self._math_callback_gen(lambda lhs, rhs: lhs | rhs), OpD(OpF.UNEXPANDED, OpL.MATH, OpA.XOR): self._math_callback_gen(lambda lhs, rhs: lhs ^ rhs), OpD(OpF.UNEXPANDED, OpL.MATH, OpA.MASK): self._math_callback_gen( lambda lhs, rhs: lhs & ((~rhs + (1 << 32)) % (1 << 32)) ), # block 2 OpD(OpF(0), OpL.MEM, OpA.LOAD): self._load_callback, OpD(OpF.QUICK, OpL.MEM, OpA.LOAD): self._load_callback, OpD(OpF(0), OpL.MEM, OpA.STORE): self._store_callback, OpD(OpF.QUICK, OpL.MEM, OpA.STORE): self._store_callback, # block 3 OpD(OpF(0), OpL.BRANCH, OpA.BRANCH): self._branch_callback, OpD(OpF.QUICK, OpL.BRANCH, OpA.BRANCH): self._branch_callback, OpD(OpF(0), OpL.BRANCH, OpA.IND_BRANCH): self._branch_indexed_callback } def step(self) -> None: """ Make one step (only step into) """ if self._to_raw_bytes_offset(self.pc) > len(self.mem) - WORD_SIZE: raise VMException( VMExceptionType.END_OF_MEM, self.pc.value, "couldn't perform step because end of memory occured" ) opcode, *_ = instr = self._fetch_instr() opdesc = self._get_opcode_desc(opcode) args: tuple[int, ...] = self._parse_instr_fields(bytes(instr)) if not (OpF.UNEXPANDED in opdesc.flags or OpF.QUICK in opdesc.flags): extra = struct.unpack(">i", self._fetch_instr())[0] args = (*args, extra) self._run_callback(opdesc, args) self._vm_flags &= ~(VMFlags.EXPANDED_INSTR) self._vm_flags &= ~(VMFlags.AFTER_BRANCH) def continue_(self) -> None: """ Continue from current breakpoint """ while self._to_raw_bytes_offset(self.pc.value) < len(self.mem): if self.pc.value in self.breakpoints: raise Breakpoint(self.pc.value) self.step() def run(self) -> None: """ Run from very beginning """ self.pc = c_uint32(0) while (self._to_raw_bytes_offset(self.pc.value) < len(self.mem)): self.continue_() def _fetch_instr(self) -> bytearray: """ Берет из памяти следущее слово, возвращает его ВАЖНО: инкрементирует счетчик команд на каждом вызове """ addr = self._to_raw_bytes_offset(self.pc) instr = self.mem[addr:addr+WORD_SIZE] self._increment_pc() return instr def _get_opcode_desc(self, opcode: int): """ Обертка над обращением к таблице опкодов из файлика optable.py. Нужна чтобы прокидывать исключения виртуальной машины, а не KeyError """ if not opcode in OPCODES: raise VMException( VMExceptionType.INVALID_OPCODE, self.pc.value, f"Couldn't resolve an opcode {hex(opcode)}" ) return OPCODES[opcode] def _to_raw_bytes_offset(self, pointer: int | c_uint32 | c_int32) -> int: """ Переводит адресс из указателей в рамках процессора, которые являются 32-битными, в формат смещения в сырых байтах в памяти """ if isinstance(pointer, int): return pointer * WORD_SIZE return pointer.value * WORD_SIZE def _parse_instr_fields( self, instr: bytes ) -> tuple[int, int, int, int]: return struct.unpack(">BBBb", instr) def _run_callback( self, opdesc: OpcodeDescription, args: tuple[int, ...] ) -> None: if not (OpF.QUICK in opdesc.flags or OpF.UNEXPANDED in opdesc.flags): self._vm_flags |= VMFlags.EXPANDED_INSTR if opdesc.layout == OpL.MATH: assert len(args) == 4 _, r3, r1, r2_or_i8 = args # поскольку этот колбэк сгенерирован, # ему необходимо в явном виде указывать # аргумент self self.instr_callbacks[opdesc]( self, r3, r1, r2_or_i8 ) if opdesc.layout == OpL.MEM: if OpF.QUICK in opdesc.flags: assert len(args) == 4 _, r3, r1, i8 = args self.instr_callbacks[opdesc]( r3, r1, i8 ) else: assert len(args) == 5 _, r3, r1, _, disp = args self.instr_callbacks[opdesc]( r3, r1, disp ) if opdesc.layout == OpL.BRANCH: if OpF.QUICK in opdesc.flags: assert len(args) == 4 _, cond, _, i8 = args self.instr_callbacks[opdesc]( cond, i8 ) elif opdesc.action == OpA.IND_BRANCH: assert len(args) == 5 _, cond, r1, _, disp = args self.instr_callbacks[opdesc]( cond, r1, disp ) else: assert len(args) == 5 _, cond, _, _, disp = args self.instr_callbacks[opdesc]( cond, disp ) def _math_callback_gen( self, operation: Callable[[int, int], int] ) -> Callable[["VM", int, int ,int], None]: """ Поскольку математические операции конструируются по одному шаблону, я завел функцию высшего порядка, которая будет их генерировать """ def callback(self, r3: int, r1: int, r2: int): lhs = self.registers[r1].value rhs = self.registers[r2].value self.registers[r3] = c_int32(operation(lhs, rhs)) return callback def _math_quick_callback_gen( self, operation: Callable[[int, int], int] ) -> Callable[["VM", int, int ,int], None]: """ Поскольку математические операции конструируются по одному шаблону, я завел функцию высшего порядка, которая будет их генерировать. В отлиие от _math_callback_gen, эта уже предназначена для операций с пометкой QUICK """ def callback(self, r3: int, r1: int, i8: int) -> None: self.cc = VMCC(0) lhs = self.registers[r1].value result = operation(lhs, i8) if result < 0: self.cc |= VMCC.NEGATIVE elif result == 0: self.cc |= VMCC.ZERO # самая дорогая проверка на переполнение) try: struct.pack('i', result) except struct.error: self.cc |= VMCC.OVERFLOW self.registers[r3] = c_int32(result) return callback def _load_callback(self, r3: int, r1: int, disp: int) -> None: addr = self._to_raw_bytes_offset(self.registers[r1].value + disp) self.registers[r3] = c_int32( struct.unpack( ">i", self.mem[addr:addr+WORD_SIZE])[0] ) def _store_callback(self, r3: int, r1: int, disp: int) -> None: addr = self._to_raw_bytes_offset(self.registers[r1].value + disp) self.mem[addr:addr+WORD_SIZE] = struct.pack( ">i", self.registers[r3].value ) def _branch_callback(self, cond: int, disp: int) -> None: c = Condition(cond) vm_c = Condition(self.cc) if (c.v & vm_c.v) | (c.n & vm_c.n) | (c.z & vm_c.z) == c.i: self._vm_flags = VMFlags.AFTER_BRANCH self.pc = c_uint32(self.pc.value + disp) def _branch_indexed_callback(self, cond: int, r1: int, disp: int) -> None: c = Condition(cond) vm_c = Condition(self.cc.value) if (c.v & vm_c.v) | (c.n & vm_c.n) | (c.z & vm_c.z) == c.i: self._vm_flags = VMFlags.AFTER_BRANCH addr = self.registers[r1].value + disp self.pc = c_uint32(addr) def _increment_pc(self): self.pc = c_uint32(self.pc.value + 1)