Compare commits

...

No commits in common. "firstTag" and "master" have entirely different histories.

94 changed files with 1741 additions and 1351 deletions

View file

@ -1,65 +0,0 @@
name: npkpy
on: [push]
jobs:
run-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up Python 3.6
uses: actions/setup-python@v1
with:
python-version: 3.6
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -e .
- name: Lint with flake8
run: |
pip install flake8
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pip install pytest pytest-cov codecov
pytest --cov=./
codecov --token=${{ secrets.CODECOV_TOKEN }}
publish-to-pypi:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: Set up Python 3.6
uses: actions/setup-python@v1
with:
python-version: 3.6
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -e .
- name: Build a binary wheel and a source tarball
run: |
python3 setup.py sdist bdist_wheel
- name: Publish distribution
uses: pypa/gh-action-pypi-publish@master
with:
password: ${{ secrets.TEST_PYPI_PASSWORD }}
repository_url: https://test.pypi.org/legacy/
- name: Publish distribution to PyPI
if: startsWith(github.event.ref, 'refs/tags')
uses: pypa/gh-action-pypi-publish@master
with:
password: ${{ secrets.PYPI_PASSWORD }}

71
.github/workflows/cicd.yml vendored Normal file
View file

@ -0,0 +1,71 @@
name: CI/CD
on:
release:
push:
schedule:
- cron: '0 2 * * *' # run at 2 AM UTC
jobs:
tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/setup-python@v1
with:
python-version: 3.6
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
pip install -e .
- name: Pylint
run: |
pip install pytest pytest-cov codecov
pylint --rcfile=.pylintrc npkpy
- name: Pytest and Coverage
run: |
pip install pytest pytest-cov codecov
pytest --cov=npkpy --cov=acceptance_test
- name: Publish to codecov
run: |
codecov --token=${{ secrets.CODECOV_TOKEN }}
release:
runs-on: ubuntu-latest
needs: tests
if: github.event_name == 'release' || github.event_name == 'push'
steps:
- uses: actions/checkout@v1
- uses: actions/setup-python@v1
with:
python-version: 3.6
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
pip install -e .
- name: Build a binary wheel and a source tarball
run: |
python3 setup.py sdist bdist_wheel
- name: Release npkPy to PyPI
# if: github.event_name == 'release'
uses: pypa/gh-action-pypi-publish@master
with:
password: ${{ secrets.PYPI_PASSWORD }}
#- name: Release npkPy to test.pypi.org
# uses: pypa/gh-action-pypi-publish@master
# with:
# password: ${{ secrets.TEST_PYPI_PASSWORD }}
# repository_url: https://test.pypi.org/legacy/

13
.pylintrc Normal file
View file

@ -0,0 +1,13 @@
[MESSAGES CONTROL]
disable=superfluous-parens,
missing-class-docstring,
missing-module-docstring,
missing-function-docstring,
fixme,
not-callable,
C0103,
W0212,
[MISCELLANEOUS]
max-line-length=120

View file

