add deployment workflow to pypi.org

This commit is contained in:
botlabsDev 2020-06-07 18:12:16 +02:00
parent b3971d2fd3
commit de282a39e7
62 changed files with 2335 additions and 2 deletions

65
.github/workflows/CICD.yml vendored Normal file
View file

@ -0,0 +1,65 @@
name: npkpy
on: [push]
jobs:
run-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up Python 3.6
uses: actions/setup-python@v1
with:
python-version: 3.6
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -e .
- name: Lint with flake8
run: |
pip install flake8
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pip install pytest pytest-cov codecov
pytest --cov=./
codecov --token=${{ secrets.CODECOV_TOKEN }}
publish-to-pypi:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: Set up Python 3.6
uses: actions/setup-python@v1
with:
python-version: 3.6
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -e .
- name: Build a binary wheel and a source tarball
run: |
python3 setup.py sdist bdist_wheel
- name: Publish distribution
uses: pypa/gh-action-pypi-publish@master
with:
password: ${{ secrets.TEST_PYPI_PASSWORD }}
repository_url: https://test.pypi.org/legacy/
- name: Publish distribution to PyPI
if: startsWith(github.event.ref, 'refs/tags')
uses: pypa/gh-action-pypi-publish@master
with:
password: ${{ secrets.PYPI_PASSWORD }}

6
.gitignore vendored
View file

@ -127,3 +127,9 @@ dmypy.json
# Pyre type checker # Pyre type checker
.pyre/ .pyre/
## custom
.idea
virtualenv/

View file

