231 lines
8.1 KiB
Python
231 lines
8.1 KiB
Python
#!/usr/bin/python3
|
|
|
|
# More info, please refer to https://github.com/qilingframework/qiling/pull/765
|
|
|
|
from collections import deque
|
|
from typing import Deque, Iterable, Iterator, Mapping, Tuple
|
|
|
|
from capstone import Cs, CsInsn, CS_ARCH_X86, CS_OP_IMM, CS_OP_MEM, CS_OP_REG
|
|
from capstone.x86 import X86Op
|
|
from capstone.x86_const import X86_INS_LEA, X86_REG_INVALID, X86_REG_RIP
|
|
|
|
from qiling import Qiling
|
|
|
|
TraceRecord = Tuple[CsInsn, Iterable[Tuple[int, int]]]
|
|
|
|
# <WORKAROUND>
|
|
def __uc2_workaround() -> Mapping[int, int]:
|
|
"""Starting from Unicorn2, Unicorn and Capstone Intel registers definitions are
|
|
no longer aligned and cannot be used interchangebly. This temporary workaround
|
|
maps capstone x86 registers definitions to unicorn x86 registers definitions.
|
|
|
|
see: https://github.com/unicorn-engine/unicorn/issues/1492
|
|
"""
|
|
|
|
from capstone import x86_const as cs_x86_const
|
|
from unicorn import x86_const as uc_x86_const
|
|
|
|
def __canonicalized_mapping(module, prefix: str) -> Mapping[str, int]:
|
|
return dict((k[len(prefix):], getattr(module, k)) for k in dir(module) if k.startswith(prefix))
|
|
|
|
cs_x86_regs = __canonicalized_mapping(cs_x86_const, 'X86_REG')
|
|
uc_x86_regs = __canonicalized_mapping(uc_x86_const, 'UC_X86_REG')
|
|
|
|
return dict((cs_x86_regs[k], uc_x86_regs[k]) for k in cs_x86_regs if k in uc_x86_regs)
|
|
|
|
CS_UC_REGS = __uc2_workaround()
|
|
# </WORKAROUND>
|
|
|
|
def __get_trace_records(ql: Qiling, address: int, size: int, md: Cs) -> Iterator[TraceRecord]:
|
|
"""[private] Acquire trace info for the current instruction and yield as a trace record.
|
|
A trace record is a parsed instruction paired to a list of registers and their values.
|
|
|
|
This method might yield more than one record for a single instruction.
|
|
"""
|
|
|
|
# unicorn denotes unsupported instructions by a magic size value. though these instructions
|
|
# are not emulated, capstone can still parse them.
|
|
if size == 0xf1f1f1f1:
|
|
# note that invalid instructions will generate a StopIteration exception here
|
|
yield next(__get_trace_records(ql, address, 16, md))
|
|
return
|
|
|
|
# a trace line is generated even for hook addresses that do not contain meaningful opcodes.
|
|
# in that case, make it look like a nop
|
|
if address in ql._addr_hook:
|
|
buf = b'\x90'
|
|
else:
|
|
buf = ql.mem.read(address, size)
|
|
|
|
for insn in md.disasm(buf, address):
|
|
# BUG: insn.regs_read doesn't work well, so we use insn.regs_access()[0]
|
|
state = tuple((reg, ql.arch.regs.read(CS_UC_REGS[reg])) for reg in insn.regs_access()[0])
|
|
|
|
yield (insn, state)
|
|
|
|
def __to_trace_line(record: TraceRecord, symsmap: Mapping[int, str] = {}) -> str:
|
|
"""[private] Transform trace info into a formatted trace line.
|
|
"""
|
|
|
|
insn, state = record
|
|
|
|
# when the rip register is referenced from within an instruction it is expected to point
|
|
# to the next instruction boundary. since unicorn has not executed the instruction yet
|
|
# is uses the cpu state resulted from the previous instruction - and rip points to the
|
|
# current instruction instead of the next one.
|
|
#
|
|
# here we patch rip value recorded in state to point to the next instruction boundary
|
|
state = tuple((reg, val + insn.size if reg == X86_REG_RIP else val) for reg, val in state)
|
|
|
|
def __read_reg(reg: int) -> int:
|
|
"""[internal] Read a register value from the recorded state. Only registers that were
|
|
referenced by the current instruction can be read.
|
|
"""
|
|
|
|
return 0 if reg == X86_REG_INVALID else next(v for r, v in state if r == reg)
|
|
|
|
def __resolve(address: int) -> str:
|
|
"""[internal] Find the symbol that matches to the specified address (if any).
|
|
"""
|
|
|
|
return symsmap.get(address, '')
|
|
|
|
def __parse_op(op: X86Op) -> str:
|
|
"""[internal] Parse an operand and return its string representation. Indirect memory
|
|
references will be substitued by the effective address they refer to. If the referenced
|
|
address is associated with a symbol, it will be substitued by that symbol.
|
|
"""
|
|
|
|
if op.type == CS_OP_REG:
|
|
return insn.reg_name(op.value.reg) or '?'
|
|
|
|
elif op.type == CS_OP_IMM:
|
|
imm = op.value.imm
|
|
|
|
return __resolve(imm) or f'{imm:#x}'
|
|
|
|
elif op.type == CS_OP_MEM:
|
|
mem = op.value.mem
|
|
|
|
base = __read_reg(mem.base)
|
|
index = __read_reg(mem.index)
|
|
scale = mem.scale
|
|
disp = mem.disp
|
|
|
|
ea = base + index * scale + disp
|
|
seg = f'{insn.reg_name(mem.segment)}:' if mem.segment else ''
|
|
|
|
# we construct the string representation for each operand; denote memory
|
|
# dereferenes with the appropriate 'ptr' prefix. the 'lea' instruction is
|
|
# an exception since it does not use that notation.
|
|
if insn.id == X86_INS_LEA:
|
|
qualifier = f''
|
|
else:
|
|
ptr = {
|
|
1: 'byte',
|
|
2: 'word',
|
|
4: 'dword',
|
|
8: 'qword',
|
|
10: 'fword',
|
|
16: 'xmmword'
|
|
}[op.size]
|
|
|
|
qualifier = f'{ptr} ptr '
|
|
|
|
return f'{qualifier}{seg}[{__resolve(ea) or f"{ea:#x}"}]'
|
|
|
|
# unexpected op type
|
|
raise RuntimeError
|
|
|
|
operands = ', '.join(__parse_op(o) for o in insn.operands)
|
|
reads = ', '.join(f'{insn.reg_name(reg)} = {val:#x}' for reg, val in state)
|
|
|
|
return f'{insn.address:08x} | {insn.bytes.hex():24s} {insn.mnemonic:10} {operands:56s} | {reads}'
|
|
|
|
def enable_full_trace(ql: Qiling):
|
|
"""Enable instruction-level tracing.
|
|
|
|
Trace line will be emitted for each instruction before it gets executed. The info
|
|
includes static data along with the relevant registers state and symbols resolving.
|
|
|
|
Args:
|
|
ql: qiling instance
|
|
"""
|
|
|
|
# enable detailed disassembly info
|
|
md = ql.arch.disassembler
|
|
md.detail = True
|
|
|
|
assert md.arch == CS_ARCH_X86, 'currently available only for intel architecture'
|
|
|
|
# if available, use symbols map to resolve memory accesses
|
|
symsmap = getattr(ql.loader, 'symsmap', {})
|
|
|
|
# show trace lines in a darker color so they would be easily distinguished from
|
|
# ordinary log records
|
|
faded_color = "\033[2m"
|
|
reset_color = "\033[0m"
|
|
|
|
def __trace_hook(ql: Qiling, address: int, size: int):
|
|
"""[internal] Trace hook callback.
|
|
"""
|
|
|
|
for record in __get_trace_records(ql, address, size, md):
|
|
line = __to_trace_line(record, symsmap)
|
|
|
|
ql.log.debug(f'{faded_color}{line}{reset_color}')
|
|
|
|
ql.hook_code(__trace_hook)
|
|
|
|
def enable_history_trace(ql: Qiling, nrecords: int):
|
|
"""Enable instruction-level tracing in history mode.
|
|
|
|
To allow faster execution, the trace info collected throughout program execution is not
|
|
emitted and undergo as minimal post-processing as possible. When program crahses, the
|
|
last `nrecords` trace lines are shown.
|
|
|
|
Args:
|
|
ql: qiling instance
|
|
nrecords: number of last records to show
|
|
"""
|
|
|
|
# enable detailed disassembly info
|
|
md = ql.arch.disassembler
|
|
md.detail = True
|
|
|
|
assert md.arch == CS_ARCH_X86, 'currently available only for intel architecture'
|
|
|
|
# if available, use symbols map to resolve memory accesses
|
|
symsmap = getattr(ql.loader, 'symsmap', {})
|
|
|
|
history: Deque[TraceRecord] = deque(maxlen=nrecords)
|
|
|
|
def __trace_hook(ql: Qiling, address: int, size: int):
|
|
"""[internal] Trace hook callback.
|
|
"""
|
|
|
|
history.extend(__get_trace_records(ql, address, size, md))
|
|
|
|
ql.hook_code(__trace_hook)
|
|
|
|
# replace the emulation error handler with our own so we can emit the trace
|
|
# records when program crashes. before we do that, we save the original one
|
|
# so we can call it.
|
|
|
|
orig_emu_error = ql.os.emu_error
|
|
|
|
def __emu_error(*args):
|
|
# first run the original emulation error handler
|
|
orig_emu_error(*args)
|
|
|
|
# then parse and emit the trace info we collected
|
|
ql.log.error(f'History:')
|
|
for record in history:
|
|
line = __to_trace_line(record, symsmap)
|
|
|
|
ql.log.error(line)
|
|
|
|
ql.log.error(f'')
|
|
|
|
ql.os.emu_error = __emu_error
|