@ -5,7 +5,7 @@
# npkPy
The npkPy package module is an unpacking tool for MikroTiks custom NPK container format. The tool is capable
to display to the content of any NPK package and to export all container.
to display the content of any NPK package and to export all container.
["NPK stands for MikroTik RouterOS upgrade package"](https://whatis.techtarget.com/fileformat/NPK-MikroTik-RouterOS-upgrade-package)
and since there is no solid unpacking tool for the format available, I want to share my approach of it.
@ -21,6 +21,12 @@ All recent packages are signed with EC-KCDSA signature,
and there's no way to create a valid npk file unless you know a secret key.
```
## Installation
```
pip install npkPy
```
## Usage
```

View file

@ -6,12 +6,12 @@ venv=${1:-virtualenv}
if [[ ! -e ${venv} ]]; then
virtualenv --python=python ${venv}
${venv}/bin/pip install pip --upgrade
${venv}/bin/pip install pip -r requirements.txt
${venv}/bin/pip install pip -r requirements-dev.txt
${venv}/bin/pip install pip -e .
fi
git config user.name "botlabsDev"
git config user.email "git@botlabs.dev"
git config user.email "54632107+botlabsDev@users.noreply.github.com"
echo "--git config--"
echo -n "git user:"; git config user.name
echo -n "git email:"; git config user.email

View file

@ -1,31 +0,0 @@
from npkpy.common import getFullPktInfo, extractContainer
from npkpy.npk.cntSquasFsImage import NPK_SQUASH_FS_IMAGE
from npkpy.npk.cntZlibCompressedData import NPK_ZLIB_COMPRESSED_DATA
from npkpy.npk.npkConstants import CNT_HANDLER
EXPORT_FOLDER_PREFIX = "npkPyExport_"
def analyseNpk(opts, npkFiles):
filterContainer = []
if opts.showContainer:
for file in npkFiles:
print("\n".join(getFullPktInfo(file)))
if opts.exportAll:
filterContainer = CNT_HANDLER.keys()
if opts.exportSquashFs:
filterContainer = [NPK_SQUASH_FS_IMAGE]
if opts.exportZlib:
filterContainer = [NPK_ZLIB_COMPRESSED_DATA]
if filterContainer:
for npkFile in npkFiles:
exportFolder = opts.dstFolder / f"{EXPORT_FOLDER_PREFIX}{npkFile.file.stem}"
exportFolder.mkdir(parents=True, exist_ok=True)
extractContainer(npkFile, exportFolder, filterContainer)
if not next(exportFolder.iterdir(), None):
exportFolder.rmdir()

32
npkpy/analyse_npk.py Normal file
View file

@ -0,0 +1,32 @@
from npkpy.common import get_full_pkt_info, extract_container
from npkpy.npk.cnt_squasfs_image import NPK_SQUASH_FS_IMAGE
from npkpy.npk.cnt_zlib_compressed_data import NPK_ZLIB_COMPRESSED_DATA
from npkpy.npk.npk_constants import CNT_HANDLER
EXPORT_FOLDER_PREFIX = "npkPyExport_"
def analyse_npk(opts, npk_files):
filter_container = []
if opts.show_container:
for file in npk_files:
print("\n".join(get_full_pkt_info(file)))
if opts.export_all:
print("export all!!")
filter_container = CNT_HANDLER.keys()
if opts.export_squashfs:
filter_container = [NPK_SQUASH_FS_IMAGE]
if opts.export_zlib:
filter_container = [NPK_ZLIB_COMPRESSED_DATA]
if filter_container:
for npk_file in npk_files:
export_folder = opts.dst_folder / f"{EXPORT_FOLDER_PREFIX}{npk_file.file.stem}"
export_folder.mkdir(parents=True, exist_ok=True)
extract_container(npk_file, export_folder, filter_container)
if not next(export_folder.iterdir(), None):
export_folder.rmdir()

View file

@ -3,64 +3,72 @@ from pathlib import Path
from typing import List
def getAllNkpFiles(path, containStr=None):
return path.glob(f"**/*{containStr}*.npk" if containStr else "**/*.npk")
def get_all_nkp_files(path, contain_str=None):
return path.glob(f"**/*{contain_str}*.npk" if contain_str else "**/*.npk")
def extractContainer(npkFile, exportFolder, filterContainer):
for position, cnt in enumerate(npkFile.pck_cntList):
if cnt.cnt_id in filterContainer:
filePath = exportFolder / f"{position:03}_cnt_{cnt.cnt_idName}.raw"
writeToFile(filePath, cnt.cnt_payload)
def extract_container(npk_file, export_folder, container_ids):
for position, cnt in enumerate(npk_file.pck_cnt_list):
if cnt.cnt_id in container_ids:
file_path = export_folder / f"{position:03}_cnt_{cnt.cnt_id_name}.raw"
write_to_file(file_path, cnt.cnt_payload)
def writeToFile(file, payloads):
written = 0
if not isinstance(payloads, list):
payloads = [payloads]
with open(file, "wb") as f:
def write_to_file(file, payloads):
payloads = [payloads] if not isinstance(payloads, list) else payloads
with open(file, "wb") as _file:
for payload in payloads:
f.write(payload)
written += len(payload)
_file.write(payload)
def getPktInfo(file):
def get_short_pkt_info(file) -> List:
return [str(file.file.name)]
def getCntInfo(file) -> List:
return [f"Cnt:{pos:3}:{c.cnt_idName}" for pos, c in file.pck_enumerateCnt]
def getFullPktInfo(file) -> List:
output = getPktInfo(file)
output += getCntInfo(file)
for cnt in file.pck_cntList:
output += getFullCntInfo(cnt)
def get_full_pkt_info(file) -> List:
output = get_short_pkt_info(file)
output += get_short_cnt_info(file)
for cnt in file.pck_cnt_list:
output += get_full_cnt_info(cnt)
return output
def getFullCntInfo(cnt) -> List:
def get_short_cnt_info(file) -> List:
return [f"Cnt:{pos:3}:{c.cnt_id_name}" for pos, c in file.pck_enumerate_cnt]
def get_full_cnt_info(cnt) -> List:
info = []
idName, options = cnt.output_cnt
info.append(f"{idName}")
id_name, options = cnt.output_cnt
info.append(f"{id_name}")
for option in options:
info.append(f" {option}")
return info
def sha1sumFromFile(file: Path):
with file.open('rb') as f:
return sha1sumFromBinary(f.read())
def sha1_sum_from_file(file: Path):
with file.open('rb') as _file:
return sha1_sum_from_binary(_file.read())
def sha1sumFromBinary(payloads):
def sha1_sum_from_binary(payloads):
if len(payloads) == 0:
return "<empty>"
return b"<empty>"
sha1 = hashlib.sha1()
for payload in [payloads] if not isinstance(payloads, list) else payloads:
sha1.update(payload)
return sha1.digest()
class NPKIdError(BaseException):
pass
class NPKMagicBytesError(BaseException):
pass
class NPKError(BaseException):
pass

View file

@ -1,43 +1,47 @@
import argparse
from pathlib import Path
from npkpy.analyseNpk import analyseNpk
from npkpy.common import getAllNkpFiles
from npkpy.analyse_npk import analyse_npk
from npkpy.common import get_all_nkp_files
from npkpy.npk.npk import Npk
def parseArgs():
def parse_args():
parser = argparse.ArgumentParser(description='npkPy is an unpacking tool for MikroTiks custom NPK container format')
inputGroup = parser.add_argument_group("input")
inputGroup.add_argument("--files", action='append', type=Path,
help="Select one or more files to process")
inputGroup.add_argument("--srcFolder", type=Path, default=Path("."),
help="Process all NPK files found recursively in given source folder.")
input_group = parser.add_argument_group("input")
input_group.add_argument("--files",
action='append', type=Path, help="Select one or more files to process")
inputFilterGroup = inputGroup.add_mutually_exclusive_group()
inputFilterGroup.add_argument("--glob", type=str, default=None,
help="Simple glob. Filter files from --srcFolder which match the given string.")
input_group.add_argument("--src-folder",
type=Path, default=Path("."),
help="Process all NPK files found recursively in given source folder.")
outputGroup = parser.add_argument_group("output")
outputGroup.add_argument("--dstFolder", type=Path, default=Path(".") / "exportNpk",
help="Extract container into given folder")
input_filter_group = input_group.add_mutually_exclusive_group()
input_filter_group.add_argument("--glob",
type=str, default=None,
help="Simple glob. Filter files from --srcFolder which match the given string.")
actionGroup = parser.add_argument_group("actions")
exclusiveAction = actionGroup.add_mutually_exclusive_group(required=True)
exclusiveAction.add_argument("--showContainer", action="store_true",
help="List all container from selected NPK files")
exclusiveAction.add_argument("--exportAll", action="store_true",
help="Export all container from selected NPK files")
exclusiveAction.add_argument("--exportSquashFs", action="store_true",
help="Export all SquashFs container from selected NPK files")
exclusiveAction.add_argument("--exportZlib", action="store_true",
help="Export all Zlib compressed container from selected NPK files")
output_group = parser.add_argument_group("output")
output_group.add_argument("--dst-folder",
type=Path, default=Path(".") / "exportNpk",
help="Extract container into given folder")
action_group = parser.add_argument_group("actions")
exclusive_action = action_group.add_mutually_exclusive_group(required=True)
exclusive_action.add_argument("--show-container",
action="store_true", help="List all container from selected NPK files")
exclusive_action.add_argument("--export-all",
action="store_true", help="Export all container from selected NPK files")
exclusive_action.add_argument("--export-squashfs", action="store_true",
help="Export all SquashFs container from selected NPK files")
exclusive_action.add_argument("--export-zlib",
action="store_true",
help="Export all Zlib compressed container from selected NPK files")
return parser.parse_args()
def main():
opts = parseArgs()
files = (Npk(f) for f in (opts.files if opts.files else getAllNkpFiles(opts.srcFolder, opts.glob)))
analyseNpk(opts, files)
opts = parse_args()
files = (Npk(f) for f in (opts.files if opts.files else get_all_nkp_files(opts.src_folder, opts.glob)))
analyse_npk(opts, files)

View file

@ -1,9 +0,0 @@
from npkpy.npk.pckRequirementsHeader import PckRequirementsHeader
NPK_MULTICONTAINER_LIST = 20
class XCnt_MultiContainerList(PckRequirementsHeader):
@property
def _regularCntId(self):
return NPK_MULTICONTAINER_LIST

View file

@ -1,9 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_ARCHITECTURE_TAG = 16
class CntArchitectureTag(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_ARCHITECTURE_TAG

View file

@ -1,89 +0,0 @@
import logging
import struct
BYTES_LEN_CNT_ID = 2
BYTES_LEN_CNT_PAYLOAD_LEN = 4
NPK_CNT_BASIC = -1
class NpkContainerBasic:
"""
0____4____8____b____f
| | | | |
x0_|AABB|BBCC|C ..... C|
x1_|....|....|....|....|
A = Container Identifier
B = Payload length
C = Payload
"""
def __init__(self, data, offsetInPck):
self._data = bytearray(data)
self._offsetInPck = offsetInPck
self.modified = False
@property
def _regularCntId(self):
return NPK_CNT_BASIC
@property
def cnt_id(self):
cntId = struct.unpack_from(b"h", self._data, 0)[0]
if cntId != self._regularCntId:
raise RuntimeError(f"Cnt object does not represent given container typ {self._regularCntId}/{cntId}")
return cntId
@property
def cnt_idName(self):
return str(self.__class__.__name__)
@property
def cnt_payloadLen(self):
return (struct.unpack_from(b"I", self._data, 2))[0]
@cnt_payloadLen.setter
def cnt_payloadLen(self, payloadLen):
logging.warning("[MODIFICATION] Please be aware that modifications can break the npk structure")
self.modified = True
struct.pack_into(b"I", self._data, 2, payloadLen)
@property
def cnt_payload(self):
return struct.unpack_from(f"{self.cnt_payloadLen}s", self._data, 6)[0]
@cnt_payload.setter
def cnt_payload(self, payload):
tmpLen = len(payload)
tmpHead = self._data[:2 + 4]
tmpHead += struct.pack(f"{tmpLen}s", payload)
self._data = tmpHead
self.cnt_payloadLen = tmpLen
@property
def cnt_fullLength(self):
return BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + self.cnt_payloadLen
# return len(self._data)
@property
def output_cnt(self):
viewLen = min(10, self.cnt_payloadLen)
return (f"{self.cnt_idName}", [f"Cnt id: {self.cnt_id}",
f"Cnt offset: {self._offsetInPck}",
f"Cnt len: {self.cnt_fullLength}",
f"Payload len: {self.cnt_payloadLen}",
f"Payload[0:{viewLen}]: {self.cnt_payload[0:viewLen]} [...] "
])
@property
def cnt_fullBinary(self):
cntId = self.cnt_id
payloadLen = self.cnt_payloadLen
payload = struct.unpack_from(f"{self.cnt_payloadLen}s",
buffer=self._data,
offset=BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN)[0]
return struct.pack(b"=hI", cntId, payloadLen) + payload

View file

@ -1,9 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_NULL_BLOCK = 22
class CntNullBlock(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_NULL_BLOCK

View file

@ -1,19 +0,0 @@
from npkpy.common import sha1sumFromBinary
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_SQUASH_FS_IMAGE = 21
class CntSquashFsImage(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_SQUASH_FS_IMAGE
@property
def cnt_payload_hash(self):
return sha1sumFromBinary(self.cnt_payload)
@property
def output_cnt(self):
idName, options = super().output_cnt
return idName, options + [f"calc Sha1Hash: {self.cnt_payload_hash}"]

View file

@ -1,14 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_SQUASHFS_HASH_SIGNATURE = 9
class CntSquashFsHashSignature(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_SQUASHFS_HASH_SIGNATURE
@property
def output_cnt(self):
idName, options = super().output_cnt
return idName, options + [f"Payload[-10:]: {self.cnt_payload[-10:]}"]

View file

@ -1,9 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_ZLIB_COMPRESSED_DATA = 4
class CntZlibCompressedData(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_ZLIB_COMPRESSED_DATA

View file

@ -0,0 +1,9 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_ARCHITECTURE_TAG = 16
class CntArchitectureTag(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_ARCHITECTURE_TAG

91
npkpy/npk/cnt_basic.py Normal file
View file

@ -0,0 +1,91 @@
import logging
import struct
from npkpy.common import NPKError
BYTES_LEN_CNT_ID = 2
BYTES_LEN_CNT_PAYLOAD_LEN = 4
NPK_CNT_BASIC = -1
class CntBasic:
"""
0____4____8____b____f
| | | | |
x0_|AABB|BBCC|C ..... C|
x1_|....|....|....|....|
A = Container Identifier
B = Payload length
C = Payload
"""
def __init__(self, data, offset_in_pck):
self._data = bytearray(data)
self._offset_in_pck = offset_in_pck
self.modified = False
@property
def _regular_cnt_id(self):
return NPK_CNT_BASIC
@property
def cnt_id(self):
cnt_id = struct.unpack_from(b"h", self._data, 0)[0]
if cnt_id != self._regular_cnt_id:
raise NPKError(f"Cnt object does not represent given container typ {self._regular_cnt_id}/{cnt_id}")
return cnt_id
@property
def cnt_id_name(self):
return str(self.__class__.__name__)
@property
def cnt_payload_len(self):
return (struct.unpack_from(b"I", self._data, 2))[0]
@cnt_payload_len.setter
def cnt_payload_len(self, payload_len):
logging.warning("[MODIFICATION] Please be aware that modifications can break the npk structure")
self.modified = True
struct.pack_into(b"I", self._data, 2, payload_len)
@property
def cnt_payload(self):
return struct.unpack_from(f"{self.cnt_payload_len}s", self._data, 6)[0]
@cnt_payload.setter
def cnt_payload(self, payload):
tmp_len = len(payload)
tmp_head = self._data[:2 + 4]
tmp_head += struct.pack(f"{tmp_len}s", payload)
self._data = tmp_head
self.cnt_payload_len = tmp_len
@property
def cnt_full_length(self):
return BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + self.cnt_payload_len
# return len(self._data)
@property
def output_cnt(self):
view_len = min(10, self.cnt_payload_len)
return (f"{self.cnt_id_name}", [f"Cnt id: {self.cnt_id}",
f"Cnt offset: {self._offset_in_pck}",
f"Cnt len: {self.cnt_full_length}",
f"Payload len: {self.cnt_payload_len}",
f"Payload[0:{view_len}]: {self.cnt_payload[0:view_len]} [...] "
])
@property
def cnt_full_binary(self):
cnt_id = self.cnt_id
payload_len = self.cnt_payload_len
payload = struct.unpack_from(f"{self.cnt_payload_len}s",
buffer=self._data,
offset=BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN)[0]
return struct.pack(b"=hI", cnt_id, payload_len) + payload

9
npkpy/npk/cnt_flag_a.py Normal file
View file

@ -0,0 +1,9 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_FLAG_A = 7
class CntFlagA(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_FLAG_A

14
npkpy/npk/cnt_flag_b.py Normal file
View file

@ -0,0 +1,14 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_FLAG_B = 8
class CntFlagB(CntBasic):
"""
Flag typ found in gps-5.23-mipsbe.npk
Payload contains b'\n update-console\n '
"""
@property
def _regular_cnt_id(self):
return NPK_FLAG_B

13
npkpy/npk/cnt_flag_c.py Normal file
View file

@ -0,0 +1,13 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_FLAG_C = 17
class CntFlagC(CntBasic):
"""
Flag typ only found in multicast-3.30-mipsbe.npk
"""
@property
def _regular_cnt_id(self):
return NPK_FLAG_C

9
npkpy/npk/cnt_mpls.py Normal file
View file

@ -0,0 +1,9 @@
from npkpy.npk.pck_requirements_header import PckRequirementsHeader
NPK_MPLS = 19
class CntMpls(PckRequirementsHeader):
@property
def _regular_cnt_id(self):
return NPK_MPLS

View file

@ -0,0 +1,9 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_NULL_BLOCK = 22
class CntNullBlock(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_NULL_BLOCK

View file

@ -0,0 +1,19 @@
from npkpy.common import sha1_sum_from_binary
from npkpy.npk.cnt_basic import CntBasic
NPK_SQUASH_FS_IMAGE = 21
class CntSquashFsImage(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_SQUASH_FS_IMAGE
@property
def cnt_payload_hash(self):
return sha1_sum_from_binary(self.cnt_payload)
@property
def output_cnt(self):
id_name, options = super().output_cnt
return id_name, options + [f"calc Sha1Hash: {self.cnt_payload_hash}"]

View file

@ -0,0 +1,14 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_SQUASHFS_HASH_SIGNATURE = 9
class CntSquashFsHashSignature(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_SQUASHFS_HASH_SIGNATURE
@property
def output_cnt(self):
id_name, options = super().output_cnt
return id_name, options + [f"Payload[-10:]: {self.cnt_payload[-10:]}"]

View file

@ -0,0 +1,9 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_ZLIB_COMPRESSED_DATA = 4
class CntZlibDompressedData(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_ZLIB_COMPRESSED_DATA

View file

@ -1,101 +1,102 @@
import struct
from pathlib import Path
from npkpy.npk.npkConstants import CNT_HANDLER
from npkpy.npk.cntBasic import BYTES_LEN_CNT_ID, BYTES_LEN_CNT_PAYLOAD_LEN
from npkpy.npk.npkFileBasic import FileBasic
from npkpy.common import NPKError, NPKIdError, NPKMagicBytesError
from npkpy.npk.npk_constants import CNT_HANDLER
from npkpy.npk.cnt_basic import BYTES_LEN_CNT_ID, BYTES_LEN_CNT_PAYLOAD_LEN
from npkpy.npk.npk_file_basic import FileBasic
MAGICBYTES = b"\x1e\xf1\xd0\xba"
MAGIC_BYTES = b"\x1e\xf1\xd0\xba"
BYTES_LEN_MAGIC_HEADER = 4
BYTES_LEN_PCK_SIZE_LEN = 4
"""
0____4____8____b____f
| | | | |
0_|AAAA|BBBB| C ..... |
1_|....|....|....|....|
A = MAGIC BYTES (4)
B = PCK SIZE (4)
C = Begin of Container area
"""
class Npk(FileBasic):
"""
0____4____8____b____f
| | | | |
0_|AAAA|BBBB| C ..... |
1_|....|....|....|....|
__cnt_list = None
A = MAGIC BYTES (4)
B = PCK SIZE (4)
C = Begin of Container area
"""
__cntList = None
def __init__(self, filePath: Path):
super().__init__(filePath)
self.cntOffset = 8
self._data = self.readDataFromFile(0, self.cntOffset)
self._checkMagicBytes(errorMsg="MagicBytes not found in Npk file")
self.pck_header = self.pck_cntList[0]
def __init__(self, file_path: Path):
super().__init__(file_path)
self.cnt_offset = 8
self._data = self.read_data_from_file(offset=0, size=self.cnt_offset)
self._check_magic_bytes(error_msg="Magic bytes not found in Npk file")
self.pck_header = self.pck_cnt_list[0]
@property
def pck_magicBytes(self):
return struct.unpack_from(b"4s", self._data, 0)[0]
def pck_magic_bytes(self):
return struct.unpack_from("4s", self._data, 0)[0]
@property
def pck_payloadLen(self):
self.__pck_payloadUpdate()
payloadLen = struct.unpack_from(b"I", self._data, 4)[0]
return payloadLen
def pck_payload_len(self):
self.__pck_payload_size_update()
payload_len = struct.unpack_from("I", self._data, 4)[0]
return payload_len
def __pck_payloadUpdate(self):
if any(cnt.modified for cnt in self.pck_cntList):
currentSize = 0
for cnt in self.pck_cntList:
currentSize += cnt.cnt_fullLength
def __pck_payload_size_update(self):
if any(cnt.modified for cnt in self.pck_cnt_list):
current_size = 0
for cnt in self.pck_cnt_list:
current_size += cnt.cnt_full_length
cnt.modified = False
struct.pack_into(b"I", self._data, 4, currentSize)
struct.pack_into("I", self._data, 4, current_size)
@property
def pck_fullSize(self):
return BYTES_LEN_MAGIC_HEADER + BYTES_LEN_PCK_SIZE_LEN + self.pck_payloadLen
def pck_full_size(self):
return BYTES_LEN_MAGIC_HEADER + BYTES_LEN_PCK_SIZE_LEN + self.pck_payload_len
@property
def pck_fullBinary(self):
binary = MAGICBYTES + struct.pack("I", self.pck_payloadLen)
for c in self.pck_cntList:
binary += c.cnt_fullBinary
def pck_full_binary(self):
binary = MAGIC_BYTES + struct.pack("I", self.pck_payload_len)
for cnt in self.pck_cnt_list:
binary += cnt.cnt_full_binary
return binary
@property
def pck_enumerateCnt(self):
for pos, c in enumerate(self.pck_cntList):
yield pos, c
def pck_enumerate_cnt(self):
for pos, cnt in enumerate(self.pck_cnt_list):
yield pos, cnt
@property
def pck_cntList(self):
if not self.__cntList:
self.__cntList = self.__parseAllCnt()
return self.__cntList
def pck_cnt_list(self):
if not self.__cnt_list:
self.__cnt_list = self.__parse_all_cnt()
return self.__cnt_list
def __parseAllCnt(self):
def __parse_all_cnt(self):
lst = []
offset = self.cntOffset
offset = self.cnt_offset
while offset < self.file.stat().st_size - 1:
lst.append(self.__getCnt(offset))
offset += BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + lst[-1].cnt_payloadLen
lst.append(self.__get_cnt(offset))
offset += BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + lst[-1].cnt_payload_len
return lst
def __getCnt(self, offset):
cntId = struct.unpack_from("H", self.readDataFromFile(offset, 2))[0]
payloadLen = struct.unpack_from("I", self.readDataFromFile(offset + BYTES_LEN_CNT_ID, 4))[0]
pktLen = BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + payloadLen
def __get_cnt(self, offset):
cnt_id = struct.unpack_from("H", self.read_data_from_file(offset, 2))[0]
payload_len = struct.unpack_from("I", self.read_data_from_file(offset + BYTES_LEN_CNT_ID, 4))[0]
pkt_len = BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN + payload_len
data = self.readDataFromFile(offset, pktLen)
if len(data) != pktLen:
raise RuntimeError(f"File maybe corrupted. Please download again. File: {self.file.absolute()}")
data = self.read_data_from_file(offset, pkt_len)
if len(data) != pkt_len:
raise NPKError(f"File maybe corrupted. Please download again. File: {self.file.absolute()}")
try:
return CNT_HANDLER[cntId](data, offset)
except KeyError:
raise RuntimeError(f"failed with id: {cntId}\n"
f"New cnt id discovered in file: {self.file.absolute()}")
# except TypeError:
# raise RuntimeError(f"failed with id: {cntId}\n{self.file.absolute()}")
return CNT_HANDLER[cnt_id](data, offset)
except KeyError as e:
raise NPKIdError(f"Failed with cnt id: {cnt_id}\n"
f"New cnt id discovered in file: {self.file.absolute()}") from e
def _checkMagicBytes(self, errorMsg):
if not self.pck_magicBytes == MAGICBYTES:
raise RuntimeError(errorMsg)
def _check_magic_bytes(self, error_msg):
if not self.pck_magic_bytes == MAGIC_BYTES:
raise NPKMagicBytesError(error_msg)

View file

@ -1,54 +0,0 @@
from npkpy.npk.XCntMultiContainerList import NPK_MULTICONTAINER_LIST, XCnt_MultiContainerList
from npkpy.npk.cntArchitectureTag import NPK_ARCHITECTURE_TAG, CntArchitectureTag
from npkpy.npk.cntNullBlock import NPK_NULL_BLOCK, CntNullBlock
from npkpy.npk.pckReleaseTyp import NPK_RELEASE_TYP, PckReleaseTyp
from npkpy.npk.cntSquasFsImage import NPK_SQUASH_FS_IMAGE, CntSquashFsImage
from npkpy.npk.cntSquashFsHashSignature import NPK_SQUASHFS_HASH_SIGNATURE, CntSquashFsHashSignature
from npkpy.npk.cntZlibCompressedData import NPK_ZLIB_COMPRESSED_DATA, CntZlibCompressedData
from npkpy.npk.cntBasic import NPK_CNT_BASIC, NpkContainerBasic
from npkpy.npk.pckDescription import NPK_PCK_DESCRIPTION, PckDescription
from npkpy.npk.pckEckcdsaHash import NPK_ECKCDSA_HASH, PckEckcdsaHash
from npkpy.npk.pckHeader import NPK_PCK_HEADER, PckHeader
from npkpy.npk.pckRequirementsHeader import NPK_REQUIREMENTS_HEADER, PckRequirementsHeader
from npkpy.npk.xCntFlagB import NPK_FLAG_B, XCnt_flagB
from npkpy.npk.xCntFlagC import NPK_FLAG_C, XCnt_flagC
from npkpy.npk.xCntFlagA import NPK_FLAG_A, XCnt_flagA
from npkpy.npk.xCntMultiContainerHeader import NPK_MULTICONTAINER_HEADER, XCnt_multiContainerHeader
from npkpy.npk.xCntMpls import NPK_MPLS, XCntMpls
CNT_HANDLER = {
NPK_CNT_BASIC: NpkContainerBasic,
0: "?",
NPK_PCK_HEADER: PckHeader,
NPK_PCK_DESCRIPTION: PckDescription,
NPK_REQUIREMENTS_HEADER: PckRequirementsHeader,
NPK_ZLIB_COMPRESSED_DATA: CntZlibCompressedData,
5: "?",
6: "?",
NPK_FLAG_A: XCnt_flagA,
NPK_FLAG_B: XCnt_flagB,
NPK_SQUASHFS_HASH_SIGNATURE: CntSquashFsHashSignature,
10: "?",
11: "?",
12: "?",
13: "?",
14: "?",
15: "?",
NPK_ARCHITECTURE_TAG: CntArchitectureTag,
NPK_FLAG_C: XCnt_flagC,
NPK_MULTICONTAINER_HEADER: XCnt_multiContainerHeader,
NPK_MPLS: XCntMpls,
NPK_MULTICONTAINER_LIST: XCnt_MultiContainerList,
NPK_SQUASH_FS_IMAGE: CntSquashFsImage,
NPK_NULL_BLOCK: CntNullBlock,
NPK_ECKCDSA_HASH: PckEckcdsaHash,
NPK_RELEASE_TYP: PckReleaseTyp,
25: "?",
26: "?",
27: "?",
28: "?",
29: "?",
30: "?",
}

View file

@ -1,47 +0,0 @@
import re
from pathlib import Path
from npkpy.common import sha1sumFromFile
ARCHITECTURES = ['arm', 'mipsbe', 'mipsle', 'mmips', 'ppc', 'smips', 'tile', 'x86']
RE_SUFFIX = '\.(npk$)'
RE_VERSION = '-(\d\.\d\d\.\d)'
RE_PROGRAM_NAME = '(^[\w-]*)-'
class FileBasic:
__data = None
def __init__(self, filePath: Path):
self.file = filePath
@property
def filename_suffix(self):
return re.search(RE_SUFFIX, self.file.name).group(1)
@property
def filename_version(self):
return re.search(RE_VERSION, self.file.name).group(1)
@property
def filename_architecture(self):
for a in ARCHITECTURES:
if f"_{a}_" in self.file.name:
return a
return "x86"
@property
def filename_program(self):
name = re.search(RE_PROGRAM_NAME, self.file.name).group(1)
name.replace(f"_{self.filename_architecture}_", "")
return name
@property
def file_hash(self):
return sha1sumFromFile(self.file)
def readDataFromFile(self, offset, size):
with self.file.open("rb") as f:
f.seek(offset)
return bytearray(f.read(size))

View file

@ -0,0 +1,53 @@
from npkpy.npk.pck_preheader import NPK_PCK_PREHEADER, PckPreHeader
from npkpy.npk.pck_multicontainer_list import NPK_MULTICONTAINER_LIST, PktMulticontainerList
from npkpy.npk.cnt_architecture_tag import NPK_ARCHITECTURE_TAG, CntArchitectureTag
from npkpy.npk.cnt_null_block import NPK_NULL_BLOCK, CntNullBlock
from npkpy.npk.pck_release_typ import NPK_RELEASE_TYP, PckReleaseTyp
from npkpy.npk.cnt_squasfs_image import NPK_SQUASH_FS_IMAGE, CntSquashFsImage
from npkpy.npk.cnt_squashfs_hash_signature import NPK_SQUASHFS_HASH_SIGNATURE, CntSquashFsHashSignature
from npkpy.npk.cnt_zlib_compressed_data import NPK_ZLIB_COMPRESSED_DATA, CntZlibDompressedData
from npkpy.npk.cnt_basic import NPK_CNT_BASIC, CntBasic
from npkpy.npk.pck_description import NPK_PCK_DESCRIPTION, PckDescription
from npkpy.npk.pck_eckcdsa_hash import NPK_ECKCDSA_HASH, PckEckcdsaHash
from npkpy.npk.pck_header import NPK_PCK_HEADER, PckHeader
from npkpy.npk.pck_requirements_header import NPK_REQUIREMENTS_HEADER, PckRequirementsHeader
from npkpy.npk.cnt_flag_b import NPK_FLAG_B, CntFlagB
from npkpy.npk.cnt_flag_c import NPK_FLAG_C, CntFlagC
from npkpy.npk.cnt_flag_a import NPK_FLAG_A, CntFlagA
from npkpy.npk.pck_multicontainer_header import NPK_MULTICONTAINER_HEADER, PktMulticontainerHeader
from npkpy.npk.cnt_mpls import NPK_MPLS, CntMpls
CNT_HANDLER = {
NPK_CNT_BASIC: CntBasic,
0: "?",
NPK_PCK_HEADER: PckHeader,
NPK_PCK_DESCRIPTION: PckDescription,
NPK_REQUIREMENTS_HEADER: PckRequirementsHeader,
NPK_ZLIB_COMPRESSED_DATA: CntZlibDompressedData,
5: "?",
6: "?",
NPK_FLAG_A: CntFlagA,
NPK_FLAG_B: CntFlagB,
NPK_SQUASHFS_HASH_SIGNATURE: CntSquashFsHashSignature,
10: "?",
11: "?",
12: "?",
13: "?",
14: "?",
15: "?",
NPK_ARCHITECTURE_TAG: CntArchitectureTag,
NPK_FLAG_C: CntFlagC,
NPK_MULTICONTAINER_HEADER: PktMulticontainerHeader,
NPK_MPLS: CntMpls,
NPK_MULTICONTAINER_LIST: PktMulticontainerList,
NPK_SQUASH_FS_IMAGE: CntSquashFsImage,
NPK_NULL_BLOCK: CntNullBlock,
NPK_ECKCDSA_HASH: PckEckcdsaHash,
NPK_RELEASE_TYP: PckReleaseTyp,
NPK_PCK_PREHEADER: PckPreHeader,
26: "?",
27: "?",
28: "?",
29: "?",
30: "?",
}

View file

@ -0,0 +1,51 @@
import re
from pathlib import Path
from npkpy.common import sha1_sum_from_file
ARCHITECTURES = ['arm', 'mipsbe', 'mipsle', 'mmips', 'ppc', 'smips', 'tile', 'x86']
RE_NPK_SUFFIX = '\\.(npk$)'
RE_VERSION = '([\\d]+[\\.\\d]*\\d)'
RE_PROGRAM_NAME = '(^[\\w-]*)-'
class FileBasic:
# pylint: disable=unused-private-member
__data = None
def __init__(self, file_path: Path):
self.file = file_path
@property
def filename_suffix(self):
suffix = re.search(RE_NPK_SUFFIX, self.file.name)
return suffix.group(1) if suffix else "<NoSuffixMatch>"
@property
def filename_version(self):
result = re.search(RE_VERSION, self.file.name)
return result.group(1) if result else "<NoVersionMatch>"
@property
def filename_architecture(self):
for arch in ARCHITECTURES:
if f"-{arch}.npk" in self.file.name:
return arch
return "x86"
@property
def filename_program_name(self):
name = re.search(RE_PROGRAM_NAME, self.file.name)
if name:
return name.group(1).replace(f"_{self.filename_architecture}_", "")
return "<NoProgramNameMatch>"
@property
def file_hash(self):
return sha1_sum_from_file(self.file)
def read_data_from_file(self, offset, size):
with self.file.open("rb") as _file:
_file.seek(offset)
return bytearray(_file.read(size))

View file

@ -1,7 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_PCK_DESCRIPTION = 2
class PckDescription(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_PCK_DESCRIPTION

View file

@ -1,9 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_ECKCDSA_HASH = 23
class PckEckcdsaHash(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_ECKCDSA_HASH

View file

@ -1,77 +0,0 @@
import datetime
import struct
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_PCK_HEADER = 1
class PckHeader(NpkContainerBasic):
"""
0____4____8____b____f
| | | | |
x0_|AABB|BBCC|CCCC|CCCC|
x1_|CCCC|CCDE|FGHH|HH..|
x2_|....|....|....|....|
A = Container Identifier (2)
B = Payload length (4)
C = Program Name (16)
D = Program version: revision
E = Program version: rc
F = Program version: minor
G = Program version: major
H = Build time
I = NULL BLock / Flags
"""
def __init__(self, data, offsetInPck):
super().__init__(data, offsetInPck)
self._offset = offsetInPck
self.flagOffset = 0
@property
def _regularCntId(self):
return NPK_PCK_HEADER
@property
def cnt_programName(self):
# TODO: b"" - b needs to be removed!
return bytes(struct.unpack_from(b"16B", self._data, 6)).decode().rstrip('\x00')
@property
def cnt_osVersion(self):
revision = (struct.unpack_from(b"B", self._data, 22))[0]
rc = (struct.unpack_from(b"B", self._data, 23))[0]
minor = (struct.unpack_from(b"B", self._data, 24))[0]
major = (struct.unpack_from(b"B", self._data, 25))[0]
return f"{major}.{minor}.{revision} - rc(?): {rc}"
@property
def cnt_built_time(self):
return datetime.datetime.utcfromtimestamp(struct.unpack_from(b"I", self._data, 26)[0])
@property
def cnt_nullBlock(self):
return struct.unpack_from(b"4B", self._data, 30)
@property
def cnt_flags(self):
try:
return struct.unpack_from(b"7B", self._data, 34)
except struct.error:
## pkt with version 5.23 seems to have only four flags.
return struct.unpack_from(b"4B", self._data, 34)
@property
def output_cnt(self):
idName, options = super().output_cnt
return (idName, options + [f"Program name: {self.cnt_programName}",
f"Os version: {self.cnt_osVersion}",
f"Created at: {self.cnt_built_time}",
f"NullBlock: {self.cnt_nullBlock}",
f"Flags: {self.cnt_flags}"
])

View file

@ -1,9 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_RELEASE_TYP = 24
class PckReleaseTyp(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_RELEASE_TYP

View file

@ -1,93 +0,0 @@
import struct
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_REQUIREMENTS_HEADER = 3
class PckRequirementsHeader(NpkContainerBasic):
def _versionOneTwo(foo):
def check(self):
if self.cnt_structure_id > 0:
return foo(self)
return "<not available for version 0>"
return check
def _versionTwo(foo):
def check(self):
if self.cnt_structure_id > 1:
return foo(self)
return "<not available for version 0,1>"
return check
def __init__(self, data, offsetInPck):
super().__init__(data, offsetInPck)
self._offset = offsetInPck
@property
def _regularCntId(self):
return NPK_REQUIREMENTS_HEADER
@property
def cnt_structure_id(self):
return struct.unpack_from(b"H", self._data, 6)[0]
@property
@_versionOneTwo
def cnt_programName(self):
return bytes(struct.unpack_from(b"16B", self._data, 8)).decode().rstrip('\x00')
@property
@_versionOneTwo
def cnt_osVersionFrom(self):
revision = (struct.unpack_from(b"B", self._data, 24))[0]
rc = (struct.unpack_from(b"B", self._data, 25))[0]
minor = (struct.unpack_from(b"B", self._data, 26))[0]
major = (struct.unpack_from(b"B", self._data, 27))[0]
return f"{major}.{minor}.{revision} - rc(?): {rc}"
@property
@_versionOneTwo
def cnt_nullBlock(self):
return struct.unpack_from(b"BBBB", self._data, 28)
@property
@_versionOneTwo
def cnt_osVersionTo(self):
revision = (struct.unpack_from(b"B", self._data, 32))[0]
rc = (struct.unpack_from(b"B", self._data, 33))[0]
minor = (struct.unpack_from(b"B", self._data, 34))[0]
major = (struct.unpack_from(b"B", self._data, 35))[0]
return f"{major}.{minor}.{revision} - rc(?): {rc}"
@property
@_versionTwo
def cnt_flags(self):
return struct.unpack_from(b"4B", self._data, 36)
@property
def cnt_fullBinary(self):
id = self.cnt_id
payload_len = self.cnt_payloadLen
payload = struct.unpack_from(f"{self.cnt_payloadLen}s", self._data, 2 + 4)[0]
# if self.cnt_structure_id > 2:
# print(self.cnt_flags)
# return struct.pack(f"HH", id, payload_len) + se + payload
return struct.pack(f"=HI", id, payload_len) + payload
@property
def output_cnt(self):
idName, opt = super().output_cnt
options = [f"Cnt id: {self.cnt_id}",
f"StructID: {self.cnt_structure_id}",
f"Offset: {self._offset}",
f"Program name: {self.cnt_programName}",
f"Null block: {self.cnt_nullBlock}",
f"Os versionFrom: {self.cnt_osVersionFrom}",
f"Os versionTo: {self.cnt_osVersionTo}",
f"Flags: {self.cnt_flags}"
]
return (f"{self.cnt_idName}", opt + options)

View file

@ -0,0 +1,9 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_PCK_DESCRIPTION = 2
class PckDescription(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_PCK_DESCRIPTION

View file

@ -0,0 +1,9 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_ECKCDSA_HASH = 23
class PckEckcdsaHash(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_ECKCDSA_HASH

74
npkpy/npk/pck_header.py Normal file
View file

@ -0,0 +1,74 @@
import datetime
import struct
from npkpy.npk.cnt_basic import CntBasic
NPK_PCK_HEADER = 1
"""
0____4____8____b____f
| | | | |
x0_|AABB|BBCC|CCCC|CCCC|
x1_|CCCC|CCDE|FGHH|HH..|
x2_|....|....|....|....|
A = Container Identifier (2)
B = Payload length (4)
C = Program Name (16)
D = Program version: revision
E = Program version: rc
F = Program version: minor
G = Program version: major
H = Build time
I = NULL BLock / Flags
"""
class PckHeader(CntBasic):
def __init__(self, data, offset_in_pck):
super().__init__(data, offset_in_pck)
self._offset = offset_in_pck
self.flag_offset = 0
@property
def _regular_cnt_id(self):
return NPK_PCK_HEADER
@property
def cnt_program_name(self):
return bytes(struct.unpack_from("16B", self._data, 6)).decode().rstrip('\x00')
@property
def cnt_os_version(self):
revision = (struct.unpack_from("B", self._data, 22))[0]
unknown_subrevision = (struct.unpack_from("B", self._data, 23))[0]
minor = (struct.unpack_from("B", self._data, 24))[0]
major = (struct.unpack_from("B", self._data, 25))[0]
return f"{major}.{minor}.{revision} - rc(?): {unknown_subrevision}"
@property
def cnt_built_time(self):
return datetime.datetime.utcfromtimestamp(struct.unpack_from("I", self._data, 26)[0])
@property
def cnt_null_block(self):
return struct.unpack_from("4B", self._data, 30)
@property
def cnt_flags(self):
try:
return struct.unpack_from("7B", self._data, 34)
except struct.error:
# INFO: pkt with version 5.23 seems to have only four flags.
return struct.unpack_from("4B", self._data, 34)
@property
def output_cnt(self):
id_name, options = super().output_cnt
return (id_name, options + [f"Program name: {self.cnt_program_name}",
f"Os version: {self.cnt_os_version}",
f"Created at: {self.cnt_built_time}",
f"NullBlock: {self.cnt_null_block}",
f"Flags: {self.cnt_flags}"
])

View file

@ -1,13 +1,13 @@
import struct
from npkpy.npk.pckHeader import PckHeader
from npkpy.npk.pck_header import PckHeader
NPK_MULTICONTAINER_HEADER: int = 18
class XCnt_multiContainerHeader(PckHeader):
class PktMulticontainerHeader(PckHeader):
@property
def _regularCntId(self):
def _regular_cnt_id(self):
return NPK_MULTICONTAINER_HEADER
@property

View file

@ -0,0 +1,9 @@
from npkpy.npk.pck_requirements_header import PckRequirementsHeader
NPK_MULTICONTAINER_LIST = 20
class PktMulticontainerList(PckRequirementsHeader):
@property
def _regular_cnt_id(self):
return NPK_MULTICONTAINER_LIST

View file

@ -0,0 +1,8 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_PCK_PREHEADER = 25
class PckPreHeader(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_PCK_PREHEADER

View file

@ -0,0 +1,9 @@
from npkpy.npk.cnt_basic import CntBasic
NPK_RELEASE_TYP = 24
class PckReleaseTyp(CntBasic):
@property
def _regular_cnt_id(self):
return NPK_RELEASE_TYP

View file

@ -0,0 +1,89 @@
import struct
from npkpy.npk.cnt_basic import CntBasic, BYTES_LEN_CNT_PAYLOAD_LEN, BYTES_LEN_CNT_ID
NPK_REQUIREMENTS_HEADER = 3
class PckRequirementsHeader(CntBasic):
def _version_one_and_two(self):
def check(obj):
if obj.cnt_structure_id > 0:
return self(obj)
return "<not available for version 0>"
return check
def _version_two_only(self):
def check(obj):
if obj.cnt_structure_id > 1:
return self(obj)
return "<not available for version 0,1>"
return check
def __init__(self, data, offset_in_pck):
super().__init__(data, offset_in_pck)
self._offset = offset_in_pck
@property
def _regular_cnt_id(self):
return NPK_REQUIREMENTS_HEADER
@property
def cnt_structure_id(self) -> object:
return struct.unpack_from(b"H", self._data, 6)[0]
@property
@_version_one_and_two
def cnt_program_name(self):
return bytes(struct.unpack_from(b"16B", self._data, 8)).decode().rstrip('\x00')
@property
@_version_one_and_two
def cnt_os_version_min(self):
revision = (struct.unpack_from(b"B", self._data, 24))[0]
unknown_subrevision = (struct.unpack_from(b"B", self._data, 25))[0]
minor = (struct.unpack_from(b"B", self._data, 26))[0]
major = (struct.unpack_from(b"B", self._data, 27))[0]
return f"{major}.{minor}.{revision} - rc(?): {unknown_subrevision}"
@property
@_version_one_and_two
def cnt_null_block(self):
return struct.unpack_from(b"BBBB", self._data, 28)
@property
@_version_one_and_two
def cnt_os_version_max(self):
revision = (struct.unpack_from(b"B", self._data, 32))[0]
unknown_subrevision = (struct.unpack_from(b"B", self._data, 33))[0]
minor = (struct.unpack_from(b"B", self._data, 34))[0]
major = (struct.unpack_from(b"B", self._data, 35))[0]
return f"{major}.{minor}.{revision} - rc(?): {unknown_subrevision}"
@property
@_version_two_only
def cnt_flags(self):
return struct.unpack_from(b"4B", self._data, 36)
@property
def cnt_full_binary(self):
cnt_id = self.cnt_id
payload_len = self.cnt_payload_len
payload = struct.unpack_from(f"{self.cnt_payload_len}s", self._data,
offset=BYTES_LEN_CNT_ID + BYTES_LEN_CNT_PAYLOAD_LEN)[0]
return struct.pack("=HI", cnt_id, payload_len) + payload
@property
def output_cnt(self):
_, opt = super().output_cnt
options = [f"StructID: {self.cnt_structure_id}",
f"Offset: {self._offset}",
f"Program name: {self.cnt_program_name}",
f"Null block: {self.cnt_null_block}",
f"Os versionFrom: {self.cnt_os_version_min}",
f"Os versionTo: {self.cnt_os_version_max}",
f"Flags: {self.cnt_flags}"
]
return f"{self.cnt_id_name}", opt + options

View file

@ -1,14 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_FLAG_A = 7
class XCnt_flagA(NpkContainerBasic):
@property
def _regularCntId(self):
return NPK_FLAG_A
# @property
# def cnt_payload(self):
# # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n '

View file

@ -1,14 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_FLAG_B = 8
class XCnt_flagB(NpkContainerBasic):
# TODO: found in gps-5.23-mipsbe.npk
@property
def _regularCntId(self):
return NPK_FLAG_B
# @property
# def cnt_payload(self):
# # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n '

View file

@ -1,14 +0,0 @@
from npkpy.npk.cntBasic import NpkContainerBasic
NPK_FLAG_C = 17
class XCnt_flagC(NpkContainerBasic):
##TODO: found in multicast-3.30-mipsbe.npk
@property
def _regularCntId(self):
return NPK_FLAG_C
# @property
# def cnt_payload(self):
# # TODO pkt gps-5.23-mipsbe.npk contains b'\n update-console\n '

View file

@ -1,9 +0,0 @@
from npkpy.npk.pckRequirementsHeader import PckRequirementsHeader
NPK_MPLS = 19
class XCntMpls(PckRequirementsHeader):
@property
def _regularCntId(self):
return NPK_MPLS

View file

@ -1,11 +1,4 @@
!/bin/bash
#!/bin/bash
if [ ! -z "$1" ]
then
clear; echo "test specific file: $1"
pytest --cov=./ $1 -v
else
clear; echo "test all project files"
pytest --cov=npkpy --cov=acceptance_test -v
fi
pytest --cov=npkpy --cov=tests_acceptance_test -v
pylint --rcfile=.pylintrc npkpy/** tests/** tests_acceptance_test/**

View file

@ -1,8 +1,9 @@
pytest
pylint
pytest_httpserver
urlpath
setuptools
wheel
twine
pytest
pytest-cov
more-itertools

View file

@ -23,7 +23,7 @@ setuptools.setup(
],
entry_points={
'console_scripts': [
"npkPy=npkpy.main:main",
"npkpy=npkpy.main:main",
# "npkDownloader=npkpy.download:main",
],
},

View file

@ -1,16 +0,0 @@
import struct
import unittest
from npkpy.npk.XCntMultiContainerList import XCnt_MultiContainerList
from tests.constants import DummyBasicCnt
class Test_xCnt_MultiContainerList(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 20)
self.cnt = XCnt_MultiContainerList(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(20, self.cnt.cnt_id)

View file

@ -1,118 +0,0 @@
import struct
import unittest
from npkpy.npk.cntBasic import NpkContainerBasic
from tests.constants import DummyBasicCnt
class Test_npkContainerBasic(unittest.TestCase):
def setUp(self) -> None:
self.cnt = NpkContainerBasic(DummyBasicCnt().binCnt, offsetInPck=0)
def test_extractCntId(self):
self.assertEqual(-1, self.cnt.cnt_id)
def test_failForWrongCntId(self):
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 999)
cnt = NpkContainerBasic(dummyCnt.binCnt, offsetInPck=0)
with self.assertRaises(RuntimeError) as e:
_ = cnt.cnt_id
self.assertEqual("Cnt object does not represent given container typ -1/999", e.exception.args[0])
def test_getNameOfContainerType(self):
self.assertEqual("NpkContainerBasic", self.cnt.cnt_idName)
def test_extractPayloadLen(self):
self.assertEqual(7, self.cnt.cnt_payloadLen)
def test_extractPayload(self):
self.assertEqual(b"Payload", self.cnt.cnt_payload)
def test_extractCntFromGivenOffset(self):
self.assertEqual(len(DummyBasicCnt().binCnt), self.cnt.cnt_fullLength)
def test_giveOverviewOfCnt(self):
expectedResult = ('NpkContainerBasic', [f'Cnt id: -1',
'Cnt offset: 0',
'Cnt len: 13',
'Payload len: 7',
"Payload[0:7]: b'Payload' [...] "])
self.assertEqual(expectedResult, self.cnt.output_cnt)
def test_getFullBinaryOfContainer(self):
self.assertEqual(DummyBasicCnt().binCnt, self.cnt.cnt_fullBinary)
class Test_modifyNpkContainerBasic(unittest.TestCase):
def setUp(self) -> None:
self.cnt = NpkContainerBasic(DummyBasicCnt().binCnt, offsetInPck=0)
def test_increaseCntSize(self):
origCntFullLength = self.cnt.cnt_fullLength
origPayloadLen = self.cnt.cnt_payloadLen
self.cnt.cnt_payloadLen += 3
self.assertEqual(7, origPayloadLen)
self.assertEqual(10, self.cnt.cnt_payloadLen)
self.assertEqual(13, origCntFullLength)
self.assertEqual(16, self.cnt.cnt_fullLength)
def test_decreaseCntSize(self):
origCntFullLength = self.cnt.cnt_fullLength
origPayloadLen = self.cnt.cnt_payloadLen
self.cnt.cnt_payloadLen -= 4
self.assertEqual(7, origPayloadLen)
self.assertEqual(13, origCntFullLength)
self.assertEqual(3, self.cnt.cnt_payloadLen)
self.assertEqual(9, self.cnt.cnt_fullLength)
def test_failAccessPayload_afterIncreasingPayloadLenField(self):
origPayloadLen = self.cnt.cnt_payloadLen
self.cnt.cnt_payloadLen += 3
self.assertEqual(origPayloadLen + 3, self.cnt.cnt_payloadLen)
with self.assertRaises(struct.error):
_ = self.cnt.cnt_payload
def test_decreasingPayloadLenField_decreaseFullCntLenAndPayload(self):
origCntFullLength = self.cnt.cnt_fullLength
origPayloadLen = self.cnt.cnt_payloadLen
self.cnt.cnt_payloadLen -= 4
self.assertEqual(origPayloadLen - 4, self.cnt.cnt_payloadLen)
self.assertEqual(origCntFullLength - 4, self.cnt.cnt_fullLength)
self.assertEqual(b"Pay", self.cnt.cnt_payload)
def test_failDecreasingPayloadLenFieldBelowZero(self):
with self.assertRaises(struct.error) as e:
self.cnt.cnt_payloadLen -= 8
self.assertEqual("argument out of range", e.exception.args[0])
def test_increasePayload_updatePayloadLen(self):
replacingPayload = b"NewTestPayload"
self.cnt.cnt_payload = replacingPayload
self.assertEqual(replacingPayload, self.cnt.cnt_payload)
self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen)
def test_decreasePayload_updatePayloadLen(self):
replacingPayload = b"New"
self.cnt.cnt_payload = replacingPayload
self.assertEqual(replacingPayload, self.cnt.cnt_payload)
self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen)
def test_NullPayload_updatePayloadLenToZero(self):
replacingPayload = b""
self.cnt.cnt_payload = replacingPayload
self.assertEqual(replacingPayload, self.cnt.cnt_payload)
self.assertEqual(len(replacingPayload), self.cnt.cnt_payloadLen)

View file

@ -1,16 +0,0 @@
import struct
import unittest
from npkpy.npk.cntNullBlock import CntNullBlock
from tests.constants import DummyBasicCnt
class Test_cntNullBlock(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 22)
self.cnt = CntNullBlock(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(22, self.cnt.cnt_id)

View file

@ -1,17 +0,0 @@
import struct
import unittest
from npkpy.npk.cntSquashFsHashSignature import CntSquashFsHashSignature
from tests.constants import DummyBasicCnt
class Test_cntSquashFsHashSignature(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 9)
self.cnt = CntSquashFsHashSignature(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(9, self.cnt.cnt_id)

116
tests/cnt_basic_test.py Normal file
View file

@ -0,0 +1,116 @@
import struct
import unittest
from npkpy.common import NPKError
from npkpy.npk.cnt_basic import CntBasic
from tests.constants import get_dummy_basic_cnt
class Test_CntBasic(unittest.TestCase):
def setUp(self) -> None:
self.cnt = CntBasic(data=get_dummy_basic_cnt(), offset_in_pck=0)
def test_extractCntId(self):
self.assertEqual(-1, self.cnt.cnt_id)
def test_failForWrongCntId(self):
cnt = CntBasic(get_dummy_basic_cnt(cnt_id=999), offset_in_pck=0)
with self.assertRaises(NPKError) as _exception:
_ = cnt.cnt_id
self.assertEqual("Cnt object does not represent given container typ -1/999", _exception.exception.args[0])
def test_getNameOfContainerType(self):
self.assertEqual("CntBasic", self.cnt.cnt_id_name)
def test_extractPayloadLen(self):
self.assertEqual(7, self.cnt.cnt_payload_len)
def test_extractPayload(self):
self.assertEqual(b"Payload", self.cnt.cnt_payload)
def test_extractCntFromGivenOffset(self):
self.assertEqual(len(get_dummy_basic_cnt()), self.cnt.cnt_full_length)
def test_giveOverviewOfCnt(self):
expected_result = ('CntBasic', ['Cnt id: -1',
'Cnt offset: 0',
'Cnt len: 13',
'Payload len: 7',
"Payload[0:7]: b'Payload' [...] "])
self.assertEqual(expected_result, self.cnt.output_cnt)
def test_getFullBinaryOfContainer(self):
self.assertEqual(get_dummy_basic_cnt(), self.cnt.cnt_full_binary)
class Test_modifyNpkContainerBasic(unittest.TestCase):
def setUp(self) -> None:
self.cnt = CntBasic(get_dummy_basic_cnt(), offset_in_pck=0)
def test_increaseCntSize(self):
orig_cnt_full_length = self.cnt.cnt_full_length
orig_payload_len = self.cnt.cnt_payload_len
self.cnt.cnt_payload_len += 3
self.assertEqual(7, orig_payload_len)
self.assertEqual(10, self.cnt.cnt_payload_len)
self.assertEqual(13, orig_cnt_full_length)
self.assertEqual(16, self.cnt.cnt_full_length)
def test_decreaseCntSize(self):
orig_cnt_full_length = self.cnt.cnt_full_length
orig_payload_len = self.cnt.cnt_payload_len
self.cnt.cnt_payload_len -= 4
self.assertEqual(7, orig_payload_len)
self.assertEqual(13, orig_cnt_full_length)
self.assertEqual(3, self.cnt.cnt_payload_len)
self.assertEqual(9, self.cnt.cnt_full_length)
def test_failAccessPayloadAfterIncreasingPayloadLenField(self):
orig_payload_len = self.cnt.cnt_payload_len
self.cnt.cnt_payload_len += 3
self.assertEqual(orig_payload_len + 3, self.cnt.cnt_payload_len)
with self.assertRaises(struct.error):
_ = self.cnt.cnt_payload
def test_decreasingPayloadLen_fieldDecreaseFullCntLenAndPayload(self):
orig_cnt_full_length = self.cnt.cnt_full_length
orig_payload_len = self.cnt.cnt_payload_len
self.cnt.cnt_payload_len -= 4
self.assertEqual(orig_payload_len - 4, self.cnt.cnt_payload_len)
self.assertEqual(orig_cnt_full_length - 4, self.cnt.cnt_full_length)
self.assertEqual(b"Pay", self.cnt.cnt_payload)
def test_failDecreasingPayloadLen_fieldBelowZero(self):
with self.assertRaises(struct.error) as _exception:
self.cnt.cnt_payload_len -= 8
self.assertEqual("argument out of range", _exception.exception.args[0])
def test_increasePayload_updatePayloadLen(self):
replace_payload = b"NewTestPayload"
self.cnt.cnt_payload = replace_payload
self.assertEqual(replace_payload, self.cnt.cnt_payload)
self.assertEqual(len(replace_payload), self.cnt.cnt_payload_len)
def test_decreasePayload_updatePayloadLen(self):
replace_payload = b"New"
self.cnt.cnt_payload = replace_payload
self.assertEqual(replace_payload, self.cnt.cnt_payload)
self.assertEqual(len(replace_payload), self.cnt.cnt_payload_len)
def test_nullPayloadUpdate_payloadLenToZero(self):
replace_payload = b""
self.cnt.cnt_payload = replace_payload
self.assertEqual(replace_payload, self.cnt.cnt_payload)
self.assertEqual(len(replace_payload), self.cnt.cnt_payload_len)

12
tests/cnt_flag_a_test.py Normal file
View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.cnt_flag_a import CntFlagA
from tests.constants import get_dummy_basic_cnt
class Test_cntFlagA(unittest.TestCase):
def setUp(self) -> None:
self.cnt = CntFlagA(get_dummy_basic_cnt(cnt_id=7), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(7, self.cnt.cnt_id)

12
tests/cnt_flag_b_test.py Normal file
View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.cnt_flag_b import CntFlagB
from tests.constants import get_dummy_basic_cnt
class Test_cntFlagB(unittest.TestCase):
def setUp(self) -> None:
self.cnt = CntFlagB(get_dummy_basic_cnt(cnt_id=8), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(8, self.cnt.cnt_id)

12
tests/cnt_flag_c_test.py Normal file
View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.cnt_flag_c import CntFlagC
from tests.constants import get_dummy_basic_cnt
class Test_cntFlagC(unittest.TestCase):
def setUp(self) -> None:
self.cnt = CntFlagC(get_dummy_basic_cnt(cnt_id=17), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(17, self.cnt.cnt_id)

12
tests/cnt_mpls_test.py Normal file
View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.cnt_mpls import CntMpls
from tests.constants import get_dummy_basic_cnt
class Test_cntMpls(unittest.TestCase):
def setUp(self) -> None:
self.cnt = CntMpls(get_dummy_basic_cnt(cnt_id=19), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(19, self.cnt.cnt_id)

View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.cnt_null_block import CntNullBlock
from tests.constants import get_dummy_basic_cnt
class Test_cntNullBlock(unittest.TestCase):
def setUp(self) -> None:
self.cnt = CntNullBlock(get_dummy_basic_cnt(cnt_id=22), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(22, self.cnt.cnt_id)

View file

@ -1,22 +1,19 @@
import struct
import unittest
from npkpy.npk.cntSquasFsImage import CntSquashFsImage
from tests.constants import DummyBasicCnt
from npkpy.npk.cnt_squasfs_image import CntSquashFsImage
from tests.constants import get_dummy_basic_cnt
class Test_cntSquashFsImage(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 21)
self.cnt = CntSquashFsImage(dummyCnt.binCnt, offsetInPck=0)
self.cnt = CntSquashFsImage(get_dummy_basic_cnt(cnt_id=21), offset_in_pck=0)
self.expectedHash = b'\xc3\x04\x15\xea\xccjYDit\xb7\x16\xef\xf5l\xf2\x82\x19\x81]'
def test_validateCntId(self):
self.assertEqual(21, self.cnt.cnt_id)
def test_payload_hash(self):
def test_payloadHash(self):
self.assertEqual(self.expectedHash, self.cnt.cnt_payload_hash)
def test_giveOverviewOfCnt(self):

View file

@ -1,15 +1,12 @@
import struct
import unittest
from npkpy.npk.cntSquashFsHashSignature import CntSquashFsHashSignature
from tests.constants import DummyBasicCnt
from npkpy.npk.cnt_squashfs_hash_signature import CntSquashFsHashSignature
from tests.constants import get_dummy_basic_cnt
class Test_cntSquashFsHashSignature(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 9)
self.cnt = CntSquashFsHashSignature(dummyCnt.binCnt, offsetInPck=0)
self.cnt = CntSquashFsHashSignature(get_dummy_basic_cnt(cnt_id=9), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(9, self.cnt.cnt_id)

View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.cnt_zlib_compressed_data import CntZlibDompressedData
from tests.constants import get_dummy_basic_cnt
class Test_cntZlibCompressedData(unittest.TestCase):
def setUp(self) -> None:
self.cnt = CntZlibDompressedData(get_dummy_basic_cnt(cnt_id=4), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(4, self.cnt.cnt_id)

View file

@ -1,55 +1,14 @@
import tempfile
import unittest
from pathlib import Path
from unittest.mock import Mock
from npkpy.common import getPktInfo, getCntInfo, getAllNkpFiles
from npkpy.common import get_short_pkt_info, get_short_cnt_info, get_all_nkp_files, write_to_file, extract_container, \
get_full_cnt_info, get_full_pkt_info, sha1_sum_from_binary, sha1_sum_from_file
from npkpy.npk.cnt_basic import CntBasic
from npkpy.npk.npk import Npk
class Common_Test(unittest.TestCase):
def setUp(self) -> None:
self.npkFile = Path(tempfile.NamedTemporaryFile(suffix=".npk").name)
self.npkFile.write_bytes(Path("tests/testData/gps-6.45.6.npk").read_bytes())
self.npk = Npk(self.npkFile)
def tearDown(self) -> None:
if self.npkFile.exists():
self.npkFile.unlink()
def test_getBasicPktInfo(self):
self.assertEqual([(str(self.npkFile.name))], getPktInfo(self.npk))
def test_getBasicCntInfo(self):
self.assertEqual(['Cnt: 0:PckHeader',
'Cnt: 1:PckReleaseTyp',
'Cnt: 2:CntArchitectureTag',
'Cnt: 3:PckDescription',
'Cnt: 4:PckEckcdsaHash',
'Cnt: 5:PckRequirementsHeader',
'Cnt: 6:CntNullBlock',
'Cnt: 7:CntSquashFsImage',
'Cnt: 8:CntSquashFsHashSignature',
'Cnt: 9:CntArchitectureTag'], getCntInfo(self.npk), )
def test_getFullPktInfo(self):
pass
# result = getFullPktInfo(self.npk)
#
# self.assertEqual(['Cnt: 0:PckHeader',
# 'Cnt: 1:PckReleaseTyp',
# 'Cnt: 2:PckArchitectureTag',
# 'Cnt: 3:PckDescription',
# 'Cnt: 4:PckSha1Hash',
# 'Cnt: 5:PckRequirementsHeader',
# 'Cnt: 6:PckNullBlock',
# 'Cnt: 7:PckSquashFsImage',
# 'Cnt: 8:PckSquashFsHashSignature',
# 'Cnt: 9:PckArchitectureTag'], result)
def test_getFullCntInfo(self):
pass
from npkpy.npk.pck_header import NPK_PCK_HEADER
from tests.constants import get_dummy_npk_binary, DummyHeaderCnt, get_dummy_basic_cnt
class Test_findNpkFiles(unittest.TestCase):
@ -70,26 +29,26 @@ class Test_findNpkFiles(unittest.TestCase):
def test_findMultipleNpkFiles_inFolder(self):
self.addExistingFiles(["fileA.npk", "fileB.npk", "fileC.npk"])
self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath)))
self.assertExistingFiles(sorted(get_all_nkp_files(self.tmpPath)))
def test_findMultipleNpkFilesRecursive(self):
self.addExistingFiles(["fileA.npk", "subB/fileB.npk", "subB/subC/fileC.npk"])
self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath)))
self.assertExistingFiles(sorted(get_all_nkp_files(self.tmpPath)))
def test_selectOnlyNpkFiles(self):
self.addExistingFiles(["fileA.npk", "fileB.exe", "fileC.txt"])
self.expectedFiles = [self.tmpPath / "fileA.npk"]
self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath)))
self.assertExistingFiles(sorted(get_all_nkp_files(self.tmpPath)))
def test_globOnlyNpkFilesFittingPattern(self):
self.addExistingFiles(["fi__pattern__leA.npk", "fileB.npk", "fi__pattern__leC.exe", "fileD.exe"])
self.expectedFiles = [self.tmpPath / "fi__pattern__leA.npk"]
self.assertExistingFiles(sorted(getAllNkpFiles(self.tmpPath, containStr="__pattern__")))
self.assertExistingFiles(sorted(get_all_nkp_files(self.tmpPath, contain_str="__pattern__")))
def assertExistingFiles(self, result):
self.assertEqual(self.expectedFiles, result)
@ -100,3 +59,106 @@ class Test_findNpkFiles(unittest.TestCase):
f.parent.mkdir(parents=True, exist_ok=True)
f.touch()
self.expectedFiles.append(f)
class Common_Test(unittest.TestCase):
def setUp(self) -> None:
self.payload = "THIS\nIS\nA\nDUMMY\nSTRING\n\n"
self.file = Path(tempfile.NamedTemporaryFile().name)
self.file.touch()
self.output_folder = Path(tempfile.TemporaryDirectory().name)
self.output_folder.mkdir()
def tearDown(self) -> None:
def delete_directory(folder):
for _file in folder.rglob("*"):
if _file.is_file():
_file.unlink()
else:
delete_directory(_file)
_file.rmdir()
self.file.unlink()
delete_directory(self.output_folder)
def test_writeDataToFile_storeOneDataElement(self):
write_to_file(self.file, self.payload.encode())
with self.file.open("r") as _file:
self.assertEqual(self.payload, _file.read())
def test_writeDataToFile_storeList(self):
payload_list = [self.payload.encode(), self.payload.encode()]
write_to_file(self.file, payload_list)
with self.file.open("r") as _file:
self.assertEqual(self.payload + self.payload, _file.read())
def test_getPktInfo_returnOnlyFileName(self):
npkFile = Mock()
npkFile.file = self.file
self.assertEqual([self.file.name], get_short_pkt_info(npkFile))
def test_getBasicCntInfo(self):
self.file.write_bytes(get_dummy_npk_binary())
self.assertEqual(['Cnt: 0:PckHeader'], get_short_cnt_info(Npk(self.file)))
def test_extractPayloadFromCnt_createFilesWithPayload(self):
self.file.write_bytes(get_dummy_npk_binary())
npkFile = Npk(self.file)
extract_container(npkFile, self.output_folder, [NPK_PCK_HEADER])
created_files = list(self.output_folder.rglob("*"))
self.assertEqual(1, len(created_files))
self.assertEqual([self.output_folder / '000_cnt_PckHeader.raw'], created_files)
self.assertEqual(DummyHeaderCnt()._02_payload, created_files[0].read_bytes())
def test_getFullCntInfo_asString(self):
result = get_full_cnt_info(CntBasic(get_dummy_basic_cnt(), offset_in_pck=0))
self.assertEqual(['CntBasic',
' Cnt id: -1',
' Cnt offset: 0',
' Cnt len: 13',
' Payload len: 7',
" Payload[0:7]: b'Payload' [...] "], result)
def test_getFullPktInfo_asString(self):
self.file.write_bytes(get_dummy_npk_binary())
npkFile = Npk(self.file)
result = get_full_pkt_info(npkFile)
self.assertEqual([f"{self.file.name}",
'Cnt: 0:PckHeader',
'PckHeader',
' Cnt id: 1',
' Cnt offset: 8',
' Cnt len: 41',
' Payload len: 35',
" Payload[0:10]: b'0123456789' [...] ",
' Program name: 01234567890abcde',
' Os version: 1.2.3 - rc(?): 4',
' Created at: 1970-01-01 00:00:01',
' NullBlock: (0, 0, 0, 0)',
' Flags: (0, 0, 0, 0, 0, 0, 0)'], result)
def test_generateSha1FromFile(self):
expectedHash = b'\xbb\xbc\xf2\xc5\x943\xf6\x8f"7l\xd2C\x9dl\xd3\t7\x8d\xf6'
self.file.write_bytes(b"TESTDATA")
self.assertEqual(expectedHash, sha1_sum_from_file(self.file))
def test_generateSha1FromHash(self):
expectedHash = b'\xbb\xbc\xf2\xc5\x943\xf6\x8f"7l\xd2C\x9dl\xd3\t7\x8d\xf6'
self.assertEqual(expectedHash, sha1_sum_from_binary(b"TESTDATA"))
def test_generateSha1FromHash_returnEmptyIfNoData(self):
self.assertEqual(b"<empty>", sha1_sum_from_binary(b""))

View file

@ -1,12 +1,11 @@
# HEADER
import struct
from npkpy.npk.pckHeader import NPK_PCK_HEADER
from npkpy.npk.pck_header import NPK_PCK_HEADER
MAGICBYTES = b"\x1e\xf1\xd0\xba"
MAGIC_BYTES = b"\x1e\xf1\xd0\xba"
PCKSIZE = struct.pack("I", 28)
MAGIC_AND_SIZE = MAGICBYTES + PCKSIZE
MAGIC_AND_SIZE = MAGIC_BYTES + PCKSIZE
# OPENING ARCHITECTURE_TAG
SET_HEADER_TAG_ID = struct.pack("H", NPK_PCK_HEADER) # b"\x01\x00"
@ -27,34 +26,20 @@ CNT_CLOSING_ARCHITECTURE_TAG = CLOSING_ARCHITECTURE_TAG_ID + \
CLOSING_ARCHITECTURE_TAG_PAYLOAD
# MINIMAL_NPK_PAKAGE
MINIMAL_NPK_PACKAGE = MAGIC_AND_SIZE + \
CNT_SET_ARCHITECTURE_TAG + \
CNT_CLOSING_ARCHITECTURE_TAG
def getDummyNpkBinary(payload=None):
if not payload:
payload = DummyHeaderCnt().binHeaderCntA
pckPayload = payload
def get_dummy_npk_binary(cnt=None):
if not cnt:
cnt = DummyHeaderCnt().get_binary
pckPayload = cnt
pckLen = struct.pack("I", len(pckPayload))
npkBinary = MAGICBYTES + pckLen + pckPayload
npkBinary = MAGIC_BYTES + pckLen + pckPayload
return npkBinary
class DummyBasicCnt:
_00_cnt_id = struct.pack("h", -1)
_01_cnt_payload_len = struct.pack("I", 7)
_02_cnt_payload = struct.pack("7s", b"Payload")
@property
def binCnt(self):
return self._00_cnt_id + \
self._01_cnt_payload_len + \
self._02_cnt_payload
class DummyHeaderCnt:
_00_cnt_id = struct.pack("H", 1)
_01_cnt_payload_len = struct.pack("I", 35)
@ -66,24 +51,81 @@ class DummyHeaderCnt:
_07_cnt_buildTime = struct.pack("I", 1)
_08_cnt_nullBock = struct.pack("I", 0)
_09a_cnt_flagsA = struct.pack("7B", 0, 0, 0, 0, 0, 0, 0)
# _09b_cnt_flagsB = struct.pack("4B", 0, 0, 0, 0)
_09b_cnt_flagsB = struct.pack("4B", 0, 0, 0, 0)
_02_payload = _02_cnt_programName + \
_03_cnt_versionRevision + \
_04_cnt_versionRc + \
_05_cnt_versionMinor + \
_06_cnt_versionMajor + \
_07_cnt_buildTime + \
_08_cnt_nullBock + \
_09a_cnt_flagsA
_02_payloadSpecialFlag = _02_cnt_programName + \
_03_cnt_versionRevision + \
_04_cnt_versionRc + \
_05_cnt_versionMinor + \
_06_cnt_versionMajor + \
_07_cnt_buildTime + \
_08_cnt_nullBock + \
_09b_cnt_flagsB
@property
def binHeaderCntA(self):
return self._binBasicHeaderCnt + self._09a_cnt_flagsA
# @property
# def binHeaderCntB(self):
# return self._binBasicHeaderCnt + self._09b_cnt_flagsB
@property
def _binBasicHeaderCnt(self):
def get_binary(self):
return self._00_cnt_id + \
self._01_cnt_payload_len + \
self._02_cnt_programName + \
self._03_cnt_versionRevision + \
self._04_cnt_versionRc + \
self._05_cnt_versionMinor + \
self._06_cnt_versionMajor + \
self._07_cnt_buildTime + \
self._08_cnt_nullBock
self._02_payload
@property
def get_binary_with_special_flags(self):
return self._00_cnt_id + \
self._01_cnt_payload_len + \
self._02_payloadSpecialFlag
# pylint: disable=too-many-locals
def get_dummy_requirements_header(structId):
_00_cnt_id = struct.pack("H", 3)
_01_cnt_payload_len = struct.pack("I", 35)
_02_cnt_struct_id = struct.pack("H", 0)
_03_cnt_program_name = struct.pack("16s", b"abcdefghijklmnop")
_04_cnt_min_versionRevision = struct.pack("B", 3)
_05_cnt_min_versionRc = struct.pack("B", 4)
_06_cnt_min_versionMinor = struct.pack("B", 2)
_07_cnt_min_versionMajor = struct.pack("B", 1)
_08_cnt_nullBock = struct.pack("I", 0)
_09_cnt_max_versionRevision = struct.pack("B", 7)
_10_cnt_max_versionRc = struct.pack("B", 8)
_11_cnt_max_versionMinor = struct.pack("B", 6)
_12_cnt_max_versionMajor = struct.pack("B", 5)
_13_cnt_flags = struct.pack("5B", 0, 0, 0, 0, 0)
def _build_payload():
return (_02_cnt_struct_id +
_03_cnt_program_name +
_04_cnt_min_versionRevision +
_05_cnt_min_versionRc +
_06_cnt_min_versionMinor +
_07_cnt_min_versionMajor +
_08_cnt_nullBock +
_09_cnt_max_versionRevision +
_10_cnt_max_versionRc +
_11_cnt_max_versionMinor +
_12_cnt_max_versionMajor +
_13_cnt_flags
)
_02_cnt_struct_id = struct.pack(b"H", structId)
return (_00_cnt_id + _01_cnt_payload_len + _build_payload())
def get_dummy_basic_cnt(cnt_id=-1):
_00_cnt_id = struct.pack("h", cnt_id)
_02_cnt_payload = struct.pack("7s", b"Payload")
_01_cnt_payload_len = struct.pack("I", len(_02_cnt_payload))
return _00_cnt_id + _01_cnt_payload_len + _02_cnt_payload

View file

@ -1,14 +0,0 @@
import unittest
from npkpy.npk.npkConstants import CNT_HANDLER
from tests.constants import DummyBasicCnt
class Test_npkConstants(unittest.TestCase):
def test_validateAssignment_DictIdIsContainerId(self):
for cnt_id, cnt_class in CNT_HANDLER.items():
if cnt_class != "?":
cnt = cnt_class(DummyBasicCnt().binCnt, 0)
self.assertEqual(cnt_id, cnt._regularCntId,
msg=f"{cnt_id}!={cnt._regularCntId}")

View file

@ -1,21 +0,0 @@
import unittest
from pathlib import Path
from npkpy.npk.npkFileBasic import FileBasic
class FileInfo_Test(unittest.TestCase):
def setUp(self) -> None:
self.file = FileBasic(Path("advanced-tools-6.41.3.npk"))
def test_file(self):
self.assertEqual(Path("advanced-tools-6.41.3.npk"), self.file.file)
def test_versionName(self):
self.assertEqual("6.41.3", self.file.filename_version)
def test_programName(self):
self.assertEqual("advanced-tools", self.file.filename_program)
def test_programSuffix(self):
self.assertEqual("npk", self.file.filename_suffix)

View file

@ -0,0 +1,14 @@
import unittest
from npkpy.npk.npk_constants import CNT_HANDLER
from tests.constants import get_dummy_basic_cnt
class Test_npkConstants(unittest.TestCase):
def test_validateAssignment_DictIdIsContainerId(self):
for cnt_id, cnt_class in CNT_HANDLER.items():
if cnt_class != "?":
cnt = cnt_class(get_dummy_basic_cnt(), 0)
self.assertEqual(cnt_id, cnt._regular_cnt_id,
msg=f"{cnt_id}!={cnt._regular_cnt_id}")

View file

@ -0,0 +1,38 @@
import unittest
from pathlib import Path
from npkpy.npk.npk_file_basic import FileBasic, ARCHITECTURES
class FileInfo_Test(unittest.TestCase):
def setUp(self) -> None:
self.file = FileBasic(Path("file-name-1.2.3.npk"))
self.illegal_file = FileBasic(Path("illegalFIle.abc"))
def test_file(self):
self.assertEqual(Path("file-name-1.2.3.npk"), self.file.file)
def test_versionName(self):
self.assertEqual("1.2.3", self.file.filename_version)
def test_versionName_filenameDoesntMatchFormat(self):
self.assertEqual("<NoVersionMatch>", self.illegal_file.filename_version)
def test_programName(self):
self.assertEqual("file-name", self.file.filename_program_name)
def test_programName_filenameDoesntMatchFormat(self):
self.assertEqual("<NoProgramNameMatch>", self.illegal_file.filename_program_name)
def test_programSuffix(self):
self.assertEqual("npk", self.file.filename_suffix)
def test_programSuffix_filenameDoesntMatchFormat(self):
self.assertEqual("<NoSuffixMatch>", self.illegal_file.filename_suffix)
def test_filenameArchitecture_returnDefaultIfNotMentionedInFilename(self):
self.assertEqual("x86", self.file.filename_architecture)
def test_filenameArchitecture_validateArchitectures(self):
for arch in ARCHITECTURES:
self.assertEqual(arch, FileBasic(Path(f"file-name-1.2.3-{arch}.npk")).filename_architecture)

View file

@ -2,148 +2,148 @@ import datetime
import unittest
from pathlib import Path
from npkpy.npk.npk import Npk, MAGICBYTES
from npkpy.npk.npk import Npk, MAGIC_BYTES
class GpsFile_Test(unittest.TestCase):
def setUp(self) -> None:
self.npkFile = Path("tests/testData/gps-6.45.6.npk")
self.npkFile = Path("tests/testData/6_45_6/gps-6.45.6.npk")
self.npk = Npk(self.npkFile)
self.cnt = self.npk.pck_cntList
self.cnt = self.npk.pck_cnt_list
class ParseGpsNpkFile_Test(GpsFile_Test):
def test_fileInfos(self):
self.assertEqual('gps', self.npk.filename_program)
self.assertEqual('gps', self.npk.filename_program_name)
self.assertEqual('6.45.6', self.npk.filename_version)
self.assertEqual('npk', self.npk.filename_suffix)
self.assertEqual('x86', self.npk.filename_architecture)
self.assertEqual(b'\xc6\x16\xf0\x9d~lS\xa7z\xba}.\xe5\xa6w=\xe9\xb4S\xe7', self.npk.file_hash)
def test_NpkHeader(self):
self.assertEqual(MAGICBYTES, self.npk.pck_magicBytes)
self.assertEqual(53321, self.npk.pck_payloadLen)
self.assertEqual(53329, self.npk.pck_fullSize)
self.assertEqual(MAGIC_BYTES, self.npk.pck_magic_bytes)
self.assertEqual(53321, self.npk.pck_payload_len)
self.assertEqual(53329, self.npk.pck_full_size)
def test_PckHeader(self):
self.assertEqual(1, self.npk.pck_header.cnt_id)
self.assertEqual("PckHeader", self.npk.pck_header.cnt_idName)
self.assertEqual(36, self.npk.pck_header.cnt_payloadLen)
self.assertEqual("PckHeader", self.npk.pck_header.cnt_id_name)
self.assertEqual(36, self.npk.pck_header.cnt_payload_len)
self.assertEqual(b'gps\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06f-\x06\x97gw]'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00', self.npk.pck_header.cnt_payload)
self.assertEqual("gps", self.npk.pck_header.cnt_programName)
self.assertEqual("6.45.6 - rc(?): 102", self.npk.pck_header.cnt_osVersion)
self.assertEqual("gps", self.npk.pck_header.cnt_program_name)
self.assertEqual("6.45.6 - rc(?): 102", self.npk.pck_header.cnt_os_version)
self.assertEqual(datetime.datetime(2019, 9, 10, 9, 6, 31), self.npk.pck_header.cnt_built_time)
self.assertEqual((0, 0, 0, 0), self.npk.pck_header.cnt_nullBlock)
self.assertEqual((0, 0, 0, 0), self.npk.pck_header.cnt_null_block)
self.assertEqual((0, 0, 0, 0, 2, 0, 0), self.npk.pck_header.cnt_flags)
def test_ReleaseTyp(self):
cnt = self.cnt[1]
self.assertEqual(24, cnt.cnt_id)
self.assertEqual(50, cnt._offsetInPck)
self.assertEqual("PckReleaseTyp", cnt.cnt_idName)
self.assertEqual(6, cnt.cnt_payloadLen)
self.assertEqual(50, cnt._offset_in_pck)
self.assertEqual("PckReleaseTyp", cnt.cnt_id_name)
self.assertEqual(6, cnt.cnt_payload_len)
self.assertEqual(b"stable", cnt.cnt_payload)
self.assertEqual(12, cnt.cnt_fullLength)
self.assertEqual(12, cnt.cnt_full_length)
def test_PckArchitectureTag(self):
cnt = self.cnt[2]
self.assertEqual(16, cnt.cnt_id)
self.assertEqual(62, cnt._offsetInPck)
self.assertEqual("CntArchitectureTag", cnt.cnt_idName)
self.assertEqual(4, cnt.cnt_payloadLen)
self.assertEqual(62, cnt._offset_in_pck)
self.assertEqual("CntArchitectureTag", cnt.cnt_id_name)
self.assertEqual(4, cnt.cnt_payload_len)
self.assertEqual(b"i386", cnt.cnt_payload)
self.assertEqual(10, cnt.cnt_fullLength)
self.assertEqual(10, cnt.cnt_full_length)
def test_PckDescription(self):
cnt = self.cnt[3]
self.assertEqual(2, cnt.cnt_id)
self.assertEqual(72, cnt._offsetInPck)
self.assertEqual("PckDescription", cnt.cnt_idName)
self.assertEqual(25, cnt.cnt_payloadLen)
self.assertEqual(72, cnt._offset_in_pck)
self.assertEqual("PckDescription", cnt.cnt_id_name)
self.assertEqual(25, cnt.cnt_payload_len)
self.assertEqual(b'Provides support for GPS.', cnt.cnt_payload)
self.assertEqual(31, cnt.cnt_fullLength)
self.assertEqual(31, cnt.cnt_full_length)
def test_PckHash(self):
cnt = self.cnt[4]
self.assertEqual(23, cnt.cnt_id)
self.assertEqual(103, cnt._offsetInPck)
self.assertEqual("PckEckcdsaHash", cnt.cnt_idName)
self.assertEqual(40, cnt.cnt_payloadLen)
self.assertEqual(103, cnt._offset_in_pck)
self.assertEqual("PckEckcdsaHash", cnt.cnt_id_name)
self.assertEqual(40, cnt.cnt_payload_len)
self.assertEqual(b'1a7d206bbfe626c55aa6d2d2caabb6a5a990f13d', cnt.cnt_payload)
self.assertEqual(46, cnt.cnt_fullLength)
self.assertEqual(46, cnt.cnt_full_length)
def test_PckRequirementsHeader(self):
cnt = self.cnt[5]
self.assertEqual(3, cnt.cnt_id)
self.assertEqual(149, cnt._offsetInPck)
self.assertEqual("PckRequirementsHeader", cnt.cnt_idName)
self.assertEqual(149, cnt._offset_in_pck)
self.assertEqual("PckRequirementsHeader", cnt.cnt_id_name)
self.assertEqual(1, cnt.cnt_structure_id)
self.assertEqual(34, cnt.cnt_payloadLen)
self.assertEqual('system', cnt.cnt_programName)
self.assertEqual((0, 0, 0, 0), cnt.cnt_nullBlock)
self.assertEqual("6.45.6 - rc(?): 102", cnt.cnt_osVersionFrom)
self.assertEqual("6.45.6 - rc(?): 102", cnt.cnt_osVersionTo)
self.assertEqual(34, cnt.cnt_payload_len)
self.assertEqual('system', cnt.cnt_program_name)
self.assertEqual((0, 0, 0, 0), cnt.cnt_null_block)
self.assertEqual("6.45.6 - rc(?): 102", cnt.cnt_os_version_min)
self.assertEqual("6.45.6 - rc(?): 102", cnt.cnt_os_version_max)
self.assertEqual("<not available for version 0,1>", cnt.cnt_flags)
self.assertEqual(40, cnt.cnt_fullLength)
self.assertEqual(40, cnt.cnt_full_length)
def test_PckNullBlock(self):
cnt = self.cnt[6]
self.assertEqual(22, cnt.cnt_id)
self.assertEqual(189, cnt._offsetInPck)
self.assertEqual("CntNullBlock", cnt.cnt_idName)
self.assertEqual(3895, cnt.cnt_payloadLen)
self.assertEqual(189, cnt._offset_in_pck)
self.assertEqual("CntNullBlock", cnt.cnt_id_name)
self.assertEqual(3895, cnt.cnt_payload_len)
self.assertEqual(b'\x00' * 3895, cnt.cnt_payload)
self.assertEqual(3901, cnt.cnt_fullLength)
self.assertEqual(3901, cnt.cnt_full_length)
def test_PckSquashFsImage(self):
cnt = self.cnt[7]
self.assertEqual(21, cnt.cnt_id)
self.assertEqual(4090, cnt._offsetInPck)
self.assertEqual("CntSquashFsImage", cnt.cnt_idName)
self.assertEqual(49152, cnt.cnt_payloadLen)
self.assertEqual(4090, cnt._offset_in_pck)
self.assertEqual("CntSquashFsImage", cnt.cnt_id_name)
self.assertEqual(49152, cnt.cnt_payload_len)
self.assertEqual(b'hsqs', cnt.cnt_payload[0:4])
self.assertEqual(49158, cnt.cnt_fullLength)
self.assertEqual(49158, cnt.cnt_full_length)
def test_PckSquashFsHashSignature(self):
cnt = self.cnt[8]
self.assertEqual(9, cnt.cnt_id)
self.assertEqual(53248, cnt._offsetInPck)
self.assertEqual("CntSquashFsHashSignature", cnt.cnt_idName)
self.assertEqual(68, cnt.cnt_payloadLen)
self.assertEqual(53248, cnt._offset_in_pck)
self.assertEqual("CntSquashFsHashSignature", cnt.cnt_id_name)
self.assertEqual(68, cnt.cnt_payload_len)
self.assertEqual(b'\x8e\xa2\xb1\x8e\xf7n\xef355', cnt.cnt_payload[0:10])
self.assertEqual(74, cnt.cnt_fullLength)
self.assertEqual(74, cnt.cnt_full_length)
def test_parseGpsFilxe_PckArchitectureTag_Closing(self):
cnt = self.cnt[9]
self.assertEqual(16, cnt.cnt_id)
self.assertEqual(53322, cnt._offsetInPck)
self.assertEqual("CntArchitectureTag", cnt.cnt_idName)
self.assertEqual(1, cnt.cnt_payloadLen)
self.assertEqual(53322, cnt._offset_in_pck)
self.assertEqual("CntArchitectureTag", cnt.cnt_id_name)
self.assertEqual(1, cnt.cnt_payload_len)
self.assertEqual(b'I', cnt.cnt_payload[0:10])
self.assertEqual(7, cnt.cnt_fullLength)
self.assertEqual(7, cnt.cnt_full_length)
def test_checkStructure(self):
self.assertEqual(10, len(self.npk.pck_cntList))
self.assertEqual([1, 24, 16, 2, 23, 3, 22, 21, 9, 16], list(cnt.cnt_id for cnt in self.npk.pck_cntList))
self.assertEqual(10, len(self.npk.pck_cnt_list))
self.assertEqual([1, 24, 16, 2, 23, 3, 22, 21, 9, 16], list(cnt.cnt_id for cnt in self.npk.pck_cnt_list))
class WriteModifiedGpsFile_Test(GpsFile_Test):
def test_modify_PckRequirementsHeader(self):
x = b"\x03\x00\x22\x00\x00\x00\x01\x00\x73\x79\x73\x74\x65\x6d\x00\x00" + \
b"\x00\x00\x00\x00\x00\x00\x00\x00\x06\x66\x2d\x06\x00\x00\x00\x00" + \
b"\x06\x66\x2d\x06\x00\x00\x00\x00"
expected_binary = b"\x03\x00\x22\x00\x00\x00\x01\x00\x73\x79\x73\x74\x65\x6d\x00\x00" + \
b"\x00\x00\x00\x00\x00\x00\x00\x00\x06\x66\x2d\x06\x00\x00\x00\x00" + \
b"\x06\x66\x2d\x06\x00\x00\x00\x00"
cnt = None
for c in self.cnt:
@ -151,7 +151,7 @@ class WriteModifiedGpsFile_Test(GpsFile_Test):
cnt = c
break
self.assertEqual(x, cnt.cnt_fullBinary)
self.assertEqual(expected_binary, cnt.cnt_full_binary)
def test_createFile_changePayloadTwice(self):
oldPayload = self.npk.pck_header.cnt_payload
@ -159,4 +159,4 @@ class WriteModifiedGpsFile_Test(GpsFile_Test):
self.npk.pck_header.cnt_payload = b"A"
self.npk.pck_header.cnt_payload = oldPayload
self.assertEqual(Npk(self.npkFile).file.read_bytes(), self.npk.pck_fullBinary)
self.assertEqual(Npk(self.npkFile).file.read_bytes(), self.npk.pck_full_binary)

View file

@ -3,48 +3,49 @@ import tempfile
import unittest
from pathlib import Path
from npkpy.common import NPKError, NPKIdError, NPKMagicBytesError
from npkpy.npk.npk import Npk
from npkpy.npk.pckHeader import PckHeader
from tests.constants import DummyHeaderCnt, MAGICBYTES, getDummyNpkBinary
from npkpy.npk.pck_header import PckHeader
from tests.constants import DummyHeaderCnt, MAGIC_BYTES, get_dummy_npk_binary
class Test_npkClass(unittest.TestCase):
def setUp(self) -> None:
self.npkFile = Path(tempfile.NamedTemporaryFile(suffix=".npk").name)
self.npkFile.write_bytes(getDummyNpkBinary())
self.npkFile.write_bytes(get_dummy_npk_binary())
def test_fileIsNoNpkFile(self):
self.npkFile.write_bytes(b"NoMagicBytesAtHeadOfFile")
with self.assertRaises(RuntimeError) as e:
_ = Npk(self.npkFile).pck_magicBytes
self.assertEqual(e.exception.args[0], "MagicBytes not found in Npk file")
with self.assertRaises(NPKMagicBytesError) as e:
_ = Npk(self.npkFile).pck_magic_bytes
self.assertEqual(e.exception.args[0], "Magic bytes not found in Npk file")
def test_npkFileIsCorrupt_fileCorruptException(self):
self.npkFile.write_bytes(MAGICBYTES + b"CorruptFile")
self.npkFile.write_bytes(MAGIC_BYTES + b"CorruptFile")
with self.assertRaises(RuntimeError) as e:
_ = Npk(self.npkFile).pck_cntList
with self.assertRaises(NPKError) as e:
_ = Npk(self.npkFile).pck_cnt_list
self.assertEqual(e.exception.args[0],
f"File maybe corrupted. Please download again. File: {self.npkFile.absolute()}")
def test_extractMagicBytes(self):
self.assertEqual(MAGICBYTES, Npk(self.npkFile).pck_magicBytes)
self.assertEqual(MAGIC_BYTES, Npk(self.npkFile).pck_magic_bytes)
def test_extractLenOfNpkPayload_propagatedSizeIsValid(self):
self.assertEqual(len(DummyHeaderCnt().binHeaderCntA), Npk(self.npkFile).pck_payloadLen)
self.assertEqual(len(DummyHeaderCnt().get_binary), Npk(self.npkFile).pck_payload_len)
def test_calculatePckFullSize_equalsFileSize(self):
self.assertEqual(self.npkFile.stat().st_size, Npk(self.npkFile).pck_fullSize)
self.assertEqual(self.npkFile.stat().st_size, Npk(self.npkFile).pck_full_size)
def test_getNpkBinary_equalsOriginalBinary(self):
npkBinary = self.npkFile.read_bytes()
self.assertEqual(npkBinary, Npk(self.npkFile).pck_fullBinary)
self.assertEqual(npkBinary, Npk(self.npkFile).pck_full_binary)
def test_getEnumeratedListOfCntInNpk(self):
cntList = list(Npk(self.npkFile).pck_enumerateCnt)
cntList = list(Npk(self.npkFile).pck_enumerate_cnt)
cntId, cnt = cntList[0]
self.assertEqual(1, len(cntList))
@ -52,7 +53,7 @@ class Test_npkClass(unittest.TestCase):
self.assertTrue(isinstance(cnt, PckHeader))
def test_getAllCnt_returnAsList(self):
cntList = Npk(self.npkFile).pck_cntList
cntList = Npk(self.npkFile).pck_cnt_list
self.assertEqual(1, len(cntList))
self.assertTrue(isinstance(cntList[0], PckHeader))
@ -60,11 +61,9 @@ class Test_npkClass(unittest.TestCase):
def test_getAllCnt_exceptionWithUnknownCntInNpk(self):
unknownCnt = DummyHeaderCnt()
unknownCnt._00_cnt_id = struct.pack("H", 999)
self.npkFile.write_bytes(getDummyNpkBinary(payload=unknownCnt.binHeaderCntA))
self.npkFile.write_bytes(get_dummy_npk_binary(cnt=unknownCnt.get_binary))
with self.assertRaises(RuntimeError) as e:
_ = Npk(self.npkFile).pck_cntList
self.assertEqual(e.exception.args[0], f"failed with id: 999\n"
with self.assertRaises(NPKIdError) as e:
_ = Npk(self.npkFile).pck_cnt_list
self.assertEqual(e.exception.args[0], f"Failed with cnt id: 999\n"
f"New cnt id discovered in file: {self.npkFile.absolute()}")

View file

@ -1,16 +0,0 @@
import struct
import unittest
from npkpy.npk.pckDescription import PckDescription
from tests.constants import DummyBasicCnt
class Test_pckDescription(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 2)
self.cnt = PckDescription(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(2, self.cnt.cnt_id)

View file

@ -1,16 +0,0 @@
import struct
import unittest
from npkpy.npk.pckEckcdsaHash import PckEckcdsaHash
from tests.constants import DummyBasicCnt
class Test_pckEckcdsaHash(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 23)
self.cnt = PckEckcdsaHash(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(23, self.cnt.cnt_id)

View file

@ -1,16 +0,0 @@
import struct
import unittest
from npkpy.npk.pckReleaseTyp import PckReleaseTyp
from tests.constants import DummyBasicCnt
class Test_pckReleaseTyp(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 24)
self.cnt = PckReleaseTyp(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(24, self.cnt.cnt_id)

View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.pck_description import PckDescription
from tests.constants import get_dummy_basic_cnt
class Test_pckDescription(unittest.TestCase):
def setUp(self) -> None:
self.cnt = PckDescription(get_dummy_basic_cnt(cnt_id=2), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(2, self.cnt.cnt_id)

View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.pck_eckcdsa_hash import PckEckcdsaHash
from tests.constants import get_dummy_basic_cnt
class Test_pckEckcdsaHash(unittest.TestCase):
def setUp(self) -> None:
self.cnt = PckEckcdsaHash(get_dummy_basic_cnt(cnt_id=23), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(23, self.cnt.cnt_id)

49
tests/pck_header_test.py Normal file
View file

@ -0,0 +1,49 @@
import datetime
import struct
import unittest
from npkpy.npk.pck_header import PckHeader
from tests.constants import DummyHeaderCnt
class Test_pckHeader(unittest.TestCase):
def setUp(self) -> None:
self.dummy_cnt = DummyHeaderCnt()
self.dummy_cnt._00_cnt_id = struct.pack("h", 1)
self.cnt = PckHeader(self.dummy_cnt.get_binary, offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(1, self.cnt.cnt_id)
def test_getCntProgramName(self):
self.assertEqual("01234567890abcde", self.cnt.cnt_program_name)
def test_getOsVersion(self):
self.assertEqual("1.2.3 - rc(?): 4", self.cnt.cnt_os_version)
def test_getNullBlock(self):
self.assertEqual((0, 0, 0, 0), self.cnt.cnt_null_block)
def test_getBuildTime(self):
self.assertEqual(datetime.datetime(1970, 1, 1, 0, 0, 1), self.cnt.cnt_built_time)
def test_getOutput(self):
self.assertEqual(('PckHeader',
['Cnt id: 1',
'Cnt offset: 0',
'Cnt len: 41',
'Payload len: 35',
"Payload[0:10]: b'0123456789' [...] ",
'Program name: 01234567890abcde',
'Os version: 1.2.3 - rc(?): 4',
'Created at: 1970-01-01 00:00:01',
'NullBlock: (0, 0, 0, 0)',
'Flags: (0, 0, 0, 0, 0, 0, 0)']), self.cnt.output_cnt)
def test_getCntFlags(self):
self.assertEqual((0, 0, 0, 0, 0, 0, 0), self.cnt.cnt_flags)
def test_flagsForSpecificVersion(self):
# INFO: pkt with version 5.23 seems to have only four flags.
cnt = PckHeader(self.dummy_cnt.get_binary_with_special_flags, offset_in_pck=0)
self.assertEqual((0, 0, 0, 0), cnt.cnt_flags)

View file

@ -0,0 +1,18 @@
import struct
import unittest
from npkpy.npk.pck_multicontainer_header import PktMulticontainerHeader
from tests.constants import DummyHeaderCnt
class Test_pktMultiContainerHeader(unittest.TestCase):
def setUp(self) -> None:
dummy_cnt = DummyHeaderCnt()
dummy_cnt._00_cnt_id = struct.pack("h", 18)
self.cnt = PktMulticontainerHeader(dummy_cnt.get_binary, offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(18, self.cnt.cnt_id)
def test_getCntFlags(self):
self.assertEqual((0, 0, 0, 0), self.cnt.cnt_flags)

View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.pck_multicontainer_list import PktMulticontainerList
from tests.constants import get_dummy_basic_cnt
class Test_cnt_MultiContainerList(unittest.TestCase):
def setUp(self) -> None:
self.cnt = PktMulticontainerList(get_dummy_basic_cnt(cnt_id=20), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(20, self.cnt.cnt_id)

View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.pck_preheader import PckPreHeader
from tests.constants import get_dummy_basic_cnt
class Test_pckPreHeader(unittest.TestCase):
def setUp(self) -> None:
self.cnt = PckPreHeader(get_dummy_basic_cnt(cnt_id=25), offset_in_pck=0)
def test_validateCntId(self):
self.assertEqual(25, self.cnt.cnt_id)

View file

@ -0,0 +1,12 @@
import unittest
from npkpy.npk.pck_release_typ import PckReleaseTyp
from tests.constants import get_dummy_basic_cnt
class Test_pckReleaseTyp(unittest.TestCase):
def setUp(self) -> None:
self.cnt = PckReleaseTyp(get_dummy_basic_cnt(cnt_id=24), offset_in_pck=0)
def test_validate_cnt_id(self):
self.assertEqual(24, self.cnt.cnt_id)

View file

@ -0,0 +1,138 @@
import unittest
from npkpy.npk.pck_requirements_header import PckRequirementsHeader
from tests.constants import get_dummy_requirements_header
class Test_pktRequirementsHeader(unittest.TestCase):
def setUp(self) -> None:
self.cnt = createRequirementsHeaderCnt(structId=0)
def test_validateCntId(self):
self.assertEqual(3, self.cnt.cnt_id)
def test_getCntStructId(self):
self.assertEqual(0, createRequirementsHeaderCnt(structId=0).cnt_structure_id)
self.assertEqual(1, createRequirementsHeaderCnt(structId=1).cnt_structure_id)
self.assertEqual(2, createRequirementsHeaderCnt(structId=2).cnt_structure_id)
class Test_pktRequirementsHeader_StructIdZero(unittest.TestCase):
def setUp(self) -> None:
self.cnt = createRequirementsHeaderCnt(structId=0)
def test_getCntFlags(self):
self.assertEqual("<not available for version 0,1>", self.cnt.cnt_flags)
def test_getProgramName_NotAvailableForVersionZero(self):
self.assertEqual("<not available for version 0>", self.cnt.cnt_program_name)
def test_getOsVersionMin_NotAvailableForVersionZero(self):
self.assertEqual("<not available for version 0>", self.cnt.cnt_os_version_min)
def test_getNullBlock_NotAvailableForVersionZero(self):
self.assertEqual("<not available for version 0>", self.cnt.cnt_null_block)
def test_getOsVersionMax_NotAvailableForVersionZero(self):
self.assertEqual("<not available for version 0>", self.cnt.cnt_os_version_max)
def test_getFlags_NotAvailableForVersionZero(self):
self.assertEqual("<not available for version 0,1>", self.cnt.cnt_flags)
def test_FullBinary(self):
self.assertEqual(get_dummy_requirements_header(structId=0), self.cnt.cnt_full_binary)
def test_getOutput(self):
self.assertEqual(('PckRequirementsHeader',
['Cnt id: 3',
'Cnt offset: 0',
'Cnt len: 41',
'Payload len: 35',
"Payload[0:10]: b'\\x00\\x00abcdefgh' [...] ",
'StructID: 0',
'Offset: 0',
'Program name: <not available for version 0>',
'Null block: <not available for version 0>',
'Os versionFrom: <not available for version 0>',
'Os versionTo: <not available for version 0>',
'Flags: <not available for version 0,1>']), self.cnt.output_cnt)
class Test_pktRequirementsHeader_StructIdOne(unittest.TestCase):
def setUp(self) -> None:
self.cnt = createRequirementsHeaderCnt(structId=1)
def test_getProgramName(self):
self.assertEqual("abcdefghijklmnop", self.cnt.cnt_program_name)
def test_getOsVersionMin(self):
self.assertEqual("1.2.3 - rc(?): 4", self.cnt.cnt_os_version_min)
def test_getNullBlock(self):
self.assertEqual((0, 0, 0, 0), self.cnt.cnt_null_block)
def test_getOsVersionMax(self):
self.assertEqual("5.6.7 - rc(?): 8", self.cnt.cnt_os_version_max)
def test_getFlags(self):
self.assertEqual("<not available for version 0,1>", self.cnt.cnt_flags)
def test_FullBinary(self):
self.assertEqual(get_dummy_requirements_header(structId=1), self.cnt.cnt_full_binary)
def test_getOutput(self):
self.assertEqual(('PckRequirementsHeader',
['Cnt id: 3',
'Cnt offset: 0',
'Cnt len: 41',
'Payload len: 35',
"Payload[0:10]: b'\\x01\\x00abcdefgh' [...] ",
'StructID: 1',
'Offset: 0',
'Program name: abcdefghijklmnop',
'Null block: (0, 0, 0, 0)',
'Os versionFrom: 1.2.3 - rc(?): 4',
'Os versionTo: 5.6.7 - rc(?): 8',
'Flags: <not available for version 0,1>']), self.cnt.output_cnt)
class Test_pktRequirementsHeader_StructIdTwo(unittest.TestCase):
def setUp(self) -> None:
self.cnt = createRequirementsHeaderCnt(structId=2)
def test_getProgramName(self):
self.assertEqual("abcdefghijklmnop", self.cnt.cnt_program_name)
def test_getOsVersionMin(self):
self.assertEqual("1.2.3 - rc(?): 4", self.cnt.cnt_os_version_min)
def test_getNullBlock(self):
self.assertEqual((0, 0, 0, 0), self.cnt.cnt_null_block)
def test_getOsVersionMax(self):
self.assertEqual("5.6.7 - rc(?): 8", self.cnt.cnt_os_version_max)
def test_getFlags(self):
self.assertEqual((0, 0, 0, 0), self.cnt.cnt_flags)
def test_FullBinary(self):
self.assertEqual(get_dummy_requirements_header(structId=2), self.cnt.cnt_full_binary)
def test_getOutput(self):
self.assertEqual(('PckRequirementsHeader',
['Cnt id: 3',
'Cnt offset: 0',
'Cnt len: 41',
'Payload len: 35',
"Payload[0:10]: b'\\x02\\x00abcdefgh' [...] ",
'StructID: 2',
'Offset: 0',
'Program name: abcdefghijklmnop',
'Null block: (0, 0, 0, 0)',
'Os versionFrom: 1.2.3 - rc(?): 4',
'Os versionTo: 5.6.7 - rc(?): 8',
'Flags: (0, 0, 0, 0)']), self.cnt.output_cnt)
def createRequirementsHeaderCnt(structId):
return PckRequirementsHeader(get_dummy_requirements_header(structId), offset_in_pck=0)

Binary file not shown.

View file

@ -1,86 +1,92 @@
gps-6.45.6.npk
Cnt: 0:PckHeader
Cnt: 1:PckReleaseTyp
Cnt: 2:CntArchitectureTag
Cnt: 3:PckDescription
Cnt: 4:PckEckcdsaHash
Cnt: 5:PckRequirementsHeader
Cnt: 6:CntNullBlock
Cnt: 7:CntSquashFsImage
Cnt: 8:CntSquashFsHashSignature
Cnt: 9:CntArchitectureTag
gps-6.48.4.npk
Cnt: 0:PckPreHeader
Cnt: 1:PckHeader
Cnt: 2:PckReleaseTyp
Cnt: 3:CntArchitectureTag
Cnt: 4:PckDescription
Cnt: 5:PckEckcdsaHash
Cnt: 6:PckRequirementsHeader
Cnt: 7:CntNullBlock
Cnt: 8:CntSquashFsImage
Cnt: 9:CntSquashFsHashSignature
Cnt: 10:CntArchitectureTag
PckPreHeader
Cnt id: 25
Cnt offset: 8
Cnt len: 6
Payload len: 0
Payload[0:0]: b'' [...]
PckHeader
Cnt id: 1
Cnt offset: 8
Cnt offset: 14
Cnt len: 42
Payload len: 36
Payload[0:10]: b'gps\x00\x00\x00\x00\x00\x00\x00' [...]
Program name: gps
Os version: 6.45.6 - rc(?): 102
Created at: 2019-09-10 09:06:31
Os version: 6.48.4 - rc(?): 102
Created at: 2021-08-18 06:43:27
NullBlock: (0, 0, 0, 0)
Flags: (0, 0, 0, 0, 2, 0, 0)
PckReleaseTyp
Cnt id: 24
Cnt offset: 50
Cnt offset: 56
Cnt len: 12
Payload len: 6
Payload[0:6]: b'stable' [...]
CntArchitectureTag
Cnt id: 16
Cnt offset: 62
Cnt offset: 68
Cnt len: 10
Payload len: 4
Payload[0:4]: b'i386' [...]
PckDescription
Cnt id: 2
Cnt offset: 72
Cnt offset: 78
Cnt len: 31
Payload len: 25
Payload[0:10]: b'Provides s' [...]
PckEckcdsaHash
Cnt id: 23
Cnt offset: 103
Cnt offset: 109
Cnt len: 46
Payload len: 40
Payload[0:10]: b'1a7d206bbf' [...]
Payload[0:10]: b'589981d829' [...]
PckRequirementsHeader
Cnt id: 3
Cnt offset: 149
Cnt offset: 155
Cnt len: 40
Payload len: 34
Payload[0:10]: b'\x01\x00system\x00\x00' [...]
Cnt id: 3
StructID: 1
Offset: 149
Offset: 155
Program name: system
Null block: (0, 0, 0, 0)
Os versionFrom: 6.45.6 - rc(?): 102
Os versionTo: 6.45.6 - rc(?): 102
Os versionFrom: 6.48.4 - rc(?): 102
Os versionTo: 6.48.4 - rc(?): 102
Flags: <not available for version 0,1>
CntNullBlock
Cnt id: 22
Cnt offset: 189
Cnt len: 3901
Payload len: 3895
Cnt offset: 195
Cnt len: 3895
Payload len: 3889
Payload[0:10]: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' [...]
CntSquashFsImage
Cnt id: 21
Cnt offset: 4090
Cnt len: 49158
Payload len: 49152
Payload[0:10]: b'hsqs\x15\x00\x00\x00\xa2m' [...]
calc Sha1Hash: b'c\x033\x82{\x9a$er\x92=o\x8a\xfeL-Q\x02hW'
Payload[0:10]: b'hsqs\x15\x00\x00\x00\xdb\xac' [...]
calc Sha1Hash: b'&1=\x9e\x05\xef\x19\xb218AVQ\xa2g&j5\x9c\x87'
CntSquashFsHashSignature
Cnt id: 9
Cnt offset: 53248
Cnt len: 74
Payload len: 68
Payload[0:10]: b'\x8e\xa2\xb1\x8e\xf7n\xef355' [...]
Payload[-10:]: b"Y\x9e'\xac\xccw\x91\x06]\t"
Cnt len: 138
Payload len: 132
Payload[0:10]: b'\xca\xdb\x02\xed,\xca\x13\xc2\x88J' [...]
Payload[-10:]: b'7\x9dF\xf4\x14\xd3,\x16\x18\x06'
CntArchitectureTag
Cnt id: 16
Cnt offset: 53322
Cnt offset: 53386
Cnt len: 7
Payload len: 1
Payload[0:1]: b'I' [...]

View file

@ -1,17 +0,0 @@
import struct
import unittest
from npkpy.npk.xCntFlagA import XCnt_flagA
from tests.constants import DummyBasicCnt
class Test_xCnt_flagB(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 7)
self.cnt = XCnt_flagA(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(7, self.cnt.cnt_id)

View file

@ -1,18 +0,0 @@
import struct
import unittest
from npkpy.npk.xCntFlagB import XCnt_flagB
from tests.constants import DummyBasicCnt
class Test_xCnt_flagB(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 8)
self.cnt = XCnt_flagB(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(8, self.cnt.cnt_id)

View file

@ -1,17 +0,0 @@
import struct
import unittest
from npkpy.npk.xCntFlagC import XCnt_flagC
from tests.constants import DummyBasicCnt
class Test_xCnt_flagC(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 17)
self.cnt = XCnt_flagC(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(17, self.cnt.cnt_id)

View file

@ -1,16 +0,0 @@
import struct
import unittest
from npkpy.npk.xCntMpls import XCntMpls
from tests.constants import DummyBasicCnt
class Test_xCntMpls(unittest.TestCase):
def setUp(self) -> None:
dummyCnt = DummyBasicCnt()
dummyCnt._00_cnt_id = struct.pack("h", 19)
self.cnt = XCntMpls(dummyCnt.binCnt, offsetInPck=0)
def test_validateCntId(self):
self.assertEqual(19, self.cnt.cnt_id)

View file

@ -4,63 +4,72 @@ import unittest
from pathlib import Path
class Test_npkPy(unittest.TestCase):
class Test_npkpy(unittest.TestCase):
def setUp(self) -> None:
# TODO: create DummyPkg and replace gps-6.45.6.npk
self.npkFile = Path("tests/testData/gps-6.45.6.npk")
self.pathToNpk = str(self.npkFile.absolute())
self.npkContainerList = Path("tests/testData/gps-6.45.6.result").read_text()
self.dstFolder = Path(tempfile.mkdtemp())
self.npk_file = Path("tests/testData/6_48_4/gps-6.48.4.npk")
self.path_to_npk = str(self.npk_file.absolute())
self.npk_container_list = Path("tests/testData/6_48_4/gps-6.48.4.result").read_text()
self.dst_folder = Path(tempfile.mkdtemp())
def tearDown(self) -> None:
[f.unlink() for f in self.dstFolder.rglob("*") if f.is_file()]
[f.rmdir() for f in self.dstFolder.rglob("*")]
self.dstFolder.rmdir()
for _file in self.dst_folder.rglob("*"):
if _file.is_file():
_file.unlink()
for _file in self.dst_folder.rglob("*"):
_file.rmdir()
def test_showAllContainersFromNpkPkg(self):
cmd = ["npkPy", "--file", self.pathToNpk, "--showContainer"]
output = runCmdInTerminal(cmd)
self.assertEqual(self.npkContainerList, output)
self.dst_folder.rmdir()
def test_exportAllContainerFromNpk(self):
cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportAll"]
def test_list_all_containers_from_npk_pkg(self):
cmd = ["npkpy", "--file", self.path_to_npk, "--show-container"]
output = run_command_in_terminal(cmd)
self.assertEqual(self.npk_container_list, output)
runCmdInTerminal(cmd)
def test_list_in_folder(self):
cmd = ["npkpy", "--src-folder", str(self.npk_file.parent), "--show-container"]
output = run_command_in_terminal(cmd)
self.assertEqual(self.npk_container_list, output)
exportedContainer = sorted(str(f.relative_to(self.dstFolder)) for f in self.dstFolder.rglob('*'))
self.assertEqual(['npkPyExport_gps-6.45.6',
'npkPyExport_gps-6.45.6/000_cnt_PckHeader.raw',
'npkPyExport_gps-6.45.6/001_cnt_PckReleaseTyp.raw',
'npkPyExport_gps-6.45.6/002_cnt_CntArchitectureTag.raw',
'npkPyExport_gps-6.45.6/003_cnt_PckDescription.raw',
'npkPyExport_gps-6.45.6/004_cnt_PckEckcdsaHash.raw',
'npkPyExport_gps-6.45.6/005_cnt_PckRequirementsHeader.raw',
'npkPyExport_gps-6.45.6/006_cnt_CntNullBlock.raw',
'npkPyExport_gps-6.45.6/007_cnt_CntSquashFsImage.raw',
'npkPyExport_gps-6.45.6/008_cnt_CntSquashFsHashSignature.raw',
'npkPyExport_gps-6.45.6/009_cnt_CntArchitectureTag.raw'], exportedContainer)
def test_export_all_container_from_npk(self):
cmd = ["npkpy", "--file", self.path_to_npk, "--dst-folder", self.dst_folder.absolute(), "--export-all"]
def test_extractSquashFsContainerFromNpk(self):
cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportSquashFs"]
run_command_in_terminal(cmd)
runCmdInTerminal(cmd)
exported_container = sorted(str(_file.relative_to(self.dst_folder)) for _file in self.dst_folder.rglob('*'))
self.assertEqual(['npkPyExport_gps-6.48.4',
'npkPyExport_gps-6.48.4/000_cnt_PckPreHeader.raw',
'npkPyExport_gps-6.48.4/001_cnt_PckHeader.raw',
'npkPyExport_gps-6.48.4/002_cnt_PckReleaseTyp.raw',
'npkPyExport_gps-6.48.4/003_cnt_CntArchitectureTag.raw',
'npkPyExport_gps-6.48.4/004_cnt_PckDescription.raw',
'npkPyExport_gps-6.48.4/005_cnt_PckEckcdsaHash.raw',
'npkPyExport_gps-6.48.4/006_cnt_PckRequirementsHeader.raw',
'npkPyExport_gps-6.48.4/007_cnt_CntNullBlock.raw',
'npkPyExport_gps-6.48.4/008_cnt_CntSquashFsImage.raw',
'npkPyExport_gps-6.48.4/009_cnt_CntSquashFsHashSignature.raw',
'npkPyExport_gps-6.48.4/010_cnt_CntArchitectureTag.raw'], exported_container)
self.assertContainerExtracted(['npkPyExport_gps-6.45.6',
'npkPyExport_gps-6.45.6/007_cnt_CntSquashFsImage.raw'])
def test_extract_squashfs_container_from_npk(self):
cmd = ["npkpy", "--file", self.path_to_npk, "--dst-folder", self.dst_folder.absolute(), "--export-squashfs"]
#
def test_extractZlibContainerFromNpk_NonExisitngNotExtracted(self):
cmd = ["npkPy", "--file", self.pathToNpk, "--dstFolder", self.dstFolder.absolute(), "--exportZlib"]
run_command_in_terminal(cmd)
runCmdInTerminal(cmd)
self.assert_container_extracted(['npkPyExport_gps-6.48.4',
'npkPyExport_gps-6.48.4/008_cnt_CntSquashFsImage.raw'])
self.assertContainerExtracted([])
def test_extract_zlib_container_from_npk_nonexisting_not_extracted(self):
cmd = ["npkpy", "--file", self.path_to_npk, "--dst-folder", self.dst_folder.absolute(), "--export-zlib"]
def assertContainerExtracted(self, expectedFiles):
extractedContainer = sorted(str(f.relative_to(self.dstFolder)) for f in self.dstFolder.rglob('*'))
self.assertEqual(expectedFiles, extractedContainer)
run_command_in_terminal(cmd)
self.assert_container_extracted([])
def assert_container_extracted(self, expected_files):
extracted_container = sorted(str(_file.relative_to(self.dst_folder)) for _file in self.dst_folder.rglob('*'))
self.assertEqual(expected_files, extracted_container)
def runCmdInTerminal(cmd):
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.decode("UTF-8")
def run_command_in_terminal(cmd):
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True).stdout.decode("UTF-8")

View file

@ -0,0 +1,56 @@
from pathlib import Path
from pprint import pprint
from npkpy.npk.npk import Npk
## KEEP IN MIND: MODIFICATIONS WILL INVALIDATE THE NPK PACKAGE SIGNATURE!
## THE ROUTER WON'T INSTALL THE PACKAGE.
def modify_poc():
npk_file = Npk(Path("tests/testData/6_48_4/gps-6.48.4.npk"))
# print overview
print("----Overview--------------------")
pprint([f"pos: {pos:2} - Name: {cnt.cnt_id_name} (id:{cnt.cnt_id:2})" for pos, cnt in npk_file.pck_enumerate_cnt])
print("-------------------------------")
# The following code example will modify the payload section of PckDescription
CNT_ID = 4 # PckDescription
print("Payload original:")
print_overview(npk_file, cnt_id=CNT_ID)
print("overwrite payload - same size:")
npk_file.pck_cnt_list[CNT_ID].cnt_payload = b"a" * 25
print_overview(npk_file, cnt_id=CNT_ID)
# Modifying the size of the payload can affect the whole npk package and
# forces recalculations in other containers of this package
print("Payload new - small size:")
npk_file.pck_cnt_list[CNT_ID].cnt_payload = b"b" * 10
print_overview(npk_file, cnt_id=CNT_ID)
print("Payload new - increased:")
npk_file.pck_cnt_list[CNT_ID].cnt_payload = b"c" * 100
print_overview(npk_file, cnt_id=CNT_ID)
print("Write File: modified.npk")
Path("modified.npk").write_bytes(npk_file.pck_full_binary)
# Parse the new npk file as shown blow:
# $ npkpy --files modified.npk --show-container
def print_overview(npk_file, cnt_id):
cnt = npk_file.pck_cnt_list[cnt_id]
print("Cnt payload: ", cnt.cnt_payload)
print("Cnt payload len: ", cnt.cnt_payload_len)
print("Cnt len: ", cnt.cnt_full_length)
print("pkg len: ", npk_file.pck_payload_len)
print("-------------------------------")
if __name__ == '__main__':
modify_poc()

View file

@ -18,17 +18,17 @@ class BasicNpkTestRequirements(unittest.TestCase):
class ParseTestPackage_Test(BasicNpkTestRequirements):
def test_minimalNPK_parseHeader(self):
self.assertEqual(b"\x1e\xf1\xd0\xba", self.npk.pck_magicBytes)
self.assertEqual(28, self.npk.pck_payloadLen)
self.assertEqual(b"\x1e\xf1\xd0\xba", self.npk.pck_magic_bytes)
self.assertEqual(28, self.npk.pck_payload_len)
def test_minimalNPK_parseAllContainer(self):
listOfCnt = self.npk.pck_cntList
listOfCnt = self.npk.pck_cnt_list
self.assertEqual(1, listOfCnt[0].cnt_id)
self.assertEqual(15, listOfCnt[0].cnt_payloadLen)
self.assertEqual(15, listOfCnt[0].cnt_payload_len)
self.assertEqual(b"NAME OF PROGRAM", listOfCnt[0].cnt_payload)
self.assertEqual(1, listOfCnt[1].cnt_id)
self.assertEqual(1, listOfCnt[1].cnt_payloadLen)
self.assertEqual(1, listOfCnt[1].cnt_payload_len)
self.assertEqual(b"I", listOfCnt[1].cnt_payload)
@ -36,62 +36,62 @@ class ModifyPayload_Test(BasicNpkTestRequirements):
def setUp(self) -> None:
super().setUp()
self.cnt = self.npk.pck_cntList[0]
self.cnt = self.npk.pck_cnt_list[0]
def test_emptyPayload_emptyContainer(self):
self.cnt.cnt_payload = b""
self.assertEqual(1, self.cnt.cnt_id)
self.assertEqual(0, self.cnt.cnt_payloadLen)
self.assertEqual(0, self.cnt.cnt_payload_len)
self.assertEqual(b"", self.cnt.cnt_payload)
self.assertEqual(6, self.cnt.cnt_fullLength)
self.assertEqual(6, self.cnt.cnt_full_length)
self.assertEqual(13, self.npk.pck_payloadLen)
self.assertEqual(21, self.npk.pck_fullSize)
self.assertEqual(13, self.npk.pck_payload_len)
self.assertEqual(21, self.npk.pck_full_size)
def test_dontchangePayloadSize_recalculateContainerKeepSize(self):
self.cnt.cnt_payload = b"PROGRAM OF NAME"
self.assertEqual(1, self.cnt.cnt_id)
self.assertEqual(15, self.cnt.cnt_payloadLen)
self.assertEqual(15, self.cnt.cnt_payload_len)
self.assertEqual(b"PROGRAM OF NAME", self.cnt.cnt_payload)
self.assertEqual(21, self.cnt.cnt_fullLength)
self.assertEqual(21, self.cnt.cnt_full_length)
self.assertEqual(28, self.npk.pck_payloadLen)
self.assertEqual(36, self.npk.pck_fullSize)
self.assertEqual(28, self.npk.pck_payload_len)
self.assertEqual(36, self.npk.pck_full_size)
def test_increasePayloadLen_recalculateContainerSizeBigger(self):
self.cnt.cnt_payload = b"NEW NAME OF PROGRAM"
self.assertEqual(1, self.cnt.cnt_id)
self.assertEqual(19, self.cnt.cnt_payloadLen)
self.assertEqual(19, self.cnt.cnt_payload_len)
self.assertEqual(b"NEW NAME OF PROGRAM", self.cnt.cnt_payload)
self.assertEqual(25, self.cnt.cnt_fullLength)
self.assertEqual(25, self.cnt.cnt_full_length)
self.assertEqual(32, self.npk.pck_payloadLen)
self.assertEqual(40, self.npk.pck_fullSize)
self.assertEqual(32, self.npk.pck_payload_len)
self.assertEqual(40, self.npk.pck_full_size)
def test_decreasePayloadLen_recalculateContainerSmaller(self):
self.cnt.cnt_payload = b"SHORT NAME"
self.assertEqual(1, self.cnt.cnt_id)
self.assertEqual(10, self.cnt.cnt_payloadLen)
self.assertEqual(10, self.cnt.cnt_payload_len)
self.assertEqual(b"SHORT NAME", self.cnt.cnt_payload)
self.assertEqual(16, self.cnt.cnt_fullLength)
self.assertEqual(16, self.cnt.cnt_full_length)
self.assertEqual(23, self.npk.pck_payloadLen)
self.assertEqual(31, self.npk.pck_fullSize)
self.assertEqual(23, self.npk.pck_payload_len)
self.assertEqual(31, self.npk.pck_full_size)
class WriteModifiedFile_Test(BasicNpkTestRequirements):
def setUp(self) -> None:
super().setUp()
self.cnt = self.npk.pck_cntList[0]
self.cnt = self.npk.pck_cnt_list[0]
def test_createFile_withoutModification(self):
self.assertEqual(MINIMAL_NPK_PACKAGE, self.npk.pck_fullBinary)
self.assertEqual(MINIMAL_NPK_PACKAGE, self.npk.pck_full_binary)
def test_createFile_changePayloadTwice(self):
self.cnt.cnt_payload = b"A"
self.cnt.cnt_payload = b"NAME OF PROGRAM"
self.assertEqual(MINIMAL_NPK_PACKAGE, self.npk.pck_fullBinary)
self.assertEqual(MINIMAL_NPK_PACKAGE, self.npk.pck_full_binary)