modified: npk.py

modified:   patch.py
This commit is contained in:
zyb 2024-07-20 05:38:38 +08:00
parent e75dd964ec
commit 7f54cf6098
2 changed files with 105 additions and 53 deletions

133
npk.py
View file

@ -147,27 +147,52 @@ class NpkFileContainer:
for item in self._items: for item in self._items:
yield item yield item
class Package:
def __init__(self) -> None:
self._parts:list[NpkPartItem] = []
def __iter__(self):
for part in self._parts:
yield part
class NovaPackage: def __getitem__(self, id:NpkPartID):
for part in self._parts:
if part.id == id:
return part
part = NpkPartItem(id,b'')
self._parts.append(part)
return part
class NovaPackage(Package):
NPK_MAGIC = 0xbad0f11e NPK_MAGIC = 0xbad0f11e
def __init__(self,data:bytes=b''): def __init__(self,data:bytes=b''):
self._parts:list[NpkPartItem] = [] super().__init__()
self._packages:list[Package] = []
offset = 0 offset = 0
self._has_pkg = False
while offset < len(data): while offset < len(data):
part_id,part_size = struct.unpack_from('<HI',data,offset) part_id,part_size = struct.unpack_from('<HI',data,offset)
offset += 6 offset += 6
part_data = data[offset:offset+part_size] part_data = data[offset:offset+part_size]
offset += part_size offset += part_size
if part_id == NpkPartID.NAME_INFO: if part_id == NpkPartID.PKG_FEATURES:
self._parts.append(NpkPartItem(NpkPartID(part_id),NpkNameInfo.unserialize_from(part_data))) self._has_pkg = True
# elif part_id == NpkPartID.FILE_CONTAINER:
# self._parts.append(NpkPartItem(NpkPartID(part_id),NpkFileContainer.unserialize_from(part_data)))
else:
self._parts.append(NpkPartItem(NpkPartID(part_id),part_data)) self._parts.append(NpkPartItem(NpkPartID(part_id),part_data))
continue
if self._has_pkg:
if part_id == NpkPartID.NAME_INFO:
self._packages.append(Package())
self._packages[-1]._parts.append(NpkPartItem(NpkPartID(part_id),NpkNameInfo.unserialize_from(part_data)))
else:
self._packages[-1]._parts.append(NpkPartItem(NpkPartID(part_id),part_data))
else:
if part_id == NpkPartID.NAME_INFO:
self._parts.append(NpkPartItem(NpkPartID(part_id),NpkNameInfo.unserialize_from(part_data)))
else:
self._parts.append(NpkPartItem(NpkPartID(part_id),part_data))
def get_digest(self,hash_fnc,package:Package=None)->bytes:
def get_digest(self,hash_fnc)->bytes: parts = package._parts if package else self._parts
for part in self._parts: for part in parts:
data_header = struct.pack('<HI',part.id.value,len(part.data)) data_header = struct.pack('<HI',part.id.value,len(part.data))
if part.id == NpkPartID.HEADER: if part.id == NpkPartID.HEADER:
continue continue
@ -185,44 +210,63 @@ class NovaPackage:
def sign(self,kcdsa_private_key:bytes,eddsa_private_key:bytes): def sign(self,kcdsa_private_key:bytes,eddsa_private_key:bytes):
import hashlib import hashlib
from mikro import mikro_kcdsa_sign,mikro_eddsa_sign from mikro import mikro_kcdsa_sign,mikro_eddsa_sign
if len(self[NpkPartID.SIGNATURE].data) != 20+48+64: build_time = os.environ['BUILD_TIME'] if 'BUILD_TIME' in os.environ else None
self[NpkPartID.SIGNATURE].data = b'\0'*(20+48+64) if NpkPartID.SIGNATURE in self._parts:
sha1_digest = self.get_digest(hashlib.new('SHA1')) if len(self[NpkPartID.SIGNATURE].data) != 20+48+64:
sha256_digest = self.get_digest(hashlib.new('SHA256')) self[NpkPartID.SIGNATURE].data = b'\0'*(20+48+64)
kcdsa_signature = mikro_kcdsa_sign(sha256_digest[:20],kcdsa_private_key) if build_time:
eddsa_signature = mikro_eddsa_sign(sha256_digest,eddsa_private_key) npk[NpkPartID.NAME_INFO].data._build_time = int(build_time)
self[NpkPartID.SIGNATURE].data = sha1_digest + kcdsa_signature + eddsa_signature sha1_digest = self.get_digest(hashlib.new('SHA1'))
sha256_digest = self.get_digest(hashlib.new('SHA256'))
kcdsa_signature = mikro_kcdsa_sign(sha256_digest[:20],kcdsa_private_key)
eddsa_signature = mikro_eddsa_sign(sha256_digest,eddsa_private_key)
self[NpkPartID.SIGNATURE].data = sha1_digest + kcdsa_signature + eddsa_signature
else:
for package in self._packages:
if len(package[NpkPartID.SIGNATURE].data) != 20+48+64:
package[NpkPartID.SIGNATURE].data = b'\0'*(20+48+64)
if build_time:
npk[NpkPartID.NAME_INFO].data._build_time = int(build_time)
sha1_digest = self.get_digest(hashlib.new('SHA1'),package)
sha256_digest = self.get_digest(hashlib.new('SHA256'),package)
kcdsa_signature = mikro_kcdsa_sign(sha256_digest[:20],kcdsa_private_key)
eddsa_signature = mikro_eddsa_sign(sha256_digest,eddsa_private_key)
package[NpkPartID.SIGNATURE].data = sha1_digest + kcdsa_signature + eddsa_signature
def verify(self,kcdsa_public_key:bytes,eddsa_public_key:bytes): def verify(self,kcdsa_public_key:bytes,eddsa_public_key:bytes):
import hashlib import hashlib
from mikro import mikro_kcdsa_verify,mikro_eddsa_verify from mikro import mikro_kcdsa_verify,mikro_eddsa_verify
sha1_digest = self.get_digest(hashlib.new('SHA1')) if NpkPartID.SIGNATURE in self._parts:
sha256_digest = self.get_digest(hashlib.new('SHA256')) sha1_digest = self.get_digest(hashlib.new('SHA1'))
signature = self[NpkPartID.SIGNATURE].data sha256_digest = self.get_digest(hashlib.new('SHA256'))
if sha1_digest != signature[:20]: signature = self[NpkPartID.SIGNATURE].data
return False if sha1_digest != signature[:20]:
if not mikro_kcdsa_verify(sha256_digest[:20],signature[20:68],kcdsa_public_key): return False
return False if not mikro_kcdsa_verify(sha256_digest[:20],signature[20:68],kcdsa_public_key):
if not mikro_eddsa_verify(sha256_digest,signature[68:132],eddsa_public_key): return False
return False if not mikro_eddsa_verify(sha256_digest,signature[68:132],eddsa_public_key):
return False
else:
for package in self._packages:
sha1_digest = self.get_digest(hashlib.new('SHA1'),package)
sha256_digest = self.get_digest(hashlib.new('SHA256'),package)
signature = package[NpkPartID.SIGNATURE].data
if sha1_digest != signature[:20]:
return False
if not mikro_kcdsa_verify(sha256_digest[:20],signature[20:68],kcdsa_public_key):
return False
if not mikro_eddsa_verify(sha256_digest,signature[68:132],eddsa_public_key):
return False
return True return True
def __iter__(self):
for part in self._parts:
yield part
def __getitem__(self, id:NpkPartID):
for part in self._parts:
if part.id == id:
return part
part = NpkPartItem(id,b'')
self._parts.append(part)
return part
def save(self,file): def save(self,file):
size = 0 size = 0
for part in self._parts: for part in self._parts:
size += 6 + len(part.data) size += 6 + len(part.data)
for package in self._packages:
for part in package:
size += 6 + len(part.data)
with open(file,'wb') as f: with open(file,'wb') as f:
f.write(struct.pack('<II', NovaPackage.NPK_MAGIC, size)) f.write(struct.pack('<II', NovaPackage.NPK_MAGIC, size))
for part in self._parts: for part in self._parts:
@ -231,6 +275,13 @@ class NovaPackage:
f.write(part.data) f.write(part.data)
else: else:
f.write(part.data.serialize()) f.write(part.data.serialize())
for package in self._packages:
for part in package:
f.write(struct.pack('<HI',part.id.value ,len(part.data)))
if isinstance(part.data,bytes):
f.write(part.data)
else:
f.write(part.data.serialize())
@staticmethod @staticmethod
def load(file): def load(file):
@ -263,12 +314,10 @@ if __name__=='__main__':
kcdsa_public_key = bytes.fromhex(os.environ['CUSTOM_LICENSE_PUBLIC_KEY']) kcdsa_public_key = bytes.fromhex(os.environ['CUSTOM_LICENSE_PUBLIC_KEY'])
eddsa_public_key = bytes.fromhex(os.environ['CUSTOM_NPK_SIGN_PUBLIC_KEY']) eddsa_public_key = bytes.fromhex(os.environ['CUSTOM_NPK_SIGN_PUBLIC_KEY'])
build_time = os.environ['BUILD_TIME'] if 'BUILD_TIME' in os.environ else None
if args.command =='sign': if args.command =='sign':
print(f'Signing {args.input}') print(f'Signing {args.input}')
npk = NovaPackage.load(args.input) npk = NovaPackage.load(args.input)
if build_time:
npk[NpkPartID.NAME_INFO].data._build_time = int(build_time)
npk.sign(kcdsa_private_key,eddsa_private_key) npk.sign(kcdsa_private_key,eddsa_private_key)
npk.save(args.output) npk.save(args.output)
elif args.command == 'verify': elif args.command == 'verify':
@ -284,8 +333,6 @@ if __name__=='__main__':
print(f'Creating {args.output} from {args.input}') print(f'Creating {args.output} from {args.input}')
option_npk = NovaPackage.load(args.input) option_npk = NovaPackage.load(args.input)
option_npk[NpkPartID.NAME_INFO].data.name = args.name option_npk[NpkPartID.NAME_INFO].data.name = args.name
if build_time:
option_npk[NpkPartID.NAME_INFO].data._build_time = int(build_time)
option_npk[NpkPartID.DESCRIPTION].data = args.description.encode() if args.description else args.name.encode() option_npk[NpkPartID.DESCRIPTION].data = args.description.encode() if args.description else args.name.encode()
option_npk[NpkPartID.NULL_BLOCK].data = b'' option_npk[NpkPartID.NULL_BLOCK].data = b''
option_npk[NpkPartID.SQUASHFS].data = open(args.squashfs,'rb').read() option_npk[NpkPartID.SQUASHFS].data = open(args.squashfs,'rb').read()

