import subprocess,lzma import struct,os,re from npk import NovaPackage,NpkPartID,NpkFileContainer def replace_chunks(old_chunks,new_chunks,data,name): pattern_parts = [re.escape(chunk) + b'(.{0,6})' for chunk in old_chunks[:-1]] pattern_parts.append(re.escape(old_chunks[-1])) pattern_bytes = b''.join(pattern_parts) pattern = re.compile(pattern_bytes, flags=re.DOTALL) def replace_match(match): replaced = b''.join([new_chunks[i] + match.group(i+1) for i in range(len(new_chunks) - 1)]) replaced += new_chunks[-1] print(f'{name} public key patched {b"".join(old_chunks)[:16].hex().upper()}...') return replaced return re.sub(pattern, replace_match, data) def replace_key(old,new,data,name=''): old_chunks = [old[i:i+4] for i in range(0, len(old), 4)] new_chunks = [new[i:i+4] for i in range(0, len(new), 4)] data = replace_chunks(old_chunks, new_chunks, data,name) key_map = [28,19,25,16,14,3,24,15,22,8,6,17,11,7,9,23,18,13,10,0,26,21,2,5,20,30,31,4,27,29,1,12,] old_chunks = [bytes([old[i]]) for i in key_map] new_chunks = [bytes([new[i]]) for i in key_map] data = replace_chunks(old_chunks, new_chunks, data,name) arch = os.getenv('ARCH') or 'x86' arch = arch.replace('-', '') if arch in ['arm64','arm']: old_chunks = [old[i:i+4] for i in range(0, len(old), 4)] new_chunks = [new[i:i+4] for i in range(0, len(new), 4)] old_bytes = old_chunks[4] + old_chunks[5] + old_chunks[2] + old_chunks[0] + old_chunks[1] + old_chunks[6] + old_chunks[7] new_bytes = new_chunks[4] + new_chunks[5] + new_chunks[2] + new_chunks[0] + new_chunks[1] + new_chunks[6] + new_chunks[7] if old_bytes in data: print(f'{name} public key patched {old[:16].hex().upper()}...') data = data.replace(old_bytes,new_bytes) old_codes = [bytes.fromhex('793583E2'),bytes.fromhex('FD3A83E2'),bytes.fromhex('193D83E2')] #0x1e400000+0xfd000+0x640 new_codes = [bytes.fromhex('FF34A0E3'),bytes.fromhex('753C83E2'),bytes.fromhex('FC3083E2')] #0xff0075fc= 0xff000000+0x7500+0xfc data = replace_chunks(old_codes, new_codes, data,name) else: def conver_chunks(data:bytes): ret = [ (data[2] << 16) | (data[1] << 8) | data[0] | ((data[3] << 24) & 0x03000000), (data[3] >> 2) | (data[4] << 6) | (data[5] << 14) | ((data[6] << 22) & 0x1C00000), (data[6] >> 3) | (data[7] << 5) | (data[8] << 13) | ((data[9] << 21) & 0x3E00000), (data[9] >> 5) | (data[10] << 3) | (data[11] << 11) | ((data[12] << 19) & 0x1F80000), (data[12] >> 6) | (data[13] << 2) | (data[14] << 10) | (data[15] << 18), data[16] | (data[17] << 8) | (data[18] << 16) | ((data[19] << 24) & 0x01000000), (data[19] >> 1) | (data[20] << 7) | (data[21] << 15) | ((data[22] << 23) & 0x03800000), (data[22] >> 3) | (data[23] << 5) | (data[24] << 13) | ((data[25] << 21) & 0x1E00000), (data[25] >> 4) | (data[26] << 4) | (data[27] << 12) | ((data[28] << 20) & 0x3F00000), (data[28] >> 6) | (data[29] << 2) | (data[30] << 10) | (data[31] << 18) ] return [struct.pack(' /dev/null | sed -n '11p' stdout,_ = run_shell_command(f"debugfs {dev} -R 'stat {file}' 2> /dev/null | sed -n '11p' ") #(0-11):1592-1603, (IND):1173, (12-15):1604-1607, (16-26):1424-1434 blocks_info = stdout.decode().strip().split(',') print(f'blocks_info : {blocks_info}') blocks = [] ind_block_id = None for block_info in blocks_info: _tmp = block_info.strip().split(':') if _tmp[0].strip() == '(IND)': ind_block_id = int(_tmp[1]) else: print(f'block_info : {block_info}') id_range = _tmp[0].strip().replace('(','').replace(')','').split('-') block_range = _tmp[1].strip().replace('(','').replace(')','').split('-') blocks += [id for id in range(int(block_range[0]),int(block_range[1])+1)] print(f' blocks : {len(blocks)} ind_block_id : {ind_block_id}') #sudo debugfs /dev/nbd0p1 -R 'cat boot/initrd.rgz' > data data,stderr = run_shell_command(f"debugfs {dev} -R 'cat {file}' 2> /dev/null") new_data = patch_kernel(data,key_dict) print(f'write block {len(blocks)} : [',end="") with open(dev,'wb') as f: for index,block_id in enumerate(blocks): print('#',end="") f.seek(block_id*BLOCK_SIZE) f.write(new_data[index*BLOCK_SIZE:(index+1)*BLOCK_SIZE]) f.flush() print(']') def patch_initrd_xz(initrd_xz:bytes,key_dict:dict,ljust=True): initrd = lzma.decompress(initrd_xz) new_initrd = initrd for old_public_key,new_public_key in key_dict.items(): new_initrd = replace_key(old_public_key,new_public_key,new_initrd,'initrd') preset = 6 new_initrd_xz = lzma.compress(new_initrd,check=lzma.CHECK_CRC32,filters=[{"id": lzma.FILTER_LZMA2, "preset": preset }] ) while len(new_initrd_xz) > len(initrd_xz) and preset < 9: print(f'preset:{preset}') print(f'new initrd xz size:{len(new_initrd_xz)}') print(f'old initrd xz size:{len(initrd_xz)}') preset += 1 new_initrd_xz = lzma.compress(new_initrd,check=lzma.CHECK_CRC32,filters=[{"id": lzma.FILTER_LZMA2, "preset": preset }] ) if len(new_initrd_xz) > len(initrd_xz): new_initrd_xz = lzma.compress(new_initrd,check=lzma.CHECK_CRC32,filters=[{"id": lzma.FILTER_LZMA2, "preset": 9 | lzma.PRESET_EXTREME,'dict_size': 32*1024*1024,"lc": 4,"lp": 0, "pb": 0,}] ) if ljust: print(f'preset:{preset}') print(f'new initrd xz size:{len(new_initrd_xz)}') print(f'old initrd xz size:{len(initrd_xz)}') print(f'ljust size:{len(initrd_xz)-len(new_initrd_xz)}') assert len(new_initrd_xz) <= len(initrd_xz),'new initrd xz size is too big' new_initrd_xz = new_initrd_xz.ljust(len(initrd_xz),b'\0') return new_initrd_xz def find_7zXZ_data(data:bytes): offset1 = 0 _data = data while b'\xFD7zXZ\x00\x00\x01' in _data: offset1 = offset1 + _data.index(b'\xFD7zXZ\x00\x00\x01') + 8 _data = _data[offset1:] offset1 -= 8 offset2 = 0 _data = data while b'\x00\x00\x00\x00\x01\x59\x5A' in _data: offset2 = offset2 + _data.index(b'\x00\x00\x00\x00\x01\x59\x5A') + 7 _data = _data[offset2:] print(f'found 7zXZ data offset:{offset1} size:{offset2-offset1}') return data[offset1:offset2] def patch_elf(data: bytes,key_dict:dict): initrd_xz = find_7zXZ_data(data) new_initrd_xz = patch_initrd_xz(initrd_xz,key_dict) return data.replace(initrd_xz,new_initrd_xz) def patch_pe(data: bytes,key_dict:dict): vmlinux_xz = find_7zXZ_data(data) vmlinux = lzma.decompress(vmlinux_xz) initrd_xz_offset = vmlinux.index(b'\xFD7zXZ\x00\x00\x01') initrd_xz_size = vmlinux[initrd_xz_offset:].index(b'\x00\x00\x00\x00\x01\x59\x5A') + 7 initrd_xz = vmlinux[initrd_xz_offset:initrd_xz_offset+initrd_xz_size] new_initrd_xz = patch_initrd_xz(initrd_xz,key_dict) new_vmlinux = vmlinux.replace(initrd_xz,new_initrd_xz) new_vmlinux_xz = lzma.compress(new_vmlinux,check=lzma.CHECK_CRC32,filters=[{"id": lzma.FILTER_LZMA2, "preset": 9,}] ) assert len(new_vmlinux_xz) <= len(vmlinux_xz),'new vmlinux xz size is too big' print(f'new vmlinux xz size:{len(new_vmlinux_xz)}') print(f'old vmlinux xz size:{len(vmlinux_xz)}') print(f'ljust size:{len(vmlinux_xz)-len(new_vmlinux_xz)}') new_vmlinux_xz = new_vmlinux_xz.ljust(len(vmlinux_xz),b'\0') new_data = data.replace(vmlinux_xz,new_vmlinux_xz) return new_data def patch_netinstall(key_dict: dict,input_file,output_file=None): netinstall = open(input_file,'rb').read() if netinstall[:2] == b'MZ': from package import check_install_package check_install_package(['pefile']) import pefile ROUTEROS_BOOT = { 129:{'arch':'power','name':'Powerboot'}, 130:{'arch':'e500','name':'e500_boot'}, 131:{'arch':'mips','name':'Mips_boot'}, 135:{'arch':'400','name':'440__boot'}, 136:{'arch':'tile','name':'tile_boot'}, 137:{'arch':'arm','name':'ARM__boot'}, 138:{'arch':'mmips','name':'MMipsBoot'}, 139:{'arch':'arm64','name':'ARM64__boot'}, 143:{'arch':'x86_64','name':'x86_64boot'} } with pefile.PE(input_file) as pe: for resource in pe.DIRECTORY_ENTRY_RESOURCE.entries: if resource.id == pefile.RESOURCE_TYPE["RT_RCDATA"]: for sub_resource in resource.directory.entries: if sub_resource.id in ROUTEROS_BOOT: bootloader = ROUTEROS_BOOT[sub_resource.id] print(f'found {bootloader["arch"]}({sub_resource.id}) bootloader') rva = sub_resource.directory.entries[0].data.struct.OffsetToData size = sub_resource.directory.entries[0].data.struct.Size data = pe.get_data(rva,size) _size = struct.unpack(' 0: for package in npk._packages: patch_npk_package(package,key_dict) else: patch_npk_package(npk,key_dict) npk.sign(kcdsa_private_key,eddsa_private_key) npk.save(output_file or input_file) if __name__ == '__main__': import argparse,os parser = argparse.ArgumentParser(description='MikroTik patcher') subparsers = parser.add_subparsers(dest="command") npk_parser = subparsers.add_parser('npk',help='patch and sign npk file') npk_parser.add_argument('input',type=str, help='Input file') npk_parser.add_argument('-O','--output',type=str,help='Output file') kernel_parser = subparsers.add_parser('kernel',help='patch kernel file') kernel_parser.add_argument('input',type=str, help='Input file') kernel_parser.add_argument('-O','--output',type=str,help='Output file') block_parser = subparsers.add_parser('block',help='patch block file') block_parser.add_argument('dev',type=str, help='block device') block_parser.add_argument('file',type=str, help='file path') netinstall_parser = subparsers.add_parser('netinstall',help='patch netinstall file') netinstall_parser.add_argument('input',type=str, help='Input file') netinstall_parser.add_argument('-O','--output',type=str,help='Output file') args = parser.parse_args() key_dict = { bytes.fromhex(os.environ['MIKRO_LICENSE_PUBLIC_KEY']):bytes.fromhex(os.environ['CUSTOM_LICENSE_PUBLIC_KEY']), bytes.fromhex(os.environ['MIKRO_NPK_SIGN_PUBLIC_KEY']):bytes.fromhex(os.environ['CUSTOM_NPK_SIGN_PUBLIC_KEY']) } kcdsa_private_key = bytes.fromhex(os.environ['CUSTOM_LICENSE_PRIVATE_KEY']) eddsa_private_key = bytes.fromhex(os.environ['CUSTOM_NPK_SIGN_PRIVATE_KEY']) if args.command =='npk': print(f'patching {args.input} ...') patch_npk_file(key_dict,kcdsa_private_key,eddsa_private_key,args.input,args.output) elif args.command == 'kernel': print(f'patching {args.input} ...') data = patch_kernel(open(args.input,'rb').read(),key_dict) open(args.output or args.input,'wb').write(data) elif args.command == 'block': print(f'patching {args.file} in {args.dev} ...') patch_block(args.dev,args.file,key_dict) elif args.command == 'netinstall': print(f'patching {args.input} ...') patch_netinstall(key_dict,args.input,args.output) else: parser.print_help()