Added validate bbfdm.AddObj & bbfdm.DelObj events

This commit is contained in:
Amin Ben Romdhane 2024-02-16 16:23:16 +01:00
parent 89adb10a41
commit 0e5106a38b
3 changed files with 134 additions and 1 deletions

View file

@ -18,9 +18,12 @@ exec_cmd ubus wait_for bbfdm
# Test ubus bbfdm 'notify_event' method
./test/python-test-cases/python/validate_ubus_notify_event_method.py test/python-test-cases/json/ubus_notify_event_method.json
# Test ubus 'bbfdm.event' event
# Test 'bbfdm.event' event callback
./test/python-test-cases/python/validate_event_callback.py test/python-test-cases/json/event_callback.json
# Test 'bbfdm.AddObj' & 'bbfdm.DelObj' event
./test/python-test-cases/python/validate_add_del_event.py test/python-test-cases/json/add_del_event.json
supervisorctl stop all
supervisorctl status

View file

@ -0,0 +1,30 @@
{
"events": [
{
"description": "Validate <bbfdm.AddObj> & <bbfdm.DelObj> event using internal object defined in the core Data Model: Device.DeviceInfo.FirmwareImage.",
"config_path": "/tmp/fwbank.data",
"objects_to_look": [
"Device.DeviceInfo.FirmwareImage.1",
"Device.DeviceInfo.FirmwareImage.2"
]
},
{
"description": "Validate <bbfdm.AddObj> & <bbfdm.DelObj> event using external object JSON Plugin: Device.X_IOPSYS_EU_Dropbear.",
"config_path": "/etc/config/dropbear",
"objects_to_look": [
"Device.X_IOPSYS_EU_Dropbear.1",
"Device.SSH.Server.1"
]
},
{
"description": "Validate <bbfdm.AddObj> & <bbfdm.DelObj> event using external object JSON Plugin: Device.X_IOPSYS_EU_IGMP.",
"config_path": "/etc/config/mcast",
"objects_to_look": [
"Device.X_IOPSYS_EU_IGMP.Proxy.1",
"Device.X_IOPSYS_EU_IGMP.Proxy.1.Interface.1",
"Device.X_IOPSYS_EU_IGMP.Proxy.1.Interface.2",
"Device.X_IOPSYS_EU_IGMP.Proxy.1.Filter.1"
]
}
]
}

View file

@ -0,0 +1,100 @@
#!/usr/bin/python3
import os
import sys
import ubus
import json
# Global variable to track UBUS disconnection
UBUS_DISCONNECT = 0
class TestArguments:
"""
Class to hold test arguments.
"""
def __init__(self, description, config_path, objects_to_look):
self.description = description
self.config_path = config_path
self.objects_to_look = objects_to_look
def callback(event, data):
"""
Callback function to handle UBUS events.
"""
global UBUS_DISCONNECT
# Check if the objects to look for are in the instances list
if set(args.objects_to_look).issubset(set(data['instances'])):
# Mark UBUS as disconnected
UBUS_DISCONNECT = 1
# Disconnect from UBUS
ubus.disconnect()
def run_global_test(args, event_name, path_1, path_2):
"""
Run a global test.
"""
global UBUS_DISCONNECT
# Connect to UBUS
ubus.connect()
UBUS_DISCONNECT = 0
# Listen for UBUS events
ubus.listen((event_name, callback))
# Force change in schema by moving dependency file
os.rename(path_1, path_2)
# Run UBUS loop with a timeout
ubus.loop(timeout=35000)
# If UBUS is still connected, disconnect and mark test as failed
if UBUS_DISCONNECT == 0:
ubus.disconnect()
return -1
return 0
def run_test(args):
"""
Run a single test.
"""
print("Running: " + args.description)
# Test deletion of object
res = run_global_test(args, "bbfdm.DelObj", args.config_path, "/tmp/bbfdm_test_1")
if res != 0:
print("FAIL: " + args.description)
return
# Test addition of object
res = run_global_test(args, "bbfdm.AddObj", "/tmp/bbfdm_test_1", args.config_path)
if res != 0:
print("FAIL: " + args.description)
return
print("PASS: " + args.description)
if __name__ == "__main__":
# Check for correct command line arguments
if len(sys.argv) != 2:
print("Usage: {} <test_arguments_json>".format(sys.argv[0]))
sys.exit(1)
test_arguments_file = sys.argv[1]
try:
# Load test arguments from JSON file
with open(test_arguments_file, 'r') as f:
test_arguments_data = json.load(f)
args_list = [TestArguments(**event) for event in test_arguments_data["events"]]
except FileNotFoundError:
print("File not found:", test_arguments_file)
sys.exit(1)
except json.JSONDecodeError as e:
print("Error parsing JSON:", e)
sys.exit(1)
# Run tests for each set of arguments
for args in args_list:
run_test(args)