#!/usr/bin/python3 # Copyright (C) 2021 iopsys Software Solutions AB # Author: Amin Ben Ramdhane import os import subprocess import shutil import json from collections import OrderedDict CURRENT_PATH = os.getcwd() ROOT = None BBF_ERROR_CODE = 0 BBF_TR181_ROOT_FILE = "device.c" BBF_TR104_ROOT_FILE = "servicesvoiceservice.c" BBF_VENDOR_ROOT_FILE = "vendor.c" BBF_VENDOR_PREFIX = "X_IOPSYS_EU_" BBF_DMTREE_PATH = CURRENT_PATH + "/../dmtree" BBF_DMTREE_PATH_TR181 = BBF_DMTREE_PATH + "/tr181" BBF_DMTREE_PATH_TR104 = BBF_DMTREE_PATH + "/tr104" BBF_DMTREE_PATH_TR143 = BBF_DMTREE_PATH + "/tr143" BBF_DMTREE_PATH_TR181_JSON = BBF_DMTREE_PATH + "/json/tr181.json" BBF_DMTREE_PATH_TR104_JSON = BBF_DMTREE_PATH + "/json/tr104.json" DATA_MODEL_FILE = ".data_model.txt" ARRAY_JSON_FILES = {"tr181": BBF_DMTREE_PATH_TR181_JSON, "tr104": BBF_DMTREE_PATH_TR104_JSON} LIST_DM_DIR = [BBF_DMTREE_PATH_TR181, BBF_DMTREE_PATH_TR104, BBF_DMTREE_PATH_TR143] LIST_IGNORED_LINE = ['/*', '//', '#'] LIST_OBJ = [] LIST_PARAM = [] LIST_SUPPORTED_DM = [] Array_Types = {"string": "DMT_STRING", "unsignedInt": "DMT_UNINT", "unsignedLong": "DMT_UNLONG", "int": "DMT_INT", "long": "DMT_LONG", "boolean": "DMT_BOOL", "dateTime": "DMT_TIME", "hexBinary": "DMT_HEXBIN", "base64": "DMT_BASE64", "command": "DMT_COMMAND", "event": "DMT_EVENT"} def get_root_node(): return ROOT def set_root_node(rootdm = "Device."): global ROOT ROOT = rootdm def rename_file(old_file_name, new_file_name): try: os.rename(old_file_name, new_file_name) except OSError: pass def remove_file(file_name): try: os.remove(file_name) except OSError: pass def create_folder(folder_name): try: os.makedirs(folder_name, exist_ok = True) except OSError: pass # rmtree exception handler def rmtree_handler(_func, path, _exc_info): print(f'Failed to remove {path}') def remove_folder(folder_name): if os.path.isdir(folder_name): shutil.rmtree(folder_name, onerror = rmtree_handler) def cd_dir(path): try: os.chdir(path) except OSError: pass def obj_has_child(value): if isinstance(value, dict): for _obj, val in value.items(): if isinstance(val, dict): for obj1, val1 in val.items(): if obj1 == "type" and val1 == "object": return 1 return 0 def obj_has_param(value): if isinstance(value, dict): for _obj, val in value.items(): if isinstance(val, dict): for obj1, val1 in val.items(): if obj1 == "type" and val1 != "object": return 1 return 0 def get_option_value(value, option, default = None): if isinstance(value, dict): for obj, val in value.items(): if obj == option: return val return default def get_param_type(value): paramtype = get_option_value(value, "type") return Array_Types.get(paramtype, None) def clean_supported_dm_list(): LIST_SUPPORTED_DM.clear() def fill_list_supported_dm(): fp = open(DATA_MODEL_FILE, 'r', encoding='utf-8') Lines = fp.readlines() for line in Lines: LIST_SUPPORTED_DM.append(line) def fill_data_model_file(): fp = open(DATA_MODEL_FILE, 'a', encoding='utf-8') for value in LIST_SUPPORTED_DM: if (ROOT): if (value.startswith(ROOT)): print(f'{value}', file=fp) else: print(f'{value}', file=fp) fp.close() def generate_datamodel_tree(filename): if filename.endswith('.c') is False: return obj_found = 0 param_found = 0 obj_found_in_list = 0 table_name = "" parent_obj = "" fp = open(filename, 'r', encoding='utf-8') for line in fp: line = line.lstrip() if line.startswith(tuple(LIST_IGNORED_LINE)) is True: continue if "DMOBJ" in line: table_name = line[:line.index('[]')].rstrip( '\n').replace("DMOBJ ", "") obj_found = 1 continue if "DMLEAF" in line: table_name = line[:line.index('[]')].rstrip( '\n').replace("DMLEAF ", "") param_found = 1 continue if obj_found == 0 and param_found == 0: continue if "{0}" in line.replace(" ", ""): obj_found = 0 param_found = 0 obj_found_in_list = 0 table_name = "" parent_obj = "" continue # Object Table if obj_found == 1: if obj_found_in_list == 0: obj_list = LIST_OBJ for value in obj_list: val = value.split(":") if val[1] == table_name: parent_obj = val[0] obj_found_in_list = 1 LIST_OBJ.remove(value) obj = line.rstrip('\n').split(", ") obj_name = parent_obj + obj[0].replace("{", "").replace("\"", "").replace( "BBF_VENDOR_PREFIX", BBF_VENDOR_PREFIX).replace(" ", "") obj_permission = obj[1].replace("&", "").replace(" ", "") obj_mulinst = obj[5].replace("&", "").replace(" ", "") if obj_mulinst == "NULL": full_obj_name = obj_name + "." else: full_obj_name = obj_name + ".{i}." LIST_SUPPORTED_DM.append( full_obj_name + "," + obj_permission + ",DMT_OBJ") if obj[8] != "NULL": LIST_OBJ.append(full_obj_name + ":" + obj[8]) if obj[9] != "NULL": LIST_PARAM.append(full_obj_name + ":" + obj[9]) # Parameter Table if param_found == 1: if obj_found_in_list == 0: param_list = LIST_PARAM for value in param_list: val = value.split(":") if val[1] == table_name: parent_obj = val[0] obj_found_in_list = 1 LIST_PARAM.remove(value) param = line.rstrip('\n').split(",") param_name = parent_obj + param[0].replace("{", "").replace( "\"", "").replace("BBF_VENDOR_PREFIX", BBF_VENDOR_PREFIX).replace(" ", "") param_permission = param[1].replace("&", "").replace(" ", "") param_type = param[2].replace(" ", "") LIST_SUPPORTED_DM.append( param_name + "," + param_permission + "," + param_type) fp.close() def generate_dynamic_datamodel_tree(filename): if filename.endswith('.c') is False: return obj_found = 0 fp = open(filename, 'r', encoding='utf-8') for line in fp: line = line.lstrip() if line.startswith(tuple(LIST_IGNORED_LINE)) is True: continue if "DM_MAP_OBJ" in line: obj_found = 1 continue if obj_found == 0: continue if "{0}" in line.replace(" ", ""): obj_found = 0 continue # Object Table if obj_found == 1: obj = line.rstrip('\n').split(", ") obj_name = obj[0][1:].replace("\"", "").replace(" ", "").replace("BBF_VENDOR_PREFIX", BBF_VENDOR_PREFIX) if obj[1] != "NULL": LIST_OBJ.append(obj_name + ":" + obj[1]) if obj[2] != "NULL": LIST_PARAM.append(obj_name + ":" + obj[2].replace("},", "").replace(" ", "")) fp.close() def parse_dynamic_json_datamodel_tree(obj, value): obj_permission = "DMWRITE" if get_option_value( value, "array") is True else "DMREAD" LIST_SUPPORTED_DM.append(obj + "," + obj_permission + ",DMT_OBJ") hasobj = obj_has_child(value) hasparam = obj_has_param(value) if hasparam and isinstance(value, dict): for k, v in value.items(): if k != "mapping" and isinstance(v, dict): for k1, v1 in v.items(): if k1 == "type" and v1 != "object": param_name = obj + k param_type = get_param_type(v) param_permission = "DMWRITE" if get_option_value( v, "write") is True else "DMREAD" LIST_SUPPORTED_DM.append( param_name + "," + param_permission + "," + param_type) break if hasobj and isinstance(value, dict): for k, v in value.items(): if isinstance(v, dict): for k1, v1 in v.items(): if k1 == "type" and v1 == "object": parse_dynamic_json_datamodel_tree(k, v) def generate_dynamic_json_datamodel_tree(filename): if filename.endswith('.json') is False: return json_file = open(filename, "r", encoding='utf-8') data = json.loads(json_file.read(), object_pairs_hook=OrderedDict) for obj, value in data.items(): if obj is None or obj.startswith('Device.') is False: continue parse_dynamic_json_datamodel_tree(obj, value) def generate_supported_dm(vendor_prefix=None, vendor_list=None, plugins=None): ''' 1/ Download Remote Data Model if needed 2/ Parse all Standard Data Model 3/ Parse all Vendor Data Model if needed 4/ Generate the list of Supported Data Model 'LIST_SUPPORTED_DM' 5/ Copy the supported data model in file 'DATA_MODEL_FILE' ''' ############## SET BBF VENDOR PREFIX ############## if vendor_prefix is not None: global BBF_VENDOR_PREFIX BBF_VENDOR_PREFIX = vendor_prefix ############## GEN Local BBF Data Models TREE ############## print("Generating the local data models...") cd_dir(BBF_DMTREE_PATH_TR181) generate_datamodel_tree(BBF_TR181_ROOT_FILE) cd_dir(BBF_DMTREE_PATH_TR104) generate_datamodel_tree(BBF_TR104_ROOT_FILE) for DIR in LIST_DM_DIR: cd_dir(DIR) for _root, _dirs, files in os.walk("."): for filename in files: if filename.endswith('.c') is False or filename == BBF_TR181_ROOT_FILE or filename == BBF_TR104_ROOT_FILE: continue generate_datamodel_tree(filename) ############## GEN Vendors BBF Data Models TREE ############## if vendor_list is not None and isinstance(vendor_list, list) and vendor_list: cd_dir(BBF_DMTREE_PATH) for vendor in vendor_list: vendor_dir = f'vendor/{vendor}/tr181' if os.path.isdir(vendor_dir): cd_dir(vendor_dir) generate_dynamic_datamodel_tree(BBF_VENDOR_ROOT_FILE) if os.path.isfile(BBF_TR181_ROOT_FILE): generate_datamodel_tree(BBF_TR181_ROOT_FILE) for _root, _dirs, files in os.walk("."): for filename in files: if filename.endswith('.c') is False or filename == BBF_VENDOR_ROOT_FILE or filename == BBF_TR181_ROOT_FILE: continue generate_datamodel_tree(filename) cd_dir(BBF_DMTREE_PATH) vendor_dir = f'vendor/{vendor}/tr104' if os.path.isdir(vendor_dir): cd_dir(vendor_dir) for _root, _dirs, files in os.walk("."): for filename in files: if filename.endswith('.c') is False: continue generate_datamodel_tree(filename) cd_dir(BBF_DMTREE_PATH) ############## Download && Generate Plugins Data Models ############## global BBF_ERROR_CODE if plugins is not None and isinstance(plugins, list) and plugins: print("Generating datamodels from defined plugins...") cd_dir(CURRENT_PATH) if isinstance(plugins, list): for plugin in plugins: proto = get_option_value(plugin, "proto") repo = get_option_value(plugin, "repo") if repo is None: BBF_ERROR_CODE += 1 continue if proto is not None and proto == "local": print(f' - Processing plugin: {plugin} at {repo}') if os.path.isdir(f"{repo}"): print(f' Processing {repo}') dm_files = get_option_value(plugin, "dm_files") if dm_files is not None and isinstance(dm_files, list): for dm_file in dm_files: if os.path.isfile(f"{repo}/{dm_file}"): generate_dynamic_datamodel_tree(f"{repo}/{dm_file}") generate_datamodel_tree(f"{repo}/{dm_file}") generate_dynamic_json_datamodel_tree(f"{repo}/{dm_file}") else: BBF_ERROR_CODE += 1 else: files = os.popen(f'find {repo}/ -name datamodel.c').read() for file in files.split('\n'): if os.path.isfile(file): generate_dynamic_datamodel_tree(file) generate_datamodel_tree(file) files = os.popen(f'find {repo}/ -name "*.json"').read() for file in files.split('\n'): if os.path.isfile(file): generate_dynamic_json_datamodel_tree(file) else: print(f' {repo} is not a directory !!!!!') BBF_ERROR_CODE += 1 else: print(f' - Processing plugin: {plugin}') version = get_option_value(plugin, "version") remove_folder(".repo") try: subprocess.run(["git", "clone", repo, ".repo"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check = True) except (OSError, subprocess.SubprocessError) as _e: print(f' Failed to clone {repo} !!!!!') BBF_ERROR_CODE += 1 if version is not None: try: subprocess.run(["git", "-C", ".repo", "checkout", version], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except (OSError, subprocess.SubprocessError) as _e: print(f' Failed to checkout git version {version} !!!!!') BBF_ERROR_CODE += 1 if os.path.isdir(".repo"): if version is None: print(f' Processing {repo}') else: print(f' Processing {repo}^{version}') dm_files = get_option_value(plugin, "dm_files") if dm_files is not None and isinstance(dm_files, list): for dm_file in dm_files: if os.path.isfile(".repo/" + dm_file): generate_dynamic_datamodel_tree(".repo/" + dm_file) generate_datamodel_tree(".repo/" + dm_file) generate_dynamic_json_datamodel_tree(".repo/" + dm_file) else: BBF_ERROR_CODE += 1 else: files = os.popen('find .repo/ -name datamodel.c').read() for file in files.split('\n'): if os.path.isfile(file): generate_dynamic_datamodel_tree(file) generate_datamodel_tree(file) files = os.popen('find .repo/ -name "*.json"').read() for file in files.split('\n'): if os.path.isfile(file): generate_dynamic_json_datamodel_tree(file) remove_folder(".repo") else: BBF_ERROR_CODE += 1 print('Generating of plugins done') ############## Remove Duplicated Element from List ############## global LIST_SUPPORTED_DM LIST_SUPPORTED_DM = list(set(LIST_SUPPORTED_DM)) ############## Sort all elements in List ############## LIST_SUPPORTED_DM.sort(reverse=False) ############## Back to the current directory ############## cd_dir(CURRENT_PATH) ############### COPY SUPPORTED DATA MODEL TO FILE ############### remove_file(DATA_MODEL_FILE) fill_data_model_file()