mirror of
https://dev.iopsys.eu/bbf/bbfdm.git
synced 2025-12-10 07:44:39 +01:00
* Added handilng to add description from BBF's DM xml file to converted json file. * Change DM aggregation in temp txt file from CSV format to JSON * Added "dm_json_files" in tools_input.json file to define the path of Json files having more details about DM parameters * Added support to read DM param description from json plugin and update it in "datamodel_default.xml" * Added support to read the description from defined "dm_json_files" in case description of param is not present in json plugin * Added support to read the description from defined "dm_json_files" in case datamodel param defined in 'C' code * Added enum and list type DM parameter in generated "datamodel_default.xml"
469 lines
17 KiB
Python
Executable file
469 lines
17 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
# Copyright (C) 2021 iopsys Software Solutions AB
|
|
# Author: Amin Ben Ramdhane <amin.benramdhane@pivasoftware.com>
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import xml.etree.ElementTree as ET
|
|
import xml.dom.minidom as MD
|
|
import bbf_common as bbf
|
|
import json
|
|
|
|
DM_OBJ_COUNT = 0
|
|
DM_PARAM_COUNT = 0
|
|
DEVICE_PROTOCOL = "DEVICE_PROTOCOL_DSLFTR069v1"
|
|
MANUFACTURER = "iopsys"
|
|
MANUFACTURER_OUI = "002207"
|
|
PRODUCT_CLASS = "DG400PRIME"
|
|
MODEL_NAME = "DG400PRIME-A"
|
|
SOFTWARE_VERSION = "1.2.3.4"
|
|
|
|
ARRAY_TYPES = {"DMT_STRING": "string",
|
|
"DMT_UNINT": "unsignedInt",
|
|
"DMT_UNLONG": "unsignedLong",
|
|
"DMT_INT": "int",
|
|
"DMT_LONG": "long",
|
|
"DMT_BOOL": "boolean",
|
|
"DMT_TIME": "dateTime",
|
|
"DMT_HEXBIN": "hexBinary",
|
|
"DMT_BASE64": "base64"}
|
|
|
|
|
|
def pretty_format(elem):
|
|
elem_string = ET.tostring(elem, 'UTF-8')
|
|
reparsed = MD.parseString(elem_string)
|
|
return reparsed.toprettyxml(indent=" ")
|
|
|
|
|
|
def generate_bbf_xml_file(output_file, dm_json_files=None):
|
|
global DM_OBJ_COUNT
|
|
global DM_PARAM_COUNT
|
|
|
|
bbf.remove_file(output_file)
|
|
root = ET.Element("dm:document")
|
|
root.set("xmlns:dm", "urn:broadband-forum-org:cwmp:datamodel-1-8")
|
|
root.set("xmlns:dmr", "urn:broadband-forum-org:cwmp:datamodel-report-0-1")
|
|
root.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
|
|
root.set("xsi:schemaLocation", "urn:broadband-forum-org:cwmp:datamodel-1-8 https://www.broadband-forum.org/cwmp/cwmp-datamodel-1-8.xsd urn:broadband-forum-org:cwmp:datamodel-report-0-1 https://www.broadband-forum.org/cwmp/cwmp-datamodel-report.xsd")
|
|
root.set("spec", "urn:broadband-forum-org:tr-181-2-14-1-cwmp")
|
|
root.set("file", "tr-181-2-15-0-cwmp-full.xml")
|
|
|
|
model = ET.SubElement(root, "model")
|
|
model.set("name", "Device:2.14")
|
|
|
|
for value in bbf.LIST_SUPPORTED_DM:
|
|
|
|
obj = json.loads(value)
|
|
protocol = bbf.get_option_value(obj, "protocol", None)
|
|
if protocol is None or protocol == "BBFDM_USP":
|
|
continue
|
|
|
|
p_type = bbf.get_option_value(obj, "type", None)
|
|
if p_type is None:
|
|
continue
|
|
|
|
name = bbf.get_option_value(obj, "param", None)
|
|
permission = bbf.get_option_value(obj, "permission", None)
|
|
list_ob = bbf.get_option_value(obj, "list", None)
|
|
enum = bbf.get_option_value(obj, "enum", None)
|
|
desc = bbf.get_option_value(obj, "description", None)
|
|
rang = bbf.get_option_value(obj, "range", None)
|
|
|
|
if name is None or permission is None:
|
|
continue
|
|
|
|
access = "readOnly" if permission == "DMREAD" else "readWrite"
|
|
|
|
if p_type == "DMT_OBJ":
|
|
# Object
|
|
objec = ET.SubElement(model, "object")
|
|
objec.set("name", name)
|
|
objec.set("access", access)
|
|
objec.set("minEntries", "0")
|
|
objec.set("maxEntries", "20")
|
|
|
|
ob_description = ET.SubElement(objec, "description")
|
|
if desc is None or len(desc) == 0:
|
|
desc = bbf.get_param_info_from_json(name, dm_json_files, "description")
|
|
ob_description.text = desc.replace("<", "{").replace(">", "}") if desc is not None else ""
|
|
|
|
DM_OBJ_COUNT += 1
|
|
else:
|
|
# Parameter
|
|
subtype = None
|
|
list_datatype = None
|
|
parameter = ET.SubElement(objec, "parameter")
|
|
parameter.set("name", name[name.rindex('.')+1:])
|
|
parameter.set("access", access)
|
|
p_description = ET.SubElement(parameter, "description")
|
|
if desc is None or len(desc) == 0:
|
|
desc = bbf.get_param_info_from_json(name, dm_json_files, "description")
|
|
p_description.text = desc.replace("<", "{").replace(">", "}") if desc is not None else ""
|
|
syntax = ET.SubElement(parameter, "syntax")
|
|
|
|
if list_ob is None:
|
|
list_ob = bbf.get_param_info_from_json(name, dm_json_files, "list")
|
|
|
|
if list_ob is not None and len(list_ob) != 0:
|
|
listtag = ET.SubElement(syntax, "list")
|
|
|
|
item_ob = None
|
|
maxsize = None
|
|
|
|
# Handle items in list
|
|
try:
|
|
item_ob = list_ob["item"]
|
|
except KeyError:
|
|
item_ob = None
|
|
|
|
if item_ob is not None:
|
|
minval = None
|
|
maxval = None
|
|
try:
|
|
minval = item_ob["min"]
|
|
except KeyError:
|
|
minval = None
|
|
|
|
try:
|
|
maxval = item_ob["max"]
|
|
except KeyError:
|
|
maxval = None
|
|
|
|
if minval is not None:
|
|
listtag.set("minItems", str(minval))
|
|
|
|
if maxval is not None:
|
|
listtag.set("maxItems", str(maxval))
|
|
|
|
# Handle maxsize in list
|
|
try:
|
|
maxsize = list_ob["maxsize"]
|
|
except KeyError:
|
|
maxsize = None
|
|
|
|
if maxsize is not None:
|
|
sizetag = ET.SubElement(listtag, "size")
|
|
sizetag.set("maxLength", str(maxsize))
|
|
|
|
if enum is None or len(enum) == 0:
|
|
try:
|
|
enum = list_ob["enumerations"]
|
|
except KeyError:
|
|
enum = None
|
|
|
|
try:
|
|
list_datatype = list_ob["datatype"]
|
|
except KeyError:
|
|
list_datatype = None
|
|
|
|
if list_datatype is not None and list_datatype in ARRAY_TYPES.values():
|
|
subtype = ET.SubElement(syntax, list_datatype)
|
|
else:
|
|
subtype = ET.SubElement(syntax, ARRAY_TYPES.get(p_type, None))
|
|
else:
|
|
subtype = ET.SubElement(syntax, ARRAY_TYPES.get(p_type, None))
|
|
|
|
if enum is None:
|
|
enum = bbf.get_param_info_from_json(name, dm_json_files, "enumerations")
|
|
|
|
if enum is not None:
|
|
for val in enum:
|
|
enumeration = ET.SubElement(subtype, "enumeration")
|
|
enumeration.set("value", str(val))
|
|
|
|
# handle range
|
|
range_min = None
|
|
range_max = None
|
|
|
|
if rang is None:
|
|
try:
|
|
if list_ob is not None:
|
|
rang = list_ob["range"]
|
|
except KeyError:
|
|
rang = None
|
|
|
|
if rang is None:
|
|
rang = bbf.get_param_info_from_json(name, dm_json_files, "range")
|
|
|
|
if rang is not None and len(rang) != 0:
|
|
rang_len = len(rang)
|
|
for i in range(rang_len):
|
|
try:
|
|
range_min = rang[i]["min"]
|
|
except KeyError:
|
|
range_min = None
|
|
|
|
try:
|
|
range_max = rang[i]["max"]
|
|
except KeyError:
|
|
range_max = None
|
|
|
|
if list_datatype is not None:
|
|
val_type = list_datatype
|
|
else:
|
|
val_type = ARRAY_TYPES.get(p_type, None)
|
|
if val_type == "string" or val_type == "hexBinary" or val_type == "base64":
|
|
size_tag = ET.SubElement(subtype, "size")
|
|
if range_min is not None:
|
|
size_tag.set("minLength", str(range_min))
|
|
if range_max is not None:
|
|
size_tag.set("maxLength", str(range_max))
|
|
if val_type == "unsignedInt" or val_type == "unsignedLong" or val_type == "int" or val_type == "long":
|
|
range_tag = ET.SubElement(subtype, "range")
|
|
if range_min is not None:
|
|
range_tag.set("minInclusive", str(range_min))
|
|
if range_max is not None:
|
|
range_tag.set("maxInclusive", str(range_max))
|
|
|
|
DM_PARAM_COUNT += 1
|
|
|
|
xml_file = open(output_file, "w", encoding='utf-8')
|
|
xml_file.write(pretty_format(root))
|
|
xml_file.close()
|
|
|
|
|
|
def generate_hdm_xml_file(output_file):
|
|
global DM_OBJ_COUNT
|
|
global DM_PARAM_COUNT
|
|
|
|
bbf.remove_file(output_file)
|
|
root = ET.Element("deviceType")
|
|
root.set("xmlns", "urn:dslforum-org:hdm-0-0")
|
|
root.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
|
|
root.set("xsi:schemaLocation", "urn:dslforum-org:hdm-0-0 deviceType.xsd")
|
|
|
|
protocol = ET.SubElement(root, "protocol")
|
|
protocol.text = str(DEVICE_PROTOCOL)
|
|
manufacturer = ET.SubElement(root, "manufacturer")
|
|
manufacturer.text = str(MANUFACTURER)
|
|
manufacturerOUI = ET.SubElement(root, "manufacturerOUI")
|
|
manufacturerOUI.text = str(MANUFACTURER_OUI)
|
|
productClass = ET.SubElement(root, "productClass")
|
|
productClass.text = str(PRODUCT_CLASS)
|
|
modelName = ET.SubElement(root, "modelName")
|
|
modelName.text = str(MODEL_NAME)
|
|
softwareVersion = ET.SubElement(root, "softwareVersion")
|
|
softwareVersion.text = str(SOFTWARE_VERSION)
|
|
dm_type = ET.SubElement(root, "type")
|
|
dm_type.text = str("Device:2")
|
|
|
|
dataModel = ET.SubElement(root, "dataModel")
|
|
attributes = ET.SubElement(dataModel, "attributes")
|
|
parameters = ET.SubElement(dataModel, "parameters")
|
|
|
|
attribute_notification = ET.SubElement(attributes, "attribute")
|
|
attributeName = ET.SubElement(attribute_notification, "attributeName")
|
|
attributeName.text = str("notification")
|
|
attributeType = ET.SubElement(attribute_notification, "attributeType")
|
|
attributeType.text = str("int")
|
|
minValue = ET.SubElement(attribute_notification, "minValue")
|
|
minValue.text = str("0")
|
|
maxValue = ET.SubElement(attribute_notification, "maxValue")
|
|
maxValue.text = str("2")
|
|
|
|
attribute_access_list = ET.SubElement(attributes, "attribute")
|
|
attributeName = ET.SubElement(attribute_access_list, "attributeName")
|
|
attributeName.text = str("accessList")
|
|
attributeType = ET.SubElement(attribute_access_list, "attributeType")
|
|
attributeType.text = str("string")
|
|
array = ET.SubElement(attribute_access_list, "array")
|
|
array.text = str("true")
|
|
attributeLength = ET.SubElement(attribute_access_list, "attributeLength")
|
|
attributeLength.text = str("64")
|
|
|
|
attribute_visibility = ET.SubElement(attributes, "attribute")
|
|
attributeName = ET.SubElement(attribute_visibility, "attributeName")
|
|
attributeName.text = str("visibility")
|
|
attributeType = ET.SubElement(attribute_visibility, "attributeType")
|
|
attributeType.text = str("string")
|
|
array = ET.SubElement(attribute_visibility, "array")
|
|
array.text = str("true")
|
|
attributeLength = ET.SubElement(attribute_visibility, "attributeLength")
|
|
attributeLength.text = str("64")
|
|
|
|
#param_array = np.empty(15, dtype=ET.Element)
|
|
param_array = [ET.Element] * 15
|
|
param_array[0] = parameters
|
|
root_dot_count = bbf.get_root_node().count('.') - 1 if (bbf.get_root_node()) else 0
|
|
|
|
for value in bbf.LIST_SUPPORTED_DM:
|
|
|
|
obj = json.loads(value)
|
|
protocol = bbf.get_option_value(obj, "protocol", None)
|
|
if protocol is None or protocol == "BBFDM_USP":
|
|
continue
|
|
|
|
p_type = bbf.get_option_value(obj, "type", None)
|
|
if p_type is None:
|
|
continue
|
|
|
|
name = bbf.get_option_value(obj, "param", None)
|
|
permission = bbf.get_option_value(obj, "permission", None)
|
|
if name is None or permission is None:
|
|
continue
|
|
|
|
if p_type == "DMT_OBJ":
|
|
# Object
|
|
obj_tag = ET.SubElement(
|
|
param_array[name.replace(".{i}", "").count('.') - root_dot_count -1], "parameter")
|
|
obj_name = ET.SubElement(obj_tag, "parameterName")
|
|
obj_name.text = str(name.replace(".{i}", "").split('.')[-2])
|
|
obj_type = ET.SubElement(obj_tag, "parameterType")
|
|
obj_type.text = str("object")
|
|
obj_array = ET.SubElement(obj_tag, "array")
|
|
obj_array.text = str(
|
|
"true" if name.endswith(".{i}.") else "false")
|
|
parameters = ET.SubElement(obj_tag, "parameters")
|
|
param_array[name.replace(".{i}", "").count('.') - root_dot_count] = parameters
|
|
DM_OBJ_COUNT += 1
|
|
else:
|
|
# Parameter
|
|
param_tag = ET.SubElement(
|
|
param_array[name.replace(".{i}", "").count('.') - root_dot_count], "parameter")
|
|
param_name = ET.SubElement(param_tag, "parameterName")
|
|
param_name.text = str(name[name.rindex('.')+1:])
|
|
param_type = ET.SubElement(param_tag, "parameterType")
|
|
param_type.text = str(ARRAY_TYPES.get(p_type, None))
|
|
DM_PARAM_COUNT += 1
|
|
|
|
xml_file = open(output_file, "w", encoding='utf-8')
|
|
xml_file.write(pretty_format(root))
|
|
xml_file.close()
|
|
|
|
def generate_xml(acs = 'default', dm_json_files=None, output_file="datamodel.xml"):
|
|
global DM_OBJ_COUNT
|
|
global DM_PARAM_COUNT
|
|
|
|
DM_OBJ_COUNT = 0
|
|
DM_PARAM_COUNT = 0
|
|
|
|
print(f'Generating BBF Data Models in xml format for {acs} acs...')
|
|
bbf.fill_list_supported_dm()
|
|
|
|
if acs == "HDM":
|
|
generate_hdm_xml_file(output_file)
|
|
else:
|
|
generate_bbf_xml_file(output_file, dm_json_files)
|
|
|
|
if os.path.isfile(output_file):
|
|
print(f' - XML file generated: {output_file}')
|
|
else:
|
|
print(' - Error in generating xml file')
|
|
|
|
print(f' - Number of BBF Data Models objects is {DM_OBJ_COUNT}')
|
|
print(f' - Number of BBF Data Models parameters is {DM_PARAM_COUNT}')
|
|
|
|
### main ###
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(
|
|
description='Script to generate list of supported and non-supported parameter in xml format',
|
|
epilog='Part of BBF-tools, refer Readme for more examples'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-r', '--remote-dm',
|
|
action='append',
|
|
metavar = 'git^https://dev.iopsys.eu/iopsys/stunc.git^devel',
|
|
help= 'Includes OBJ/PARAM defined under remote repositories defined as bbf plugin'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-v', '--vendor-list',
|
|
metavar='iopsys',
|
|
action = 'append',
|
|
help='Generate data model tree with vendor extension OBJ/PARAM.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-p', '--vendor-prefix',
|
|
default = 'X_IOPSYS_EU_',
|
|
metavar = 'X_IOPSYS_EU_',
|
|
help = 'Generate data model tree using provided vendor prefix for vendor defined objects.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-d', '--device-protocol',
|
|
default = 'DEVICE_PROTOCOL_DSLFTR069v1',
|
|
metavar = 'DEVICE_PROTOCOL_DSLFTR069v1',
|
|
help = 'Generate data model tree using this device protocol.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-m", "--manufacturer",
|
|
default = 'IOPSYS',
|
|
metavar = 'IOPSYS',
|
|
help = 'Generate data model tree using this manufacturer.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-u", "--manufacturer-oui",
|
|
default = '002207',
|
|
metavar = '002207',
|
|
help = 'Generate data model tree using this manufacturer oui.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-c", "--product-class",
|
|
default = 'DG400PRIME',
|
|
metavar = 'DG400PRIME',
|
|
help = 'Generate data model tree using this product class.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-n", "--model-name",
|
|
default = 'DG400PRIME-A',
|
|
metavar = 'DG400PRIME-A',
|
|
help = 'Generate data model tree using this model name.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-s", "--software-version",
|
|
default = '1.2.3.4',
|
|
metavar = '1.2.3.4',
|
|
help = 'Generate data model tree using this software version.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-f", "--format",
|
|
metavar = 'BBF',
|
|
default = 'BBF',
|
|
choices=['HDM', 'BBF', 'default'],
|
|
help = 'Generate data model tree with HDM format.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-o', '--output',
|
|
default = "datamodel.xml",
|
|
metavar = "datamodel.xml",
|
|
help = 'Generate the output file with given name'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
MANUFACTURER = args.manufacturer
|
|
DEVICE_PROTOCOL = args.device_protocol
|
|
MANUFACTURER_OUI = args.manufacturer_oui
|
|
PRODUCT_CLASS = args.product_class
|
|
MODEL_NAME = args.model_name
|
|
SOFTWARE_VERSION = args.software_version
|
|
|
|
plugins = []
|
|
|
|
if isinstance(args.remote_dm, list):
|
|
for f in args.remote_dm:
|
|
x = f.split('^')
|
|
r = {}
|
|
r["proto"] = x[0]
|
|
if len(x) > 1:
|
|
r["repo"] = x[1]
|
|
if len(x) == 3:
|
|
r["version"] = x[2]
|
|
|
|
plugins.append(r)
|
|
|
|
bbf.generate_supported_dm(args.vendor_prefix, args.vendor_list, plugins)
|
|
bbf.clean_supported_dm_list()
|
|
generate_xml(args.format, args.dm_json_files, args.output)
|
|
print(f'Datamodel generation completed, aritifacts available in {args.output}')
|
|
sys.exit(bbf.BBF_ERROR_CODE)
|