View file

@ -266,10 +266,10 @@ def run_shell_command(command):
process = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.stdout, process.stderr return process.stdout, process.stderr
def patch_npk_file(key_dict,kcdsa_private_key,eddsa_private_key,input_file,output_file=None):
npk = NovaPackage.load(input_file) def patch_npk_package(package,key_dict,kcdsa_private_key,eddsa_private_key):
if npk[NpkPartID.NAME_INFO].data.name == 'system': if package[NpkPartID.NAME_INFO].data.name == 'system':
file_container = NpkFileContainer.unserialize_from(npk[NpkPartID.FILE_CONTAINER].data) file_container = NpkFileContainer.unserialize_from(package[NpkPartID.FILE_CONTAINER].data)
for item in file_container: for item in file_container:
if item.name == b'boot/EFI/BOOT/BOOTX64.EFI': if item.name == b'boot/EFI/BOOT/BOOTX64.EFI':
print(f'patch {item.name} ...') print(f'patch {item.name} ...')
@ -282,11 +282,11 @@ def patch_npk_file(key_dict,kcdsa_private_key,eddsa_private_key,input_file,outpu
item.data = patch_kernel(item.data,key_dict) item.data = patch_kernel(item.data,key_dict)
open('initrd.rgz','wb').write(item.data) open('initrd.rgz','wb').write(item.data)
npk[NpkPartID.FILE_CONTAINER].data = file_container.serialize() package[NpkPartID.FILE_CONTAINER].data = file_container.serialize()
try: try:
squashfs_file = 'squashfs-root.sfs' squashfs_file = 'squashfs-root.sfs'
extract_dir = 'squashfs-root' extract_dir = 'squashfs-root'
open(squashfs_file,'wb').write(npk[NpkPartID.SQUASHFS].data) open(squashfs_file,'wb').write(package[NpkPartID.SQUASHFS].data)
print(f"extract {squashfs_file} ...") print(f"extract {squashfs_file} ...")
_, stderr = run_shell_command(f"unsquashfs -d {extract_dir} {squashfs_file}") _, stderr = run_shell_command(f"unsquashfs -d {extract_dir} {squashfs_file}")
print(stderr.decode()) print(stderr.decode())
@ -306,11 +306,16 @@ def patch_npk_file(key_dict,kcdsa_private_key,eddsa_private_key,input_file,outpu
print(e) print(e)
print(f"clean ...") print(f"clean ...")
run_shell_command(f"rm -rf {extract_dir}") run_shell_command(f"rm -rf {extract_dir}")
npk[NpkPartID.SQUASHFS].data = open(squashfs_file,'rb').read() package[NpkPartID.SQUASHFS].data = open(squashfs_file,'rb').read()
run_shell_command(f"rm -f {squashfs_file}") run_shell_command(f"rm -f {squashfs_file}")
build_time = os.environ['BUILD_TIME'] if 'BUILD_TIME' in os.environ else None
if build_time: def patch_npk_file(key_dict,kcdsa_private_key,eddsa_private_key,input_file,output_file=None):
npk[NpkPartID.NAME_INFO].data._build_time = int(os.environ['BUILD_TIME']) npk = NovaPackage.load(input_file)
if NpkPartID.NAME_INFO in npk._parts:
patch_npk_package(npk,key_dict,kcdsa_private_key,eddsa_private_key)
else:
for package in npk._packages:
patch_npk_package(package,key_dict,kcdsa_private_key,eddsa_private_key)
npk.sign(kcdsa_private_key,eddsa_private_key) npk.sign(kcdsa_private_key,eddsa_private_key)
npk.save(output_file or input_file) npk.save(output_file or input_file)