import base64 import binascii import os.path import re import socket import struct import sys import time import zlib import requests from tqdm import tqdm from hashlib import sha256 from tabulate import tabulate import xxtea from struct import pack from ecdsa import BRAINPOOLP256r1, SigningKey, VerifyingKey FRAM_MD5 = 'aadc292fe4063a7ac392e3c3dde51e84' FRAM_OFFSETS = [3120, 3204, 3206, 3207] RESP_PAT = re.compile(rb'admin:\n(.+)\nOK\n', re.MULTILINE | re.DOTALL) SERIAL_PAT = re.compile(rb'RIGOL TECHNOLOGIES\$(.+?)\$(.+?)\$(.+?)\n\$(.+?)\$(.+?)\$(.+?)\$(.+?)$') OPTIONS = ['BW1T2', 'BW1T3', 'BW1T5', 'BW2T3', 'BW2T5', 'BW3T5', 'MSO', '2RL', '5RL', 'BND', 'COMP', 'EMBD', 'AUTO', 'FLEX', 'AUDIO', 'SENSOR', 'AERO', 'ARINC', 'DG', 'JITTER', 'MASK', 'PWR', 'DVM', 'CTR', 'EDK', '4CH', 'BW07T1', 'BW07T2', 'BW07T3', 'BW07T5'] PRIV_PATH = 'priv.pem' KEY1 = b''.join(pack(' /tmp/byte1', need_res=False) exec_rigol_cmd(ip_addr, 'echo -n -e \'\\x3d\' > /tmp/byte2', need_res=False) exec_rigol_cmd(ip_addr, 'echo -n -e \'\\x5b\' > /tmp/byte3', need_res=False) exec_rigol_cmd(ip_addr, 'echo -n -e \'\\xe5\' > /tmp/byte4', need_res=False) exec_rigol_cmd(ip_addr, 'cp /rigol/tools/fram /rigol/tools/fram01', need_res=False) exec_rigol_cmd(ip_addr, 'chmod +x /rigol/tools/fram01', need_res=False) exec_rigol_cmd(ip_addr, 'dd if=/tmp/byte1 of=/rigol/tools/fram01 obs=1 seek=%d conv=notrunc' % FRAM_OFFSETS[0], need_res=False) exec_rigol_cmd(ip_addr, 'dd if=/tmp/byte2 of=/rigol/tools/fram01 obs=1 seek=%d conv=notrunc' % FRAM_OFFSETS[1], need_res=False) exec_rigol_cmd(ip_addr, 'dd if=/tmp/byte3 of=/rigol/tools/fram01 obs=1 seek=%d conv=notrunc' % FRAM_OFFSETS[2], need_res=False) exec_rigol_cmd(ip_addr, 'dd if=/tmp/byte4 of=/rigol/tools/fram01 obs=1 seek=%d conv=notrunc' % FRAM_OFFSETS[3], need_res=False) with tqdm(total=len(new_cfram)) as pb: for i, b in enumerate(new_cfram): exec_rigol_cmd(ip_addr, '/rigol/tools/fram01 -w %x %02x' % (i, b), need_res=False) pb.update(1) print('New CFRAM applied.\n') exec_rigol_cmd(ip_addr, 'cp /rigol/data/Key.data /rigol/data/Key.data.bak', need_res=False) print('Key.data backup created.') exec_rigol_cmd(ip_addr, 'echo -n %s | base64 -d > /rigol/data/Key.data' % base64.b64encode(new_key).decode(), need_res=False) print('New Key.data applied.') def check_fram_tool(ip_addr): res = exec_rigol_cmd(ip_addr, 'md5sum /rigol/tools/fram') res = res.split(' ')[0] if res != FRAM_MD5: print('Different /rigol/tools/fram hash. You have to recalc FRAM_OFFSETS!') exit(-1) print('/rigol/tools/fram is OK!\n') def deactivate_option(ip_addr, code, line): print('Deactivating: %s...' % code, end=' ') s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip_addr, 5555)) s.sendall(b':SYST:OPT:UNIN %s\n' % line) print('deactivated.') s.close() def activate_option(ip_addr, code, line): print('Activating: %s...' % code, end=' ') s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) s.connect((ip_addr, 5555)) s.sendall(b':SYST:OPT:INST %s\n' % line) s.sendall(b':SYST:OPT:STAT? %s\n' % code.encode()) try: res = s.recv(2) res = res.rstrip(b'\n') if res == b'0': print('not', end=' ') print('activated.') except TimeoutError: print('unavailable.') finally: s.close() def get_unavail_options(ip_addr): items = [] while True: res = requests.post('http://%s/cgi-bin/options.cgi' % ip_addr) if res.status_code == 500: continue body = res.content.decode() items = body.split('#') break table = [['Code', 'Status', 'Description']] actives = [] for item in items: row = item.split('$') table.append(row) if row[1] == 'Forever': actives.append(row[0]) print(tabulate(table, headers='firstrow', tablefmt='fancy_grid')) return list(set(OPTIONS) - set(actives)) def main(ip_addr): unavails = get_unavail_options(ip_addr) check_fram_tool(ip_addr) model, ser = read_rigol_model_serial(ip_addr) opts = [] key_hex = None prev_key = True for option in unavails: opt_sign, key_hex, prev_key = sign_option({ 'model': model, 'serial': ser, 'option': option, 'version': '1.0' }) opts.append((option, opt_sign)) # Set this to False to force overwrite the key # prev_key = False cfram = None if not prev_key: cfram = read_cfram_data(ip_addr) new_key = b'brainpoolP256r1;%s' % key_hex # ELHER necessary due to different key on my scope. # if len(new_key) < 388: # new_key += (388 - len(new_key)) * b"\xaa" new_key = encrypt_xxtea1(new_key) # new_key = # new_key = encrypt_xxtea2(b'brainpoolP256r1;%s' % key_hex) if not prev_key: new_cfram = replace_cfram_key(cfram, new_key) apply_new_key(ip_addr, new_cfram, new_key) model = model[:-2] + '00' for opt in opts: code = opt[0].encode() activate_option(ip_addr, code.decode(), b'%s-%s@%s' % (model.encode(), code, opt[1])) get_unavail_options(ip_addr) if __name__ == '__main__': if len(sys.argv) < 2: print('Usage: python rigol_kg.py 192.168.1.1') main(sys.argv[1])