diff --git a/.github/workflows/CICD.yml b/.github/workflows/CICD.yml new file mode 100644 index 0000000..5da45d1 --- /dev/null +++ b/.github/workflows/CICD.yml @@ -0,0 +1,65 @@ +name: npkpy +on: [push] + +jobs: + run-tests: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - name: Set up Python 3.6 + uses: actions/setup-python@v1 + with: + python-version: 3.6 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -e . + + - name: Lint with flake8 + run: | + pip install flake8 + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + + - name: Test with pytest + run: | + pip install pytest pytest-cov codecov + pytest --cov=./ + codecov --token=${{ secrets.CODECOV_TOKEN }} + + publish-to-pypi: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@master + - name: Set up Python 3.6 + uses: actions/setup-python@v1 + with: + python-version: 3.6 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -e . + + - name: Build a binary wheel and a source tarball + run: | + python3 setup.py sdist bdist_wheel + + - name: Publish distribution + uses: pypa/gh-action-pypi-publish@master + with: + password: ${{ secrets.TEST_PYPI_PASSWORD }} + repository_url: https://test.pypi.org/legacy/ + + - name: Publish distribution to PyPI + if: startsWith(github.event.ref, 'refs/tags') + uses: pypa/gh-action-pypi-publish@master + with: + password: ${{ secrets.PYPI_PASSWORD }} diff --git a/.gitignore b/.gitignore index b6e4761..66397ab 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,9 @@ dmypy.json # Pyre type checker .pyre/ + + +## custom +.idea +virtualenv/ + diff --git a/README.md b/README.md index f51ffb6..c0a90ad 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,60 @@ -# npkpy -MikroTik npk-format unpacker. +[![Actions Status](https://github.com/botlabsDev/npkpy/workflows/Pytest/badge.svg)](https://github.com/botlabsDev/npkpy/actions) +[![codecov](https://codecov.io/gh/botlabsDev/npkpy/branch/master/graph/badge.svg?token=4ns6uIqoln)](https://codecov.io/gh/botlabsDev/npkpy) + + + +# npkPy +The npkPy package module is an unpacking tool for MikroTiks custom NPK container format. The tool is capable +to display to the content of any NPK package and to export all container. + +["NPK stands for MikroTik RouterOS upgrade package"](https://whatis.techtarget.com/fileformat/NPK-MikroTik-RouterOS-upgrade-package) +and since there is no solid unpacking tool for the format available, I want to share my approach of it. +The format, in general, is used by MikroTik to install and update the software on MikroTiks routerOs systems. + +NPK packages can be found here: [MikroTik Archive](https://mikrotik.com/download/archive) + +The code covers the ability to modify the container payload. Yet, this ability won't be available for cli. +Please be aware, that you can't create or modify __valid__ packages [since they are signed](https://forum.mikrotik.com/viewtopic.php?t=87126). + +``` +All recent packages are signed with EC-KCDSA signature, +and there's no way to create a valid npk file unless you know a secret key. +``` + +## Usage + +``` +$ npkPy is an unpacking tool for MikroTiks custom NPK container format + +optional arguments: + -h, --help show this help message and exit + +input: + --files FILES Select one or more files to process + --srcFolder SRCFOLDER + Process all NPK files found recursively in given source folder. + --glob GLOB Simple glob. Filter files from --srcFolder which match the given string. + +output: + --dstFolder DSTFOLDER + Extract container into given folder + +actions: + --showContainer List all container from selected NPK files + --exportAll Export all container from selected NPK files + --exportSquashFs Export all SquashFs container from selected NPK files + --exportZlib Export all Zlib compressed container from selected NPK files + +``` + +Common understanding: A file represents an NPK package with multiple containers. +Each container 'contains' payloads like descriptions, SquashFs images or Zlib compressed data. + +## Other unpacking tools +If npkPy does not work for you, check out older approaches of NPK unpacking tools: +* [mikrotik-npk](https://github.com/kost/mikrotik-npk) +* [npk-tools](https://github.com/rsa9000/npk-tools) + + + + diff --git a/bootstrap.sh b/bootstrap.sh new file mode 100755 index 0000000..223f13c --- /dev/null +++ b/bootstrap.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +venv=${1:-virtualenv} + +## setup virtualenv if not already exist +if [[ ! -e ${venv} ]]; then + virtualenv --python=python ${venv} + ${venv}/bin/pip install pip --upgrade + ${venv}/bin/pip install pip -r requirements.txt + ${venv}/bin/pip install pip -e . +fi + +git config user.name "botlabsDev" +git config user.email "git@botlabs.dev" +echo "--git config--" +echo -n "git user:"; git config user.name +echo -n "git email:"; git config user.email +echo "--------------" + +## start virtualenv +source ${venv}/bin/activate + + + + + diff --git a/npkpy/__init__.py b/npkpy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/npkpy/analyseNpk.py b/npkpy/analyseNpk.py new file mode 100644 index 0000000..ff69097 --- /dev/null +++ b/npkpy/analyseNpk.py @@ -0,0 +1,31 @@ +from npkpy.common import getFullPktInfo, extractContainer +from npkpy.npk.cntSquasFsImage import NPK_SQUASH_FS_IMAGE +from npkpy.npk.cntZlibCompressedData import NPK_ZLIB_COMPRESSED_DATA +from npkpy.npk.npkConstants import CNT_HANDLER + +EXPORT_FOLDER_PREFIX = "npkPyExport_" + + +def analyseNpk(opts, npkFiles): + filterContainer = [] + + if opts.showContainer: + for file in npkFiles: + print("\n".join(getFullPktInfo(file))) + + if opts.exportAll: + filterContainer = CNT_HANDLER.keys() + if opts.exportSquashFs: + filterContainer = [NPK_SQUASH_FS_IMAGE] + if opts.exportZlib: + filterContainer = [NPK_ZLIB_COMPRESSED_DATA] + + if filterContainer: + for npkFile in npkFiles: + exportFolder = opts.dstFolder / f"{EXPORT_FOLDER_PREFIX}{npkFile.file.stem}" + exportFolder.mkdir(parents=True, exist_ok=True) + + extractContainer(npkFile, exportFolder, filterContainer) + + if not next(exportFolder.iterdir(), None): + exportFolder.rmdir() diff --git a/npkpy/common.py b/npkpy/common.py new file mode 100644 index 0000000..9b38c4b --- /dev/null +++ b/npkpy/common.py @@ -0,0 +1,66 @@ +import hashlib +from pathlib import Path +from typing import List + + +def getAllNkpFiles(path, containStr=None): + return path.glob(f"**/*{containStr}*.npk" if containStr else "**/*.npk") + + +def extractContainer(npkFile, exportFolder, filterContainer): + for position, cnt in enumerate(npkFile.pck_cntList): + if cnt.cnt_id in filterContainer: + filePath = exportFolder / f"{position:03}_cnt_{cnt.cnt_idName}.raw" + writeToFile(filePath, cnt.cnt_payload) + + +def writeToFile(file, payloads): + written = 0 + if not isinstance(payloads, list): + payloads = [payloads] + + with open(file, "wb") as f: + for payload in payloads: + f.write(payload) + written += len(payload) + + +def getPktInfo(file): + return [str(file.file.name)] + + +def getCntInfo(file) -> List: + return [f"Cnt:{pos:3}:{c.cnt_idName}" for pos, c in file.pck_enumerateCnt] + + +def getFullPktInfo(file) -> List: + output = getPktInfo(file) + output += getCntInfo(file) + for cnt in file.pck_cntList: + output += getFullCntInfo(cnt) + return output + + +def getFullCntInfo(cnt) -> List: + info = [] + idName, options = cnt.output_cnt + info.append(f"{idName}") + for option in options: + info.append(f" {option}") + return info + + +def sha1sumFromFile(file: Path): + with file.open('rb') as f: + return sha1sumFromBinary(f.read()) + + +def sha1sumFromBinary(payloads): + if len(payloads) == 0: + return "" + + sha1 = hashlib.sha1() + for payload in [payloads] if not isinstance(payloads, list) else payloads: + sha1.update(payload) + + return sha1.digest() diff --git a/npkpy/main.py b/npkpy/main.py new file mode 100644 index 0000000..0a73fb1 --- /dev/null +++ b/npkpy/main.py @@ -0,0 +1,43 @@ +import argparse +from pathlib import Path + +from npkpy.analyseNpk import analyseNpk +from npkpy.common import getAllNkpFiles +from npkpy.npk.npk import Npk + + +def parseArgs(): + parser = argparse.ArgumentParser(description='npkPy is an unpacking tool for MikroTiks custom NPK container format') + + inputGroup = parser.add_argument_group("input") + inputGroup.add_argument("--files", action='append', type=Path, + help="Select one or more files to process") + inputGroup.add_argument("--srcFolder", type=Path, default=Path("."), + help="Process all NPK files found recursively in given source folder.") + + inputFilterGroup = inputGroup.add_mutually_exclusive_group() + inputFilterGroup.add_argument("--glob", type=str, default=None, + help="Simple glob. Filter files from --srcFolder which match the given string.") + + outputGroup = parser.add_argument_group("output") + outputGroup.add_argument("--dstFolder", type=Path, default=Path(".") / "exportNpk", + help="Extract container into given folder") + + actionGroup = parser.add_argument_group("actions") + exclusiveAction = actionGroup.add_mutually_exclusive_group(required=True) + exclusiveAction.add_argument("--showContainer", action="store_true", + help="List all container from selected NPK files") + exclusiveAction.add_argument("--exportAll", action="store_true", + help="Export all container from selected NPK files") + exclusiveAction.add_argument("--exportSquashFs", action="store_true", + help="Export all SquashFs container from selected NPK files") + exclusiveAction.add_argument("--exportZlib", action="store_true", + help="Export all Zlib compressed container from selected NPK files") + + return parser.parse_args() + + +def main(): + opts = parseArgs() + files = (Npk(f) for f in (opts.files if opts.files else getAllNkpFiles(opts.srcFolder, opts.glob))) + analyseNpk(opts, files) diff --git a/npkpy/npk/XCntMultiContainerList.py b/npkpy/npk/XCntMultiContainerList.py new file mode 100644 index 0000000..bdd478e --- /dev/null +++ b/npkpy/npk/XCntMultiContainerList.py @@ -0,0 +1,9 @@ +from npkpy.npk.pckRequirementsHeader import PckRequirementsHeader + +NPK_MULTICONTAINER_LIST = 20 + + +class XCnt_MultiContainerList(PckRequirementsHeader): + @property + def _regularCntId(self): + return NPK_MULTICONTAINER_LIST diff --git a/npkpy/npk/__init__.py b/npkpy/npk/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/npkpy/npk/cntArchitectureTag.py b/npkpy/npk/cntArchitectureTag.py new file mode 100644 index 0000000..b73de0f --- /dev/null +++ b/npkpy/npk/cntArchitectureTag.py @@ -0,0 +1,9 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_ARCHITECTURE_TAG = 16 + + +class CntArchitectureTag(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_ARCHITECTURE_TAG diff --git a/npkpy/npk/cntBasic.py b/npkpy/npk/cntBasic.py new file mode 100644 index 0000000..764221c --- /dev/null +++ b/npkpy/npk/cntBasic.py @@ -0,0 +1,89 @@ +import logging +import struct + +BYTES_LEN_CNT_ID = 2 +BYTES_LEN_CNT_PAYLOAD_LEN = 4 + +NPK_CNT_BASIC = -1 + + +class NpkContainerBasic: + """ + 0____4____8____b____f + | | | | | + x0_|AABB|BBCC|C ..... C| + x1_|....|....|....|....| + + A = Container Identifier + B = Payload length + C = Payload + """ + + def __init__(self, data, offsetInPck): + self._data = bytearray(data) + self._offsetInPck = offsetInPck + self.modified = False + + @property + def _regularCntId(self): + return NPK_CNT_BASIC + + @property + def cnt_id(self): + cntId = struct.unpack_from(b"h", self._data, 0)[0] + if cntId != self._regularCntId: + raise RuntimeError(f"Cnt object does not represent given container typ {self._regularCntId}/{cntId}") + return cntId + + @property + def cnt_idName(self): + return str(self.__class__.__name__) + + @property + def cnt_payloadLen(self): + return (struct.unpack_from(b"I", self._data, 2))[0] + + @cnt_payloadLen.setter + def cnt_payloadLen(self, payloadLen): + logging.warning("[MODIFICATION] Please be aware that modifications can break the npk structure") + self.modified = True + struct.pack_into(b"I", self._data, 2, payloadLen) + + @property + def cnt_payload(self): + return struct.unpack_from(f"{self.cnt_payloadLen}s", self._data, 6)[0] + + @cnt_payload.setter + def cnt_payload(self, payload): + tmpLen = len(payload) + tmpHead = self._data[:2 + 4] + tmpHead += struct.pack(f"{tmpLen}s", payload) + self._data = tmpHead + self.cnt_payloadLen = tmpLen + + @property + def cnt_fullLength(self): + return BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + self.cnt_payloadLen + # return len(self._data) + + @property + def output_cnt(self): + viewLen = min(10, self.cnt_payloadLen) + + return (f"{self.cnt_idName}", [f"Cnt id: {self.cnt_id}", + f"Cnt offset: {self._offsetInPck}", + f"Cnt len: {self.cnt_fullLength}", + f"Payload len: {self.cnt_payloadLen}", + f"Payload[0:{viewLen}]: {self.cnt_payload[0:viewLen]} [...] " + ]) + + @property + def cnt_fullBinary(self): + cntId = self.cnt_id + payloadLen = self.cnt_payloadLen + + payload = struct.unpack_from(f"{self.cnt_payloadLen}s", + buffer=self._data, + offset=BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN)[0] + + return struct.pack(b"=hI", cntId, payloadLen) + payload diff --git a/npkpy/npk/cntNullBlock.py b/npkpy/npk/cntNullBlock.py new file mode 100644 index 0000000..d0a3277 --- /dev/null +++ b/npkpy/npk/cntNullBlock.py @@ -0,0 +1,9 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_NULL_BLOCK = 22 + + +class CntNullBlock(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_NULL_BLOCK diff --git a/npkpy/npk/cntSquasFsImage.py b/npkpy/npk/cntSquasFsImage.py new file mode 100644 index 0000000..362a5dc --- /dev/null +++ b/npkpy/npk/cntSquasFsImage.py @@ -0,0 +1,19 @@ +from npkpy.common import sha1sumFromBinary +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_SQUASH_FS_IMAGE = 21 + + +class CntSquashFsImage(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_SQUASH_FS_IMAGE + + @property + def cnt_payload_hash(self): + return sha1sumFromBinary(self.cnt_payload) + + @property + def output_cnt(self): + idName, options = super().output_cnt + return idName, options + [f"calc Sha1Hash: {self.cnt_payload_hash}"] diff --git a/npkpy/npk/cntSquashFsHashSignature.py b/npkpy/npk/cntSquashFsHashSignature.py new file mode 100644 index 0000000..c4ed6fe --- /dev/null +++ b/npkpy/npk/cntSquashFsHashSignature.py @@ -0,0 +1,14 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_SQUASHFS_HASH_SIGNATURE = 9 + + +class CntSquashFsHashSignature(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_SQUASHFS_HASH_SIGNATURE + + @property + def output_cnt(self): + idName, options = super().output_cnt + return idName, options + [f"Payload[-10:]: {self.cnt_payload[-10:]}"] diff --git a/npkpy/npk/cntZlibCompressedData.py b/npkpy/npk/cntZlibCompressedData.py new file mode 100644 index 0000000..954be1e --- /dev/null +++ b/npkpy/npk/cntZlibCompressedData.py @@ -0,0 +1,9 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_ZLIB_COMPRESSED_DATA = 4 + + +class CntZlibCompressedData(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_ZLIB_COMPRESSED_DATA diff --git a/npkpy/npk/npk.py b/npkpy/npk/npk.py new file mode 100644 index 0000000..c9abb59 --- /dev/null +++ b/npkpy/npk/npk.py @@ -0,0 +1,101 @@ +import struct +from pathlib import Path + +from npkpy.npk.npkConstants import CNT_HANDLER +from npkpy.npk.cntBasic import BYTES_LEN_CNT_ID, BYTES_LEN_CNT_PAYLOAD_LEN +from npkpy.npk.npkFileBasic import FileBasic + +MAGICBYTES = b"\x1e\xf1\xd0\xba" +BYTES_LEN_MAGIC_HEADER = 4 +BYTES_LEN_PCK_SIZE_LEN = 4 + + +class Npk(FileBasic): + """ + 0____4____8____b____f + | | | | | + 0_|AAAA|BBBB| C ..... | + 1_|....|....|....|....| + + + A = MAGIC BYTES (4) + B = PCK SIZE (4) + C = Begin of Container area + + """ + __cntList = None + + def __init__(self, filePath: Path): + super().__init__(filePath) + self.cntOffset = 8 + self._data = self.readDataFromFile(0, self.cntOffset) + self._checkMagicBytes(errorMsg="MagicBytes not found in Npk file") + self.pck_header = self.pck_cntList[0] + + @property + def pck_magicBytes(self): + return struct.unpack_from(b"4s", self._data, 0)[0] + + @property + def pck_payloadLen(self): + self.__pck_payloadUpdate() + payloadLen = struct.unpack_from(b"I", self._data, 4)[0] + return payloadLen + + def __pck_payloadUpdate(self): + if any(cnt.modified for cnt in self.pck_cntList): + currentSize = 0 + for cnt in self.pck_cntList: + currentSize += cnt.cnt_fullLength + cnt.modified = False + struct.pack_into(b"I", self._data, 4, currentSize) + + @property + def pck_fullSize(self): + return BYTES_LEN_MAGIC_HEADER + BYTES_LEN_PCK_SIZE_LEN + self.pck_payloadLen + + @property + def pck_fullBinary(self): + binary = MAGICBYTES + struct.pack("I", self.pck_payloadLen) + for c in self.pck_cntList: + binary += c.cnt_fullBinary + return binary + + @property + def pck_enumerateCnt(self): + for pos, c in enumerate(self.pck_cntList): + yield pos, c + + @property + def pck_cntList(self): + if not self.__cntList: + self.__cntList = self.__parseAllCnt() + return self.__cntList + + def __parseAllCnt(self): + lst = [] + offset = self.cntOffset + while offset < self.file.stat().st_size - 1: + lst.append(self.__getCnt(offset)) + offset += BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + lst[-1].cnt_payloadLen + return lst + + def __getCnt(self, offset): + cntId = struct.unpack_from("H", self.readDataFromFile(offset, 2))[0] + payloadLen = struct.unpack_from("I", self.readDataFromFile(offset + BYTES_LEN_CNT_ID, 4))[0] + pktLen = BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + payloadLen + + data = self.readDataFromFile(offset, pktLen) + if len(data) != pktLen: + raise RuntimeError(f"File maybe corrupted. Please download again. File: {self.file.absolute()}") + try: + return CNT_HANDLER[cntId](data, offset) + except KeyError: + raise RuntimeError(f"failed with id: {cntId}\n" + f"New cnt id discovered in file: {self.file.absolute()}") + # except TypeError: + # raise RuntimeError(f"failed with id: {cntId}\n{self.file.absolute()}") + + def _checkMagicBytes(self, errorMsg): + if not self.pck_magicBytes == MAGICBYTES: + raise RuntimeError(errorMsg) diff --git a/npkpy/npk/npkConstants.py b/npkpy/npk/npkConstants.py new file mode 100644 index 0000000..2a833e6 --- /dev/null +++ b/npkpy/npk/npkConstants.py @@ -0,0 +1,54 @@ +from npkpy.npk.XCntMultiContainerList import NPK_MULTICONTAINER_LIST, XCnt_MultiContainerList +from npkpy.npk.cntArchitectureTag import NPK_ARCHITECTURE_TAG, CntArchitectureTag +from npkpy.npk.cntNullBlock import NPK_NULL_BLOCK, CntNullBlock +from npkpy.npk.pckReleaseTyp import NPK_RELEASE_TYP, PckReleaseTyp +from npkpy.npk.cntSquasFsImage import NPK_SQUASH_FS_IMAGE, CntSquashFsImage +from npkpy.npk.cntSquashFsHashSignature import NPK_SQUASHFS_HASH_SIGNATURE, CntSquashFsHashSignature +from npkpy.npk.cntZlibCompressedData import NPK_ZLIB_COMPRESSED_DATA, CntZlibCompressedData +from npkpy.npk.cntBasic import NPK_CNT_BASIC, NpkContainerBasic +from npkpy.npk.pckDescription import NPK_PCK_DESCRIPTION, PckDescription +from npkpy.npk.pckEckcdsaHash import NPK_ECKCDSA_HASH, PckEckcdsaHash +from npkpy.npk.pckHeader import NPK_PCK_HEADER, PckHeader +from npkpy.npk.pckRequirementsHeader import NPK_REQUIREMENTS_HEADER, PckRequirementsHeader +from npkpy.npk.xCntFlagB import NPK_FLAG_B, XCnt_flagB +from npkpy.npk.xCntFlagC import NPK_FLAG_C, XCnt_flagC +from npkpy.npk.xCntFlagA import NPK_FLAG_A, XCnt_flagA +from npkpy.npk.xCntMultiContainerHeader import NPK_MULTICONTAINER_HEADER, XCnt_multiContainerHeader +from npkpy.npk.xCntMpls import NPK_MPLS, XCntMpls + + + +CNT_HANDLER = { + NPK_CNT_BASIC: NpkContainerBasic, + 0: "?", + NPK_PCK_HEADER: PckHeader, + NPK_PCK_DESCRIPTION: PckDescription, + NPK_REQUIREMENTS_HEADER: PckRequirementsHeader, + NPK_ZLIB_COMPRESSED_DATA: CntZlibCompressedData, + 5: "?", + 6: "?", + NPK_FLAG_A: XCnt_flagA, + NPK_FLAG_B: XCnt_flagB, + NPK_SQUASHFS_HASH_SIGNATURE: CntSquashFsHashSignature, + 10: "?", + 11: "?", + 12: "?", + 13: "?", + 14: "?", + 15: "?", + NPK_ARCHITECTURE_TAG: CntArchitectureTag, + NPK_FLAG_C: XCnt_flagC, + NPK_MULTICONTAINER_HEADER: XCnt_multiContainerHeader, + NPK_MPLS: XCntMpls, + NPK_MULTICONTAINER_LIST: XCnt_MultiContainerList, + NPK_SQUASH_FS_IMAGE: CntSquashFsImage, + NPK_NULL_BLOCK: CntNullBlock, + NPK_ECKCDSA_HASH: PckEckcdsaHash, + NPK_RELEASE_TYP: PckReleaseTyp, + 25: "?", + 26: "?", + 27: "?", + 28: "?", + 29: "?", + 30: "?", +} diff --git a/npkpy/npk/npkFileBasic.py b/npkpy/npk/npkFileBasic.py new file mode 100644 index 0000000..1626399 --- /dev/null +++ b/npkpy/npk/npkFileBasic.py @@ -0,0 +1,47 @@ +import re +from pathlib import Path + +from npkpy.common import sha1sumFromFile + +ARCHITECTURES = ['arm', 'mipsbe', 'mipsle', 'mmips', 'ppc', 'smips', 'tile', 'x86'] + +RE_SUFFIX = '\.(npk$)' +RE_VERSION = '-(\d\.\d\d\.\d)' +RE_PROGRAM_NAME = '(^[\w-]*)-' + + +class FileBasic: + __data = None + + def __init__(self, filePath: Path): + self.file = filePath + + @property + def filename_suffix(self): + return re.search(RE_SUFFIX, self.file.name).group(1) + + @property + def filename_version(self): + return re.search(RE_VERSION, self.file.name).group(1) + + @property + def filename_architecture(self): + for a in ARCHITECTURES: + if f"_{a}_" in self.file.name: + return a + return "x86" + + @property + def filename_program(self): + name = re.search(RE_PROGRAM_NAME, self.file.name).group(1) + name.replace(f"_{self.filename_architecture}_", "") + return name + + @property + def file_hash(self): + return sha1sumFromFile(self.file) + + def readDataFromFile(self, offset, size): + with self.file.open("rb") as f: + f.seek(offset) + return bytearray(f.read(size)) diff --git a/npkpy/npk/pckDescription.py b/npkpy/npk/pckDescription.py new file mode 100644 index 0000000..80c27da --- /dev/null +++ b/npkpy/npk/pckDescription.py @@ -0,0 +1,7 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_PCK_DESCRIPTION = 2 +class PckDescription(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_PCK_DESCRIPTION diff --git a/npkpy/npk/pckEckcdsaHash.py b/npkpy/npk/pckEckcdsaHash.py new file mode 100644 index 0000000..1e2b52c --- /dev/null +++ b/npkpy/npk/pckEckcdsaHash.py @@ -0,0 +1,9 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_ECKCDSA_HASH = 23 +class PckEckcdsaHash(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_ECKCDSA_HASH + + diff --git a/npkpy/npk/pckHeader.py b/npkpy/npk/pckHeader.py new file mode 100644 index 0000000..07dc567 --- /dev/null +++ b/npkpy/npk/pckHeader.py @@ -0,0 +1,77 @@ +import datetime +import struct + +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_PCK_HEADER = 1 + + +class PckHeader(NpkContainerBasic): + """ + 0____4____8____b____f + | | | | | + x0_|AABB|BBCC|CCCC|CCCC| + x1_|CCCC|CCDE|FGHH|HH..| + x2_|....|....|....|....| + + + A = Container Identifier (2) + B = Payload length (4) + C = Program Name (16) + D = Program version: revision + E = Program version: rc + F = Program version: minor + G = Program version: major + H = Build time + I = NULL BLock / Flags + + """ + + + def __init__(self, data, offsetInPck): + super().__init__(data, offsetInPck) + self._offset = offsetInPck + self.flagOffset = 0 + + @property + def _regularCntId(self): + return NPK_PCK_HEADER + + @property + def cnt_programName(self): + # TODO: b"" - b needs to be removed! + return bytes(struct.unpack_from(b"16B", self._data, 6)).decode().rstrip('\x00') + + @property + def cnt_osVersion(self): + revision = (struct.unpack_from(b"B", self._data, 22))[0] + rc = (struct.unpack_from(b"B", self._data, 23))[0] + minor = (struct.unpack_from(b"B", self._data, 24))[0] + major = (struct.unpack_from(b"B", self._data, 25))[0] + return f"{major}.{minor}.{revision} - rc(?): {rc}" + + @property + def cnt_built_time(self): + return datetime.datetime.utcfromtimestamp(struct.unpack_from(b"I", self._data, 26)[0]) + + @property + def cnt_nullBlock(self): + return struct.unpack_from(b"4B", self._data, 30) + + @property + def cnt_flags(self): + try: + return struct.unpack_from(b"7B", self._data, 34) + except struct.error: + ## pkt with version 5.23 seems to have only four flags. + return struct.unpack_from(b"4B", self._data, 34) + + @property + def output_cnt(self): + idName, options = super().output_cnt + return (idName, options + [f"Program name: {self.cnt_programName}", + f"Os version: {self.cnt_osVersion}", + f"Created at: {self.cnt_built_time}", + f"NullBlock: {self.cnt_nullBlock}", + f"Flags: {self.cnt_flags}" + ]) diff --git a/npkpy/npk/pckReleaseTyp.py b/npkpy/npk/pckReleaseTyp.py new file mode 100644 index 0000000..453ed40 --- /dev/null +++ b/npkpy/npk/pckReleaseTyp.py @@ -0,0 +1,9 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_RELEASE_TYP = 24 + + +class PckReleaseTyp(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_RELEASE_TYP \ No newline at end of file diff --git a/npkpy/npk/pckRequirementsHeader.py b/npkpy/npk/pckRequirementsHeader.py new file mode 100644 index 0000000..f5c6929 --- /dev/null +++ b/npkpy/npk/pckRequirementsHeader.py @@ -0,0 +1,93 @@ +import struct +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_REQUIREMENTS_HEADER = 3 + + +class PckRequirementsHeader(NpkContainerBasic): + def _versionOneTwo(foo): + def check(self): + if self.cnt_structure_id > 0: + return foo(self) + return "" + + return check + + def _versionTwo(foo): + def check(self): + if self.cnt_structure_id > 1: + return foo(self) + return "" + + return check + + def __init__(self, data, offsetInPck): + super().__init__(data, offsetInPck) + self._offset = offsetInPck + + @property + def _regularCntId(self): + return NPK_REQUIREMENTS_HEADER + + @property + def cnt_structure_id(self): + return struct.unpack_from(b"H", self._data, 6)[0] + + @property + @_versionOneTwo + def cnt_programName(self): + return bytes(struct.unpack_from(b"16B", self._data, 8)).decode().rstrip('\x00') + + @property + @_versionOneTwo + def cnt_osVersionFrom(self): + revision = (struct.unpack_from(b"B", self._data, 24))[0] + rc = (struct.unpack_from(b"B", self._data, 25))[0] + minor = (struct.unpack_from(b"B", self._data, 26))[0] + major = (struct.unpack_from(b"B", self._data, 27))[0] + return f"{major}.{minor}.{revision} - rc(?): {rc}" + + @property + @_versionOneTwo + def cnt_nullBlock(self): + return struct.unpack_from(b"BBBB", self._data, 28) + + @property + @_versionOneTwo + def cnt_osVersionTo(self): + revision = (struct.unpack_from(b"B", self._data, 32))[0] + rc = (struct.unpack_from(b"B", self._data, 33))[0] + minor = (struct.unpack_from(b"B", self._data, 34))[0] + major = (struct.unpack_from(b"B", self._data, 35))[0] + return f"{major}.{minor}.{revision} - rc(?): {rc}" + + @property + @_versionTwo + def cnt_flags(self): + return struct.unpack_from(b"4B", self._data, 36) + + @property + def cnt_fullBinary(self): + id = self.cnt_id + payload_len = self.cnt_payloadLen + payload = struct.unpack_from(f"{self.cnt_payloadLen}s", self._data, 2 + 4)[0] + + # if self.cnt_structure_id > 2: + # print(self.cnt_flags) + # return struct.pack(f"HH", id, payload_len) + se + payload + return struct.pack(f"=HI", id, payload_len) + payload + + @property + def output_cnt(self): + idName, opt = super().output_cnt + options = [f"Cnt id: {self.cnt_id}", + f"StructID: {self.cnt_structure_id}", + f"Offset: {self._offset}", + f"Program name: {self.cnt_programName}", + f"Null block: {self.cnt_nullBlock}", + f"Os versionFrom: {self.cnt_osVersionFrom}", + f"Os versionTo: {self.cnt_osVersionTo}", + f"Flags: {self.cnt_flags}" + ] + + return (f"{self.cnt_idName}", opt + options) diff --git a/npkpy/npk/xCntFlagA.py b/npkpy/npk/xCntFlagA.py new file mode 100644 index 0000000..a3c5d4e --- /dev/null +++ b/npkpy/npk/xCntFlagA.py @@ -0,0 +1,14 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_FLAG_A = 7 + + +class XCnt_flagA(NpkContainerBasic): + @property + def _regularCntId(self): + return NPK_FLAG_A + + # @property + # def cnt_payload(self): + # # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n ' + diff --git a/npkpy/npk/xCntFlagB.py b/npkpy/npk/xCntFlagB.py new file mode 100644 index 0000000..26cd442 --- /dev/null +++ b/npkpy/npk/xCntFlagB.py @@ -0,0 +1,14 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_FLAG_B = 8 + + +class XCnt_flagB(NpkContainerBasic): + # TODO: found in gps-5.23-mipsbe.npk + @property + def _regularCntId(self): + return NPK_FLAG_B + + # @property + # def cnt_payload(self): + # # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n ' diff --git a/npkpy/npk/xCntFlagC.py b/npkpy/npk/xCntFlagC.py new file mode 100644 index 0000000..7559dde --- /dev/null +++ b/npkpy/npk/xCntFlagC.py @@ -0,0 +1,14 @@ +from npkpy.npk.cntBasic import NpkContainerBasic + +NPK_FLAG_C = 17 + + +class XCnt_flagC(NpkContainerBasic): + ##TODO: found in multicast-3.30-mipsbe.npk + @property + def _regularCntId(self): + return NPK_FLAG_C + + # @property + # def cnt_payload(self): + # # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n ' diff --git a/npkpy/npk/xCntMpls.py b/npkpy/npk/xCntMpls.py new file mode 100644 index 0000000..9bcd86d --- /dev/null +++ b/npkpy/npk/xCntMpls.py @@ -0,0 +1,9 @@ +from npkpy.npk.pckRequirementsHeader import PckRequirementsHeader + +NPK_MPLS = 19 + + +class XCntMpls(PckRequirementsHeader): + @property + def _regularCntId(self): + return NPK_MPLS diff --git a/npkpy/npk/xCntMultiContainerHeader.py b/npkpy/npk/xCntMultiContainerHeader.py new file mode 100644 index 0000000..7bdd44c --- /dev/null +++ b/npkpy/npk/xCntMultiContainerHeader.py @@ -0,0 +1,15 @@ +import struct + +from npkpy.npk.pckHeader import PckHeader + +NPK_MULTICONTAINER_HEADER: int = 18 + + +class XCnt_multiContainerHeader(PckHeader): + @property + def _regularCntId(self): + return NPK_MULTICONTAINER_HEADER + + @property + def cnt_flags(self): + return struct.unpack_from(b"4B", self._data, 34) diff --git a/pytest.sh b/pytest.sh new file mode 100755 index 0000000..aa7946f --- /dev/null +++ b/pytest.sh @@ -0,0 +1,11 @@ +!/bin/bash + +if [ ! -z "$1" ] +then + clear; echo "test specific file: $1" + pytest --cov=./ $1 -v +else + clear; echo "test all project files" + pytest --cov=npkpy --cov=acceptance_test -v + +fi diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..addff5d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +pytest_httpserver +urlpath +setuptools +wheel +twine +pytest +pytest-cov + diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..d6b0a75 --- /dev/null +++ b/setup.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3.8 +import setuptools +from datetime import datetime + +with open("README.md", "r") as fh: + long_description = fh.read() + +setuptools.setup( + name="npkPy", + version=f"{(datetime.now()).strftime('%Y.%m.%d.%H.%M')}", + description="npkPy is an unpacker tool for MikroTiks custom npk container format", + author='botlabsDev', + author_email='npkPy@botlabs.dev', + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/botlabsDev/npkpy", + packages=setuptools.find_packages(), + python_requires='>=3.6', + classifiers=[ + "Programming Language :: Python :: 3.6", + "License :: OSI Approved :: GNU Affero General Public License v3", + "Operating System :: OS Independent", + ], + entry_points={ + 'console_scripts': [ + "npkPy=npkpy.main:main", + # "npkDownloader=npkpy.download:main", + ], + }, + install_requires=[ + ], +) diff --git a/tests/XCntMultiContainerList_test.py b/tests/XCntMultiContainerList_test.py new file mode 100644 index 0000000..2bd9878 --- /dev/null +++ b/tests/XCntMultiContainerList_test.py @@ -0,0 +1,16 @@ +import struct +import unittest + +from npkpy.npk.XCntMultiContainerList import XCnt_MultiContainerList +from tests.constants import DummyBasicCnt + + +class Test_xCnt_MultiContainerList(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 20) + + self.cnt = XCnt_MultiContainerList(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(20, self.cnt.cnt_id) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/cntBasic_test.py b/tests/cntBasic_test.py new file mode 100644 index 0000000..48af161 --- /dev/null +++ b/tests/cntBasic_test.py @@ -0,0 +1,118 @@ +import struct +import unittest + +from npkpy.npk.cntBasic import NpkContainerBasic +from tests.constants import DummyBasicCnt + + +class Test_npkContainerBasic(unittest.TestCase): + + def setUp(self) -> None: + self.cnt = NpkContainerBasic(DummyBasicCnt().binCnt, offsetInPck=0) + + def test_extractCntId(self): + self.assertEqual(-1, self.cnt.cnt_id) + + def test_failForWrongCntId(self): + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 999) + cnt = NpkContainerBasic(dummyCnt.binCnt, offsetInPck=0) + with self.assertRaises(RuntimeError) as e: + _ = cnt.cnt_id + self.assertEqual("Cnt object does not represent given container typ -1/999", e.exception.args[0]) + + def test_getNameOfContainerType(self): + self.assertEqual("NpkContainerBasic", self.cnt.cnt_idName) + + def test_extractPayloadLen(self): + self.assertEqual(7, self.cnt.cnt_payloadLen) + + def test_extractPayload(self): + self.assertEqual(b"Payload", self.cnt.cnt_payload) + + def test_extractCntFromGivenOffset(self): + self.assertEqual(len(DummyBasicCnt().binCnt), self.cnt.cnt_fullLength) + + def test_giveOverviewOfCnt(self): + expectedResult = ('NpkContainerBasic', [f'Cnt id: -1', + 'Cnt offset: 0', + 'Cnt len: 13', + 'Payload len: 7', + "Payload[0:7]: b'Payload' [...] "]) + self.assertEqual(expectedResult, self.cnt.output_cnt) + + def test_getFullBinaryOfContainer(self): + self.assertEqual(DummyBasicCnt().binCnt, self.cnt.cnt_fullBinary) + + +class Test_modifyNpkContainerBasic(unittest.TestCase): + def setUp(self) -> None: + self.cnt = NpkContainerBasic(DummyBasicCnt().binCnt, offsetInPck=0) + + def test_increaseCntSize(self): + origCntFullLength = self.cnt.cnt_fullLength + origPayloadLen = self.cnt.cnt_payloadLen + + self.cnt.cnt_payloadLen += 3 + + self.assertEqual(7, origPayloadLen) + self.assertEqual(10, self.cnt.cnt_payloadLen) + self.assertEqual(13, origCntFullLength) + self.assertEqual(16, self.cnt.cnt_fullLength) + + def test_decreaseCntSize(self): + origCntFullLength = self.cnt.cnt_fullLength + origPayloadLen = self.cnt.cnt_payloadLen + + self.cnt.cnt_payloadLen -= 4 + + self.assertEqual(7, origPayloadLen) + self.assertEqual(13, origCntFullLength) + self.assertEqual(3, self.cnt.cnt_payloadLen) + self.assertEqual(9, self.cnt.cnt_fullLength) + + def test_failAccessPayload_afterIncreasingPayloadLenField(self): + origPayloadLen = self.cnt.cnt_payloadLen + self.cnt.cnt_payloadLen += 3 + + self.assertEqual(origPayloadLen + 3, self.cnt.cnt_payloadLen) + with self.assertRaises(struct.error): + _ = self.cnt.cnt_payload + + def test_decreasingPayloadLenField_decreaseFullCntLenAndPayload(self): + origCntFullLength = self.cnt.cnt_fullLength + origPayloadLen = self.cnt.cnt_payloadLen + + self.cnt.cnt_payloadLen -= 4 + + self.assertEqual(origPayloadLen - 4, self.cnt.cnt_payloadLen) + self.assertEqual(origCntFullLength - 4, self.cnt.cnt_fullLength) + self.assertEqual(b"Pay", self.cnt.cnt_payload) + + def test_failDecreasingPayloadLenFieldBelowZero(self): + with self.assertRaises(struct.error) as e: + self.cnt.cnt_payloadLen -= 8 + self.assertEqual("argument out of range", e.exception.args[0]) + + def test_increasePayload_updatePayloadLen(self): + replacingPayload = b"NewTestPayload" + + self.cnt.cnt_payload = replacingPayload + + self.assertEqual(replacingPayload, self.cnt.cnt_payload) + self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen) + + def test_decreasePayload_updatePayloadLen(self): + replacingPayload = b"New" + + self.cnt.cnt_payload = replacingPayload + self.assertEqual(replacingPayload, self.cnt.cnt_payload) + self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen) + + def test_NullPayload_updatePayloadLenToZero(self): + replacingPayload = b"" + + self.cnt.cnt_payload = replacingPayload + self.assertEqual(replacingPayload, self.cnt.cnt_payload) + self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen) + diff --git a/tests/cntNullBlock_test.py b/tests/cntNullBlock_test.py new file mode 100644 index 0000000..d536a69 --- /dev/null +++ b/tests/cntNullBlock_test.py @@ -0,0 +1,16 @@ +import struct +import unittest + +from npkpy.npk.cntNullBlock import CntNullBlock +from tests.constants import DummyBasicCnt + + +class Test_cntNullBlock(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 22) + + self.cnt = CntNullBlock(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(22, self.cnt.cnt_id) diff --git a/tests/cntSquasFsImage_test.py b/tests/cntSquasFsImage_test.py new file mode 100644 index 0000000..58f20cd --- /dev/null +++ b/tests/cntSquasFsImage_test.py @@ -0,0 +1,27 @@ +import struct +import unittest + +from npkpy.npk.cntSquasFsImage import CntSquashFsImage +from tests.constants import DummyBasicCnt + + +class Test_cntSquashFsImage(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 21) + self.cnt = CntSquashFsImage(dummyCnt.binCnt, offsetInPck=0) + + self.expectedHash = b'\xc3\x04\x15\xea\xccjYDit\xb7\x16\xef\xf5l\xf2\x82\x19\x81]' + + def test_validateCntId(self): + self.assertEqual(21, self.cnt.cnt_id) + + def test_payload_hash(self): + self.assertEqual(self.expectedHash, self.cnt.cnt_payload_hash) + + def test_giveOverviewOfCnt(self): + expected = f"calc Sha1Hash: {self.expectedHash}" + + _, cntData = self.cnt.output_cnt + + self.assertEqual(expected, cntData[-1]) diff --git a/tests/cntSquashFsHashSignature_test.py b/tests/cntSquashFsHashSignature_test.py new file mode 100644 index 0000000..5fe0f60 --- /dev/null +++ b/tests/cntSquashFsHashSignature_test.py @@ -0,0 +1,22 @@ +import struct +import unittest + +from npkpy.npk.cntSquashFsHashSignature import CntSquashFsHashSignature +from tests.constants import DummyBasicCnt + + +class Test_cntSquashFsHashSignature(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 9) + self.cnt = CntSquashFsHashSignature(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(9, self.cnt.cnt_id) + + def test_giveOverviewOfCnt(self): + expected = "Payload[-10:]: b'Payload'" + + _, cntData = self.cnt.output_cnt + + self.assertEqual(expected, cntData[-1]) diff --git a/tests/cntZlibCompressedData_test.py b/tests/cntZlibCompressedData_test.py new file mode 100644 index 0000000..072b357 --- /dev/null +++ b/tests/cntZlibCompressedData_test.py @@ -0,0 +1,17 @@ +import struct +import unittest + +from npkpy.npk.cntSquashFsHashSignature import CntSquashFsHashSignature +from tests.constants import DummyBasicCnt + + +class Test_cntSquashFsHashSignature(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 9) + + self.cnt = CntSquashFsHashSignature(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(9, self.cnt.cnt_id) + diff --git a/tests/common_test.py b/tests/common_test.py new file mode 100644 index 0000000..7a2d278 --- /dev/null +++ b/tests/common_test.py @@ -0,0 +1,102 @@ +import tempfile +import unittest +from pathlib import Path + +from npkpy.common import getPktInfo, getCntInfo, getAllNkpFiles +from npkpy.npk.npk import Npk + + +class Common_Test(unittest.TestCase): + + def setUp(self) -> None: + self.npkFile = Path(tempfile.NamedTemporaryFile(suffix=".npk").name) + self.npkFile.write_bytes(Path("tests/testData/gps-6.45.6.npk").read_bytes()) + self.npk = Npk(self.npkFile) + + def tearDown(self) -> None: + if self.npkFile.exists(): + self.npkFile.unlink() + + def test_getBasicPktInfo(self): + self.assertEqual([(str(self.npkFile.name))], getPktInfo(self.npk)) + + def test_getBasicCntInfo(self): + self.assertEqual(['Cnt: 0:PckHeader', + 'Cnt: 1:PckReleaseTyp', + 'Cnt: 2:CntArchitectureTag', + 'Cnt: 3:PckDescription', + 'Cnt: 4:PckEckcdsaHash', + 'Cnt: 5:PckRequirementsHeader', + 'Cnt: 6:CntNullBlock', + 'Cnt: 7:CntSquashFsImage', + 'Cnt: 8:CntSquashFsHashSignature', + 'Cnt: 9:CntArchitectureTag'], getCntInfo(self.npk), ) + + def test_getFullPktInfo(self): + pass + + # result = getFullPktInfo(self.npk) + # + # self.assertEqual(['Cnt: 0:PckHeader', + # 'Cnt: 1:PckReleaseTyp', + # 'Cnt: 2:PckArchitectureTag', + # 'Cnt: 3:PckDescription', + # 'Cnt: 4:PckSha1Hash', + # 'Cnt: 5:PckRequirementsHeader', + # 'Cnt: 6:PckNullBlock', + # 'Cnt: 7:PckSquashFsImage', + # 'Cnt: 8:PckSquashFsHashSignature', + # 'Cnt: 9:PckArchitectureTag'], result) + + def test_getFullCntInfo(self): + pass + + +class Test_findNpkFiles(unittest.TestCase): + + def setUp(self) -> None: + self.tmpPath = Path(tempfile.TemporaryDirectory().name) + self.tmpPath.mkdir(parents=True) + self.expectedFiles = [] + + def tearDown(self) -> None: + for file in self.tmpPath.rglob("*"): + if file.is_file(): + file.unlink() + for folder in sorted(self.tmpPath.rglob("*"), key=lambda _path: str(_path.absolute()).count("/"), reverse=True): + folder.rmdir() + self.tmpPath.rmdir() + + def test_findMultipleNpkFiles_inFolder(self): + self.addExistingFiles(["fileA.npk", "fileB.npk", "fileC.npk"]) + + self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath))) + + def test_findMultipleNpkFilesRecursive(self): + self.addExistingFiles(["fileA.npk", "subB/fileB.npk", "subB/subC/fileC.npk"]) + + self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath))) + + def test_selectOnlyNpkFiles(self): + self.addExistingFiles(["fileA.npk", "fileB.exe", "fileC.txt"]) + + self.expectedFiles = [self.tmpPath / "fileA.npk"] + + self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath))) + + def test_globOnlyNpkFilesFittingPattern(self): + self.addExistingFiles(["fi__pattern__leA.npk", "fileB.npk", "fi__pattern__leC.exe", "fileD.exe"]) + + self.expectedFiles = [self.tmpPath / "fi__pattern__leA.npk"] + + self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath, containStr="__pattern__"))) + + def assertExistingFiles(self, result): + self.assertEqual(self.expectedFiles, result) + + def addExistingFiles(self, files): + for file in files: + f = self.tmpPath / file + f.parent.mkdir(parents=True, exist_ok=True) + f.touch() + self.expectedFiles.append(f) diff --git a/tests/constants.py b/tests/constants.py new file mode 100644 index 0000000..94fcd06 --- /dev/null +++ b/tests/constants.py @@ -0,0 +1,89 @@ +# HEADER +import struct + +from npkpy.npk.pckHeader import NPK_PCK_HEADER + +MAGICBYTES = b"\x1e\xf1\xd0\xba" + +PCKSIZE = struct.pack("I", 28) +MAGIC_AND_SIZE = MAGICBYTES + PCKSIZE + +# OPENING ARCHITECTURE_TAG +SET_HEADER_TAG_ID = struct.pack("H", NPK_PCK_HEADER) # b"\x01\x00" +SET_HEADER_TAG_PAYLOAD = bytes("NAME OF PROGRAM".encode()) +SET_HEADER_TAG_SIZE = struct.pack("I", len(SET_HEADER_TAG_PAYLOAD)) + +CNT_SET_ARCHITECTURE_TAG = SET_HEADER_TAG_ID + \ + SET_HEADER_TAG_SIZE + \ + SET_HEADER_TAG_PAYLOAD + +# CLOSING ARCHITECTURE_TAG +CLOSING_ARCHITECTURE_TAG_ID = struct.pack("H", 1) # b"\x01\x00" +CLOSING_ARCHITECTURE_TAG_PAYLOAD = struct.pack("s", b"I") +CLOSING_ARCHITECTURE_TAG_SIZE = struct.pack("I", len(CLOSING_ARCHITECTURE_TAG_PAYLOAD)) + +CNT_CLOSING_ARCHITECTURE_TAG = CLOSING_ARCHITECTURE_TAG_ID + \ + CLOSING_ARCHITECTURE_TAG_SIZE + \ + CLOSING_ARCHITECTURE_TAG_PAYLOAD + +# MINIMAL_NPK_PAKAGE + + +MINIMAL_NPK_PACKAGE = MAGIC_AND_SIZE + \ + CNT_SET_ARCHITECTURE_TAG + \ + CNT_CLOSING_ARCHITECTURE_TAG + + +def getDummyNpkBinary(payload=None): + if not payload: + payload = DummyHeaderCnt().binHeaderCntA + pckPayload = payload + pckLen = struct.pack("I", len(pckPayload)) + npkBinary = MAGICBYTES + pckLen + pckPayload + return npkBinary + + +class DummyBasicCnt: + _00_cnt_id = struct.pack("h", -1) + _01_cnt_payload_len = struct.pack("I", 7) + _02_cnt_payload = struct.pack("7s", b"Payload") + + @property + def binCnt(self): + return self._00_cnt_id + \ + self._01_cnt_payload_len + \ + self._02_cnt_payload + + +class DummyHeaderCnt: + _00_cnt_id = struct.pack("H", 1) + _01_cnt_payload_len = struct.pack("I", 35) + _02_cnt_programName = struct.pack("16s", b"01234567890abcdef") + _03_cnt_versionRevision = struct.pack("B", 3) + _04_cnt_versionRc = struct.pack("B", 4) + _05_cnt_versionMinor = struct.pack("B", 2) + _06_cnt_versionMajor = struct.pack("B", 1) + _07_cnt_buildTime = struct.pack("I", 1) + _08_cnt_nullBock = struct.pack("I", 0) + _09a_cnt_flagsA = struct.pack("7B", 0, 0, 0, 0, 0, 0, 0) + # _09b_cnt_flagsB = struct.pack("4B", 0, 0, 0, 0) + + @property + def binHeaderCntA(self): + return self._binBasicHeaderCnt + self._09a_cnt_flagsA + + # @property + # def binHeaderCntB(self): + # return self._binBasicHeaderCnt + self._09b_cnt_flagsB + + @property + def _binBasicHeaderCnt(self): + return self._00_cnt_id + \ + self._01_cnt_payload_len + \ + self._02_cnt_programName + \ + self._03_cnt_versionRevision + \ + self._04_cnt_versionRc + \ + self._05_cnt_versionMinor + \ + self._06_cnt_versionMajor + \ + self._07_cnt_buildTime + \ + self._08_cnt_nullBock diff --git a/tests/npkConstants_test.py b/tests/npkConstants_test.py new file mode 100644 index 0000000..a819527 --- /dev/null +++ b/tests/npkConstants_test.py @@ -0,0 +1,14 @@ +import unittest + +from npkpy.npk.npkConstants import CNT_HANDLER +from tests.constants import DummyBasicCnt + + +class Test_npkConstants(unittest.TestCase): + + def test_validateAssignment_DictIdIsContainerId(self): + for cnt_id, cnt_class in CNT_HANDLER.items(): + if cnt_class != "?": + cnt = cnt_class(DummyBasicCnt().binCnt, 0) + self.assertEqual(cnt_id, cnt._regularCntId, + msg=f"{cnt_id}!={cnt._regularCntId}") diff --git a/tests/npkFileBasic_test.py b/tests/npkFileBasic_test.py new file mode 100644 index 0000000..e2be133 --- /dev/null +++ b/tests/npkFileBasic_test.py @@ -0,0 +1,21 @@ +import unittest +from pathlib import Path + +from npkpy.npk.npkFileBasic import FileBasic + + +class FileInfo_Test(unittest.TestCase): + def setUp(self) -> None: + self.file = FileBasic(Path("advanced-tools-6.41.3.npk")) + + def test_file(self): + self.assertEqual(Path("advanced-tools-6.41.3.npk"), self.file.file) + + def test_versionName(self): + self.assertEqual("6.41.3", self.file.filename_version) + + def test_programName(self): + self.assertEqual("advanced-tools", self.file.filename_program) + + def test_programSuffix(self): + self.assertEqual("npk", self.file.filename_suffix) diff --git a/tests/npkParsing_gpsFile_test.py b/tests/npkParsing_gpsFile_test.py new file mode 100644 index 0000000..bfe215d --- /dev/null +++ b/tests/npkParsing_gpsFile_test.py @@ -0,0 +1,162 @@ +import datetime +import unittest +from pathlib import Path + +from npkpy.npk.npk import Npk, MAGICBYTES + + +class GpsFile_Test(unittest.TestCase): + def setUp(self) -> None: + self.npkFile = Path("tests/testData/gps-6.45.6.npk") + self.npk = Npk(self.npkFile) + self.cnt = self.npk.pck_cntList + + +class ParseGpsNpkFile_Test(GpsFile_Test): + + def test_fileInfos(self): + self.assertEqual('gps', self.npk.filename_program) + self.assertEqual('6.45.6', self.npk.filename_version) + self.assertEqual('npk', self.npk.filename_suffix) + self.assertEqual('x86', self.npk.filename_architecture) + self.assertEqual(b'\xc6\x16\xf0\x9d~lS\xa7z\xba}.\xe5\xa6w=\xe9\xb4S\xe7', self.npk.file_hash) + + def test_NpkHeader(self): + self.assertEqual(MAGICBYTES, self.npk.pck_magicBytes) + self.assertEqual(53321, self.npk.pck_payloadLen) + self.assertEqual(53329, self.npk.pck_fullSize) + + def test_PckHeader(self): + self.assertEqual(1, self.npk.pck_header.cnt_id) + self.assertEqual("PckHeader", self.npk.pck_header.cnt_idName) + self.assertEqual(36, self.npk.pck_header.cnt_payloadLen) + self.assertEqual(b'gps\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06f-\x06\x97gw]' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00', self.npk.pck_header.cnt_payload) + + self.assertEqual("gps", self.npk.pck_header.cnt_programName) + self.assertEqual("6.45.6 - rc(?): 102", self.npk.pck_header.cnt_osVersion) + self.assertEqual(datetime.datetime(2019, 9, 10, 9, 6, 31), self.npk.pck_header.cnt_built_time) + self.assertEqual((0, 0, 0, 0), self.npk.pck_header.cnt_nullBlock) + self.assertEqual((0, 0, 0, 0, 2, 0, 0), self.npk.pck_header.cnt_flags) + + def test_ReleaseTyp(self): + cnt = self.cnt[1] + + self.assertEqual(24, cnt.cnt_id) + self.assertEqual(50, cnt._offsetInPck) + self.assertEqual("PckReleaseTyp", cnt.cnt_idName) + self.assertEqual(6, cnt.cnt_payloadLen) + self.assertEqual(b"stable", cnt.cnt_payload) + self.assertEqual(12, cnt.cnt_fullLength) + + def test_PckArchitectureTag(self): + cnt = self.cnt[2] + + self.assertEqual(16, cnt.cnt_id) + self.assertEqual(62, cnt._offsetInPck) + self.assertEqual("CntArchitectureTag", cnt.cnt_idName) + self.assertEqual(4, cnt.cnt_payloadLen) + self.assertEqual(b"i386", cnt.cnt_payload) + self.assertEqual(10, cnt.cnt_fullLength) + + def test_PckDescription(self): + cnt = self.cnt[3] + + self.assertEqual(2, cnt.cnt_id) + self.assertEqual(72, cnt._offsetInPck) + self.assertEqual("PckDescription", cnt.cnt_idName) + self.assertEqual(25, cnt.cnt_payloadLen) + self.assertEqual(b'Provides support for GPS.', cnt.cnt_payload) + self.assertEqual(31, cnt.cnt_fullLength) + + def test_PckHash(self): + cnt = self.cnt[4] + + self.assertEqual(23, cnt.cnt_id) + self.assertEqual(103, cnt._offsetInPck) + self.assertEqual("PckEckcdsaHash", cnt.cnt_idName) + self.assertEqual(40, cnt.cnt_payloadLen) + self.assertEqual(b'1a7d206bbfe626c55aa6d2d2caabb6a5a990f13d', cnt.cnt_payload) + self.assertEqual(46, cnt.cnt_fullLength) + + def test_PckRequirementsHeader(self): + cnt = self.cnt[5] + + self.assertEqual(3, cnt.cnt_id) + self.assertEqual(149, cnt._offsetInPck) + self.assertEqual("PckRequirementsHeader", cnt.cnt_idName) + self.assertEqual(1, cnt.cnt_structure_id) + self.assertEqual(34, cnt.cnt_payloadLen) + self.assertEqual('system', cnt.cnt_programName) + self.assertEqual((0, 0, 0, 0), cnt.cnt_nullBlock) + self.assertEqual("6.45.6 - rc(?): 102", cnt.cnt_osVersionFrom) + self.assertEqual("6.45.6 - rc(?): 102", cnt.cnt_osVersionTo) + self.assertEqual("", cnt.cnt_flags) + self.assertEqual(40, cnt.cnt_fullLength) + + def test_PckNullBlock(self): + cnt = self.cnt[6] + + self.assertEqual(22, cnt.cnt_id) + self.assertEqual(189, cnt._offsetInPck) + self.assertEqual("CntNullBlock", cnt.cnt_idName) + self.assertEqual(3895, cnt.cnt_payloadLen) + self.assertEqual(b'\x00' * 3895, cnt.cnt_payload) + self.assertEqual(3901, cnt.cnt_fullLength) + + def test_PckSquashFsImage(self): + cnt = self.cnt[7] + + self.assertEqual(21, cnt.cnt_id) + self.assertEqual(4090, cnt._offsetInPck) + self.assertEqual("CntSquashFsImage", cnt.cnt_idName) + self.assertEqual(49152, cnt.cnt_payloadLen) + self.assertEqual(b'hsqs', cnt.cnt_payload[0:4]) + self.assertEqual(49158, cnt.cnt_fullLength) + + def test_PckSquashFsHashSignature(self): + cnt = self.cnt[8] + + self.assertEqual(9, cnt.cnt_id) + self.assertEqual(53248, cnt._offsetInPck) + self.assertEqual("CntSquashFsHashSignature", cnt.cnt_idName) + self.assertEqual(68, cnt.cnt_payloadLen) + self.assertEqual(b'\x8e\xa2\xb1\x8e\xf7n\xef355', cnt.cnt_payload[0:10]) + self.assertEqual(74, cnt.cnt_fullLength) + + def test_parseGpsFilxe_PckArchitectureTag_Closing(self): + cnt = self.cnt[9] + + self.assertEqual(16, cnt.cnt_id) + self.assertEqual(53322, cnt._offsetInPck) + self.assertEqual("CntArchitectureTag", cnt.cnt_idName) + self.assertEqual(1, cnt.cnt_payloadLen) + self.assertEqual(b'I', cnt.cnt_payload[0:10]) + self.assertEqual(7, cnt.cnt_fullLength) + + def test_checkStructure(self): + self.assertEqual(10, len(self.npk.pck_cntList)) + self.assertEqual([1, 24, 16, 2, 23, 3, 22, 21, 9, 16], list(cnt.cnt_id for cnt in self.npk.pck_cntList)) + + +class WriteModifiedGpsFile_Test(GpsFile_Test): + def test_modify_PckRequirementsHeader(self): + x = b"\x03\x00\x22\x00\x00\x00\x01\x00\x73\x79\x73\x74\x65\x6d\x00\x00" + \ + b"\x00\x00\x00\x00\x00\x00\x00\x00\x06\x66\x2d\x06\x00\x00\x00\x00" + \ + b"\x06\x66\x2d\x06\x00\x00\x00\x00" + + cnt = None + for c in self.cnt: + if c.cnt_id == 3: + cnt = c + break + + self.assertEqual(x, cnt.cnt_fullBinary) + + def test_createFile_changePayloadTwice(self): + oldPayload = self.npk.pck_header.cnt_payload + + self.npk.pck_header.cnt_payload = b"A" + self.npk.pck_header.cnt_payload = oldPayload + + self.assertEqual(Npk(self.npkFile).file.read_bytes(), self.npk.pck_fullBinary) diff --git a/tests/npk_test.py b/tests/npk_test.py new file mode 100644 index 0000000..d275030 --- /dev/null +++ b/tests/npk_test.py @@ -0,0 +1,70 @@ +import struct +import tempfile +import unittest +from pathlib import Path + +from npkpy.npk.npk import Npk +from npkpy.npk.pckHeader import PckHeader +from tests.constants import DummyHeaderCnt, MAGICBYTES, getDummyNpkBinary + + +class Test_npkClass(unittest.TestCase): + + def setUp(self) -> None: + self.npkFile = Path(tempfile.NamedTemporaryFile(suffix=".npk").name) + self.npkFile.write_bytes(getDummyNpkBinary()) + + def test_fileIsNoNpkFile(self): + self.npkFile.write_bytes(b"NoMagicBytesAtHeadOfFile") + + with self.assertRaises(RuntimeError) as e: + _ = Npk(self.npkFile).pck_magicBytes + self.assertEqual(e.exception.args[0], "MagicBytes not found in Npk file") + + def test_npkFileIsCorrupt_fileCorruptException(self): + self.npkFile.write_bytes(MAGICBYTES + b"CorruptFile") + + with self.assertRaises(RuntimeError) as e: + _ = Npk(self.npkFile).pck_cntList + self.assertEqual(e.exception.args[0], + f"File maybe corrupted. Please download again. File: {self.npkFile.absolute()}") + + def test_extractMagicBytes(self): + self.assertEqual(MAGICBYTES, Npk(self.npkFile).pck_magicBytes) + + def test_extractLenOfNpkPayload_propagatedSizeIsValid(self): + self.assertEqual(len(DummyHeaderCnt().binHeaderCntA), Npk(self.npkFile).pck_payloadLen) + + def test_calculatePckFullSize_equalsFileSize(self): + self.assertEqual(self.npkFile.stat().st_size, Npk(self.npkFile).pck_fullSize) + + def test_getNpkBinary_equalsOriginalBinary(self): + npkBinary = self.npkFile.read_bytes() + + self.assertEqual(npkBinary, Npk(self.npkFile).pck_fullBinary) + + def test_getEnumeratedListOfCntInNpk(self): + cntList = list(Npk(self.npkFile).pck_enumerateCnt) + cntId, cnt = cntList[0] + + self.assertEqual(1, len(cntList)) + self.assertEqual(0, cntId) + self.assertTrue(isinstance(cnt, PckHeader)) + + def test_getAllCnt_returnAsList(self): + cntList = Npk(self.npkFile).pck_cntList + + self.assertEqual(1, len(cntList)) + self.assertTrue(isinstance(cntList[0], PckHeader)) + + def test_getAllCnt_exceptionWithUnknownCntInNpk(self): + unknownCnt = DummyHeaderCnt() + unknownCnt._00_cnt_id = struct.pack("H", 999) + self.npkFile.write_bytes(getDummyNpkBinary(payload=unknownCnt.binHeaderCntA)) + + with self.assertRaises(RuntimeError) as e: + _ = Npk(self.npkFile).pck_cntList + self.assertEqual(e.exception.args[0], f"failed with id: 999\n" + f"New cnt id discovered in file: {self.npkFile.absolute()}") + + diff --git a/tests/pckDescription_test.py b/tests/pckDescription_test.py new file mode 100644 index 0000000..6940367 --- /dev/null +++ b/tests/pckDescription_test.py @@ -0,0 +1,16 @@ +import struct +import unittest + +from npkpy.npk.pckDescription import PckDescription +from tests.constants import DummyBasicCnt + + +class Test_pckDescription(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 2) + + self.cnt = PckDescription(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(2, self.cnt.cnt_id) diff --git a/tests/pckEckcdsaHash_test.py b/tests/pckEckcdsaHash_test.py new file mode 100644 index 0000000..2c3b548 --- /dev/null +++ b/tests/pckEckcdsaHash_test.py @@ -0,0 +1,16 @@ +import struct +import unittest + +from npkpy.npk.pckEckcdsaHash import PckEckcdsaHash +from tests.constants import DummyBasicCnt + + +class Test_pckEckcdsaHash(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 23) + + self.cnt = PckEckcdsaHash(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(23, self.cnt.cnt_id) diff --git a/tests/pckReleaseTyp_test.py b/tests/pckReleaseTyp_test.py new file mode 100644 index 0000000..12c6cd1 --- /dev/null +++ b/tests/pckReleaseTyp_test.py @@ -0,0 +1,16 @@ +import struct +import unittest + +from npkpy.npk.pckReleaseTyp import PckReleaseTyp +from tests.constants import DummyBasicCnt + + +class Test_pckReleaseTyp(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 24) + + self.cnt = PckReleaseTyp(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(24, self.cnt.cnt_id) diff --git a/tests/testData/gps-6.45.6.npk b/tests/testData/gps-6.45.6.npk new file mode 100644 index 0000000..9c190e2 Binary files /dev/null and b/tests/testData/gps-6.45.6.npk differ diff --git a/tests/testData/gps-6.45.6.result b/tests/testData/gps-6.45.6.result new file mode 100644 index 0000000..65f3b21 --- /dev/null +++ b/tests/testData/gps-6.45.6.result @@ -0,0 +1,86 @@ +gps-6.45.6.npk +Cnt: 0:PckHeader +Cnt: 1:PckReleaseTyp +Cnt: 2:CntArchitectureTag +Cnt: 3:PckDescription +Cnt: 4:PckEckcdsaHash +Cnt: 5:PckRequirementsHeader +Cnt: 6:CntNullBlock +Cnt: 7:CntSquashFsImage +Cnt: 8:CntSquashFsHashSignature +Cnt: 9:CntArchitectureTag +PckHeader + Cnt id: 1 + Cnt offset: 8 + Cnt len: 42 + Payload len: 36 + Payload[0:10]: b'gps\x00\x00\x00\x00\x00\x00\x00' [...] + Program name: gps + Os version: 6.45.6 - rc(?): 102 + Created at: 2019-09-10 09:06:31 + NullBlock: (0, 0, 0, 0) + Flags: (0, 0, 0, 0, 2, 0, 0) +PckReleaseTyp + Cnt id: 24 + Cnt offset: 50 + Cnt len: 12 + Payload len: 6 + Payload[0:6]: b'stable' [...] +CntArchitectureTag + Cnt id: 16 + Cnt offset: 62 + Cnt len: 10 + Payload len: 4 + Payload[0:4]: b'i386' [...] +PckDescription + Cnt id: 2 + Cnt offset: 72 + Cnt len: 31 + Payload len: 25 + Payload[0:10]: b'Provides s' [...] +PckEckcdsaHash + Cnt id: 23 + Cnt offset: 103 + Cnt len: 46 + Payload len: 40 + Payload[0:10]: b'1a7d206bbf' [...] +PckRequirementsHeader + Cnt id: 3 + Cnt offset: 149 + Cnt len: 40 + Payload len: 34 + Payload[0:10]: b'\x01\x00system\x00\x00' [...] + Cnt id: 3 + StructID: 1 + Offset: 149 + Program name: system + Null block: (0, 0, 0, 0) + Os versionFrom: 6.45.6 - rc(?): 102 + Os versionTo: 6.45.6 - rc(?): 102 + Flags: +CntNullBlock + Cnt id: 22 + Cnt offset: 189 + Cnt len: 3901 + Payload len: 3895 + Payload[0:10]: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' [...] +CntSquashFsImage + Cnt id: 21 + Cnt offset: 4090 + Cnt len: 49158 + Payload len: 49152 + Payload[0:10]: b'hsqs\x15\x00\x00\x00\xa2m' [...] + calc Sha1Hash: b'c\x033\x82{\x9a$er\x92=o\x8a\xfeL-Q\x02hW' +CntSquashFsHashSignature + Cnt id: 9 + Cnt offset: 53248 + Cnt len: 74 + Payload len: 68 + Payload[0:10]: b'\x8e\xa2\xb1\x8e\xf7n\xef355' [...] + Payload[-10:]: b"Y\x9e'\xac\xccw\x91\x06]\t" +CntArchitectureTag + Cnt id: 16 + Cnt offset: 53322 + Cnt len: 7 + Payload len: 1 + Payload[0:1]: b'I' [...] diff --git a/tests/xCntFlagA_test.py b/tests/xCntFlagA_test.py new file mode 100644 index 0000000..5dd36f9 --- /dev/null +++ b/tests/xCntFlagA_test.py @@ -0,0 +1,17 @@ +import struct +import unittest + +from npkpy.npk.xCntFlagA import XCnt_flagA +from tests.constants import DummyBasicCnt + + +class Test_xCnt_flagB(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 7) + + self.cnt = XCnt_flagA(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(7, self.cnt.cnt_id) + diff --git a/tests/xCntFlagB_test.py b/tests/xCntFlagB_test.py new file mode 100644 index 0000000..e3d1d82 --- /dev/null +++ b/tests/xCntFlagB_test.py @@ -0,0 +1,18 @@ +import struct +import unittest + +from npkpy.npk.xCntFlagB import XCnt_flagB +from tests.constants import DummyBasicCnt + + +class Test_xCnt_flagB(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 8) + + self.cnt = XCnt_flagB(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(8, self.cnt.cnt_id) + + diff --git a/tests/xCntFlagC_test.py b/tests/xCntFlagC_test.py new file mode 100644 index 0000000..f9b0f10 --- /dev/null +++ b/tests/xCntFlagC_test.py @@ -0,0 +1,17 @@ +import struct +import unittest + +from npkpy.npk.xCntFlagC import XCnt_flagC +from tests.constants import DummyBasicCnt + + +class Test_xCnt_flagC(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 17) + + self.cnt = XCnt_flagC(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(17, self.cnt.cnt_id) + diff --git a/tests/xCntMpls_test.py b/tests/xCntMpls_test.py new file mode 100644 index 0000000..6758a3f --- /dev/null +++ b/tests/xCntMpls_test.py @@ -0,0 +1,16 @@ +import struct +import unittest + +from npkpy.npk.xCntMpls import XCntMpls +from tests.constants import DummyBasicCnt + + +class Test_xCntMpls(unittest.TestCase): + def setUp(self) -> None: + dummyCnt = DummyBasicCnt() + dummyCnt._00_cnt_id = struct.pack("h", 19) + + self.cnt = XCntMpls(dummyCnt.binCnt, offsetInPck=0) + + def test_validateCntId(self): + self.assertEqual(19, self.cnt.cnt_id) diff --git a/tests_acceptance_test/__init__.py b/tests_acceptance_test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests_acceptance_test/acceptance_test.py b/tests_acceptance_test/acceptance_test.py new file mode 100644 index 0000000..8418bcf --- /dev/null +++ b/tests_acceptance_test/acceptance_test.py @@ -0,0 +1,66 @@ +import subprocess +import tempfile +import unittest +from pathlib import Path + + +class Test_npkPy(unittest.TestCase): + + def setUp(self) -> None: + # TODO: create DummyPkg and replace gps-6.45.6.npk + self.npkFile = Path("tests/testData/gps-6.45.6.npk") + self.pathToNpk = str(self.npkFile.absolute()) + self.npkContainerList = Path("tests/testData/gps-6.45.6.result").read_text() + self.dstFolder = Path(tempfile.mkdtemp()) + + def tearDown(self) -> None: + [f.unlink() for f in self.dstFolder.rglob("*") if f.is_file()] + [f.rmdir() for f in self.dstFolder.rglob("*")] + self.dstFolder.rmdir() + + def test_showAllContainersFromNpkPkg(self): + cmd = ["npkPy", "--file", self.pathToNpk, "--showContainer"] + output = runCmdInTerminal(cmd) + self.assertEqual(self.npkContainerList, output) + + def test_exportAllContainerFromNpk(self): + cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportAll"] + + runCmdInTerminal(cmd) + + exportedContainer = sorted(str(f.relative_to(self.dstFolder)) for f in self.dstFolder.rglob('*')) + self.assertEqual(['npkPyExport_gps-6.45.6', + 'npkPyExport_gps-6.45.6/000_cnt_PckHeader.raw', + 'npkPyExport_gps-6.45.6/001_cnt_PckReleaseTyp.raw', + 'npkPyExport_gps-6.45.6/002_cnt_CntArchitectureTag.raw', + 'npkPyExport_gps-6.45.6/003_cnt_PckDescription.raw', + 'npkPyExport_gps-6.45.6/004_cnt_PckEckcdsaHash.raw', + 'npkPyExport_gps-6.45.6/005_cnt_PckRequirementsHeader.raw', + 'npkPyExport_gps-6.45.6/006_cnt_CntNullBlock.raw', + 'npkPyExport_gps-6.45.6/007_cnt_CntSquashFsImage.raw', + 'npkPyExport_gps-6.45.6/008_cnt_CntSquashFsHashSignature.raw', + 'npkPyExport_gps-6.45.6/009_cnt_CntArchitectureTag.raw'], exportedContainer) + + def test_extractSquashFsContainerFromNpk(self): + cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportSquashFs"] + + runCmdInTerminal(cmd) + + self.assertContainerExtracted(['npkPyExport_gps-6.45.6', + 'npkPyExport_gps-6.45.6/007_cnt_CntSquashFsImage.raw']) + + # + def test_extractZlibContainerFromNpk_NonExisitngNotExtracted(self): + cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportZlib"] + + runCmdInTerminal(cmd) + + self.assertContainerExtracted([]) + + def assertContainerExtracted(self, expectedFiles): + extractedContainer = sorted(str(f.relative_to(self.dstFolder)) for f in self.dstFolder.rglob('*')) + self.assertEqual(expectedFiles, extractedContainer) + + +def runCmdInTerminal(cmd): + return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.decode("UTF-8") diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/download_all_packages.py b/tools/download_all_packages.py new file mode 100755 index 0000000..34b7817 --- /dev/null +++ b/tools/download_all_packages.py @@ -0,0 +1,128 @@ +import socket +import sys +import urllib.request +import zipfile +from datetime import datetime +from pathlib import Path +from time import sleep +from urllib.error import HTTPError, URLError +from zipfile import ZipFile + +from urlpath import URL + + +class MikroTikDownloader: + + def __init__(self, dstPath): + + self.dstPath = Path(dstPath) + self.sleepTime = 1 + socket.setdefaulttimeout(10) + + def downloadAll(self, urls): + missingFiles = self._determineMissingFiles(urls) + for link in missingFiles: + unpackFile(self._downloadFile(link)) + + def _downloadFile(self, url): + def _progress(count, block_size, total_size): + progress = (float(count * block_size) / float(total_size) * 100.0) + if int(progress) > 100: + raise RuntimeError("AccessDenied") + sys.stderr.write(self._msg(url) + ': [%.1f%%] ' % progress) + sys.stderr.flush() + + targetFile = self._convertToLocalFilePath(url) + targetFile.parent.mkdir(exist_ok=True, parents=True) + + while True: + try: + urllib.request.urlretrieve(f"http:{url.resolve()}", targetFile, _progress) + self.msg(url, "[100.0%]") + self.dec_sleep() + break + except HTTPError as e: + self.msg(url, f"[failed] (Error code: {e.code})") + break + except socket.timeout: + self.msg(url, "[failed] (Timeout)") + self.inc_sleep() + except RuntimeError as e: + self.msg(url, f"[failed] ([{e}])") + if targetFile.exists(): + targetFile.unlink() + break + except URLError as e: + self.msg(url, f"[failed] (Error code: {e})") + self.inc_sleep() + + return targetFile + + def inc_sleep(self): + sleep(self.sleepTime) + self.sleepTime += 1 + + def dec_sleep(self): + if self.sleepTime > 1: + self.sleepTime -= 1 + + def _determineMissingFiles(self, urls): + for url in urls: + if not self._convertToLocalFilePath(url).exists(): + yield url + else: + self.msg(url, "[ok]") + + def msg(self, url, msg): + print(f"{self._msg(url)}: {msg}") + + def _msg(self, url): + return f"\r[{datetime.now().time()}] Download {url.name:35}" + + def _convertToLocalFilePath(self, link: URL): + pktName = arc = version = "" + elements = link.stem.split("-") + count = len(elements) + if count == 2: + pktName, version = elements + if count == 3: + pktName, arc, version = elements + return self.dstPath / pktName / arc / version / link.name + + +def fetchWebsite(url): + with urllib.request.urlopen(url) as response: + return response.read().decode("utf-8") + + +def extractDownloadLinks(tHtml, filterString): + return (URL(line.split('"')[1]) for line in tHtml.split() if filterString in line) + + +def filterLinks(tLinks, filters): + return (link for link in tLinks if + any(True for name, suffix in filters + if name in link.stem and + suffix in link.suffix + ) + ) + + +def unpackFile(file: Path): + if file.exists() and file.suffix == ".zip": + try: + with ZipFile(file, 'r') as zipObj: + zipObj.extractall(file.parent) + except zipfile.BadZipFile: + file.unlink() + + +if __name__ == "__main__": + html = fetchWebsite("https://mikrotik.com/download/archive") + links = extractDownloadLinks(html, "download.mikrotik.com") + selectedLinks = filterLinks(links, [("all_packages", ".zip"), + (".", ".npk") + ] + ) + mtd = MikroTikDownloader("./downloads") + mtd.downloadAll(selectedLinks) diff --git a/tools/download_all_packages_test.py b/tools/download_all_packages_test.py new file mode 100644 index 0000000..7f2f056 --- /dev/null +++ b/tools/download_all_packages_test.py @@ -0,0 +1,105 @@ +import tempfile +import unittest +from pathlib import Path +from zipfile import ZipFile + +from pytest_httpserver import HTTPServer +from urlpath import URL + +from tools.download_all_packages import fetchWebsite, extractDownloadLinks, filterLinks, \ + unpackFile + + +class Test_MikroTikDownloader(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + print("setup Class!") + cls.s = HTTPServer() + cls.s.start() + cls.testUrl = f"http://localhost:{cls.s.port}" + + # cls.s.expect_request("/download/archive").respond_with_data(createArchiveWebsite(cls.testUrl)) + # cls.s.expect_request("/routeros/6.36.4/all_packages-mipsbe-6.36.4.zip").respond_with_data(b"data", + + # content_type="application/zip") + + @classmethod + def tearDownClass(cls) -> None: + cls.s.stop() + + def setUp(self) -> None: + self.s.clear() + + def test_downloadWebsite(self): + self.s.expect_request("/").respond_with_data(b"testData") + + result = fetchWebsite(f"http://localhost:{self.s.port}") + + self.assertEqual("testData", result) + + def test_extractDownloadLinks(self): + testData = "dummyTest \n" + testData += "program-arch-version.zip>\n" + testData += "dummyTest \n" + + result = extractDownloadLinks(testData, "filterString.com") + + self.assertEqual([URL("//filterString.com/prod/vers/programA-arch-version.zip")], list(result)) + + def test_filterForSpecificLinks_basedOnSuffix(self): + links = [URL("//filterString.com/prod/vers/programA-arch-version.zip"), + URL("//filterString.com/prod/vers/programB-arch-version.WRONGSUFFIX")] + + result = filterLinks(links, [("program", ".zip")]) + + self.assertEqual([URL("//filterString.com/prod/vers/programA-arch-version.zip")], list(result)) + + def test_filterForSpecificLinks_basedOnProgram(self): + links = [URL("//filterString.com/prod/vers/programA-arch-version.exe"), + URL("//filterString.com/prod/vers/programB-arch-version.zip")] + + result = filterLinks(links, ([("programB", ".zip")])) + + self.assertEqual([URL("//filterString.com/prod/vers/programB-arch-version.zip")], list(result)) + + def test_dontExtractZip_fileDontExist(self): + file = Path(tempfile.NamedTemporaryFile(suffix="WRONGSUFFIX").name) + + unpackFile(file) + + def test_dontExtractZip_fileHasNoZipSuffix(self): + file = Path(tempfile.NamedTemporaryFile(suffix="WRONGSUFFIX").name) + file.touch() + + unpackFile(file) + + def test_extractZip_validatePayload(self): + zipFile, origTxtFile = _createTxtInZipFile(txtPayload="TEST DATA") + unzipedFile = Path(f"{origTxtFile.parent}/{origTxtFile.absolute()}") + + unpackFile(zipFile) + + self.assertTrue(zipFile.exists()) + self.assertTrue(unzipedFile.exists()) + self.assertEqual("TEST DATA", unzipedFile.read_text()) + + +def _createTxtInZipFile(txtPayload): + txtFile = Path(tempfile.NamedTemporaryFile(suffix=".txt").name) + zipFile = Path(tempfile.NamedTemporaryFile(suffix=".zip").name) + with txtFile.open("w") as f: + f.write(txtPayload) + with ZipFile(zipFile, 'w') as zipObj: + zipObj.write(txtFile.absolute()) + return zipFile, txtFile + + +def createArchiveWebsite(testUrl): + return f""" + +
+
all_packages-mipsbe-6.36.4> + mipsbe + + """ diff --git a/tools/npkModify_test.py b/tools/npkModify_test.py new file mode 100644 index 0000000..678a0a1 --- /dev/null +++ b/tools/npkModify_test.py @@ -0,0 +1,97 @@ +import tempfile +import unittest +from pathlib import Path + +from tests.constants import MINIMAL_NPK_PACKAGE +from npkpy.npk.npk import Npk + + +class BasicNpkTestRequirements(unittest.TestCase): + def setUp(self) -> None: + self.testNpk = Path(tempfile.NamedTemporaryFile().name) + self.testNpk.write_bytes(MINIMAL_NPK_PACKAGE) + self.npk = Npk(self.testNpk) + + def tearDown(self) -> None: + self.testNpk.unlink() + + +class ParseTestPackage_Test(BasicNpkTestRequirements): + def test_minimalNPK_parseHeader(self): + self.assertEqual(b"\x1e\xf1\xd0\xba", self.npk.pck_magicBytes) + self.assertEqual(28, self.npk.pck_payloadLen) + + def test_minimalNPK_parseAllContainer(self): + listOfCnt = self.npk.pck_cntList + + self.assertEqual(1, listOfCnt[0].cnt_id) + self.assertEqual(15, listOfCnt[0].cnt_payloadLen) + self.assertEqual(b"NAME OF PROGRAM", listOfCnt[0].cnt_payload) + self.assertEqual(1, listOfCnt[1].cnt_id) + self.assertEqual(1, listOfCnt[1].cnt_payloadLen) + self.assertEqual(b"I", listOfCnt[1].cnt_payload) + + +class ModifyPayload_Test(BasicNpkTestRequirements): + + def setUp(self) -> None: + super().setUp() + self.cnt = self.npk.pck_cntList[0] + + def test_emptyPayload_emptyContainer(self): + self.cnt.cnt_payload = b"" + + self.assertEqual(1, self.cnt.cnt_id) + self.assertEqual(0, self.cnt.cnt_payloadLen) + self.assertEqual(b"", self.cnt.cnt_payload) + self.assertEqual(6, self.cnt.cnt_fullLength) + + self.assertEqual(13, self.npk.pck_payloadLen) + self.assertEqual(21, self.npk.pck_fullSize) + + def test_dontchangePayloadSize_recalculateContainerKeepSize(self): + self.cnt.cnt_payload = b"PROGRAM OF NAME" + + self.assertEqual(1, self.cnt.cnt_id) + self.assertEqual(15, self.cnt.cnt_payloadLen) + self.assertEqual(b"PROGRAM OF NAME", self.cnt.cnt_payload) + self.assertEqual(21, self.cnt.cnt_fullLength) + + self.assertEqual(28, self.npk.pck_payloadLen) + self.assertEqual(36, self.npk.pck_fullSize) + + def test_increasePayloadLen_recalculateContainerSizeBigger(self): + self.cnt.cnt_payload = b"NEW NAME OF PROGRAM" + + self.assertEqual(1, self.cnt.cnt_id) + self.assertEqual(19, self.cnt.cnt_payloadLen) + self.assertEqual(b"NEW NAME OF PROGRAM", self.cnt.cnt_payload) + self.assertEqual(25, self.cnt.cnt_fullLength) + + self.assertEqual(32, self.npk.pck_payloadLen) + self.assertEqual(40, self.npk.pck_fullSize) + + def test_decreasePayloadLen_recalculateContainerSmaller(self): + self.cnt.cnt_payload = b"SHORT NAME" + + self.assertEqual(1, self.cnt.cnt_id) + self.assertEqual(10, self.cnt.cnt_payloadLen) + self.assertEqual(b"SHORT NAME", self.cnt.cnt_payload) + self.assertEqual(16, self.cnt.cnt_fullLength) + + self.assertEqual(23, self.npk.pck_payloadLen) + self.assertEqual(31, self.npk.pck_fullSize) + + +class WriteModifiedFile_Test(BasicNpkTestRequirements): + def setUp(self) -> None: + super().setUp() + self.cnt = self.npk.pck_cntList[0] + + def test_createFile_withoutModification(self): + self.assertEqual(MINIMAL_NPK_PACKAGE, self.npk.pck_fullBinary) + + def test_createFile_changePayloadTwice(self): + self.cnt.cnt_payload = b"A" + self.cnt.cnt_payload = b"NAME OF PROGRAM" + self.assertEqual(MINIMAL_NPK_PACKAGE, self.npk.pck_fullBinary) diff --git a/tools/sections.py b/tools/sections.py new file mode 100644 index 0000000..eb5ee37 --- /dev/null +++ b/tools/sections.py @@ -0,0 +1,51 @@ +from collections import Counter +from pathlib import Path +from typing import Tuple + +from more_itertools import peekable + + +def getBinaryFromFile(file: Path): + with file.open("rb") as f: + data = f.read() + for d in data: + yield d + +def findDiffs(lastFile, file): + bLastFile = peekable(getBinaryFromFile(lastFile)) + bfile = peekable(getBinaryFromFile(file)) + + sections = [] + counter = -1 + hasChanged = (bLastFile.peek() == bfile.peek()) + sectionTracker = dict({True: 0, False: 0}) + + while True: + try: + while (next(bLastFile) == next(bfile)) is hasChanged: + counter += 1 + except StopIteration: + sections.append((max(sectionTracker.values()), counter, not hasChanged)) + break + + sectionTracker[hasChanged] = counter + 1 + hasChanged = not hasChanged + sections.append((sectionTracker[hasChanged], counter, hasChanged)) + counter += 1 + + return sections + + +def findSections(filesDict: dict) -> Tuple[str, Counter]: + cTotal = Counter() + for (program, versionFiles) in filesDict.items(): + lastFile = None + c = Counter() + for (version, file) in versionFiles.items(): + if lastFile is not None: + diffs = findDiffs(lastFile.file, file.file) + c.update(diffs) + cTotal.update(diffs) + lastFile = file + yield program, c + yield "__total", cTotal diff --git a/tools/sections_test.py b/tools/sections_test.py new file mode 100644 index 0000000..c08668e --- /dev/null +++ b/tools/sections_test.py @@ -0,0 +1,43 @@ +import unittest +from pathlib import Path +from tempfile import NamedTemporaryFile + +from tools.sections import findDiffs + + +class FindDiffs_Test(unittest.TestCase): + def setUp(self) -> None: + self.fileA = Path(NamedTemporaryFile(delete=False).name) + self.fileB = Path(NamedTemporaryFile(delete=False).name) + self.fileC = Path(NamedTemporaryFile(delete=False).name) + + def tearDown(self) -> None: + self.fileA.unlink() + self.fileB.unlink() + self.fileC.unlink() + + def test_filesEqual_minimumFile(self): + self.assertFileDiff("a", "a", [(0, 0, False)]) + + def test_filesUnequal_minimumFile(self): + self.assertFileDiff("a", "b", [(0, 0, True)]) + + def test_filesEqual_equalSize(self): + self.assertFileDiff("aabbcc", "aabbcc", [(0, 5, False)]) + + def test_diffInCenter_equalSize(self): + self.assertFileDiff("aabbcc", "aa cc", [(0, 1, False), (2, 3, True), (4, 5, False)]) + + def test_differInSize(self): + self.assertFileDiff("aa", "aabb", [(0, 1, False)]) + + def assertFileDiff(self, contentA, contentB, result): + writeFile(self.fileA, contentA) + writeFile(self.fileB, contentB) + + self.assertEqual(result, findDiffs(self.fileA, self.fileB)) + + +def writeFile(file, content): + with file.open("w") as f: + f.write(content)