import typing, pathlib, struct, argparse, pickle, hashlib, time from ghidra_assistant.utils.archs.arm.arm_emulator import * from ghidra_assistant.ghidra_assistant import GhidraAssistant from ghidra_assistant.concrete_device import ConcreteDevice from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.backends import default_backend from cryptography.exceptions import InvalidSignature if typing.TYPE_CHECKING: from GA_debugger import * acces_str = { UC_MEM_READ : "UC_MEM_READ", UC_MEM_WRITE : "UC_MEM_WRITE", UC_MEM_FETCH : "UC_MEM_FETCH", UC_MEM_READ_UNMAPPED : "UC_MEM_READ_UNMAPPED", UC_MEM_WRITE_UNMAPPED : "UC_MEM_WRITE_UNMAPPED", UC_MEM_FETCH_UNMAPPED : "UC_MEM_FETCH_UNMAPPED", UC_MEM_WRITE_PROT : "UC_MEM_WRITE_PROT", UC_MEM_READ_PROT : "UC_MEM_READ_PROT", UC_MEM_FETCH_PROT : "UC_MEM_FETCH_PROT", UC_MEM_READ_AFTER : "UC_MEM_READ_AFTER", } def p8(value): return struct.pack(" None: self.emulator = emulator self.fuses_visible = 0 def read(self, address, size): if len(self.emulator.saved_hwio[address]) > 0: val = self.emulator.saved_hwio[address].pop(0) self.emulator.write_ptr(address, val) return True if address == TegraDevice.CLK_RST_CONTROLLER_MISC_CLK_ENB_0: self.emulator.write_ptr(TegraDevice.CLK_RST_CONTROLLER_MISC_CLK_ENB_0, self.fuses_visible) return True raise NotImplemented def write(self, address, data): return True if address == TegraDevice.CLK_RST_CONTROLLER_MISC_CLK_ENB_0: self.fuses_visible = data return True raise NotImplemented class FuseDevice(TegraDevice): BASE = 0x7000F000 SIZE = 0x1000 NAME = "Fuse" FUSE_ODM_INFO_0 = BASE + 0x99c FUSE_FUSEADDR_0 = BASE + 0x804 FUSE_FUSECTRL_0 = BASE + 0x800 CMD_READ = 1 CMD_IDLE = 0 FUSE_DAT = BytesIO() def __init__(self, emulator: "TegraEmulator") -> None: super().__init__(emulator) self.fuse_ctr_cmd = 0xc0040000 self.fuse_addr = 0x0 def read(self, address, size): if address == FuseDevice.FUSE_ODM_INFO_0: self.emulator.write_ptr(FuseDevice.FUSE_ODM_INFO_0, 2) elif address == FuseDevice.FUSE_FUSECTRL_0: # get last int from fuse_ctr_cmd cmd = self.fuse_ctr_cmd & 0xffffffff if cmd == FuseDevice.CMD_READ: # Handle read # Set idle, set last byte of cmd to 0 self.fuse_ctr_cmd = cmd & 0xffffff00 self.emulator.write_ptr(FuseDevice.FUSE_FUSECTRL_0, self.fuse_ctr_cmd) self.emulator.write_ptr(FuseDevice.FUSE_FUSECTRL_0, self.fuse_ctr_cmd) else: raise NotImplemented return True def write(self, address, value): if address == FuseDevice.FUSE_FUSEADDR_0: self.fuse_addr = value elif address == FuseDevice.FUSE_FUSECTRL_0: self.emulator.write_ptr(FuseDevice.FUSE_FUSECTRL_0, value) self.fuse_ctr_cmd = value else: raise NotImplemented return True pass class TimerDevice(TegraDevice): BASE = 0x60005000 SIZE = 0x1000 NAME = "Timer" READ_TIME_OFFSET = BASE + 0x10 def __init__(self, emulator: "TegraEmulator") -> None: super().__init__(emulator) def read(self, address, size): if address == TimerDevice.READ_TIME_OFFSET: val = int(time.clock_gettime_ns(0)/1000) & 0xffffffff self.emulator.write_ptr(TimerDevice.READ_TIME_OFFSET, val) return True class EmmcDevice(TegraDevice): BASE = 0x700b0000 SIZE = 0x1000 NAME = "Emmc" def __init__(self, emulator: "TegraEmulator") -> None: super().__init__(emulator) def read(self, address, size): pass def write(self, address, value): pass class CryptoDevice(TegraDevice): BASE = 0x70012000 SIZE = 0x1000 NAME = "Crypto" KEY_SLOT_SELECT = BASE + 0x31c KEY_SLOT_START = BASE + 0x320 KEY_SLOT_END = BASE + 0x330 SE_CONFIG_0 = BASE + 0x14 SE_BUSY = BASE + 0x800 SE_START = BASE + 0x8 SE_SHA_CONFIG_0 = BASE + 0x200 SE_SHA_MSG_LENGTH_0 = BASE + 0x204 SE_SHA_MSG_LENGTH_2 = BASE + 0x208 SE_SHA_MSG_LENGTH_2 = BASE + 0x20c SE_IN_LL_ADDR_0 = BASE + 0x18 SE_OUT_LL_ADDR_0 = BASE + 0x24 SE_SHA_MSG_LEFT_0 = BASE + 0x214 SE_RSA_EXP_SIZE_0 = BASE + 0x408 SE_RSA_KEY_SIZE_0 = BASE + 0x404 SE_RSA_KEYTABLE_ADDR_0 = BASE + 0x420 SE_RSA_KEY_SIZE_0_VAL_WIDTH_2048 = 3 def __init__(self, emulator: "TegraEmulator") -> None: super().__init__(emulator) self.keys = {} self.current_key = b"\x00" * 16 self.key_slot_config = 0 self.key_slot = 0 self.config = 0 def read(self, address, size): if address == CryptoDevice.SE_CONFIG_0: self.emulator.write_ptr(CryptoDevice.SE_CONFIG_0, self.config) elif address == CryptoDevice.SE_BUSY: pass else: pass def write(self, address, value): if address == CryptoDevice.KEY_SLOT_SELECT: # Commit key to key slot self.key_slot_config = value self.key_slot = p32(value)[0] # First byte is keyslot self.keys[self.key_slot] = self.current_key elif address >= CryptoDevice.KEY_SLOT_START and address <= CryptoDevice.KEY_SLOT_END: if self.key_slot not in self.keys: self.keys[self.key_slot] = b"\x00" * 16 # Keys are 16 bytes offset = address - CryptoDevice.KEY_SLOT_START self.current_key = self.current_key[:offset] + struct.pack(" None: super().__init__(init_uc) self.log_hw_access = True self.save_hwio = True if saved_hwio: self.saved_hwio = pickle.load(open(saved_hwio, "rb")) else: self.saved_hwio = {} self.hw_itm = hw_itm self.saved_blocks = {} try: self.ghidra = GhidraAssistant() except: pass def setup(self, target="bootrom"): self.target = target self.setup_memory() self.setup_registers() if not self.hw_itm: self.setup_devices() self.setup_hooks() self.apply_patches() def install_debugger(self, debugger : ConcreteDevice): self.debugger = debugger def setup_memory(self): self.bootrom_path = pathlib.Path("bootrom_t124.bin") self.bootrom = self.bootrom_path.read_bytes() self.uc.mem_map(0x100000, page_align_top(len(self.bootrom)), UC_PROT_EXEC | UC_PROT_READ) self.uc.mem_write(0x100000, self.bootrom) # map IMEM self.uc.mem_map(0x40000000, 0x40000, UC_PROT_EXEC | UC_PROT_READ | UC_PROT_WRITE) if self.target == "bootrom": pass else: self.imem_path = pathlib.Path("imem3_bct") self.imem = self.imem_path.read_bytes() self.uc.mem_write(0x40000000, self.imem) # DRAM DRAM_BASE = 0x80000000 DRAM_SIZE = 2 * GB self.uc.mem_map(DRAM_BASE, DRAM_SIZE, UC_PROT_READ | UC_PROT_WRITE | UC_PROT_EXEC) def setup_registers(self, target="bootrom"): if self.target == "bootrom": self.pc = 0x100000 | 1 self.sp = 0x4000d000 self.is_thumb = True else: self.sp = 0x4000d000 self.pc = 0x4000e000 self.is_thumb = False def setup_devices(self): self.devices = {} # self.devices['fuse'] = FuseDevice(self) # self.devices['timer'] = TimerDevice(self) # self.devices['emmc'] = EmmcDevice(self) self.devices['crypto'] = CryptoDevice(self) # Setup EMMC data self.emmc_raw_data = open("/home/eljakim/Source/tegrax1_plus/dump/nv-recovery-image-shield-tablet-lte-us-update3_1_1/blob", "rb").read()[0x112dc9f - 528:] self.devices['tegra'] = TegraDevice(self) # For all other devices def hook_unmapped(self, uc, access, address, size, value, user_data): print(f"Unmapped memory access at 0x{address:x} with size {size} and access {acces_str[access]}") pass def hook_mem_access(self, uc, access, address, size, value, user_data): # Hook all memory accesses # if self.log_hw_access: # p_info(f"{hex(self.pc)} HW access at 0x{address:x} with size {size}, value={hex(value)} and access {acces_str[access]}") if access == UC_MEM_WRITE: self.debugger.memwrite_region(address, self.uc.mem_read(address, size)) if access == UC_MEM_READ: self.uc.mem_write(address, self.debugger.memdump_region(address, size)) pass def hw_itm_handle(self, access, address, size, value): # All unmapped memory is send to the debugger if self.log_hw_access: if access == UC_MEM_READ: val = self.debugger.memdump_region(address, size) if len(val) == 4: val = struct.unpack(" {hex(value)}") try: if address == 0x70012800: # self.ghidra.ghidra.set_background_color(self.saved_blocks) sys.exit(0) pass if access == UC_MEM_WRITE: if size == 4: self.debugger.memwrite_region(address, p32(value)) # self.uc.mem_write(address, p32(value)) # self.uc.mem_write(address, self.debugger.memdump_region(address, size)) elif size == 1: self.debugger.memwrite_io(address, p8(value)) # self.uc.mem_write(address, p8(value)) # self.uc.mem_write(address, self.debugger.memdump_region(address, size)) else: raise Exception("Unhandled write!") elif access == UC_MEM_READ: if size == 1: pass self.uc.mem_write(address, self.debugger.memdump_region(address, size)) else: raise Exception("Not handled!") except Exception as e: print(e) sys.exit(0) pass return True def get_device_at(self, address): for devname in self.devices: dev = self.devices[devname] if address >= dev.BASE and address < dev.BASE + dev.SIZE: return dev return self.devices['tegra'] # raise Exception(f"No device found at address {hex(address)} pc={hex(sef.pc)}") def hw_emulation_handle(self, access, address, size, value): dev = self.get_device_at(address) print(f"Device={dev.NAME} pc={hex(self.pc)} target=0x{address:x} size={size} access={acces_str[access]}") if access == UC_MEM_READ: dev.read(address, size) elif access == UC_MEM_WRITE: dev.write(address, value) return True def sync_imem(self): ''' Syncs IMEM regions ''' if not hasattr(self, "debugger"): pass imem = self.uc.mem_read(0x40000000, 0x40000) imem_debugger = self.debugger.memdump_region(0x40000000, 0x40000) pass def hook_hw_access(self, uc, access, address, size, value, user_data): # if address == 0x70012800: # self.sync_imem() # pass if self.hw_itm: return self.hw_itm_handle(access, address, size, value) return self.hw_emulation_handle(access, address, size, value) def setup_hooks(self): # hook unmapped self.uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_FETCH_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_UNMAPPED, self.hook_unmapped) # 0x6000f000 self.uc.mem_map(0x60000000, 0x20000, UC_PROT_READ | UC_PROT_WRITE) self.uc.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, self.hook_hw_access, begin=0x60000000, end=0x60000000 + 0x10000) self.uc.mem_map(0x70000000, 0x100000, UC_PROT_READ | UC_PROT_WRITE) self.uc.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, self.hook_hw_access, begin=0x70000000, end=0x70000000 + 0x100000) #ROM # self.uc.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, self.hook_mem_access, self, 0x100000, 0x100000 + len(self.bootrom)) #IMEM access # self.uc.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, self.hook_mem_access, self, 0x40000000, 0x40000000 + 0x40000) # DRAM # self.uc.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, self.hook_mem_access, self, 0x80000000, 0x80000000 + 2 * GB) if self.target == "bootrom": self.setup_warmboot_hook() self.setup_hook_blocks() self.setup_rcm_hooks() self.setup_sdmmc_hooks() self.setup_rsa_verify_hook() else: self.setup_log_hook() self.setup_hook_blocks() # self.setup_hook_EmmcValidateResponse() def setup_coldboot_hook(self): def hook_coldboot(uc, address, size, user_data): logging.info(f"Reached coldboot target.") self.print_ctx() return True self.uc.hook_add(UC_HOOK_CODE, hook_coldboot, begin=0x0010145e, end=0x0010145e + 1) def setup_rcm_hooks(self): def hook_rcm(uc, address, size, user_data): self.R0 = 0 self.R1 = 0 return True self.uc.hook_add(UC_HOOK_CODE, hook_rcm, begin=0x00101414, end=0x00101414 + 1) def setup_warmboot_hook(self): def hook_warmboot(uc, address, size, user_data): logging.info(f"Hooking warmboot, forcing coldboot.") self.R0 = 0 return True self.uc.hook_add(UC_HOOK_CODE, hook_warmboot, begin=0x00101f3a, end=0x00101f3a + 1) def setup_rsa_verify_hook(self): def hook_rsa_signature_verify(uc, address, size, user_data): # TODO not working print(f"RSA signature verify keyslot={self.R0} keysize={self.R1} input={self.R2} message_hash={self.R3} input_size={self.R4} signature={self.R5} algo={self.R6} signature_size={self.R7}") crypto = self.devices['crypto'] crypto_key = crypto.rsa_key[:crypto.rsa_key_size // 8] signature = self.uc.mem_read(self.R7, 0x100) modulus = int.from_bytes(crypto_key, byteorder='big') public_numbers = rsa.RSAPublicNumbers(e=65537, n=modulus) public_key = public_numbers.public_key(default_backend()) try: public_key.verify( signature, b"", padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) print("Signature is valid.") except InvalidSignature: print("Signature is invalid.") # Verify signature self.PC = self.LR self.R0 = 0 return True self.uc.hook_add(UC_HOOK_CODE, hook_rsa_signature_verify, begin=0x00102f08, end=0x00102f08 + 1) def apply_patches(self): # Nop out 400101f0 to 0x40010220, maybe this is restricting access to IMEM and ROM? self.sc.mov_0_r0 = self.ks.asm("mov r0, #0", as_bytes=True)[0] # self.uc.mem_write(0x400101e4, self.sc.mov_0_r0 * ((0x40010220 - 0x400101e4) // 4)) self.sc.bx_lr = self.ks.asm("bx lr", as_bytes=True)[0] bx_lr_thumb = self.ksT.asm("bx lr", as_bytes=True)[0] movs_0_r0_thumb = self.ksT.asm("movs r0, #0", as_bytes=True)[0] # self.uc.mem_write(0x4001dfb0, self.sc.mov_0_r0 + self.sc.bx_lr) if self.target == "bootrom": #NvBootClocksIsPllStable, ret # self.uc.mem_write(0x00101730, bx_lr_thumb) # # NvBootClocksStartPll self.uc.mem_write(0x00101866, bx_lr_thumb) # NvBootClocksPllDivRstCtrl self.uc.mem_write(0x001016ce, bx_lr_thumb) #usb init? self.uc.mem_write(0x00103bf4, bx_lr_thumb) #SE engine always ready # self.uc.mem_write(0x00102b24, movs_0_r0_thumb) pass def run(self): try: self.uc.emu_start(self.pc, 0) pass except Exception as e: print(str(e)) self.print_ctx(print) pass def setup_log_hook(self): UART_LOG_HOOK = 0x4001cadc def hook_log(uc, address, size, user_data): msg = self.read_string(self.R0) try: args = msg.count(b"%") arg_types = [] offset = 0 for i in range(args): c_offset = msg[offset:].find(b"%") mtype = msg[c_offset:offset + 2] offset += c_offset + 2 arg_types.append(mtype) def read_msg_var(mtype, addr): if mtype == b"%s": return self.read_string(addr) elif mtype == b"%d": return eval('b"'+ str(addr) +'"')# As int else: return eval('b"'+ hex(addr)[2:] +'"')# As hex arg_str = [] for i in range(args): if i == 0: arg_str.append(read_msg_var(arg_types[i], self.R1)) elif i == 1: arg_str.append(read_msg_var(arg_types[i], self.R2)) elif i == 2: arg_str.append(read_msg_var(arg_types[i], self.R3)) else: break for i in range(len(arg_str)): offset = msg.find(b"%") msg = msg[:offset] + arg_str[i] + msg[offset + 2:] except Exception as e: pass print(f"{hex(self.LR)} : {msg}") if(b"Sdmmc Read failed" in msg): pass return True self.uc.hook_add(UC_HOOK_CODE, hook_log, begin=UART_LOG_HOOK, end=UART_LOG_HOOK + 1) # And patch function to just return self.uc.mem_write(UART_LOG_HOOK, self.ks.asm("bx lr", as_bytes=True)[0]) def setup_sdmmc_hooks(self): def hook_sdmmc(uc, address, size, user_data): block = self.R0 page = self.R1 destbuf = self.R2 offset = page * 0x200 dat = self.emmc_raw_data[offset:offset + 0x200] self.uc.mem_write(destbuf, dat) self.PC = self.LR return True self.uc.hook_add(UC_HOOK_CODE, hook_sdmmc, begin=NVBOOTSDMMCREADPAGE, end=NVBOOTSDMMCREADPAGE + 1) def setup_se_sha_hash_hook(self): def hook_se_sha(uc, address, size, user_data): self.saved_se_sha[self.pc] = self.get_registers() return True self.uc.hook_add(UC_HOOK_CODE, hook_se_sha, begin=NVBOOTSESHAHASH, end=NVBOOTSESHAHASH + 1) def setup_hook_blocks(self, only_blocks=False): if only_blocks: def hook_block(uc, address, size, user_data): # print(f"Block at {hex(self.LR)}") self.saved_blocks[self.LR] = self.get_registers() return True self.uc.hook_add(UC_HOOK_BLOCK, hook_block) else: def hook_all(uc, address, size, user_data): if self.pc == NVBOOTREADONEOBJECT: info(f"NvBootReadOneObject context={self.R0:04x} dest={self.R1:04x} bct_obj_dst={self.R2:04x} num_copies={self.R3}") if self.pc == 0x00104558: # Sync 0x40000100, length 0x2000 info(f"NvBootReadOneObject done dest={self.read_ptr(self.R1)}") # if self.pc == NVBOOTSDMMCREADPAGE: # # At this point we can read our on pages if requested. # info(f"SdmmcReadPage block={self.R0} page={self.R1} destbuf={self.R2:04x}") # print(f"Block at {hex(self.LR)}") if self.pc == 0x00102ee8: pass if self.pc == 0x00102f1e: pass elif self.pc == 0x00104418: pass #Validatebct elif self.pc == 0x00104452: pass #validate done self.saved_blocks[self.pc] = self.get_registers() return True self.uc.hook_add(UC_HOOK_CODE, hook_all, self) def setup_interrupt_hook(self): RAISE_INTERRUPT = 0x4001cab8 def hook_interrupt(uc, address, size, user_data): print(f"Interrupt at {self.LR}") return True self.uc.hook_add(UC_HOOK_CODE, hook_interrupt, begin=RAISE_INTERRUPT, end=RAISE_INTERRUPT + 1) def setup_hook_EmmcValidateResponse(self): self.saved_emmc_responses = {} def hook_emmc(uc, address, size, user_data): self.saved_emmc_responses[self.pc] = self.get_registers() return True self.uc.hook_add(UC_HOOK_CODE, hook_emmc, begin=0x4001dfb0, end=0x4001e160) def do_partial_emu(debugger : ConcreteDevice, real_hw=True): if real_hw: emu = TegraEmulator() emu.install_debugger(debugger) else: emu = TegraEmulator(hw_itm=False, saved_hwio="bin/t124_brom_hwio.pickle") emu.setup(target="bootrom") emu.run() if __name__ == "__main__": do_partial_emu(None, real_hw=False)