diff --git a/npkpy/common.py b/npkpy/common.py index cc42948..d6973ec 100644 --- a/npkpy/common.py +++ b/npkpy/common.py @@ -16,28 +16,27 @@ def extract_container(npk_file, export_folder, container_ids): def write_to_file(file, payloads): payloads = [payloads] if not isinstance(payloads, list) else payloads - with open(file, "wb") as _file: for payload in payloads: _file.write(payload) -def get_pkt_info(file) -> List: +def get_short_pkt_info(file) -> List: return [str(file.file.name)] -def get_cnt_info(file) -> List: - return [f"Cnt:{pos:3}:{c.cnt_id_name}" for pos, c in file.pck_enumerate_cnt] - - def get_full_pkt_info(file) -> List: - output = get_pkt_info(file) - output += get_cnt_info(file) + output = get_short_pkt_info(file) + output += get_short_cnt_info(file) for cnt in file.pck_cnt_list: output += get_full_cnt_info(cnt) return output +def get_short_cnt_info(file) -> List: + return [f"Cnt:{pos:3}:{c.cnt_id_name}" for pos, c in file.pck_enumerate_cnt] + + def get_full_cnt_info(cnt) -> List: info = [] id_name, options = cnt.output_cnt @@ -61,3 +60,11 @@ def sha1_sum_from_binary(payloads): sha1.update(payload) return sha1.digest() + + +class NPKIdError(BaseException): + pass + + +class NPKError(BaseException): + pass diff --git a/npkpy/main.py b/npkpy/main.py index bb33d60..41e553e 100644 --- a/npkpy/main.py +++ b/npkpy/main.py @@ -10,30 +10,34 @@ def parse_args(): parser = argparse.ArgumentParser(description='npkPy is an unpacking tool for MikroTiks custom NPK container format') input_group = parser.add_argument_group("input") - input_group.add_argument("--files", action='append', type=Path, - help="Select one or more files to process") - input_group.add_argument("--src-folder", type=Path, default=Path("."), + input_group.add_argument("--files", + action='append', type=Path, help="Select one or more files to process") + + input_group.add_argument("--src-folder", + type=Path, default=Path("."), help="Process all NPK files found recursively in given source folder.") input_filter_group = input_group.add_mutually_exclusive_group() - input_filter_group.add_argument("--glob", type=str, default=None, + input_filter_group.add_argument("--glob", + type=str, default=None, help="Simple glob. Filter files from --srcFolder which match the given string.") output_group = parser.add_argument_group("output") - output_group.add_argument("--dst-folder", type=Path, default=Path(".") / "exportNpk", + output_group.add_argument("--dst-folder", + type=Path, default=Path(".") / "exportNpk", help="Extract container into given folder") action_group = parser.add_argument_group("actions") exclusive_action = action_group.add_mutually_exclusive_group(required=True) - exclusive_action.add_argument("--show-container", action="store_true", - help="List all container from selected NPK files") - exclusive_action.add_argument("--export-all", action="store_true", - help="Export all container from selected NPK files") + exclusive_action.add_argument("--show-container", + action="store_true", help="List all container from selected NPK files") + exclusive_action.add_argument("--export-all", + action="store_true", help="Export all container from selected NPK files") exclusive_action.add_argument("--export-squashfs", action="store_true", help="Export all SquashFs container from selected NPK files") - exclusive_action.add_argument("--export-zlib", action="store_true", + exclusive_action.add_argument("--export-zlib", + action="store_true", help="Export all Zlib compressed container from selected NPK files") - return parser.parse_args() diff --git a/npkpy/npk/cnt_basic.py b/npkpy/npk/cnt_basic.py index 58fe336..9fcd3a8 100644 --- a/npkpy/npk/cnt_basic.py +++ b/npkpy/npk/cnt_basic.py @@ -1,6 +1,8 @@ import logging import struct +from npkpy.common import NPKError + BYTES_LEN_CNT_ID = 2 BYTES_LEN_CNT_PAYLOAD_LEN = 4 @@ -32,7 +34,7 @@ class CntBasic: def cnt_id(self): cnt_id = struct.unpack_from(b"h", self._data, 0)[0] if cnt_id != self._regular_cnt_id: - raise RuntimeError(f"Cnt object does not represent given container typ {self._regular_cnt_id}/{cnt_id}") + raise NPKError(f"Cnt object does not represent given container typ {self._regular_cnt_id}/{cnt_id}") return cnt_id @property diff --git a/npkpy/npk/npk.py b/npkpy/npk/npk.py index 26346c2..d62836e 100644 --- a/npkpy/npk/npk.py +++ b/npkpy/npk/npk.py @@ -1,6 +1,7 @@ import struct from pathlib import Path +from npkpy.common import NPKError, NPKIdError from npkpy.npk.npk_constants import CNT_HANDLER from npkpy.npk.cnt_basic import BYTES_LEN_CNT_ID, BYTES_LEN_CNT_PAYLOAD_LEN from npkpy.npk.npk_file_basic import FileBasic @@ -9,20 +10,21 @@ MAGIC_BYTES = b"\x1e\xf1\xd0\xba" BYTES_LEN_MAGIC_HEADER = 4 BYTES_LEN_PCK_SIZE_LEN = 4 +""" + 0____4____8____b____f + | | | | | +0_|AAAA|BBBB| C ..... | +1_|....|....|....|....| + + +A = MAGIC BYTES (4) +B = PCK SIZE (4) +C = Begin of Container area + +""" + class Npk(FileBasic): - """ - 0____4____8____b____f - | | | | | - 0_|AAAA|BBBB| C ..... | - 1_|....|....|....|....| - - - A = MAGIC BYTES (4) - B = PCK SIZE (4) - C = Begin of Container area - - """ __cnt_list = None def __init__(self, file_path: Path): @@ -34,12 +36,12 @@ class Npk(FileBasic): @property def pck_magic_bytes(self): - return struct.unpack_from(b"4s", self._data, 0)[0] + return struct.unpack_from("4s", self._data, 0)[0] @property def pck_payload_len(self): self.__pck_payload_size_update() - payload_len = struct.unpack_from(b"I", self._data, 4)[0] + payload_len = struct.unpack_from("I", self._data, 4)[0] return payload_len def __pck_payload_size_update(self): @@ -48,7 +50,7 @@ class Npk(FileBasic): for cnt in self.pck_cnt_list: current_size += cnt.cnt_full_length cnt.modified = False - struct.pack_into(b"I", self._data, 4, current_size) + struct.pack_into("I", self._data, 4, current_size) @property def pck_full_size(self): @@ -87,15 +89,14 @@ class Npk(FileBasic): data = self.read_data_from_file(offset, pkt_len) if len(data) != pkt_len: - raise RuntimeError(f"File maybe corrupted. Please download again. File: {self.file.absolute()}") + raise NPKError(f"File maybe corrupted. Please download again. File: {self.file.absolute()}") try: return CNT_HANDLER[cnt_id](data, offset) except KeyError: - raise RuntimeError(f"failed with id: {cnt_id}\n" - f"New cnt id discovered in file: {self.file.absolute()}") - # except TypeError: - # raise RuntimeError(f"failed with id: {cnt_id}\n{self.file.absolute()}") + raise NPKIdError(f"Failed with cnt id: {cnt_id}\n" + f"New cnt id discovered in file: {self.file.absolute()}") + def _check_magic_bytes(self, error_msg): if not self.pck_magic_bytes == MAGIC_BYTES: - raise RuntimeError(error_msg) + raise NPKError(error_msg) diff --git a/tests/cnt_basic_test.py b/tests/cnt_basic_test.py index 1166421..d349a2e 100644 --- a/tests/cnt_basic_test.py +++ b/tests/cnt_basic_test.py @@ -1,6 +1,7 @@ import struct import unittest +from npkpy.common import NPKError from npkpy.npk.cnt_basic import CntBasic from tests.constants import DummyBasicCnt @@ -17,7 +18,7 @@ class Test_CntBasic(unittest.TestCase): dummy_cnt = DummyBasicCnt() dummy_cnt._00_cnt_id = struct.pack("h", 999) cnt = CntBasic(dummy_cnt.cnt_full_binary, offset_in_pck=0) - with self.assertRaises(RuntimeError) as _exception: + with self.assertRaises(NPKError) as _exception: _ = cnt.cnt_id self.assertEqual("Cnt object does not represent given container typ -1/999", _exception.exception.args[0]) diff --git a/tests/common_test.py b/tests/common_test.py index 52d7bf2..b9dbc7a 100644 --- a/tests/common_test.py +++ b/tests/common_test.py @@ -3,7 +3,7 @@ import unittest from pathlib import Path, PosixPath from unittest.mock import Mock -from npkpy.common import get_pkt_info, get_cnt_info, get_all_nkp_files, write_to_file, extract_container, \ +from npkpy.common import get_short_pkt_info, get_short_cnt_info, get_all_nkp_files, write_to_file, extract_container, \ get_full_cnt_info, get_full_pkt_info, sha1_sum_from_binary, sha1_sum_from_file from npkpy.npk.cnt_basic import CntBasic from npkpy.npk.npk import Npk @@ -101,12 +101,12 @@ class Common_Test(unittest.TestCase): npkFile = Mock() npkFile.file = self.file - self.assertEqual([self.file.name], get_pkt_info(npkFile)) + self.assertEqual([self.file.name], get_short_pkt_info(npkFile)) def test_getBasicCntInfo(self): self.file.write_bytes(get_dummy_npk_binary()) - self.assertEqual(['Cnt: 0:PckHeader'], get_cnt_info(Npk(self.file))) + self.assertEqual(['Cnt: 0:PckHeader'], get_short_cnt_info(Npk(self.file))) def test_extractPayloadFromCnt_createFilesWithPayload(self): self.file.write_bytes(get_dummy_npk_binary()) diff --git a/tests/constants.py b/tests/constants.py index 93a830e..4610de9 100644 --- a/tests/constants.py +++ b/tests/constants.py @@ -3,10 +3,10 @@ import struct from npkpy.npk.pck_header import NPK_PCK_HEADER -MAGICBYTES = b"\x1e\xf1\xd0\xba" +MAGIC_BYTES = b"\x1e\xf1\xd0\xba" PCKSIZE = struct.pack("I", 28) -MAGIC_AND_SIZE = MAGICBYTES + PCKSIZE +MAGIC_AND_SIZE = MAGIC_BYTES + PCKSIZE # OPENING ARCHITECTURE_TAG SET_HEADER_TAG_ID = struct.pack("H", NPK_PCK_HEADER) # b"\x01\x00" @@ -32,12 +32,12 @@ MINIMAL_NPK_PACKAGE = MAGIC_AND_SIZE + \ CNT_CLOSING_ARCHITECTURE_TAG -def get_dummy_npk_binary(payload=None): - if not payload: - payload = DummyHeaderCnt().get_binary - pckPayload = payload +def get_dummy_npk_binary(cnt=None): + if not cnt: + cnt = DummyHeaderCnt().get_binary + pckPayload = cnt pckLen = struct.pack("I", len(pckPayload)) - npkBinary = MAGICBYTES + pckLen + pckPayload + npkBinary = MAGIC_BYTES + pckLen + pckPayload return npkBinary @@ -134,8 +134,8 @@ class DummyRequirementsHeader: class DummyBasicCnt: _00_cnt_id = struct.pack("h", -1) - _01_cnt_payload_len = struct.pack("I", 7) _02_cnt_payload = struct.pack("7s", b"Payload") + _01_cnt_payload_len = struct.pack("I", len(_02_cnt_payload)) @property def cnt_full_binary(self): diff --git a/tests/npk_test.py b/tests/npk_test.py index d90dd51..e720424 100644 --- a/tests/npk_test.py +++ b/tests/npk_test.py @@ -3,9 +3,10 @@ import tempfile import unittest from pathlib import Path +from npkpy.common import NPKError, NPKIdError from npkpy.npk.npk import Npk from npkpy.npk.pck_header import PckHeader -from tests.constants import DummyHeaderCnt, MAGICBYTES, get_dummy_npk_binary +from tests.constants import DummyHeaderCnt, MAGIC_BYTES, get_dummy_npk_binary class Test_npkClass(unittest.TestCase): @@ -17,20 +18,20 @@ class Test_npkClass(unittest.TestCase): def test_fileIsNoNpkFile(self): self.npkFile.write_bytes(b"NoMagicBytesAtHeadOfFile") - with self.assertRaises(RuntimeError) as e: + with self.assertRaises(NPKError) as e: _ = Npk(self.npkFile).pck_magic_bytes self.assertEqual(e.exception.args[0], "Magic bytes not found in Npk file") def test_npkFileIsCorrupt_fileCorruptException(self): - self.npkFile.write_bytes(MAGICBYTES + b"CorruptFile") + self.npkFile.write_bytes(MAGIC_BYTES + b"CorruptFile") - with self.assertRaises(RuntimeError) as e: + with self.assertRaises(NPKError) as e: _ = Npk(self.npkFile).pck_cnt_list self.assertEqual(e.exception.args[0], f"File maybe corrupted. Please download again. File: {self.npkFile.absolute()}") def test_extractMagicBytes(self): - self.assertEqual(MAGICBYTES, Npk(self.npkFile).pck_magic_bytes) + self.assertEqual(MAGIC_BYTES, Npk(self.npkFile).pck_magic_bytes) def test_extractLenOfNpkPayload_propagatedSizeIsValid(self): self.assertEqual(len(DummyHeaderCnt().get_binary), Npk(self.npkFile).pck_payload_len) @@ -60,9 +61,9 @@ class Test_npkClass(unittest.TestCase): def test_getAllCnt_exceptionWithUnknownCntInNpk(self): unknownCnt = DummyHeaderCnt() unknownCnt._00_cnt_id = struct.pack("H", 999) - self.npkFile.write_bytes(get_dummy_npk_binary(payload=unknownCnt.get_binary)) + self.npkFile.write_bytes(get_dummy_npk_binary(cnt=unknownCnt.get_binary)) - with self.assertRaises(RuntimeError) as e: + with self.assertRaises(NPKIdError) as e: _ = Npk(self.npkFile).pck_cnt_list - self.assertEqual(e.exception.args[0], f"failed with id: 999\n" + self.assertEqual(e.exception.args[0], f"Failed with cnt id: 999\n" f"New cnt id discovered in file: {self.npkFile.absolute()}")