rigol_scope/rigol_kg.py
2024-01-17 23:11:23 +01:00

389 lines
11 KiB
Python

import base64
import binascii
import os.path
import re
import socket
import struct
import sys
import time
import zlib
import requests
from tqdm import tqdm
from hashlib import sha256
from tabulate import tabulate
import xxtea
from struct import pack
from ecdsa import BRAINPOOLP256r1, SigningKey, VerifyingKey
FRAM_MD5 = 'aadc292fe4063a7ac392e3c3dde51e84'
FRAM_OFFSETS = [3120, 3204, 3206, 3207]
RESP_PAT = re.compile(rb'admin:\n(.+)\nOK\n', re.MULTILINE | re.DOTALL)
SERIAL_PAT = re.compile(rb'RIGOL TECHNOLOGIES\$(.+?)\$(.+?)\$(.+?)\n\$(.+?)\$(.+?)\$(.+?)\$(.+?)$')
OPTIONS = ['BW1T2', 'BW1T3', 'BW1T5', 'BW2T3', 'BW2T5',
'BW3T5', 'MSO', '2RL', '5RL', 'BND', 'COMP',
'EMBD', 'AUTO', 'FLEX', 'AUDIO', 'SENSOR',
'AERO', 'ARINC', 'DG', 'JITTER', 'MASK',
'PWR', 'DVM', 'CTR', 'EDK', '4CH', 'BW07T1',
'BW07T2', 'BW07T3', 'BW07T5']
PRIV_PATH = 'priv.pem'
KEY1 = b''.join(pack('<I', x) for x in [0x03920001, 0x08410841, 0x18C32104, 0x318639C7])
KEY2 = b''.join(pack('<I', x) for x in [0x478AA887, 0x99A85895, 0x1770078, 0x87888798])
def decrypt_xxtea1(buf):
dec = xxtea.decrypt(buf, KEY1, padding=False)
return dec
def decrypt_xxtea2(buf):
dec = xxtea.decrypt(buf, KEY2, padding=False)
return dec
def encrypt_xxtea1(buf):
delta = len(buf) % 4
if delta:
buf += b'\x00' * (4 - delta)
enc = xxtea.encrypt(buf, KEY1, padding=False)
return enc
def encrypt_xxtea2(buf):
delta = len(buf) % 4
if delta:
buf += b'\x00' * (4 - delta)
enc = xxtea.encrypt(buf, KEY2, padding=False)
return enc
def sign_option(opt):
bb = bytearray()
bb.extend(opt['model'].encode())
bb.extend(opt['serial'].encode())
bb.extend(opt['option'].encode())
bb.extend(opt['version'].encode())
bb.append(0x00)
bb.append(0x00)
buf = bytes(bb)
dig = sha256(buf).digest()
prev_key = True
if not os.path.exists(PRIV_PATH):
sk = SigningKey.generate(curve=BRAINPOOLP256r1, hashfunc=sha256)
with open(PRIV_PATH, 'wb') as w:
w.write(sk.to_pem())
prev_key = False
else:
with open(PRIV_PATH) as f:
sk = SigningKey.from_pem(f.read())
sign = sk.sign_digest_deterministic(dig)
vk = sk.verifying_key
vkk = b'04' + vk.to_string().hex().upper().encode()
vk = VerifyingKey.from_string(binascii.unhexlify(vkk), curve=BRAINPOOLP256r1)
assert vk.verify_digest(sign, dig)
return binascii.hexlify(sign).upper(), vkk, prev_key
BLOCK_HDR_FMT = '<IiIiI'
def calc_crc32(buf):
return zlib.crc32(buf) & 0xFFFFFFFF
def get_dw(buf, off):
dw = struct.unpack_from('<I', buf, off)[0]
return dw, off + 4
def get_dws(buf, off):
dw = struct.unpack_from('<i', buf, off)[0]
return dw, off + 4
def get_data(buf, off, size):
block = buf[off:off + size]
return block, off + size
def read_block(buf, off):
start = off
id_, id_neg, data_size, data_size_neg, crc32 = struct.unpack_from(BLOCK_HDR_FMT, buf, off)
off += struct.calcsize(BLOCK_HDR_FMT)
assert ((id_ + id_neg) == 0) and ((data_size + data_size_neg) == 0)
block_data, off = get_data(buf, off, data_size)
crc32_real = calc_crc32(block_data)
assert crc32_real == crc32
return {
'offset': start,
'id': id_,
'data': block_data,
'crc32': crc32_real
}, off
def neg(val):
return -1 * val
def write_block(buf, off, block, block_data):
block_len = len(block_data)
assert len(block['data']) == block_len
crc32 = calc_crc32(block_data)
struct.pack_into(BLOCK_HDR_FMT, buf, off + block['offset'], block['id'], neg(block['id']), block_len,
neg(block_len), crc32)
block_data_off = off + block['offset'] + struct.calcsize(BLOCK_HDR_FMT)
buf[block_data_off:block_data_off + block_len] = block_data
def replace_cfram_key(cfram, new_key):
buf = cfram[0x100:]
off = 0
full_size, off = get_dw(buf, off)
full_size_neg, off = get_dws(buf, off)
assert (full_size + full_size_neg) == 0
items = {}
while off < full_size:
block, off = read_block(buf, off)
# print('%04X: id=%04d, data_sz=%04d, data=%s, crc32=%08X' % (
# block['offset'], block['id'], len(block['data']), binascii.hexlify(block['data']).decode(), block['crc32']))
items[block['id']] = block
# print('last_off: 0x%04X' % off)
data = bytearray(cfram)
pub_key = items[4512]
write_block(data, 0x100, pub_key, new_key)
return bytes(data)
def exec_rigol_cmd(ip_addr, cmd, need_res=True):
while True:
res = requests.post('http://%s/cgi-bin/changepwd.cgi' % ip_addr, data={'pass0': '', 'pass1': '; %s # "' % cmd})
if res.status_code == 500:
continue
body = res.content
m = RESP_PAT.match(body)
if m is None and need_res:
continue
if need_res:
grp = m.group(1)
return grp.decode()
else:
return None
def read_cfram_data(ip_addr):
print('Reading CFRAM...')
cfram = bytearray()
i = 0
with tqdm(total=0x2000) as pb:
while i < 0x2000:
cmd = '/rigol/tools/fram -r %0x' % i
res = exec_rigol_cmd(ip_addr, cmd)
if res is None:
continue
bb = binascii.unhexlify(res.replace(',', ''))
cfram.extend(bb)
i += 0x10
pb.update(0x10)
print('Reading CFRAM done.\n')
return bytes(cfram)
def read_rigol_model_serial(ip_addr):
res = requests.post('http://%s/cgi-bin/welcome.cgi' % ip_addr)
body = res.content
m = SERIAL_PAT.match(body)
if m is None:
return None
model = m.group(1).decode()
ser = m.group(2).decode()
ver = m.group(3).decode()
mac = m.group(4).decode()
print('Model: %s\nSerial: %s\nVersion: %s\nMAC: %s\n' % (model, ser, ver, mac))
return model, ser
def apply_new_key(ip_addr, new_cfram, new_key):
print('Applying new CFRAM...')
exec_rigol_cmd(ip_addr, 'echo -n -e \'\\x03\' > /tmp/byte1', need_res=False)
exec_rigol_cmd(ip_addr, 'echo -n -e \'\\x3d\' > /tmp/byte2', need_res=False)
exec_rigol_cmd(ip_addr, 'echo -n -e \'\\x5b\' > /tmp/byte3', need_res=False)
exec_rigol_cmd(ip_addr, 'echo -n -e \'\\xe5\' > /tmp/byte4', need_res=False)
exec_rigol_cmd(ip_addr, 'cp /rigol/tools/fram /rigol/tools/fram01', need_res=False)
exec_rigol_cmd(ip_addr, 'chmod +x /rigol/tools/fram01', need_res=False)
exec_rigol_cmd(ip_addr, 'dd if=/tmp/byte1 of=/rigol/tools/fram01 obs=1 seek=%d conv=notrunc' % FRAM_OFFSETS[0], need_res=False)
exec_rigol_cmd(ip_addr, 'dd if=/tmp/byte2 of=/rigol/tools/fram01 obs=1 seek=%d conv=notrunc' % FRAM_OFFSETS[1], need_res=False)
exec_rigol_cmd(ip_addr, 'dd if=/tmp/byte3 of=/rigol/tools/fram01 obs=1 seek=%d conv=notrunc' % FRAM_OFFSETS[2], need_res=False)
exec_rigol_cmd(ip_addr, 'dd if=/tmp/byte4 of=/rigol/tools/fram01 obs=1 seek=%d conv=notrunc' % FRAM_OFFSETS[3], need_res=False)
with tqdm(total=len(new_cfram)) as pb:
for i, b in enumerate(new_cfram):
exec_rigol_cmd(ip_addr, '/rigol/tools/fram01 -w %x %02x' % (i, b), need_res=False)
pb.update(1)
print('New CFRAM applied.\n')
exec_rigol_cmd(ip_addr, 'cp /rigol/data/Key.data /rigol/data/Key.data.bak', need_res=False)
print('Key.data backup created.')
exec_rigol_cmd(ip_addr, 'echo -n %s | base64 -d > /rigol/data/Key.data' % base64.b64encode(new_key).decode(), need_res=False)
print('New Key.data applied.')
def check_fram_tool(ip_addr):
res = exec_rigol_cmd(ip_addr, 'md5sum /rigol/tools/fram')
res = res.split(' ')[0]
if res != FRAM_MD5:
print('Different /rigol/tools/fram hash. You have to recalc FRAM_OFFSETS!')
exit(-1)
print('/rigol/tools/fram is OK!\n')
def deactivate_option(ip_addr, code, line):
print('Deactivating: %s...' % code, end=' ')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip_addr, 5555))
s.sendall(b':SYST:OPT:UNIN %s\n' % line)
print('deactivated.')
s.close()
def activate_option(ip_addr, code, line):
print('Activating: %s...' % code, end=' ')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
s.connect((ip_addr, 5555))
s.sendall(b':SYST:OPT:INST %s\n' % line)
s.sendall(b':SYST:OPT:STAT? %s\n' % code.encode())
try:
res = s.recv(2)
res = res.rstrip(b'\n')
if res == b'0':
print('not', end=' ')
print('activated.')
except TimeoutError:
print('unavailable.')
finally:
s.close()
def get_unavail_options(ip_addr):
items = []
while True:
res = requests.post('http://%s/cgi-bin/options.cgi' % ip_addr)
if res.status_code == 500:
continue
body = res.content.decode()
items = body.split('#')
break
table = [['Code', 'Status', 'Description']]
actives = []
for item in items:
row = item.split('$')
table.append(row)
if row[1] == 'Forever':
actives.append(row[0])
print(tabulate(table, headers='firstrow', tablefmt='fancy_grid'))
return list(set(OPTIONS) - set(actives))
def main(ip_addr):
unavails = get_unavail_options(ip_addr)
check_fram_tool(ip_addr)
model, ser = read_rigol_model_serial(ip_addr)
opts = []
key_hex = None
prev_key = True
for option in unavails:
opt_sign, key_hex, prev_key = sign_option({
'model': model,
'serial': ser,
'option': option,
'version': '1.0'
})
opts.append((option, opt_sign))
# Set this to False to force overwrite the key
# prev_key = False
cfram = None
if not prev_key:
cfram = read_cfram_data(ip_addr)
new_key = b'brainpoolP256r1;%s' % key_hex
# ELHER necessary due to different key on my scope.
# if len(new_key) < 388:
# new_key += (388 - len(new_key)) * b"\xaa"
new_key = encrypt_xxtea1(new_key)
# new_key =
# new_key = encrypt_xxtea2(b'brainpoolP256r1;%s' % key_hex)
if not prev_key:
new_cfram = replace_cfram_key(cfram, new_key)
apply_new_key(ip_addr, new_cfram, new_key)
model = model[:-2] + '00'
for opt in opts:
code = opt[0].encode()
activate_option(ip_addr, code.decode(), b'%s-%s@%s' % (model.encode(), code, opt[1]))
get_unavail_options(ip_addr)
if __name__ == '__main__':
if len(sys.argv) < 2:
print('Usage: python rigol_kg.py 192.168.1.1')
main(sys.argv[1])