@ -1,2 +1,60 @@
# npkpy [![Actions Status](https://github.com/botlabsDev/npkpy/workflows/Pytest/badge.svg)](https://github.com/botlabsDev/npkpy/actions)
MikroTik npk-format unpacker. [![codecov](https://codecov.io/gh/botlabsDev/npkpy/branch/master/graph/badge.svg?token=4ns6uIqoln)](https://codecov.io/gh/botlabsDev/npkpy)
# npkPy
The npkPy package module is an unpacking tool for MikroTiks custom NPK container format. The tool is capable
to display to the content of any NPK package and to export all container.
["NPK stands for MikroTik RouterOS upgrade package"](https://whatis.techtarget.com/fileformat/NPK-MikroTik-RouterOS-upgrade-package)
and since there is no solid unpacking tool for the format available, I want to share my approach of it.
The format, in general, is used by MikroTik to install and update the software on MikroTiks routerOs systems.
NPK packages can be found here: [MikroTik Archive](https://mikrotik.com/download/archive)
The code covers the ability to modify the container payload. Yet, this ability won't be available for cli.
Please be aware, that you can't create or modify __valid__ packages [since they are signed](https://forum.mikrotik.com/viewtopic.php?t=87126).
```
All recent packages are signed with EC-KCDSA signature,
and there's no way to create a valid npk file unless you know a secret key.
```
## Usage
```
$ npkPy is an unpacking tool for MikroTiks custom NPK container format
optional arguments:
-h, --help show this help message and exit
input:
--files FILES Select one or more files to process
--srcFolder SRCFOLDER
Process all NPK files found recursively in given source folder.
--glob GLOB Simple glob. Filter files from --srcFolder which match the given string.
output:
--dstFolder DSTFOLDER
Extract container into given folder
actions:
--showContainer List all container from selected NPK files
--exportAll Export all container from selected NPK files
--exportSquashFs Export all SquashFs container from selected NPK files
--exportZlib Export all Zlib compressed container from selected NPK files
```
Common understanding: A file represents an NPK package with multiple containers.
Each container 'contains' payloads like descriptions, SquashFs images or Zlib compressed data.
## Other unpacking tools
If npkPy does not work for you, check out older approaches of NPK unpacking tools:
* [mikrotik-npk](https://github.com/kost/mikrotik-npk)
* [npk-tools](https://github.com/rsa9000/npk-tools)

26
bootstrap.sh Executable file
View file

@ -0,0 +1,26 @@
#!/bin/bash
venv=${1:-virtualenv}
## setup virtualenv if not already exist
if [[ ! -e ${venv} ]]; then
virtualenv --python=python ${venv}
${venv}/bin/pip install pip --upgrade
${venv}/bin/pip install pip -r requirements.txt
${venv}/bin/pip install pip -e .
fi
git config user.name "botlabsDev"
git config user.email "git@botlabs.dev"
echo "--git config--"
echo -n "git user:"; git config user.name
echo -n "git email:"; git config user.email
echo "--------------"
## start virtualenv
source ${venv}/bin/activate

0
npkpy/__init__.py Normal file
View file

31
npkpy/analyseNpk.py Normal file
View file

@ -0,0 +1,31 @@
from npkpy.common import getFullPktInfo, extractContainer
from npkpy.npk.cntSquasFsImage import NPK_SQUASH_FS_IMAGE
from npkpy.npk.cntZlibCompressedData import NPK_ZLIB_COMPRESSED_DATA
from npkpy.npk.npkConstants import CNT_HANDLER
EXPORT_FOLDER_PREFIX = "npkPyExport_"
def analyseNpk(opts, npkFiles):
filterContainer = []
if opts.showContainer:
for file in npkFiles:
print("\n".join(getFullPktInfo(file)))
if opts.exportAll:
filterContainer = CNT_HANDLER.keys()
if opts.exportSquashFs:
filterContainer = [NPK_SQUASH_FS_IMAGE]
if opts.exportZlib:
filterContainer = [NPK_ZLIB_COMPRESSED_DATA]
if filterContainer:
for npkFile in npkFiles:
exportFolder = opts.dstFolder / f"{EXPORT_FOLDER_PREFIX}{npkFile.file.stem}"
exportFolder.mkdir(parents=True, exist_ok=True)
extractContainer(npkFile, exportFolder, filterContainer)
if not next(exportFolder.iterdir(), None):
exportFolder.rmdir()

66
npkpy/common.py Normal file
View file

@ -0,0 +1,66 @@
import hashlib
from pathlib import Path
from typing import List
def getAllNkpFiles(path, containStr=None):
return path.glob(f"**/*{containStr}*.npk" if containStr else "**/*.npk")
def extractContainer(npkFile, exportFolder, filterContainer):
for position, cnt in enumerate(npkFile.pck_cntList):
if cnt.cnt_id in filterContainer:
filePath = exportFolder / f"{position:03}_cnt_{cnt.cnt_idName}.raw"
writeToFile(filePath, cnt.cnt_payload)
def writeToFile(file, payloads):
written = 0
if not isinstance(payloads, list):
payloads = [payloads]
with open(file, "wb") as f:
for payload in payloads:
f.write(payload)
written += len(payload)
def getPktInfo(file):
return [str(file.file.name)]
def getCntInfo(file) -> List:
return [f"Cnt:{pos:3}:{c.cnt_idName}" for pos, c in file.pck_enumerateCnt]
def getFullPktInfo(file) -> List:
output = getPktInfo(file)
output += getCntInfo(file)
for cnt in file.pck_cntList:
output += getFullCntInfo(cnt)
return output
def getFullCntInfo(cnt) -> List:
info = []
idName, options = cnt.output_cnt
info.append(f"{idName}")
for option in options:
info.append(f" {option}")
return info
def sha1sumFromFile(file: Path):
with file.open('rb') as f:
return sha1sumFromBinary(f.read())
def sha1sumFromBinary(payloads):
if len(payloads) == 0:
return "<empty>"
sha1 = hashlib.sha1()
for payload in [payloads] if not isinstance(payloads, list) else payloads:
sha1.update(payload)
return sha1.digest()

43
npkpy/main.py Normal file
View file

@ -0,0 +1,43 @@
import argparse
from pathlib import Path
from npkpy.analyseNpk import analyseNpk
from npkpy.common import getAllNkpFiles
from npkpy.npk.npk import Npk
def parseArgs():
parser = argparse.ArgumentParser(description='npkPy is an unpacking tool for MikroTiks custom NPK container format')
inputGroup = parser.add_argument_group("input")
inputGroup.add_argument("--files", action='append', type=Path,
help="Select one or more files to process")
inputGroup.add_argument("--srcFolder", type=Path, default=Path("."),
help="Process all NPK files found recursively in given source folder.")
inputFilterGroup = inputGroup.add_mutually_exclusive_group()
inputFilterGroup.add_argument("--glob", type=str, default=None,
help="Simple glob. Filter files from --srcFolder which match the given string.")
outputGroup = parser.add_argument_group("output")
outputGroup.add_argument("--dstFolder", type=Path, default=Path(".") / "exportNpk",
help="Extract container into given folder")
actionGroup = parser.add_argument_group("actions")
exclusiveAction = actionGroup.add_mutually_exclusive_group(required=True)
exclusiveAction.add_argument("--showContainer", action="store_true",
help="List all container from selected NPK files")
exclusiveAction.add_argument("--exportAll", action="store_true",
help="Export all container from selected NPK files")
exclusiveAction.add_argument("--exportSquashFs", action="store_true",
help="Export all SquashFs container from selected NPK files")
exclusiveAction.add_argument("--exportZlib", action="store_true",
help="Export all Zlib compressed container from selected NPK files")
return parser.parse_args()
def main():
opts = parseArgs()
files = (Npk(f) for f in (opts.files if opts.files else getAllNkpFiles(opts.srcFolder, opts.glob)))
analyseNpk(opts, files)

View file

@ -0,0 +1,9 @@
from npkpy.npk.pckRequirementsHeader import PckRequirementsHeader
NPK_MULTICONTAINER_LIST = 20
class XCnt_MultiContainerList(PckRequirementsHeader):
@property
def _regularCntId(self):
return NPK_MULTICONTAINER_LIST

0
npkpy/npk/__init__.py Normal file
View file

View file

@ -0,0 +1,9 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_ARCHITECTURE_TAG = 16
class CntArchitectureTag(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_ARCHITECTURE_TAG

89
npkpy/npk/cntBasic.py Normal file
View file

@ -0,0 +1,89 @@
import logging
import struct
BYTES_LEN_CNT_ID = 2
BYTES_LEN_CNT_PAYLOAD_LEN = 4
NPK_CNT_BASIC = -1
class NpkContainerBasic:
"""
0____4____8____b____f
| | | | |
x0_|AABB|BBCC|C ..... C|
x1_|....|....|....|....|
A = Container Identifier
B = Payload length
C = Payload
"""
def __init__(self, data, offsetInPck):
self._data = bytearray(data)
self._offsetInPck = offsetInPck
self.modified = False
@property
def _regularCntId(self):
return NPK_CNT_BASIC
@property
def cnt_id(self):
cntId = struct.unpack_from(b"h", self._data, 0)[0]
if cntId != self._regularCntId:
raise RuntimeError(f"Cnt object does not represent given container typ {self._regularCntId}/{cntId}")
return cntId
@property
def cnt_idName(self):
return str(self.__class__.__name__)
@property
def cnt_payloadLen(self):
return (struct.unpack_from(b"I", self._data, 2))[0]
@cnt_payloadLen.setter
def cnt_payloadLen(self, payloadLen):
logging.warning("[MODIFICATION] Please be aware that modifications can break the npk structure")
self.modified = True
struct.pack_into(b"I", self._data, 2, payloadLen)
@property
def cnt_payload(self):
return struct.unpack_from(f"{self.cnt_payloadLen}s", self._data, 6)[0]
@cnt_payload.setter
def cnt_payload(self, payload):
tmpLen = len(payload)
tmpHead = self._data[:2 + 4]
tmpHead += struct.pack(f"{tmpLen}s", payload)
self._data = tmpHead
self.cnt_payloadLen = tmpLen
@property
def cnt_fullLength(self):
return BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + self.cnt_payloadLen
# return len(self._data)
@property
def output_cnt(self):
viewLen = min(10, self.cnt_payloadLen)
return (f"{self.cnt_idName}", [f"Cnt id: {self.cnt_id}",
f"Cnt offset: {self._offsetInPck}",
f"Cnt len: {self.cnt_fullLength}",
f"Payload len: {self.cnt_payloadLen}",
f"Payload[0:{viewLen}]: {self.cnt_payload[0:viewLen]} [...] "
])
@property
def cnt_fullBinary(self):
cntId = self.cnt_id
payloadLen = self.cnt_payloadLen
payload = struct.unpack_from(f"{self.cnt_payloadLen}s",
buffer=self._data,
offset=BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN)[0]
return struct.pack(b"=hI", cntId, payloadLen) + payload

View file

@ -0,0 +1,9 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_NULL_BLOCK = 22
class CntNullBlock(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_NULL_BLOCK

View file

@ -0,0 +1,19 @@
from npkpy.common import sha1sumFromBinary
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_SQUASH_FS_IMAGE = 21
class CntSquashFsImage(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_SQUASH_FS_IMAGE
@property
def cnt_payload_hash(self):
return sha1sumFromBinary(self.cnt_payload)
@property
def output_cnt(self):
idName, options = super().output_cnt
return idName, options + [f"calc Sha1Hash: {self.cnt_payload_hash}"]

View file

@ -0,0 +1,14 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_SQUASHFS_HASH_SIGNATURE = 9
class CntSquashFsHashSignature(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_SQUASHFS_HASH_SIGNATURE
@property
def output_cnt(self):
idName, options = super().output_cnt
return idName, options + [f"Payload[-10:]: {self.cnt_payload[-10:]}"]

View file

@ -0,0 +1,9 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_ZLIB_COMPRESSED_DATA = 4
class CntZlibCompressedData(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_ZLIB_COMPRESSED_DATA

101
npkpy/npk/npk.py Normal file
View file

@ -0,0 +1,101 @@
import struct
from pathlib import Path
from npkpy.npk.npkConstants import CNT_HANDLER
from npkpy.npk.cntBasic import BYTES_LEN_CNT_ID, BYTES_LEN_CNT_PAYLOAD_LEN
from npkpy.npk.npkFileBasic import FileBasic
MAGICBYTES = b"\x1e\xf1\xd0\xba"
BYTES_LEN_MAGIC_HEADER = 4
BYTES_LEN_PCK_SIZE_LEN = 4
class Npk(FileBasic):
"""
0____4____8____b____f
| | | | |
0_|AAAA|BBBB| C ..... |
1_|....|....|....|....|
A = MAGIC BYTES (4)
B = PCK SIZE (4)
C = Begin of Container area
"""
__cntList = None
def __init__(self, filePath: Path):
super().__init__(filePath)
self.cntOffset = 8
self._data = self.readDataFromFile(0, self.cntOffset)
self._checkMagicBytes(errorMsg="MagicBytes not found in Npk file")
self.pck_header = self.pck_cntList[0]
@property
def pck_magicBytes(self):
return struct.unpack_from(b"4s", self._data, 0)[0]
@property
def pck_payloadLen(self):
self.__pck_payloadUpdate()
payloadLen = struct.unpack_from(b"I", self._data, 4)[0]
return payloadLen
def __pck_payloadUpdate(self):
if any(cnt.modified for cnt in self.pck_cntList):
currentSize = 0
for cnt in self.pck_cntList:
currentSize += cnt.cnt_fullLength
cnt.modified = False
struct.pack_into(b"I", self._data, 4, currentSize)
@property
def pck_fullSize(self):
return BYTES_LEN_MAGIC_HEADER + BYTES_LEN_PCK_SIZE_LEN + self.pck_payloadLen
@property
def pck_fullBinary(self):
binary = MAGICBYTES + struct.pack("I", self.pck_payloadLen)
for c in self.pck_cntList:
binary += c.cnt_fullBinary
return binary
@property
def pck_enumerateCnt(self):
for pos, c in enumerate(self.pck_cntList):
yield pos, c
@property
def pck_cntList(self):
if not self.__cntList:
self.__cntList = self.__parseAllCnt()
return self.__cntList
def __parseAllCnt(self):
lst = []
offset = self.cntOffset
while offset < self.file.stat().st_size - 1:
lst.append(self.__getCnt(offset))
offset += BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + lst[-1].cnt_payloadLen
return lst
def __getCnt(self, offset):
cntId = struct.unpack_from("H", self.readDataFromFile(offset, 2))[0]
payloadLen = struct.unpack_from("I", self.readDataFromFile(offset + BYTES_LEN_CNT_ID, 4))[0]
pktLen = BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + payloadLen
data = self.readDataFromFile(offset, pktLen)
if len(data) != pktLen:
raise RuntimeError(f"File maybe corrupted. Please download again. File: {self.file.absolute()}")
try:
return CNT_HANDLER[cntId](data, offset)
except KeyError:
raise RuntimeError(f"failed with id: {cntId}\n"
f"New cnt id discovered in file: {self.file.absolute()}")
# except TypeError:
# raise RuntimeError(f"failed with id: {cntId}\n{self.file.absolute()}")
def _checkMagicBytes(self, errorMsg):
if not self.pck_magicBytes == MAGICBYTES:
raise RuntimeError(errorMsg)

54
npkpy/npk/npkConstants.py Normal file
View file

@ -0,0 +1,54 @@
from npkpy.npk.XCntMultiContainerList import NPK_MULTICONTAINER_LIST, XCnt_MultiContainerList
from npkpy.npk.cntArchitectureTag import NPK_ARCHITECTURE_TAG, CntArchitectureTag
from npkpy.npk.cntNullBlock import NPK_NULL_BLOCK, CntNullBlock
from npkpy.npk.pckReleaseTyp import NPK_RELEASE_TYP, PckReleaseTyp
from npkpy.npk.cntSquasFsImage import NPK_SQUASH_FS_IMAGE, CntSquashFsImage
from npkpy.npk.cntSquashFsHashSignature import NPK_SQUASHFS_HASH_SIGNATURE, CntSquashFsHashSignature
from npkpy.npk.cntZlibCompressedData import NPK_ZLIB_COMPRESSED_DATA, CntZlibCompressedData
from npkpy.npk.cntBasic import NPK_CNT_BASIC, NpkContainerBasic
from npkpy.npk.pckDescription import NPK_PCK_DESCRIPTION, PckDescription
from npkpy.npk.pckEckcdsaHash import NPK_ECKCDSA_HASH, PckEckcdsaHash
from npkpy.npk.pckHeader import NPK_PCK_HEADER, PckHeader
from npkpy.npk.pckRequirementsHeader import NPK_REQUIREMENTS_HEADER, PckRequirementsHeader
from npkpy.npk.xCntFlagB import NPK_FLAG_B, XCnt_flagB
from npkpy.npk.xCntFlagC import NPK_FLAG_C, XCnt_flagC
from npkpy.npk.xCntFlagA import NPK_FLAG_A, XCnt_flagA
from npkpy.npk.xCntMultiContainerHeader import NPK_MULTICONTAINER_HEADER, XCnt_multiContainerHeader
from npkpy.npk.xCntMpls import NPK_MPLS, XCntMpls
CNT_HANDLER = {
NPK_CNT_BASIC: NpkContainerBasic,
0: "?",
NPK_PCK_HEADER: PckHeader,
NPK_PCK_DESCRIPTION: PckDescription,
NPK_REQUIREMENTS_HEADER: PckRequirementsHeader,
NPK_ZLIB_COMPRESSED_DATA: CntZlibCompressedData,
5: "?",
6: "?",
NPK_FLAG_A: XCnt_flagA,
NPK_FLAG_B: XCnt_flagB,
NPK_SQUASHFS_HASH_SIGNATURE: CntSquashFsHashSignature,
10: "?",
11: "?",
12: "?",
13: "?",
14: "?",
15: "?",
NPK_ARCHITECTURE_TAG: CntArchitectureTag,
NPK_FLAG_C: XCnt_flagC,
NPK_MULTICONTAINER_HEADER: XCnt_multiContainerHeader,
NPK_MPLS: XCntMpls,
NPK_MULTICONTAINER_LIST: XCnt_MultiContainerList,
NPK_SQUASH_FS_IMAGE: CntSquashFsImage,
NPK_NULL_BLOCK: CntNullBlock,
NPK_ECKCDSA_HASH: PckEckcdsaHash,
NPK_RELEASE_TYP: PckReleaseTyp,
25: "?",
26: "?",
27: "?",
28: "?",
29: "?",
30: "?",
}

47
npkpy/npk/npkFileBasic.py Normal file
View file

@ -0,0 +1,47 @@
import re
from pathlib import Path
from npkpy.common import sha1sumFromFile
ARCHITECTURES = ['arm', 'mipsbe', 'mipsle', 'mmips', 'ppc', 'smips', 'tile', 'x86']
RE_SUFFIX = '\.(npk$)'
RE_VERSION = '-(\d\.\d\d\.\d)'
RE_PROGRAM_NAME = '(^[\w-]*)-'
class FileBasic:
__data = None
def __init__(self, filePath: Path):
self.file = filePath
@property
def filename_suffix(self):
return re.search(RE_SUFFIX, self.file.name).group(1)
@property
def filename_version(self):
return re.search(RE_VERSION, self.file.name).group(1)
@property
def filename_architecture(self):
for a in ARCHITECTURES:
if f"_{a}_" in self.file.name:
return a
return "x86"
@property
def filename_program(self):
name = re.search(RE_PROGRAM_NAME, self.file.name).group(1)
name.replace(f"_{self.filename_architecture}_", "")
return name
@property
def file_hash(self):
return sha1sumFromFile(self.file)
def readDataFromFile(self, offset, size):
with self.file.open("rb") as f:
f.seek(offset)
return bytearray(f.read(size))

View file

@ -0,0 +1,7 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_PCK_DESCRIPTION = 2
class PckDescription(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_PCK_DESCRIPTION

View file

@ -0,0 +1,9 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_ECKCDSA_HASH = 23
class PckEckcdsaHash(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_ECKCDSA_HASH

77
npkpy/npk/pckHeader.py Normal file
View file

@ -0,0 +1,77 @@
import datetime
import struct
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_PCK_HEADER = 1
class PckHeader(NpkContainerBasic):
"""
0____4____8____b____f
| | | | |
x0_|AABB|BBCC|CCCC|CCCC|
x1_|CCCC|CCDE|FGHH|HH..|
x2_|....|....|....|....|
A = Container Identifier (2)
B = Payload length (4)
C = Program Name (16)
D = Program version: revision
E = Program version: rc
F = Program version: minor
G = Program version: major
H = Build time
I = NULL BLock / Flags
"""
def __init__(self, data, offsetInPck):
super().__init__(data, offsetInPck)
self._offset = offsetInPck
self.flagOffset = 0
@property
def _regularCntId(self):
return NPK_PCK_HEADER
@property
def cnt_programName(self):
# TODO: b"" - b needs to be removed!
return bytes(struct.unpack_from(b"16B", self._data, 6)).decode().rstrip('\x00')
@property
def cnt_osVersion(self):
revision = (struct.unpack_from(b"B", self._data, 22))[0]
rc = (struct.unpack_from(b"B", self._data, 23))[0]
minor = (struct.unpack_from(b"B", self._data, 24))[0]
major = (struct.unpack_from(b"B", self._data, 25))[0]
return f"{major}.{minor}.{revision} - rc(?): {rc}"
@property
def cnt_built_time(self):
return datetime.datetime.utcfromtimestamp(struct.unpack_from(b"I", self._data, 26)[0])
@property
def cnt_nullBlock(self):
return struct.unpack_from(b"4B", self._data, 30)
@property
def cnt_flags(self):
try:
return struct.unpack_from(b"7B", self._data, 34)
except struct.error:
## pkt with version 5.23 seems to have only four flags.
return struct.unpack_from(b"4B", self._data, 34)
@property
def output_cnt(self):
idName, options = super().output_cnt
return (idName, options + [f"Program name: {self.cnt_programName}",
f"Os version: {self.cnt_osVersion}",
f"Created at: {self.cnt_built_time}",
f"NullBlock: {self.cnt_nullBlock}",
f"Flags: {self.cnt_flags}"
])

View file

@ -0,0 +1,9 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_RELEASE_TYP = 24
class PckReleaseTyp(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_RELEASE_TYP

View file

@ -0,0 +1,93 @@
import struct
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_REQUIREMENTS_HEADER = 3
class PckRequirementsHeader(NpkContainerBasic):
def _versionOneTwo(foo):
def check(self):
if self.cnt_structure_id > 0:
return foo(self)
return "<not available for version 0>"
return check
def _versionTwo(foo):
def check(self):
if self.cnt_structure_id > 1:
return foo(self)
return "<not available for version 0,1>"
return check
def __init__(self, data, offsetInPck):
super().__init__(data, offsetInPck)
self._offset = offsetInPck
@property
def _regularCntId(self):
return NPK_REQUIREMENTS_HEADER
@property
def cnt_structure_id(self):
return struct.unpack_from(b"H", self._data, 6)[0]
@property
@_versionOneTwo
def cnt_programName(self):
return bytes(struct.unpack_from(b"16B", self._data, 8)).decode().rstrip('\x00')
@property
@_versionOneTwo
def cnt_osVersionFrom(self):
revision = (struct.unpack_from(b"B", self._data, 24))[0]
rc = (struct.unpack_from(b"B", self._data, 25))[0]
minor = (struct.unpack_from(b"B", self._data, 26))[0]
major = (struct.unpack_from(b"B", self._data, 27))[0]
return f"{major}.{minor}.{revision} - rc(?): {rc}"
@property
@_versionOneTwo
def cnt_nullBlock(self):
return struct.unpack_from(b"BBBB", self._data, 28)
@property
@_versionOneTwo
def cnt_osVersionTo(self):
revision = (struct.unpack_from(b"B", self._data, 32))[0]
rc = (struct.unpack_from(b"B", self._data, 33))[0]
minor = (struct.unpack_from(b"B", self._data, 34))[0]
major = (struct.unpack_from(b"B", self._data, 35))[0]
return f"{major}.{minor}.{revision} - rc(?): {rc}"
@property
@_versionTwo
def cnt_flags(self):
return struct.unpack_from(b"4B", self._data, 36)
@property
def cnt_fullBinary(self):
id = self.cnt_id
payload_len = self.cnt_payloadLen
payload = struct.unpack_from(f"{self.cnt_payloadLen}s", self._data, 2 + 4)[0]
# if self.cnt_structure_id > 2:
# print(self.cnt_flags)
# return struct.pack(f"HH", id, payload_len) + se + payload
return struct.pack(f"=HI", id, payload_len) + payload
@property
def output_cnt(self):
idName, opt = super().output_cnt
options = [f"Cnt id: {self.cnt_id}",
f"StructID: {self.cnt_structure_id}",
f"Offset: {self._offset}",
f"Program name: {self.cnt_programName}",
f"Null block: {self.cnt_nullBlock}",
f"Os versionFrom: {self.cnt_osVersionFrom}",
f"Os versionTo: {self.cnt_osVersionTo}",
f"Flags: {self.cnt_flags}"
]
return (f"{self.cnt_idName}", opt + options)

14
npkpy/npk/xCntFlagA.py Normal file
View file

@ -0,0 +1,14 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_FLAG_A = 7
class XCnt_flagA(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_FLAG_A
# @property
# def cnt_payload(self):
# # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n '

14
npkpy/npk/xCntFlagB.py Normal file
View file

@ -0,0 +1,14 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_FLAG_B = 8
class XCnt_flagB(NpkContainerBasic):
# TODO: found in gps-5.23-mipsbe.npk
@property
def _regularCntId(self):
return NPK_FLAG_B
# @property
# def cnt_payload(self):
# # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n '

14
npkpy/npk/xCntFlagC.py Normal file
View file

@ -0,0 +1,14 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_FLAG_C = 17
class XCnt_flagC(NpkContainerBasic):
##TODO: found in multicast-3.30-mipsbe.npk
@property
def _regularCntId(self):
return NPK_FLAG_C
# @property
# def cnt_payload(self):
# # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n '

9
npkpy/npk/xCntMpls.py Normal file
View file

@ -0,0 +1,9 @@
from npkpy.npk.pckRequirementsHeader import PckRequirementsHeader
NPK_MPLS = 19
class XCntMpls(PckRequirementsHeader):
@property
def _regularCntId(self):
return NPK_MPLS

View file

@ -0,0 +1,15 @@
import struct
from npkpy.npk.pckHeader import PckHeader
NPK_MULTICONTAINER_HEADER: int = 18
class XCnt_multiContainerHeader(PckHeader):
@property
def _regularCntId(self):
return NPK_MULTICONTAINER_HEADER
@property
def cnt_flags(self):
return struct.unpack_from(b"4B", self._data, 34)

11
pytest.sh Executable file
View file

@ -0,0 +1,11 @@
!/bin/bash
if [ ! -z "$1" ]
then
clear; echo "test specific file: $1"
pytest --cov=./ $1 -v
else
clear; echo "test all project files"
pytest --cov=npkpy --cov=acceptance_test -v
fi

8
requirements.txt Normal file
View file

@ -0,0 +1,8 @@
pytest_httpserver
urlpath
setuptools
wheel
twine
pytest
pytest-cov

32
setup.py Executable file
View file

@ -0,0 +1,32 @@
#!/usr/bin/env python3.8
import setuptools
from datetime import datetime
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="npkPy",
version=f"{(datetime.now()).strftime('%Y.%m.%d.%H.%M')}",
description="npkPy is an unpacker tool for MikroTiks custom npk container format",
author='botlabsDev',
author_email='npkPy@botlabs.dev',
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/botlabsDev/npkpy",
packages=setuptools.find_packages(),
python_requires='>=3.6',
classifiers=[
"Programming Language :: Python :: 3.6",
"License :: OSI Approved :: GNU Affero General Public License v3",
"Operating System :: OS Independent",
],
entry_points={
'console_scripts': [
"npkPy=npkpy.main:main",
# "npkDownloader=npkpy.download:main",
],
},
install_requires=[
],
)

View file

@ -0,0 +1,16 @@
import struct
import unittest
from npkpy.npk.XCntMultiContainerList import XCnt_MultiContainerList
from tests.constants import DummyBasicCnt
class Test_xCnt_MultiContainerList(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 20)
self.cnt = XCnt_MultiContainerList(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(20, self.cnt.cnt_id)

0
tests/__init__.py Normal file
View file

118
tests/cntBasic_test.py Normal file
View file

@ -0,0 +1,118 @@
import struct
import unittest
from npkpy.npk.cntBasic import NpkContainerBasic
from tests.constants import DummyBasicCnt
class Test_npkContainerBasic(unittest.TestCase):
def setUp(self) -> None:
self.cnt = NpkContainerBasic(DummyBasicCnt().binCnt, offsetInPck=0)
def test_extractCntId(self):
self.assertEqual(-1, self.cnt.cnt_id)
def test_failForWrongCntId(self):
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 999)
cnt = NpkContainerBasic(dummyCnt.binCnt, offsetInPck=0)
with self.assertRaises(RuntimeError) as e:
_ = cnt.cnt_id
self.assertEqual("Cnt object does not represent given container typ -1/999", e.exception.args[0])
def test_getNameOfContainerType(self):
self.assertEqual("NpkContainerBasic", self.cnt.cnt_idName)
def test_extractPayloadLen(self):
self.assertEqual(7, self.cnt.cnt_payloadLen)
def test_extractPayload(self):
self.assertEqual(b"Payload", self.cnt.cnt_payload)
def test_extractCntFromGivenOffset(self):
self.assertEqual(len(DummyBasicCnt().binCnt), self.cnt.cnt_fullLength)
def test_giveOverviewOfCnt(self):
expectedResult = ('NpkContainerBasic', [f'Cnt id: -1',
'Cnt offset: 0',
'Cnt len: 13',
'Payload len: 7',
"Payload[0:7]: b'Payload' [...] "])
self.assertEqual(expectedResult, self.cnt.output_cnt)
def test_getFullBinaryOfContainer(self):
self.assertEqual(DummyBasicCnt().binCnt, self.cnt.cnt_fullBinary)
class Test_modifyNpkContainerBasic(unittest.TestCase):
def setUp(self) -> None:
self.cnt = NpkContainerBasic(DummyBasicCnt().binCnt, offsetInPck=0)
def test_increaseCntSize(self):
origCntFullLength = self.cnt.cnt_fullLength
origPayloadLen = self.cnt.cnt_payloadLen
self.cnt.cnt_payloadLen += 3
self.assertEqual(7, origPayloadLen)
self.assertEqual(10, self.cnt.cnt_payloadLen)
self.assertEqual(13, origCntFullLength)
self.assertEqual(16, self.cnt.cnt_fullLength)
def test_decreaseCntSize(self):
origCntFullLength = self.cnt.cnt_fullLength
origPayloadLen = self.cnt.cnt_payloadLen
self.cnt.cnt_payloadLen -= 4
self.assertEqual(7, origPayloadLen)
self.assertEqual(13, origCntFullLength)
self.assertEqual(3, self.cnt.cnt_payloadLen)
self.assertEqual(9, self.cnt.cnt_fullLength)
def test_failAccessPayload_afterIncreasingPayloadLenField(self):
origPayloadLen = self.cnt.cnt_payloadLen
self.cnt.cnt_payloadLen += 3
self.assertEqual(origPayloadLen + 3, self.cnt.cnt_payloadLen)
with self.assertRaises(struct.error):
_ = self.cnt.cnt_payload
def test_decreasingPayloadLenField_decreaseFullCntLenAndPayload(self):
origCntFullLength = self.cnt.cnt_fullLength
origPayloadLen = self.cnt.cnt_payloadLen
self.cnt.cnt_payloadLen -= 4
self.assertEqual(origPayloadLen - 4, self.cnt.cnt_payloadLen)
self.assertEqual(origCntFullLength - 4, self.cnt.cnt_fullLength)
self.assertEqual(b"Pay", self.cnt.cnt_payload)
def test_failDecreasingPayloadLenFieldBelowZero(self):
with self.assertRaises(struct.error) as e:
self.cnt.cnt_payloadLen -= 8
self.assertEqual("argument out of range", e.exception.args[0])
def test_increasePayload_updatePayloadLen(self):
replacingPayload = b"NewTestPayload"
self.cnt.cnt_payload = replacingPayload
self.assertEqual(replacingPayload, self.cnt.cnt_payload)
self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen)
def test_decreasePayload_updatePayloadLen(self):
replacingPayload = b"New"
self.cnt.cnt_payload = replacingPayload
self.assertEqual(replacingPayload, self.cnt.cnt_payload)
self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen)
def test_NullPayload_updatePayloadLenToZero(self):
replacingPayload = b""
self.cnt.cnt_payload = replacingPayload
self.assertEqual(replacingPayload, self.cnt.cnt_payload)
self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen)

View file

@ -0,0 +1,16 @@
import struct
import unittest
from npkpy.npk.cntNullBlock import CntNullBlock
from tests.constants import DummyBasicCnt
class Test_cntNullBlock(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 22)
self.cnt = CntNullBlock(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(22, self.cnt.cnt_id)

View file

@ -0,0 +1,27 @@
import struct
import unittest
from npkpy.npk.cntSquasFsImage import CntSquashFsImage
from tests.constants import DummyBasicCnt
class Test_cntSquashFsImage(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 21)
self.cnt = CntSquashFsImage(dummyCnt.binCnt, offsetInPck=0)
self.expectedHash = b'\xc3\x04\x15\xea\xccjYDit\xb7\x16\xef\xf5l\xf2\x82\x19\x81]'
def test_validateCntId(self):
self.assertEqual(21, self.cnt.cnt_id)
def test_payload_hash(self):
self.assertEqual(self.expectedHash, self.cnt.cnt_payload_hash)
def test_giveOverviewOfCnt(self):
expected = f"calc Sha1Hash: {self.expectedHash}"
_, cntData = self.cnt.output_cnt
self.assertEqual(expected, cntData[-1])

View file

@ -0,0 +1,22 @@
import struct
import unittest
from npkpy.npk.cntSquashFsHashSignature import CntSquashFsHashSignature
from tests.constants import DummyBasicCnt
class Test_cntSquashFsHashSignature(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 9)
self.cnt = CntSquashFsHashSignature(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(9, self.cnt.cnt_id)
def test_giveOverviewOfCnt(self):
expected = "Payload[-10:]: b'Payload'"
_, cntData = self.cnt.output_cnt
self.assertEqual(expected, cntData[-1])

View file

@ -0,0 +1,17 @@
import struct
import unittest
from npkpy.npk.cntSquashFsHashSignature import CntSquashFsHashSignature
from tests.constants import DummyBasicCnt
class Test_cntSquashFsHashSignature(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 9)
self.cnt = CntSquashFsHashSignature(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(9, self.cnt.cnt_id)

102
tests/common_test.py Normal file
View file

@ -0,0 +1,102 @@
import tempfile
import unittest
from pathlib import Path
from npkpy.common import getPktInfo, getCntInfo, getAllNkpFiles
from npkpy.npk.npk import Npk
class Common_Test(unittest.TestCase):
def setUp(self) -> None:
self.npkFile = Path(tempfile.NamedTemporaryFile(suffix=".npk").name)
self.npkFile.write_bytes(Path("tests/testData/gps-6.45.6.npk").read_bytes())
self.npk = Npk(self.npkFile)
def tearDown(self) -> None:
if self.npkFile.exists():
self.npkFile.unlink()
def test_getBasicPktInfo(self):
self.assertEqual([(str(self.npkFile.name))], getPktInfo(self.npk))
def test_getBasicCntInfo(self):
self.assertEqual(['Cnt: 0:PckHeader',
'Cnt: 1:PckReleaseTyp',
'Cnt: 2:CntArchitectureTag',
'Cnt: 3:PckDescription',
'Cnt: 4:PckEckcdsaHash',
'Cnt: 5:PckRequirementsHeader',
'Cnt: 6:CntNullBlock',
'Cnt: 7:CntSquashFsImage',
'Cnt: 8:CntSquashFsHashSignature',
'Cnt: 9:CntArchitectureTag'], getCntInfo(self.npk), )
def test_getFullPktInfo(self):
pass
# result = getFullPktInfo(self.npk)
#
# self.assertEqual(['Cnt: 0:PckHeader',
# 'Cnt: 1:PckReleaseTyp',
# 'Cnt: 2:PckArchitectureTag',
# 'Cnt: 3:PckDescription',
# 'Cnt: 4:PckSha1Hash',
# 'Cnt: 5:PckRequirementsHeader',
# 'Cnt: 6:PckNullBlock',
# 'Cnt: 7:PckSquashFsImage',
# 'Cnt: 8:PckSquashFsHashSignature',
# 'Cnt: 9:PckArchitectureTag'], result)
def test_getFullCntInfo(self):
pass
class Test_findNpkFiles(unittest.TestCase):
def setUp(self) -> None:
self.tmpPath = Path(tempfile.TemporaryDirectory().name)
self.tmpPath.mkdir(parents=True)
self.expectedFiles = []
def tearDown(self) -> None:
for file in self.tmpPath.rglob("*"):
if file.is_file():
file.unlink()
for folder in sorted(self.tmpPath.rglob("*"), key=lambda _path: str(_path.absolute()).count("/"), reverse=True):
folder.rmdir()
self.tmpPath.rmdir()
def test_findMultipleNpkFiles_inFolder(self):
self.addExistingFiles(["fileA.npk", "fileB.npk", "fileC.npk"])
self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath)))
def test_findMultipleNpkFilesRecursive(self):
self.addExistingFiles(["fileA.npk", "subB/fileB.npk", "subB/subC/fileC.npk"])
self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath)))
def test_selectOnlyNpkFiles(self):
self.addExistingFiles(["fileA.npk", "fileB.exe", "fileC.txt"])
self.expectedFiles = [self.tmpPath / "fileA.npk"]
self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath)))
def test_globOnlyNpkFilesFittingPattern(self):
self.addExistingFiles(["fi__pattern__leA.npk", "fileB.npk", "fi__pattern__leC.exe", "fileD.exe"])
self.expectedFiles = [self.tmpPath / "fi__pattern__leA.npk"]
self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath, containStr="__pattern__")))
def assertExistingFiles(self, result):
self.assertEqual(self.expectedFiles, result)
def addExistingFiles(self, files):
for file in files:
f = self.tmpPath / file
f.parent.mkdir(parents=True, exist_ok=True)
f.touch()
self.expectedFiles.append(f)

89
tests/constants.py Normal file
View file

@ -0,0 +1,89 @@
# HEADER
import struct
from npkpy.npk.pckHeader import NPK_PCK_HEADER
MAGICBYTES = b"\x1e\xf1\xd0\xba"
PCKSIZE = struct.pack("I", 28)
MAGIC_AND_SIZE = MAGICBYTES + PCKSIZE
# OPENING ARCHITECTURE_TAG
SET_HEADER_TAG_ID = struct.pack("H", NPK_PCK_HEADER) # b"\x01\x00"
SET_HEADER_TAG_PAYLOAD = bytes("NAME OF PROGRAM".encode())
SET_HEADER_TAG_SIZE = struct.pack("I", len(SET_HEADER_TAG_PAYLOAD))
CNT_SET_ARCHITECTURE_TAG = SET_HEADER_TAG_ID + \
SET_HEADER_TAG_SIZE + \
SET_HEADER_TAG_PAYLOAD
# CLOSING ARCHITECTURE_TAG
CLOSING_ARCHITECTURE_TAG_ID = struct.pack("H", 1) # b"\x01\x00"
CLOSING_ARCHITECTURE_TAG_PAYLOAD = struct.pack("s", b"I")
CLOSING_ARCHITECTURE_TAG_SIZE = struct.pack("I", len(CLOSING_ARCHITECTURE_TAG_PAYLOAD))
CNT_CLOSING_ARCHITECTURE_TAG = CLOSING_ARCHITECTURE_TAG_ID + \
CLOSING_ARCHITECTURE_TAG_SIZE + \
CLOSING_ARCHITECTURE_TAG_PAYLOAD
# MINIMAL_NPK_PAKAGE
MINIMAL_NPK_PACKAGE = MAGIC_AND_SIZE + \
CNT_SET_ARCHITECTURE_TAG + \
CNT_CLOSING_ARCHITECTURE_TAG
def getDummyNpkBinary(payload=None):
if not payload:
payload = DummyHeaderCnt().binHeaderCntA
pckPayload = payload
pckLen = struct.pack("I", len(pckPayload))
npkBinary = MAGICBYTES + pckLen + pckPayload
return npkBinary
class DummyBasicCnt:
_00_cnt_id = struct.pack("h", -1)
_01_cnt_payload_len = struct.pack("I", 7)
_02_cnt_payload = struct.pack("7s", b"Payload")
@property
def binCnt(self):
return self._00_cnt_id + \
self._01_cnt_payload_len + \
self._02_cnt_payload
class DummyHeaderCnt:
_00_cnt_id = struct.pack("H", 1)
_01_cnt_payload_len = struct.pack("I", 35)
_02_cnt_programName = struct.pack("16s", b"01234567890abcdef")
_03_cnt_versionRevision = struct.pack("B", 3)
_04_cnt_versionRc = struct.pack("B", 4)
_05_cnt_versionMinor = struct.pack("B", 2)
_06_cnt_versionMajor = struct.pack("B", 1)
_07_cnt_buildTime = struct.pack("I", 1)
_08_cnt_nullBock = struct.pack("I", 0)
_09a_cnt_flagsA = struct.pack("7B", 0, 0, 0, 0, 0, 0, 0)
# _09b_cnt_flagsB = struct.pack("4B", 0, 0, 0, 0)
@property
def binHeaderCntA(self):
return self._binBasicHeaderCnt + self._09a_cnt_flagsA
# @property
# def binHeaderCntB(self):
# return self._binBasicHeaderCnt + self._09b_cnt_flagsB
@property
def _binBasicHeaderCnt(self):
return self._00_cnt_id + \
self._01_cnt_payload_len + \
self._02_cnt_programName + \
self._03_cnt_versionRevision + \
self._04_cnt_versionRc + \
self._05_cnt_versionMinor + \
self._06_cnt_versionMajor + \
self._07_cnt_buildTime + \
self._08_cnt_nullBock

View file

@ -0,0 +1,14 @@
import unittest
from npkpy.npk.npkConstants import CNT_HANDLER
from tests.constants import DummyBasicCnt
class Test_npkConstants(unittest.TestCase):
def test_validateAssignment_DictIdIsContainerId(self):
for cnt_id, cnt_class in CNT_HANDLER.items():
if cnt_class != "?":
cnt = cnt_class(DummyBasicCnt().binCnt, 0)
self.assertEqual(cnt_id, cnt._regularCntId,
msg=f"{cnt_id}!={cnt._regularCntId}")

View file

@ -0,0 +1,21 @@
import unittest
from pathlib import Path
from npkpy.npk.npkFileBasic import FileBasic
class FileInfo_Test(unittest.TestCase):
def setUp(self) -> None:
self.file = FileBasic(Path("advanced-tools-6.41.3.npk"))
def test_file(self):
self.assertEqual(Path("advanced-tools-6.41.3.npk"), self.file.file)
def test_versionName(self):
self.assertEqual("6.41.3", self.file.filename_version)
def test_programName(self):
self.assertEqual("advanced-tools", self.file.filename_program)
def test_programSuffix(self):
self.assertEqual("npk", self.file.filename_suffix)

View file

@ -0,0 +1,162 @@
import datetime
import unittest
from pathlib import Path
from npkpy.npk.npk import Npk, MAGICBYTES
class GpsFile_Test(unittest.TestCase):
def setUp(self) -> None:
self.npkFile = Path("tests/testData/gps-6.45.6.npk")
self.npk = Npk(self.npkFile)
self.cnt = self.npk.pck_cntList
class ParseGpsNpkFile_Test(GpsFile_Test):
def test_fileInfos(self):
self.assertEqual('gps', self.npk.filename_program)
self.assertEqual('6.45.6', self.npk.filename_version)
self.assertEqual('npk', self.npk.filename_suffix)
self.assertEqual('x86', self.npk.filename_architecture)
self.assertEqual(b'\xc6\x16\xf0\x9d~lS\xa7z\xba}.\xe5\xa6w=\xe9\xb4S\xe7', self.npk.file_hash)
def test_NpkHeader(self):
self.assertEqual(MAGICBYTES, self.npk.pck_magicBytes)
self.assertEqual(53321, self.npk.pck_payloadLen)
self.assertEqual(53329, self.npk.pck_fullSize)
def test_PckHeader(self):
self.assertEqual(1, self.npk.pck_header.cnt_id)
self.assertEqual("PckHeader", self.npk.pck_header.cnt_idName)
self.assertEqual(36, self.npk.pck_header.cnt_payloadLen)
self.assertEqual(b'gps\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06f-\x06\x97gw]'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00', self.npk.pck_header.cnt_payload)
self.assertEqual("gps", self.npk.pck_header.cnt_programName)
self.assertEqual("6.45.6 - rc(?): 102", self.npk.pck_header.cnt_osVersion)
self.assertEqual(datetime.datetime(2019, 9, 10, 9, 6, 31), self.npk.pck_header.cnt_built_time)
self.assertEqual((0, 0, 0, 0), self.npk.pck_header.cnt_nullBlock)
self.assertEqual((0, 0, 0, 0, 2, 0, 0), self.npk.pck_header.cnt_flags)
def test_ReleaseTyp(self):
cnt = self.cnt[1]
self.assertEqual(24, cnt.cnt_id)
self.assertEqual(50, cnt._offsetInPck)
self.assertEqual("PckReleaseTyp", cnt.cnt_idName)
self.assertEqual(6, cnt.cnt_payloadLen)
self.assertEqual(b"stable", cnt.cnt_payload)
self.assertEqual(12, cnt.cnt_fullLength)
def test_PckArchitectureTag(self):
cnt = self.cnt[2]
self.assertEqual(16, cnt.cnt_id)
self.assertEqual(62, cnt._offsetInPck)
self.assertEqual("CntArchitectureTag", cnt.cnt_idName)
self.assertEqual(4, cnt.cnt_payloadLen)
self.assertEqual(b"i386", cnt.cnt_payload)
self.assertEqual(10, cnt.cnt_fullLength)
def test_PckDescription(self):
cnt = self.cnt[3]
self.assertEqual(2, cnt.cnt_id)
self.assertEqual(72, cnt._offsetInPck)
self.assertEqual("PckDescription", cnt.cnt_idName)
self.assertEqual(25, cnt.cnt_payloadLen)
self.assertEqual(b'Provides support for GPS.', cnt.cnt_payload)
self.assertEqual(31, cnt.cnt_fullLength)
def test_PckHash(self):
cnt = self.cnt[4]
self.assertEqual(23, cnt.cnt_id)
self.assertEqual(103, cnt._offsetInPck)
self.assertEqual("PckEckcdsaHash", cnt.cnt_idName)
self.assertEqual(40, cnt.cnt_payloadLen)
self.assertEqual(b'1a7d206bbfe626c55aa6d2d2caabb6a5a990f13d', cnt.cnt_payload)
self.assertEqual(46, cnt.cnt_fullLength)
def test_PckRequirementsHeader(self):
cnt = self.cnt[5]
self.assertEqual(3, cnt.cnt_id)
self.assertEqual(149, cnt._offsetInPck)
self.assertEqual("PckRequirementsHeader", cnt.cnt_idName)
self.assertEqual(1, cnt.cnt_structure_id)
self.assertEqual(34, cnt.cnt_payloadLen)
self.assertEqual('system', cnt.cnt_programName)
self.assertEqual((0, 0, 0, 0), cnt.cnt_nullBlock)
self.assertEqual("6.45.6 - rc(?): 102", cnt.cnt_osVersionFrom)
self.assertEqual("6.45.6 - rc(?): 102", cnt.cnt_osVersionTo)
self.assertEqual("<not available for version 0,1>", cnt.cnt_flags)
self.assertEqual(40, cnt.cnt_fullLength)
def test_PckNullBlock(self):
cnt = self.cnt[6]
self.assertEqual(22, cnt.cnt_id)
self.assertEqual(189, cnt._offsetInPck)
self.assertEqual("CntNullBlock", cnt.cnt_idName)
self.assertEqual(3895, cnt.cnt_payloadLen)
self.assertEqual(b'\x00' * 3895, cnt.cnt_payload)
self.assertEqual(3901, cnt.cnt_fullLength)
def test_PckSquashFsImage(self):
cnt = self.cnt[7]
self.assertEqual(21, cnt.cnt_id)
self.assertEqual(4090, cnt._offsetInPck)
self.assertEqual("CntSquashFsImage", cnt.cnt_idName)
self.assertEqual(49152, cnt.cnt_payloadLen)
self.assertEqual(b'hsqs', cnt.cnt_payload[0:4])
self.assertEqual(49158, cnt.cnt_fullLength)
def test_PckSquashFsHashSignature(self):
cnt = self.cnt[8]
self.assertEqual(9, cnt.cnt_id)
self.assertEqual(53248, cnt._offsetInPck)
self.assertEqual("CntSquashFsHashSignature", cnt.cnt_idName)
self.assertEqual(68, cnt.cnt_payloadLen)
self.assertEqual(b'\x8e\xa2\xb1\x8e\xf7n\xef355', cnt.cnt_payload[0:10])
self.assertEqual(74, cnt.cnt_fullLength)
def test_parseGpsFilxe_PckArchitectureTag_Closing(self):
cnt = self.cnt[9]
self.assertEqual(16, cnt.cnt_id)
self.assertEqual(53322, cnt._offsetInPck)
self.assertEqual("CntArchitectureTag", cnt.cnt_idName)
self.assertEqual(1, cnt.cnt_payloadLen)
self.assertEqual(b'I', cnt.cnt_payload[0:10])
self.assertEqual(7, cnt.cnt_fullLength)
def test_checkStructure(self):
self.assertEqual(10, len(self.npk.pck_cntList))
self.assertEqual([1, 24, 16, 2, 23, 3, 22, 21, 9, 16], list(cnt.cnt_id for cnt in self.npk.pck_cntList))
class WriteModifiedGpsFile_Test(GpsFile_Test):
def test_modify_PckRequirementsHeader(self):
x = b"\x03\x00\x22\x00\x00\x00\x01\x00\x73\x79\x73\x74\x65\x6d\x00\x00" + \
b"\x00\x00\x00\x00\x00\x00\x00\x00\x06\x66\x2d\x06\x00\x00\x00\x00" + \
b"\x06\x66\x2d\x06\x00\x00\x00\x00"
cnt = None
for c in self.cnt:
if c.cnt_id == 3:
cnt = c
break
self.assertEqual(x, cnt.cnt_fullBinary)
def test_createFile_changePayloadTwice(self):
oldPayload = self.npk.pck_header.cnt_payload
self.npk.pck_header.cnt_payload = b"A"
self.npk.pck_header.cnt_payload = oldPayload
self.assertEqual(Npk(self.npkFile).file.read_bytes(), self.npk.pck_fullBinary)

70
tests/npk_test.py Normal file
View file

@ -0,0 +1,70 @@
import struct
import tempfile
import unittest
from pathlib import Path
from npkpy.npk.npk import Npk
from npkpy.npk.pckHeader import PckHeader
from tests.constants import DummyHeaderCnt, MAGICBYTES, getDummyNpkBinary
class Test_npkClass(unittest.TestCase):
def setUp(self) -> None:
self.npkFile = Path(tempfile.NamedTemporaryFile(suffix=".npk").name)
self.npkFile.write_bytes(getDummyNpkBinary())
def test_fileIsNoNpkFile(self):
self.npkFile.write_bytes(b"NoMagicBytesAtHeadOfFile")
with self.assertRaises(RuntimeError) as e:
_ = Npk(self.npkFile).pck_magicBytes
self.assertEqual(e.exception.args[0], "MagicBytes not found in Npk file")
def test_npkFileIsCorrupt_fileCorruptException(self):
self.npkFile.write_bytes(MAGICBYTES + b"CorruptFile")
with self.assertRaises(RuntimeError) as e:
_ = Npk(self.npkFile).pck_cntList
self.assertEqual(e.exception.args[0],
f"File maybe corrupted. Please download again. File: {self.npkFile.absolute()}")
def test_extractMagicBytes(self):
self.assertEqual(MAGICBYTES, Npk(self.npkFile).pck_magicBytes)
def test_extractLenOfNpkPayload_propagatedSizeIsValid(self):
self.assertEqual(len(DummyHeaderCnt().binHeaderCntA), Npk(self.npkFile).pck_payloadLen)
def test_calculatePckFullSize_equalsFileSize(self):
self.assertEqual(self.npkFile.stat().st_size, Npk(self.npkFile).pck_fullSize)
def test_getNpkBinary_equalsOriginalBinary(self):
npkBinary = self.npkFile.read_bytes()
self.assertEqual(npkBinary, Npk(self.npkFile).pck_fullBinary)
def test_getEnumeratedListOfCntInNpk(self):
cntList = list(Npk(self.npkFile).pck_enumerateCnt)
cntId, cnt = cntList[0]
self.assertEqual(1, len(cntList))
self.assertEqual(0, cntId)
self.assertTrue(isinstance(cnt, PckHeader))
def test_getAllCnt_returnAsList(self):
cntList = Npk(self.npkFile).pck_cntList
self.assertEqual(1, len(cntList))
self.assertTrue(isinstance(cntList[0], PckHeader))
def test_getAllCnt_exceptionWithUnknownCntInNpk(self):
unknownCnt = DummyHeaderCnt()
unknownCnt._00_cnt_id = struct.pack("H", 999)
self.npkFile.write_bytes(getDummyNpkBinary(payload=unknownCnt.binHeaderCntA))
with self.assertRaises(RuntimeError) as e:
_ = Npk(self.npkFile).pck_cntList
self.assertEqual(e.exception.args[0], f"failed with id: 999\n"
f"New cnt id discovered in file: {self.npkFile.absolute()}")

View file

@ -0,0 +1,16 @@
import struct
import unittest
from npkpy.npk.pckDescription import PckDescription
from tests.constants import DummyBasicCnt
class Test_pckDescription(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 2)
self.cnt = PckDescription(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(2, self.cnt.cnt_id)

View file

@ -0,0 +1,16 @@
import struct
import unittest
from npkpy.npk.pckEckcdsaHash import PckEckcdsaHash
from tests.constants import DummyBasicCnt
class Test_pckEckcdsaHash(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 23)
self.cnt = PckEckcdsaHash(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(23, self.cnt.cnt_id)

View file

@ -0,0 +1,16 @@
import struct
import unittest
from npkpy.npk.pckReleaseTyp import PckReleaseTyp
from tests.constants import DummyBasicCnt
class Test_pckReleaseTyp(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 24)
self.cnt = PckReleaseTyp(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(24, self.cnt.cnt_id)

Binary file not shown.

View file

@ -0,0 +1,86 @@
gps-6.45.6.npk
Cnt: 0:PckHeader
Cnt: 1:PckReleaseTyp
Cnt: 2:CntArchitectureTag
Cnt: 3:PckDescription
Cnt: 4:PckEckcdsaHash
Cnt: 5:PckRequirementsHeader
Cnt: 6:CntNullBlock
Cnt: 7:CntSquashFsImage
Cnt: 8:CntSquashFsHashSignature
Cnt: 9:CntArchitectureTag
PckHeader
Cnt id: 1
Cnt offset: 8
Cnt len: 42
Payload len: 36
Payload[0:10]: b'gps\x00\x00\x00\x00\x00\x00\x00' [...]
Program name: gps
Os version: 6.45.6 - rc(?): 102
Created at: 2019-09-10 09:06:31
NullBlock: (0, 0, 0, 0)
Flags: (0, 0, 0, 0, 2, 0, 0)
PckReleaseTyp
Cnt id: 24
Cnt offset: 50
Cnt len: 12
Payload len: 6
Payload[0:6]: b'stable' [...]
CntArchitectureTag
Cnt id: 16
Cnt offset: 62
Cnt len: 10
Payload len: 4
Payload[0:4]: b'i386' [...]
PckDescription
Cnt id: 2
Cnt offset: 72
Cnt len: 31
Payload len: 25
Payload[0:10]: b'Provides s' [...]
PckEckcdsaHash
Cnt id: 23
Cnt offset: 103
Cnt len: 46
Payload len: 40
Payload[0:10]: b'1a7d206bbf' [...]
PckRequirementsHeader
Cnt id: 3
Cnt offset: 149
Cnt len: 40
Payload len: 34
Payload[0:10]: b'\x01\x00system\x00\x00' [...]
Cnt id: 3
StructID: 1
Offset: 149
Program name: system
Null block: (0, 0, 0, 0)
Os versionFrom: 6.45.6 - rc(?): 102
Os versionTo: 6.45.6 - rc(?): 102
Flags: <not available for version 0,1>
CntNullBlock
Cnt id: 22
Cnt offset: 189
Cnt len: 3901
Payload len: 3895
Payload[0:10]: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' [...]
CntSquashFsImage
Cnt id: 21
Cnt offset: 4090
Cnt len: 49158
Payload len: 49152
Payload[0:10]: b'hsqs\x15\x00\x00\x00\xa2m' [...]
calc Sha1Hash: b'c\x033\x82{\x9a$er\x92=o\x8a\xfeL-Q\x02hW'
CntSquashFsHashSignature
Cnt id: 9
Cnt offset: 53248
Cnt len: 74
Payload len: 68
Payload[0:10]: b'\x8e\xa2\xb1\x8e\xf7n\xef355' [...]
Payload[-10:]: b"Y\x9e'\xac\xccw\x91\x06]\t"
CntArchitectureTag
Cnt id: 16
Cnt offset: 53322
Cnt len: 7
Payload len: 1
Payload[0:1]: b'I' [...]

17
tests/xCntFlagA_test.py Normal file
View file

@ -0,0 +1,17 @@
import struct
import unittest
from npkpy.npk.xCntFlagA import XCnt_flagA
from tests.constants import DummyBasicCnt
class Test_xCnt_flagB(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 7)
self.cnt = XCnt_flagA(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(7, self.cnt.cnt_id)

18
tests/xCntFlagB_test.py Normal file
View file

@ -0,0 +1,18 @@
import struct
import unittest
from npkpy.npk.xCntFlagB import XCnt_flagB
from tests.constants import DummyBasicCnt
class Test_xCnt_flagB(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 8)
self.cnt = XCnt_flagB(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(8, self.cnt.cnt_id)

17
tests/xCntFlagC_test.py Normal file
View file

@ -0,0 +1,17 @@
import struct
import unittest
from npkpy.npk.xCntFlagC import XCnt_flagC
from tests.constants import DummyBasicCnt
class Test_xCnt_flagC(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 17)
self.cnt = XCnt_flagC(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(17, self.cnt.cnt_id)

16
tests/xCntMpls_test.py Normal file
View file

@ -0,0 +1,16 @@
import struct
import unittest
from npkpy.npk.xCntMpls import XCntMpls
from tests.constants import DummyBasicCnt
class Test_xCntMpls(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 19)
self.cnt = XCntMpls(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(19, self.cnt.cnt_id)

View file

View file

@ -0,0 +1,66 @@
import subprocess
import tempfile
import unittest
from pathlib import Path
class Test_npkPy(unittest.TestCase):
def setUp(self) -> None:
# TODO: create DummyPkg and replace gps-6.45.6.npk
self.npkFile = Path("tests/testData/gps-6.45.6.npk")
self.pathToNpk = str(self.npkFile.absolute())
self.npkContainerList = Path("tests/testData/gps-6.45.6.result").read_text()
self.dstFolder = Path(tempfile.mkdtemp())
def tearDown(self) -> None:
[f.unlink() for f in self.dstFolder.rglob("*") if f.is_file()]
[f.rmdir() for f in self.dstFolder.rglob("*")]
self.dstFolder.rmdir()
def test_showAllContainersFromNpkPkg(self):
cmd = ["npkPy", "--file", self.pathToNpk, "--showContainer"]
output = runCmdInTerminal(cmd)
self.assertEqual(self.npkContainerList, output)
def test_exportAllContainerFromNpk(self):
cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportAll"]
runCmdInTerminal(cmd)
exportedContainer = sorted(str(f.relative_to(self.dstFolder)) for f in self.dstFolder.rglob('*'))
self.assertEqual(['npkPyExport_gps-6.45.6',
'npkPyExport_gps-6.45.6/000_cnt_PckHeader.raw',
'npkPyExport_gps-6.45.6/001_cnt_PckReleaseTyp.raw',
'npkPyExport_gps-6.45.6/002_cnt_CntArchitectureTag.raw',
'npkPyExport_gps-6.45.6/003_cnt_PckDescription.raw',
'npkPyExport_gps-6.45.6/004_cnt_PckEckcdsaHash.raw',
'npkPyExport_gps-6.45.6/005_cnt_PckRequirementsHeader.raw',
'npkPyExport_gps-6.45.6/006_cnt_CntNullBlock.raw',
'npkPyExport_gps-6.45.6/007_cnt_CntSquashFsImage.raw',
'npkPyExport_gps-6.45.6/008_cnt_CntSquashFsHashSignature.raw',
'npkPyExport_gps-6.45.6/009_cnt_CntArchitectureTag.raw'], exportedContainer)
def test_extractSquashFsContainerFromNpk(self):
cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportSquashFs"]
runCmdInTerminal(cmd)
self.assertContainerExtracted(['npkPyExport_gps-6.45.6',
'npkPyExport_gps-6.45.6/007_cnt_CntSquashFsImage.raw'])
#
def test_extractZlibContainerFromNpk_NonExisitngNotExtracted(self):
cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportZlib"]
runCmdInTerminal(cmd)
self.assertContainerExtracted([])
def assertContainerExtracted(self, expectedFiles):
extractedContainer = sorted(str(f.relative_to(self.dstFolder)) for f in self.dstFolder.rglob('*'))
self.assertEqual(expectedFiles, extractedContainer)
def runCmdInTerminal(cmd):
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.decode("UTF-8")

0
tools/__init__.py Normal file
View file

128
tools/download_all_packages.py Executable file
View file

@ -0,0 +1,128 @@
import socket
import sys
import urllib.request
import zipfile
from datetime import datetime
from pathlib import Path
from time import sleep
from urllib.error import HTTPError, URLError
from zipfile import ZipFile
from urlpath import URL
class MikroTikDownloader:
def __init__(self, dstPath):
self.dstPath = Path(dstPath)
self.sleepTime = 1
socket.setdefaulttimeout(10)
def downloadAll(self, urls):
missingFiles = self._determineMissingFiles(urls)
for link in missingFiles:
unpackFile(self._downloadFile(link))
def _downloadFile(self, url):
def _progress(count, block_size, total_size):
progress = (float(count * block_size) / float(total_size) * 100.0)
if int(progress) > 100:
raise RuntimeError("AccessDenied")
sys.stderr.write(self._msg(url) + ': [%.1f%%] ' % progress)
sys.stderr.flush()
targetFile = self._convertToLocalFilePath(url)
targetFile.parent.mkdir(exist_ok=True, parents=True)
while True:
try:
urllib.request.urlretrieve(f"http:{url.resolve()}", targetFile, _progress)
self.msg(url, "[100.0%]")
self.dec_sleep()
break
except HTTPError as e:
self.msg(url, f"[failed] (Error code: {e.code})")
break
except socket.timeout:
self.msg(url, "[failed] (Timeout)")
self.inc_sleep()
except RuntimeError as e:
self.msg(url, f"[failed] ([{e}])")
if targetFile.exists():
targetFile.unlink()
break
except URLError as e:
self.msg(url, f"[failed] (Error code: {e})")
self.inc_sleep()
return targetFile
def inc_sleep(self):
sleep(self.sleepTime)
self.sleepTime += 1
def dec_sleep(self):
if self.sleepTime > 1:
self.sleepTime -= 1
def _determineMissingFiles(self, urls):
for url in urls:
if not self._convertToLocalFilePath(url).exists():
yield url
else:
self.msg(url, "[ok]")
def msg(self, url, msg):
print(f"{self._msg(url)}: {msg}")
def _msg(self, url):
return f"\r[{datetime.now().time()}] Download {url.name:35}"
def _convertToLocalFilePath(self, link: URL):
pktName = arc = version = ""
elements = link.stem.split("-")
count = len(elements)
if count == 2:
pktName, version = elements
if count == 3:
pktName, arc, version = elements
return self.dstPath / pktName / arc / version / link.name
def fetchWebsite(url):
with urllib.request.urlopen(url) as response:
return response.read().decode("utf-8")
def extractDownloadLinks(tHtml, filterString):
return (URL(line.split('"')[1]) for line in tHtml.split() if filterString in line)
def filterLinks(tLinks, filters):
return (link for link in tLinks if
any(True for name, suffix in filters
if name in link.stem and
suffix in link.suffix
)
)
def unpackFile(file: Path):
if file.exists() and file.suffix == ".zip":
try:
with ZipFile(file, 'r') as zipObj:
zipObj.extractall(file.parent)
except zipfile.BadZipFile:
file.unlink()
if __name__ == "__main__":
html = fetchWebsite("https://mikrotik.com/download/archive")
links = extractDownloadLinks(html, "download.mikrotik.com")
selectedLinks = filterLinks(links, [("all_packages", ".zip"),
(".", ".npk")
]
)
mtd = MikroTikDownloader("./downloads")
mtd.downloadAll(selectedLinks)

View file

@ -0,0 +1,105 @@
import tempfile
import unittest
from pathlib import Path
from zipfile import ZipFile
from pytest_httpserver import HTTPServer
from urlpath import URL
from tools.download_all_packages import fetchWebsite, extractDownloadLinks, filterLinks, \
unpackFile
class Test_MikroTikDownloader(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
print("setup Class!")
cls.s = HTTPServer()
cls.s.start()
cls.testUrl = f"http://localhost:{cls.s.port}"
# cls.s.expect_request("/download/archive").respond_with_data(createArchiveWebsite(cls.testUrl))
# cls.s.expect_request("/routeros/6.36.4/all_packages-mipsbe-6.36.4.zip").respond_with_data(b"data",
# content_type="application/zip")
@classmethod
def tearDownClass(cls) -> None:
cls.s.stop()
def setUp(self) -> None:
self.s.clear()
def test_downloadWebsite(self):
self.s.expect_request("/").respond_with_data(b"testData")
result = fetchWebsite(f"http://localhost:{self.s.port}")
self.assertEqual("testData", result)
def test_extractDownloadLinks(self):
testData = "dummyTest \n"
testData += "<td><a href=\"//filterString.com/prod/vers/programA-arch-version.zip\">program-arch-version.zip>\n"
testData += "dummyTest \n"
result = extractDownloadLinks(testData, "filterString.com")
self.assertEqual([URL("//filterString.com/prod/vers/programA-arch-version.zip")], list(result))
def test_filterForSpecificLinks_basedOnSuffix(self):
links = [URL("//filterString.com/prod/vers/programA-arch-version.zip"),
URL("//filterString.com/prod/vers/programB-arch-version.WRONGSUFFIX")]
result = filterLinks(links, [("program", ".zip")])
self.assertEqual([URL("//filterString.com/prod/vers/programA-arch-version.zip")], list(result))
def test_filterForSpecificLinks_basedOnProgram(self):
links = [URL("//filterString.com/prod/vers/programA-arch-version.exe"),
URL("//filterString.com/prod/vers/programB-arch-version.zip")]
result = filterLinks(links, ([("programB", ".zip")]))
self.assertEqual([URL("//filterString.com/prod/vers/programB-arch-version.zip")], list(result))
def test_dontExtractZip_fileDontExist(self):
file = Path(tempfile.NamedTemporaryFile(suffix="WRONGSUFFIX").name)
unpackFile(file)
def test_dontExtractZip_fileHasNoZipSuffix(self):
file = Path(tempfile.NamedTemporaryFile(suffix="WRONGSUFFIX").name)
file.touch()
unpackFile(file)
def test_extractZip_validatePayload(self):
zipFile, origTxtFile = _createTxtInZipFile(txtPayload="TEST DATA")
unzipedFile = Path(f"{origTxtFile.parent}/{origTxtFile.absolute()}")
unpackFile(zipFile)
self.assertTrue(zipFile.exists())
self.assertTrue(unzipedFile.exists())
self.assertEqual("TEST DATA", unzipedFile.read_text())
def _createTxtInZipFile(txtPayload):
txtFile = Path(tempfile.NamedTemporaryFile(suffix=".txt").name)
zipFile = Path(tempfile.NamedTemporaryFile(suffix=".zip").name)
with txtFile.open("w") as f:
f.write(txtPayload)
with ZipFile(zipFile, 'w') as zipObj:
zipObj.write(txtFile.absolute())
return zipFile, txtFile
def createArchiveWebsite(testUrl):
return f"""
<tr>
<td width='40px'><div class='icon zip'></div></td>
<td><a href=\"//{testUrl}/routeros/6.36.4/all_packages-mipsbe-6.36.4.zip\">all_packages-mipsbe-6.36.4>
<td class='pl'>mipsbe</td>
</tr>
"""

97
tools/npkModify_test.py Normal file
View file

@ -0,0 +1,97 @@
import tempfile
import unittest
from pathlib import Path
from tests.constants import MINIMAL_NPK_PACKAGE
from npkpy.npk.npk import Npk
class BasicNpkTestRequirements(unittest.TestCase):
def setUp(self) -> None:
self.testNpk = Path(tempfile.NamedTemporaryFile().name)
self.testNpk.write_bytes(MINIMAL_NPK_PACKAGE)
self.npk = Npk(self.testNpk)
def tearDown(self) -> None:
self.testNpk.unlink()
class ParseTestPackage_Test(BasicNpkTestRequirements):
def test_minimalNPK_parseHeader(self):
self.assertEqual(b"\x1e\xf1\xd0\xba", self.npk.pck_magicBytes)
self.assertEqual(28, self.npk.pck_payloadLen)
def test_minimalNPK_parseAllContainer(self):
listOfCnt = self.npk.pck_cntList
self.assertEqual(1, listOfCnt[0].cnt_id)
self.assertEqual(15, listOfCnt[0].cnt_payloadLen)
self.assertEqual(b"NAME OF PROGRAM", listOfCnt[0].cnt_payload)
self.assertEqual(1, listOfCnt[1].cnt_id)
self.assertEqual(1, listOfCnt[1].cnt_payloadLen)
self.assertEqual(b"I", listOfCnt[1].cnt_payload)
class ModifyPayload_Test(BasicNpkTestRequirements):
def setUp(self) -> None:
super().setUp()
self.cnt = self.npk.pck_cntList[0]
def test_emptyPayload_emptyContainer(self):
self.cnt.cnt_payload = b""
self.assertEqual(1, self.cnt.cnt_id)
self.assertEqual(0, self.cnt.cnt_payloadLen)
self.assertEqual(b"", self.cnt.cnt_payload)
self.assertEqual(6, self.cnt.cnt_fullLength)
self.assertEqual(13, self.npk.pck_payloadLen)
self.assertEqual(21, self.npk.pck_fullSize)
def test_dontchangePayloadSize_recalculateContainerKeepSize(self):
self.cnt.cnt_payload = b"PROGRAM OF NAME"
self.assertEqual(1, self.cnt.cnt_id)
self.assertEqual(15, self.cnt.cnt_payloadLen)
self.assertEqual(b"PROGRAM OF NAME", self.cnt.cnt_payload)
self.assertEqual(21, self.cnt.cnt_fullLength)
self.assertEqual(28, self.npk.pck_payloadLen)
self.assertEqual(36, self.npk.pck_fullSize)
def test_increasePayloadLen_recalculateContainerSizeBigger(self):
self.cnt.cnt_payload = b"NEW NAME OF PROGRAM"
self.assertEqual(1, self.cnt.cnt_id)
self.assertEqual(19, self.cnt.cnt_payloadLen)
self.assertEqual(b"NEW NAME OF PROGRAM", self.cnt.cnt_payload)
self.assertEqual(25, self.cnt.cnt_fullLength)
self.assertEqual(32, self.npk.pck_payloadLen)
self.assertEqual(40, self.npk.pck_fullSize)
def test_decreasePayloadLen_recalculateContainerSmaller(self):
self.cnt.cnt_payload = b"SHORT NAME"
self.assertEqual(1, self.cnt.cnt_id)
self.assertEqual(10, self.cnt.cnt_payloadLen)
self.assertEqual(b"SHORT NAME", self.cnt.cnt_payload)
self.assertEqual(16, self.cnt.cnt_fullLength)
self.assertEqual(23, self.npk.pck_payloadLen)
self.assertEqual(31, self.npk.pck_fullSize)
class WriteModifiedFile_Test(BasicNpkTestRequirements):
def setUp(self) -> None:
super().setUp()
self.cnt = self.npk.pck_cntList[0]
def test_createFile_withoutModification(self):
self.assertEqual(MINIMAL_NPK_PACKAGE, self.npk.pck_fullBinary)
def test_createFile_changePayloadTwice(self):
self.cnt.cnt_payload = b"A"
self.cnt.cnt_payload = b"NAME OF PROGRAM"
self.assertEqual(MINIMAL_NPK_PACKAGE, self.npk.pck_fullBinary)

51
tools/sections.py Normal file
View file

@ -0,0 +1,51 @@
from collections import Counter
from pathlib import Path
from typing import Tuple
from more_itertools import peekable
def getBinaryFromFile(file: Path):
with file.open("rb") as f:
data = f.read()
for d in data:
yield d
def findDiffs(lastFile, file):
bLastFile = peekable(getBinaryFromFile(lastFile))
bfile = peekable(getBinaryFromFile(file))
sections = []
counter = -1
hasChanged = (bLastFile.peek() == bfile.peek())
sectionTracker = dict({True: 0, False: 0})
while True:
try:
while (next(bLastFile) == next(bfile)) is hasChanged:
counter += 1
except StopIteration:
sections.append((max(sectionTracker.values()), counter, not hasChanged))
break
sectionTracker[hasChanged] = counter + 1
hasChanged = not hasChanged
sections.append((sectionTracker[hasChanged], counter, hasChanged))
counter += 1
return sections
def findSections(filesDict: dict) -> Tuple[str, Counter]:
cTotal = Counter()
for (program, versionFiles) in filesDict.items():
lastFile = None
c = Counter()
for (version, file) in versionFiles.items():
if lastFile is not None:
diffs = findDiffs(lastFile.file, file.file)
c.update(diffs)
cTotal.update(diffs)
lastFile = file
yield program, c
yield "__total", cTotal

43
tools/sections_test.py Normal file
View file

@ -0,0 +1,43 @@
import unittest
from pathlib import Path
from tempfile import NamedTemporaryFile
from tools.sections import findDiffs
class FindDiffs_Test(unittest.TestCase):
def setUp(self) -> None:
self.fileA = Path(NamedTemporaryFile(delete=False).name)
self.fileB = Path(NamedTemporaryFile(delete=False).name)
self.fileC = Path(NamedTemporaryFile(delete=False).name)
def tearDown(self) -> None:
self.fileA.unlink()
self.fileB.unlink()
self.fileC.unlink()
def test_filesEqual_minimumFile(self):
self.assertFileDiff("a", "a", [(0, 0, False)])
def test_filesUnequal_minimumFile(self):
self.assertFileDiff("a", "b", [(0, 0, True)])
def test_filesEqual_equalSize(self):
self.assertFileDiff("aabbcc", "aabbcc", [(0, 5, False)])
def test_diffInCenter_equalSize(self):
self.assertFileDiff("aabbcc", "aa cc", [(0, 1, False), (2, 3, True), (4, 5, False)])
def test_differInSize(self):
self.assertFileDiff("aa", "aabb", [(0, 1, False)])
def assertFileDiff(self, contentA, contentB, result):
writeFile(self.fileA, contentA)
writeFile(self.fileB, contentB)
self.assertEqual(result, findDiffs(self.fileA, self.fileB))
def writeFile(file, content):
with file.open("w") as f:
f.write(content)