Added custom exception classes as requested in #1. Please catch 'NPKError' and 'NPKIdError'.

This commit is contained in:
botlabsDev 2020-06-25 17:15:46 +02:00
parent 3d81964135
commit c5e6219ab4
8 changed files with 77 additions and 61 deletions

View file

@ -16,28 +16,27 @@ def extract_container(npk_file, export_folder, container_ids):
def write_to_file(file, payloads):
payloads = [payloads] if not isinstance(payloads, list) else payloads
with open(file, "wb") as _file:
for payload in payloads:
_file.write(payload)
def get_pkt_info(file) -> List:
def get_short_pkt_info(file) -> List:
return [str(file.file.name)]
def get_cnt_info(file) -> List:
return [f"Cnt:{pos:3}:{c.cnt_id_name}" for pos, c in file.pck_enumerate_cnt]
def get_full_pkt_info(file) -> List:
output = get_pkt_info(file)
output += get_cnt_info(file)
output = get_short_pkt_info(file)
output += get_short_cnt_info(file)
for cnt in file.pck_cnt_list:
output += get_full_cnt_info(cnt)
return output
def get_short_cnt_info(file) -> List:
return [f"Cnt:{pos:3}:{c.cnt_id_name}" for pos, c in file.pck_enumerate_cnt]
def get_full_cnt_info(cnt) -> List:
info = []
id_name, options = cnt.output_cnt
@ -61,3 +60,11 @@ def sha1_sum_from_binary(payloads):
sha1.update(payload)
return sha1.digest()
class NPKIdError(BaseException):
pass
class NPKError(BaseException):
pass

View file

