bbfdm/tools/generate_dm_xml.py
2025-05-08 17:34:23 +00:00

479 lines
17 KiB
Python
Executable file

#!/usr/bin/python3
# Copyright (C) 2024 iopsys Software Solutions AB
# Author: Amin Ben Romdhane <amin.benromdhane@iopsys.eu>
import os
import sys
import argparse
import xml.etree.ElementTree as ET
import xml.dom.minidom as MD
import bbf_common as bbf
import json
DM_OBJ_COUNT = 0
DM_PARAM_COUNT = 0
DEVICE_PROTOCOL = "DEVICE_PROTOCOL_DSLFTR069v1"
MANUFACTURER = "iopsys"
MANUFACTURER_OUI = "002207"
PRODUCT_CLASS = "DG400PRIME"
MODEL_NAME = "DG400PRIME-A"
SOFTWARE_VERSION = "1.2.3.4"
ARRAY_TYPES = [ "string",
"unsignedInt",
"unsignedLong",
"int",
"long",
"boolean",
"dateTime",
"hexBinary",
"base64"]
LIST_SUPPORTED_DM = []
def pretty_format(elem):
elem_string = ET.tostring(elem, 'UTF-8')
reparsed = MD.parseString(elem_string)
return reparsed.toprettyxml(indent=" ")
def organize_parent_child(dm_list):
organized_dm = []
for parent_item in dm_list:
parent_type = parent_item.get("type")
if parent_type != "object":
continue
parent_name = parent_item.get("param")
organized_dm.append(parent_item)
for child_item in dm_list:
child_type = child_item.get("type")
if child_type is None or child_type == "object":
continue
child_name = child_item.get("param")
if child_name.find(parent_name) != -1:
parent_dot_count = parent_name.count('.')
child_dot_count = child_name.count('.')
if parent_dot_count == child_dot_count:
organized_dm.append(child_item)
return organized_dm
def get_info_from_json(data, dm_json_files=None):
entry = {}
list_data = []
arr = data.split(".")
if len(arr) == 0:
return None
for i in range(0, len(arr)):
string = ""
if i == 0:
string=arr[i] + "."
elif i == (len(arr) - 1):
string=arr[i]
else:
for j in range(0, i + 1):
string=string + arr[j]
string=string + "."
if len(string) != 0:
list_data.append(string)
if len(list_data) == 0:
return entry
found = False
if dm_json_files is not None and isinstance(dm_json_files, list) and dm_json_files:
for fl in dm_json_files:
if os.path.exists(fl):
fo = open(fl, 'r', encoding='utf-8')
try:
ob = json.load(fo)
except json.decoder.JSONDecodeError:
continue
index = -1
for key in ob.keys():
if key in list_data:
index = list_data.index(key)
break
if index == -1:
continue
for i in range(index, len(list_data)):
if i != (len(list_data) - 1) and list_data[i + 1] == list_data[i] + "{i}.":
continue
try:
if str(list_data[i]).find("X_IOWRT_EU_") != -1 or str(list_data[i]).find("X_GENEXIS_EU_") != -1:
param = str(list_data[i]).replace("X_IOWRT_EU_", "{BBF_VENDOR_PREFIX}").replace("X_GENEXIS_EU_", "{BBF_VENDOR_PREFIX}")
else:
param = str(list_data[i])
ob = ob[param]
found = True
except KeyError:
found = False
break
if found is True:
entry["description"] = ob['description'] if "description" in ob else None
entry["enumerations"] = ob['enumerations'] if "enumerations" in ob else None
entry["range"] = ob['range'] if "range" in ob else None
entry["list"] = ob["list"] if "list" in ob else None
break
return entry
def generate_bbf_xml_file(output_file, dm_json_files=None):
global DM_OBJ_COUNT
global DM_PARAM_COUNT
bbf.remove_file(output_file)
root = ET.Element("dm:document")
root.set("xmlns:dm", "urn:broadband-forum-org:cwmp:datamodel-1-8")
root.set("xmlns:dmr", "urn:broadband-forum-org:cwmp:datamodel-report-0-1")
root.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
root.set("xsi:schemaLocation", "urn:broadband-forum-org:cwmp:datamodel-1-14 https://www.broadband-forum.org/cwmp/cwmp-datamodel-1-14.xsd urn:broadband-forum-org:cwmp:datamodel-report-1-0 https://www.broadband-forum.org/cwmp/cwmp-datamodel-report-1-0.xsd")
root.set("spec", "urn:broadband-forum-org:tr-181-2-18-0-cwmp")
root.set("file", "tr-181-2-18-0-cwmp.xml")
model = ET.SubElement(root, "model")
model.set("name", "Device:2.18")
for entry in LIST_SUPPORTED_DM:
name = entry.get("param")
p_type = entry.get("type")
access = entry.get("permission")
info = get_info_from_json(name, dm_json_files)
desc = info.get("description")
list_ob = info.get("list")
enum = info.get("enumerations")
rang = info.get("range")
if p_type == "object":
# Object
objec = ET.SubElement(model, "object")
objec.set("name", name)
objec.set("access", access)
objec.set("minEntries", "0")
objec.set("maxEntries", "20")
ob_description = ET.SubElement(objec, "description")
ob_description.text = desc.replace("<", "{").replace(">", "}") if desc is not None else ""
if desc is None:
print(f'#### Description should be added for {name} object ####')
DM_OBJ_COUNT += 1
else:
# Parameter
subtype = None
list_datatype = None
parameter = ET.SubElement(objec, "parameter")
parameter.set("name", name[name.rindex('.')+1:])
parameter.set("access", access)
p_description = ET.SubElement(parameter, "description")
p_description.text = desc.replace("<", "{").replace(">", "}") if desc is not None else ""
if desc is None:
print(f'#### Description should be added for {name} parameter ####')
syntax = ET.SubElement(parameter, "syntax")
if list_ob is not None and len(list_ob) != 0:
listtag = ET.SubElement(syntax, "list")
# Handle items in list
item_ob = list_ob["item"] if "item" in list_ob else None
if item_ob is not None:
minval = item_ob["min"] if "min" in item_ob else None
maxval = item_ob["max"]if "max" in item_ob else None
if minval is not None:
listtag.set("minItems", str(minval))
if maxval is not None:
listtag.set("maxItems", str(maxval))
# Handle maxsize in list
maxsize = list_ob["maxsize"] if "maxsize" in list_ob else None
if maxsize is not None:
sizetag = ET.SubElement(listtag, "size")
sizetag.set("maxLength", str(maxsize))
if enum is None or len(enum) == 0:
enum = list_ob["enumerations"] if "enumerations" in list_ob else None
list_datatype = list_ob["datatype"] if "datatype" in list_ob else None
if list_datatype is not None and list_datatype in ARRAY_TYPES:
subtype = ET.SubElement(syntax, list_datatype)
else:
subtype = ET.SubElement(syntax, p_type)
else:
subtype = ET.SubElement(syntax, p_type)
if enum is not None:
for val in enum:
enumeration = ET.SubElement(subtype, "enumeration")
enumeration.set("value", str(val))
# handle range
if rang is None and list_ob is not None:
rang = list_ob["range"] if "range" in list_ob else None
if rang is not None and len(rang) != 0:
for i in range(len(rang)):
range_min = rang[i]["min"] if "min" in rang[i] else None
range_max = rang[i]["max"] if "max" in rang[i] else None
val_type = list_datatype if list_datatype is not None else p_type
if val_type == "string" or val_type == "hexBinary" or val_type == "base64":
size_tag = ET.SubElement(subtype, "size")
if range_min is not None:
size_tag.set("minLength", str(range_min))
if range_max is not None:
size_tag.set("maxLength", str(range_max))
if val_type == "unsignedInt" or val_type == "unsignedLong" or val_type == "int" or val_type == "long":
range_tag = ET.SubElement(subtype, "range")
if range_min is not None:
range_tag.set("minInclusive", str(range_min))
if range_max is not None:
range_tag.set("maxInclusive", str(range_max))
DM_PARAM_COUNT += 1
xml_file = open(output_file, "w", encoding='utf-8')
xml_file.write(pretty_format(root))
xml_file.close()
def generate_hdm_xml_file(output_file):
global DM_OBJ_COUNT
global DM_PARAM_COUNT
bbf.remove_file(output_file)
root = ET.Element("deviceType")
root.set("xmlns", "urn:dslforum-org:hdm-0-0")
root.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
root.set("xsi:schemaLocation", "urn:dslforum-org:hdm-0-0 deviceType.xsd")
protocol = ET.SubElement(root, "protocol")
protocol.text = str(DEVICE_PROTOCOL)
manufacturer = ET.SubElement(root, "manufacturer")
manufacturer.text = str(MANUFACTURER)
manufacturerOUI = ET.SubElement(root, "manufacturerOUI")
manufacturerOUI.text = str(MANUFACTURER_OUI)
productClass = ET.SubElement(root, "productClass")
productClass.text = str(PRODUCT_CLASS)
modelName = ET.SubElement(root, "modelName")
modelName.text = str(MODEL_NAME)
softwareVersion = ET.SubElement(root, "softwareVersion")
softwareVersion.text = str(SOFTWARE_VERSION)
dm_type = ET.SubElement(root, "type")
dm_type.text = str("Device:2")
dataModel = ET.SubElement(root, "dataModel")
attributes = ET.SubElement(dataModel, "attributes")
parameters = ET.SubElement(dataModel, "parameters")
attribute_notification = ET.SubElement(attributes, "attribute")
attributeName = ET.SubElement(attribute_notification, "attributeName")
attributeName.text = str("notification")
attributeType = ET.SubElement(attribute_notification, "attributeType")
attributeType.text = str("int")
minValue = ET.SubElement(attribute_notification, "minValue")
minValue.text = str("0")
maxValue = ET.SubElement(attribute_notification, "maxValue")
maxValue.text = str("2")
attribute_access_list = ET.SubElement(attributes, "attribute")
attributeName = ET.SubElement(attribute_access_list, "attributeName")
attributeName.text = str("accessList")
attributeType = ET.SubElement(attribute_access_list, "attributeType")
attributeType.text = str("string")
array = ET.SubElement(attribute_access_list, "array")
array.text = str("true")
attributeLength = ET.SubElement(attribute_access_list, "attributeLength")
attributeLength.text = str("64")
attribute_visibility = ET.SubElement(attributes, "attribute")
attributeName = ET.SubElement(attribute_visibility, "attributeName")
attributeName.text = str("visibility")
attributeType = ET.SubElement(attribute_visibility, "attributeType")
attributeType.text = str("string")
array = ET.SubElement(attribute_visibility, "array")
array.text = str("true")
attributeLength = ET.SubElement(attribute_visibility, "attributeLength")
attributeLength.text = str("64")
param_array = [ET.Element] * 15
param_array[0] = parameters
root_dot_count = 0
for entry in LIST_SUPPORTED_DM:
name = entry.get("param")
p_type = entry.get("type")
if p_type == "object":
# Object
obj_tag = ET.SubElement(param_array[name.replace(".{i}", "").count('.') - root_dot_count -1], "parameter")
obj_name = ET.SubElement(obj_tag, "parameterName")
obj_name.text = str(name.replace(".{i}", "").split('.')[-2])
obj_type = ET.SubElement(obj_tag, "parameterType")
obj_type.text = str("object")
obj_array = ET.SubElement(obj_tag, "array")
obj_array.text = str("true" if name.endswith(".{i}.") else "false")
parameters = ET.SubElement(obj_tag, "parameters")
param_array[name.replace(".{i}", "").count('.') - root_dot_count] = parameters
DM_OBJ_COUNT += 1
else:
# Parameter
param_tag = ET.SubElement(param_array[name.replace(".{i}", "").count('.') - root_dot_count], "parameter")
param_name = ET.SubElement(param_tag, "parameterName")
param_name.text = str(name[name.rindex('.')+1:])
param_type = ET.SubElement(param_tag, "parameterType")
param_type.text = str(p_type)
DM_PARAM_COUNT += 1
xml_file = open(output_file, "w", encoding='utf-8')
xml_file.write(pretty_format(root))
xml_file.close()
def generate_xml(acs = 'default', dm_json_files=None, output_file="datamodel.xml"):
global LIST_SUPPORTED_DM
global DM_OBJ_COUNT
global DM_PARAM_COUNT
DM_OBJ_COUNT = 0
DM_PARAM_COUNT = 0
LIST_SUPPORTED_DM = organize_parent_child(bbf.LIST_SUPPORTED_CWMP_DM)
print(f'Generating BBF Data Models in xml format for {acs} acs...')
if acs == "HDM":
generate_hdm_xml_file(output_file)
else:
generate_bbf_xml_file(output_file, dm_json_files)
if os.path.isfile(output_file):
print(f' - XML file generated: {output_file}')
else:
print(' - Error in generating xml file')
print(f' - Number of BBF Data Models objects is {DM_OBJ_COUNT}')
print(f' - Number of BBF Data Models parameters is {DM_PARAM_COUNT}')
### main ###
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Script to generate list of supported and non-supported parameter in xml format',
epilog='Part of BBF-tools, refer Readme for more examples'
)
parser.add_argument(
'-r', '--remote-dm',
action='append',
metavar = 'git^https://dev.iopsys.eu/bbf/stunc.git^devel',
help= 'Includes OBJ/PARAM defined under remote repositories defined as bbf plugin'
)
parser.add_argument(
'-d', '--device-protocol',
default = 'DEVICE_PROTOCOL_DSLFTR069v1',
metavar = 'DEVICE_PROTOCOL_DSLFTR069v1',
help = 'Generate data model tree using this device protocol.'
)
parser.add_argument(
"-m", "--manufacturer",
default = 'IOPSYS',
metavar = 'IOPSYS',
help = 'Generate data model tree using this manufacturer.'
)
parser.add_argument(
"-u", "--manufacturer-oui",
default = '002207',
metavar = '002207',
help = 'Generate data model tree using this manufacturer oui.'
)
parser.add_argument(
"-c", "--product-class",
default = 'DG400PRIME',
metavar = 'DG400PRIME',
help = 'Generate data model tree using this product class.'
)
parser.add_argument(
"-n", "--model-name",
default = 'DG400PRIME-A',
metavar = 'DG400PRIME-A',
help = 'Generate data model tree using this model name.'
)
parser.add_argument(
"-s", "--software-version",
default = '1.2.3.4',
metavar = '1.2.3.4',
help = 'Generate data model tree using this software version.'
)
parser.add_argument(
"-f", "--format",
metavar = 'BBF',
default = 'BBF',
choices=['HDM', 'BBF', 'default'],
help = 'Generate data model tree with HDM format.'
)
parser.add_argument(
'-o', '--output',
default = "datamodel.xml",
metavar = "datamodel.xml",
help = 'Generate the output file with given name'
)
args = parser.parse_args()
MANUFACTURER = args.manufacturer
DEVICE_PROTOCOL = args.device_protocol
MANUFACTURER_OUI = args.manufacturer_oui
PRODUCT_CLASS = args.product_class
MODEL_NAME = args.model_name
SOFTWARE_VERSION = args.software_version
plugins = []
if isinstance(args.remote_dm, list):
for f in args.remote_dm:
x = f.split('^')
r = {}
r["proto"] = x[0]
if len(x) > 1:
r["repo"] = x[1]
if len(x) == 3:
r["version"] = x[2]
plugins.append(r)
bbf.generate_supported_dm(plugins)
generate_xml(args.format, args.dm_json_files, args.output)
print(f'Datamodel generation completed, aritifacts available in {args.output}')
sys.exit(bbf.BBF_ERROR_CODE)