@ -10,30 +10,34 @@ def parse_args():
parser = argparse.ArgumentParser(description='npkPy is an unpacking tool for MikroTiks custom NPK container format')
input_group = parser.add_argument_group("input")
input_group.add_argument("--files", action='append', type=Path,
help="Select one or more files to process")
input_group.add_argument("--src-folder", type=Path, default=Path("."),
input_group.add_argument("--files",
action='append', type=Path, help="Select one or more files to process")
input_group.add_argument("--src-folder",
type=Path, default=Path("."),
help="Process all NPK files found recursively in given source folder.")
input_filter_group = input_group.add_mutually_exclusive_group()
input_filter_group.add_argument("--glob", type=str, default=None,
input_filter_group.add_argument("--glob",
type=str, default=None,
help="Simple glob. Filter files from --srcFolder which match the given string.")
output_group = parser.add_argument_group("output")
output_group.add_argument("--dst-folder", type=Path, default=Path(".") / "exportNpk",
output_group.add_argument("--dst-folder",
type=Path, default=Path(".") / "exportNpk",
help="Extract container into given folder")
action_group = parser.add_argument_group("actions")
exclusive_action = action_group.add_mutually_exclusive_group(required=True)
exclusive_action.add_argument("--show-container", action="store_true",
help="List all container from selected NPK files")
exclusive_action.add_argument("--export-all", action="store_true",
help="Export all container from selected NPK files")
exclusive_action.add_argument("--show-container",
action="store_true", help="List all container from selected NPK files")
exclusive_action.add_argument("--export-all",
action="store_true", help="Export all container from selected NPK files")
exclusive_action.add_argument("--export-squashfs", action="store_true",
help="Export all SquashFs container from selected NPK files")
exclusive_action.add_argument("--export-zlib", action="store_true",
exclusive_action.add_argument("--export-zlib",
action="store_true",
help="Export all Zlib compressed container from selected NPK files")
return parser.parse_args()

View file

@ -1,6 +1,8 @@
import logging
import struct
from npkpy.common import NPKError
BYTES_LEN_CNT_ID = 2
BYTES_LEN_CNT_PAYLOAD_LEN = 4
@ -32,7 +34,7 @@ class CntBasic:
def cnt_id(self):
cnt_id = struct.unpack_from(b"h", self._data, 0)[0]
if cnt_id != self._regular_cnt_id:
raise RuntimeError(f"Cnt object does not represent given container typ {self._regular_cnt_id}/{cnt_id}")
raise NPKError(f"Cnt object does not represent given container typ {self._regular_cnt_id}/{cnt_id}")
return cnt_id
@property

View file

@ -1,6 +1,7 @@
import struct
from pathlib import Path
from npkpy.common import NPKError, NPKIdError
from npkpy.npk.npk_constants import CNT_HANDLER
from npkpy.npk.cnt_basic import BYTES_LEN_CNT_ID, BYTES_LEN_CNT_PAYLOAD_LEN
from npkpy.npk.npk_file_basic import FileBasic
@ -9,20 +10,21 @@ MAGIC_BYTES = b"\x1e\xf1\xd0\xba"
BYTES_LEN_MAGIC_HEADER = 4
BYTES_LEN_PCK_SIZE_LEN = 4
"""
0____4____8____b____f
| | | | |
0_|AAAA|BBBB| C ..... |
1_|....|....|....|....|
A = MAGIC BYTES (4)
B = PCK SIZE (4)
C = Begin of Container area
"""
class Npk(FileBasic):
"""
0____4____8____b____f
| | | | |
0_|AAAA|BBBB| C ..... |
1_|....|....|....|....|
A = MAGIC BYTES (4)
B = PCK SIZE (4)
C = Begin of Container area
"""
__cnt_list = None
def __init__(self, file_path: Path):
@ -34,12 +36,12 @@ class Npk(FileBasic):
@property
def pck_magic_bytes(self):
return struct.unpack_from(b"4s", self._data, 0)[0]
return struct.unpack_from("4s", self._data, 0)[0]
@property
def pck_payload_len(self):
self.__pck_payload_size_update()
payload_len = struct.unpack_from(b"I", self._data, 4)[0]
payload_len = struct.unpack_from("I", self._data, 4)[0]
return payload_len
def __pck_payload_size_update(self):
@ -48,7 +50,7 @@ class Npk(FileBasic):
for cnt in self.pck_cnt_list:
current_size += cnt.cnt_full_length
cnt.modified = False
struct.pack_into(b"I", self._data, 4, current_size)
struct.pack_into("I", self._data, 4, current_size)
@property
def pck_full_size(self):
@ -87,15 +89,14 @@ class Npk(FileBasic):
data = self.read_data_from_file(offset, pkt_len)
if len(data) != pkt_len:
raise RuntimeError(f"File maybe corrupted. Please download again. File: {self.file.absolute()}")
raise NPKError(f"File maybe corrupted. Please download again. File: {self.file.absolute()}")
try:
return CNT_HANDLER[cnt_id](data, offset)
except KeyError:
raise RuntimeError(f"failed with id: {cnt_id}\n"
f"New cnt id discovered in file: {self.file.absolute()}")
# except TypeError:
# raise RuntimeError(f"failed with id: {cnt_id}\n{self.file.absolute()}")
raise NPKIdError(f"Failed with cnt id: {cnt_id}\n"
f"New cnt id discovered in file: {self.file.absolute()}")
def _check_magic_bytes(self, error_msg):
if not self.pck_magic_bytes == MAGIC_BYTES:
raise RuntimeError(error_msg)
raise NPKError(error_msg)

View file

@ -1,6 +1,7 @@
import struct
import unittest
from npkpy.common import NPKError
from npkpy.npk.cnt_basic import CntBasic
from tests.constants import DummyBasicCnt
@ -17,7 +18,7 @@ class Test_CntBasic(unittest.TestCase):
dummy_cnt = DummyBasicCnt()
dummy_cnt._00_cnt_id = struct.pack("h", 999)
cnt = CntBasic(dummy_cnt.cnt_full_binary, offset_in_pck=0)
with self.assertRaises(RuntimeError) as _exception:
with self.assertRaises(NPKError) as _exception:
_ = cnt.cnt_id
self.assertEqual("Cnt object does not represent given container typ -1/999", _exception.exception.args[0])

View file

@ -3,7 +3,7 @@ import unittest
from pathlib import Path, PosixPath
from unittest.mock import Mock
from npkpy.common import get_pkt_info, get_cnt_info, get_all_nkp_files, write_to_file, extract_container, \
from npkpy.common import get_short_pkt_info, get_short_cnt_info, get_all_nkp_files, write_to_file, extract_container, \
get_full_cnt_info, get_full_pkt_info, sha1_sum_from_binary, sha1_sum_from_file
from npkpy.npk.cnt_basic import CntBasic
from npkpy.npk.npk import Npk
@ -101,12 +101,12 @@ class Common_Test(unittest.TestCase):
npkFile = Mock()
npkFile.file = self.file
self.assertEqual([self.file.name], get_pkt_info(npkFile))
self.assertEqual([self.file.name], get_short_pkt_info(npkFile))
def test_getBasicCntInfo(self):
self.file.write_bytes(get_dummy_npk_binary())
self.assertEqual(['Cnt: 0:PckHeader'], get_cnt_info(Npk(self.file)))
self.assertEqual(['Cnt: 0:PckHeader'], get_short_cnt_info(Npk(self.file)))
def test_extractPayloadFromCnt_createFilesWithPayload(self):
self.file.write_bytes(get_dummy_npk_binary())

View file

@ -3,10 +3,10 @@ import struct
from npkpy.npk.pck_header import NPK_PCK_HEADER
MAGICBYTES = b"\x1e\xf1\xd0\xba"
MAGIC_BYTES = b"\x1e\xf1\xd0\xba"
PCKSIZE = struct.pack("I", 28)
MAGIC_AND_SIZE = MAGICBYTES + PCKSIZE
MAGIC_AND_SIZE = MAGIC_BYTES + PCKSIZE
# OPENING ARCHITECTURE_TAG
SET_HEADER_TAG_ID = struct.pack("H", NPK_PCK_HEADER) # b"\x01\x00"
@ -32,12 +32,12 @@ MINIMAL_NPK_PACKAGE = MAGIC_AND_SIZE + \
CNT_CLOSING_ARCHITECTURE_TAG
def get_dummy_npk_binary(payload=None):
if not payload:
payload = DummyHeaderCnt().get_binary
pckPayload = payload
def get_dummy_npk_binary(cnt=None):
if not cnt:
cnt = DummyHeaderCnt().get_binary
pckPayload = cnt
pckLen = struct.pack("I", len(pckPayload))
npkBinary = MAGICBYTES + pckLen + pckPayload
npkBinary = MAGIC_BYTES + pckLen + pckPayload
return npkBinary
@ -134,8 +134,8 @@ class DummyRequirementsHeader:
class DummyBasicCnt:
_00_cnt_id = struct.pack("h", -1)
_01_cnt_payload_len = struct.pack("I", 7)
_02_cnt_payload = struct.pack("7s", b"Payload")
_01_cnt_payload_len = struct.pack("I", len(_02_cnt_payload))
@property
def cnt_full_binary(self):

View file

@ -3,9 +3,10 @@ import tempfile
import unittest
from pathlib import Path
from npkpy.common import NPKError, NPKIdError
from npkpy.npk.npk import Npk
from npkpy.npk.pck_header import PckHeader
from tests.constants import DummyHeaderCnt, MAGICBYTES, get_dummy_npk_binary
from tests.constants import DummyHeaderCnt, MAGIC_BYTES, get_dummy_npk_binary
class Test_npkClass(unittest.TestCase):
@ -17,20 +18,20 @@ class Test_npkClass(unittest.TestCase):
def test_fileIsNoNpkFile(self):
self.npkFile.write_bytes(b"NoMagicBytesAtHeadOfFile")
with self.assertRaises(RuntimeError) as e:
with self.assertRaises(NPKError) as e:
_ = Npk(self.npkFile).pck_magic_bytes
self.assertEqual(e.exception.args[0], "Magic bytes not found in Npk file")
def test_npkFileIsCorrupt_fileCorruptException(self):
self.npkFile.write_bytes(MAGICBYTES + b"CorruptFile")
self.npkFile.write_bytes(MAGIC_BYTES + b"CorruptFile")
with self.assertRaises(RuntimeError) as e:
with self.assertRaises(NPKError) as e:
_ = Npk(self.npkFile).pck_cnt_list
self.assertEqual(e.exception.args[0],
f"File maybe corrupted. Please download again. File: {self.npkFile.absolute()}")
def test_extractMagicBytes(self):
self.assertEqual(MAGICBYTES, Npk(self.npkFile).pck_magic_bytes)
self.assertEqual(MAGIC_BYTES, Npk(self.npkFile).pck_magic_bytes)
def test_extractLenOfNpkPayload_propagatedSizeIsValid(self):
self.assertEqual(len(DummyHeaderCnt().get_binary), Npk(self.npkFile).pck_payload_len)
@ -60,9 +61,9 @@ class Test_npkClass(unittest.TestCase):
def test_getAllCnt_exceptionWithUnknownCntInNpk(self):
unknownCnt = DummyHeaderCnt()
unknownCnt._00_cnt_id = struct.pack("H", 999)
self.npkFile.write_bytes(get_dummy_npk_binary(payload=unknownCnt.get_binary))
self.npkFile.write_bytes(get_dummy_npk_binary(cnt=unknownCnt.get_binary))
with self.assertRaises(RuntimeError) as e:
with self.assertRaises(NPKIdError) as e:
_ = Npk(self.npkFile).pck_cnt_list
self.assertEqual(e.exception.args[0], f"failed with id: 999\n"
self.assertEqual(e.exception.args[0], f"Failed with cnt id: 999\n"
f"New cnt id discovered in file: {self.npkFile.absolute()}")