Index: obuspa-10.0.4.0/src/core/bdc_exec.c =================================================================== --- obuspa-10.0.4.0.orig/src/core/bdc_exec.c +++ obuspa-10.0.4.0/src/core/bdc_exec.c @@ -549,10 +549,25 @@ int StartSendingReport(bdc_connection_t // Set the list of headers bc->headers = NULL; - bc->headers = curl_slist_append(bc->headers, "Content-Type: application/json; charset=UTF-8"); - bc->headers = curl_slist_append(bc->headers, "BBF-Report-Format: NameValuePair"); + + + if (bc->flags & BDC_FLAG_HEADER_OBJ_HIER) { + bc->headers = curl_slist_append(bc->headers, "Content-Type: application/json; charset=UTF-8"); + bc->headers = curl_slist_append(bc->headers, "BBF-Report-Format: ObjectHierarchy"); + } else if (bc->flags & BDC_FLAG_HEADER_NAME_VAL) { + bc->headers = curl_slist_append(bc->headers, "Content-Type: application/json; charset=UTF-8"); + bc->headers = curl_slist_append(bc->headers, "BBF-Report-Format: NameValuePair"); + } else if (bc->flags & BDC_FLAG_HEADER_PER_COL) { + bc->headers = curl_slist_append(bc->headers, "Content-Type: text/csv; charset=UTF-8"); + bc->headers = curl_slist_append(bc->headers, "BBF-Report-Format: ParameterPerColumn"); + } else { + bc->headers = curl_slist_append(bc->headers, "Content-Type: text/csv; charset=UTF-8"); + bc->headers = curl_slist_append(bc->headers, "BBF-Report-Format: ParameterPerRow"); + } + if (bc->flags & BDC_FLAG_GZIP) { + curl_easy_setopt(curl_ctx, CURLOPT_ACCEPT_ENCODING, "gzip"); bc->headers = curl_slist_append(bc->headers, "Content-Encoding: gzip"); } Index: obuspa-10.0.4.0/src/core/bdc_exec.h =================================================================== --- obuspa-10.0.4.0.orig/src/core/bdc_exec.h +++ obuspa-10.0.4.0/src/core/bdc_exec.h @@ -54,6 +54,8 @@ void BDC_EXEC_ScheduleExit(void); #define BDC_FLAG_PUT 0x00000001 // If set, HTTP PUT should be used instead of HTTP POST when sending the report to the BDC server #define BDC_FLAG_GZIP 0x00000002 // If set, the reports contants are Gzipped #define BDC_FLAG_DATE_HEADER 0x00000004 // If set, the date header should be included in the HTTP post. - - +#define BDC_FLAG_HEADER_OBJ_HIER 0x00000008 // If set, report format in header would be json ObjectHierarchy +#define BDC_FLAG_HEADER_NAME_VAL 0x00000010 // If set, report format in header would be json NameValuePair +#define BDC_FLAG_HEADER_PER_ROW 0x00000020 // If set, report format in header would be csv ParameterPerRow +#define BDC_FLAG_HEADER_PER_COL 0x00000040 // If set, report format in header would be csv ParameterPerColumn #endif Index: obuspa-10.0.4.0/src/core/device_bulkdata.c =================================================================== --- obuspa-10.0.4.0.orig/src/core/device_bulkdata.c +++ obuspa-10.0.4.0/src/core/device_bulkdata.c @@ -71,8 +71,12 @@ //------------------------------------------------------------------------------ // Definitions for formats that we support -#define BULKDATA_ENCODING_TYPE "JSON" -#define BULKDATA_JSON_REPORT_FORMAT "NameValuePair" +#define BULKDATA_ENCODING_TYPE_JSON "JSON" +#define BULKDATA_ENCODING_TYPE_CSV "CSV" +#define BULKDATA_JSON_REPORT_FORMAT_NAME_VALUE "NameValuePair" +#define BULKDATA_JSON_REPORT_FORMAT_OBJ_HIER "ObjectHierarchy" +#define BULKDATA_CSV_REPORT_FORMAT_PER_COLUMN "ParameterPerColumn" +#define BULKDATA_CSV_REPORT_FORMAT_PER_ROW "ParameterPerRow" // Definitions for Device.BulkData.Profile.{i}.JSONEncoding.ReportTimestamp @@ -162,6 +166,7 @@ static char *profile_push_event_args[] = typedef struct { int num_retained_failed_reports; + char encoding_type[10]; #ifdef ENABLE_MQTT char mqtt_reference[254]; // relates to Device.BulkData.Profile.{i}.MQTT.Reference char mqtt_publish_topic[254]; // relates to Device.BulkData.Profile.{i}.MQTT.PublishTopic @@ -172,6 +177,12 @@ typedef struct char password[257]; char compression[9]; char method[9]; + char report_format[20]; + char field_separator[10]; + char row_separator[10]; + char escape_char[10]; + char csv_format[20]; + char row_timestamp[33]; bool use_date_header; } profile_ctrl_params_t; @@ -220,6 +231,7 @@ int Validate_BulkDataEncodingType(dm_req int Validate_BulkDataReportingInterval(dm_req_t *req, char *value); int Validate_BulkDataReference(dm_req_t *req, char *value); int Validate_BulkDataReportFormat(dm_req_t *req, char *value); +int Validate_BulkDataCSVReportFormat(dm_req_t *req, char *value); int Validate_BulkDataReportTimestamp(dm_req_t *req, char *value); int Validate_BulkDataCompression(dm_req_t *req, char *value); int Validate_BulkDataHTTPMethod(dm_req_t *req, char *value); @@ -247,7 +259,8 @@ bulkdata_profile_t *bulkdata_find_free_p bulkdata_profile_t *bulkdata_find_profile(int profile_id); int bulkdata_calc_report_map(bulkdata_profile_t *bp, kv_vector_t *report_map, combined_role_t *combined_role); int bulkdata_reduce_to_alt_name(char *spec, char *path, char *alt_name, char *out_buf, int buf_len); -char *bulkdata_generate_json_report(bulkdata_profile_t *bp, char *report_timestamp); +char *bulkdata_generate_json_report(bulkdata_profile_t *bp, char *report_timestamp, char *report_format); +char *bulkdata_generate_csv_report(bulkdata_profile_t *bp, char *field_separator, char *row_separator, char *escape_char, char *csv_format, char *row_timestamp); unsigned char *bulkdata_compress_report(profile_ctrl_params_t *ctrl, char *input_buf, int input_len, int *p_output_len); int bulkdata_schedule_sending_http_report(profile_ctrl_params_t *ctrl, bulkdata_profile_t *bp, unsigned char *json_report, int report_len); int bulkdata_start_profile(bulkdata_profile_t *bp); @@ -263,6 +276,8 @@ int bulkdata_platform_get_param_refs(int int bulkdata_platform_calc_combined_role(int instance, combined_role_t **bulkdata_role, combined_role_t *combined_role, int *cont_instance); void bulkdata_expand_param_ref(param_ref_entry_t *pr, group_get_vector_t *ggv, combined_role_t *combined_role); void bulkdata_append_to_result_map(param_ref_entry_t *pr, group_get_vector_t *ggv, kv_vector_t *report_map); +void append_string_to_target(char *str, char **output); +char *csv_encode(const char *str); int GetAuto_BulkDataController(dm_req_t *req, char *buf, int len); #ifdef ENABLE_MQTT int Validate_BulkDataMqttReference(dm_req_t *req, char *value); @@ -301,7 +316,7 @@ int DEVICE_BULKDATA_Init(void) err |= USP_REGISTER_VendorParam_ReadOnly("Device.BulkData.Status", Get_BulkDataGlobalStatus, DM_STRING); err |= USP_REGISTER_Param_Constant("Device.BulkData.MinReportingInterval", BULKDATA_MINIMUM_REPORTING_INTERVAL_STR, DM_UINT); err |= USP_REGISTER_Param_SupportedList("Device.BulkData.Protocols", bdc_protocols, NUM_ELEM(bdc_protocols)); - err |= USP_REGISTER_Param_Constant("Device.BulkData.EncodingTypes", BULKDATA_ENCODING_TYPE, DM_STRING); + err |= USP_REGISTER_Param_Constant("Device.BulkData.EncodingTypes", "CSV,JSON", DM_STRING); err |= USP_REGISTER_Param_Constant("Device.BulkData.ParameterWildCardSupported", "true", DM_BOOL); err |= USP_REGISTER_Param_Constant("Device.BulkData.MaxNumberOfProfiles", BULKDATA_MAX_PROFILES_STR, DM_INT); err |= USP_REGISTER_Param_Constant("Device.BulkData.MaxNumberOfParameterReferences", "-1", DM_INT); @@ -316,7 +331,7 @@ int DEVICE_BULKDATA_Init(void) err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.Name", "", NULL, NULL, DM_STRING); err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.NumberOfRetainedFailedReports", "0", Validate_NumberOfRetainedFailedReports, NULL, DM_INT); err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.Protocol", BULKDATA_PROTOCOL_HTTP, Validate_BulkDataProtocol, NULL, DM_STRING); - err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.EncodingType", BULKDATA_ENCODING_TYPE, Validate_BulkDataEncodingType, NULL, DM_STRING); + err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.EncodingType", BULKDATA_ENCODING_TYPE_JSON, Validate_BulkDataEncodingType, NULL, DM_STRING); err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.ReportingInterval", "86400", Validate_BulkDataReportingInterval, NotifyChange_BulkDataReportingInterval, DM_UINT); err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.TimeReference", UNKNOWN_TIME_STR, NULL, NotifyChange_BulkDataTimeReference, DM_DATETIME); err |= USP_REGISTER_DBParam_ReadOnlyAuto("Device.BulkData.Profile.{i}.Controller", GetAuto_BulkDataController, DM_STRING); @@ -329,9 +344,16 @@ int DEVICE_BULKDATA_Init(void) err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.Parameter.{i}.Reference", "", Validate_BulkDataReference, NULL, DM_STRING); // Device.BulkData.Profile.{i}.JSONEncoding - err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.JSONEncoding.ReportFormat", BULKDATA_JSON_REPORT_FORMAT, Validate_BulkDataReportFormat, NULL, DM_STRING); + err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.JSONEncoding.ReportFormat", BULKDATA_JSON_REPORT_FORMAT_NAME_VALUE, Validate_BulkDataReportFormat, NULL, DM_STRING); err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.JSONEncoding.ReportTimestamp", BULKDATA_JSON_TIMESTAMP_FORMAT_EPOCH, Validate_BulkDataReportTimestamp, NULL, DM_STRING); + // Device.BulkData.Profile.{i}.CSVEncoding + err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.CSVEncoding.FieldSeparator", ",", NULL, NULL, DM_STRING); + err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.CSVEncoding.RowSeparator", " ", NULL, NULL, DM_STRING); + err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.CSVEncoding.EscapeCharacter", """, NULL, NULL, DM_STRING); + err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.CSVEncoding.ReportFormat", BULKDATA_CSV_REPORT_FORMAT_PER_COLUMN, Validate_BulkDataCSVReportFormat, NULL, DM_STRING); + err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.CSVEncoding.RowTimestamp", BULKDATA_JSON_TIMESTAMP_FORMAT_EPOCH, Validate_BulkDataReportTimestamp, NULL, DM_STRING); + // Device.BulkData.Profile.{i}.HTTP err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.HTTP.URL", "", NULL, NotifyChange_BulkDataURL, DM_STRING); err |= USP_REGISTER_DBParam_ReadWrite("Device.BulkData.Profile.{i}.HTTP.Username", "", NULL, NULL, DM_STRING); @@ -687,9 +709,10 @@ int Validate_BulkDataProtocol(dm_req_t * int Validate_BulkDataEncodingType(dm_req_t *req, char *value) { // Exit if trying to set a value outside of the range we accept - if (strcmp(value, BULKDATA_ENCODING_TYPE) != 0) - { - USP_ERR_SetMessage("%s: Only EncodingType supported is '%s'", __FUNCTION__, BULKDATA_ENCODING_TYPE); + if (strcmp(value, BULKDATA_ENCODING_TYPE_JSON) != 0 && strcmp(value, BULKDATA_ENCODING_TYPE_CSV) != 0) + { + USP_ERR_SetMessage("%s: Only EncodingType supported are '%s,%s'", __FUNCTION__, + BULKDATA_ENCODING_TYPE_JSON, BULKDATA_ENCODING_TYPE_CSV); return USP_ERR_INVALID_VALUE; } @@ -793,9 +816,36 @@ int Validate_BulkDataReference(dm_req_t int Validate_BulkDataReportFormat(dm_req_t *req, char *value) { // Exit if trying to set a value outside of the range we accept - if (strcmp(value, BULKDATA_JSON_REPORT_FORMAT) != 0) + if (strcmp(value, BULKDATA_JSON_REPORT_FORMAT_NAME_VALUE) != 0 && + strcmp(value, BULKDATA_JSON_REPORT_FORMAT_OBJ_HIER) != 0) + { + USP_ERR_SetMessage("%s: Supported JSON Report Formats are '%s', '%s'", __FUNCTION__, BULKDATA_JSON_REPORT_FORMAT_NAME_VALUE, BULKDATA_JSON_REPORT_FORMAT_OBJ_HIER); + return USP_ERR_INVALID_VALUE; + } + + return USP_ERR_OK; +} + +/*********************************************************************//** +** +** Validate_BulkDataCSVReportFormat +** +** Validates Device.BulkData.Profile.{i}.CSVEncoding.ReportFormat +** +** \param req - pointer to structure identifying the parameter +** \param value - value that the controller would like to set the parameter to +** +** \return USP_ERR_OK if successful +** +**************************************************************************/ +int Validate_BulkDataCSVReportFormat(dm_req_t *req, char *value) +{ + // Exit if trying to set a value outside of the range we accept + if (strcmp(value, BULKDATA_CSV_REPORT_FORMAT_PER_COLUMN) != 0 && + strcmp(value, BULKDATA_CSV_REPORT_FORMAT_PER_ROW) != 0) { - USP_ERR_SetMessage("%s: Only JSON Report Format supported is '%s'", __FUNCTION__, BULKDATA_JSON_REPORT_FORMAT); + USP_ERR_SetMessage("%s: Only JSON Report Format supported are '%s', '%s'", __FUNCTION__, + BULKDATA_CSV_REPORT_FORMAT_PER_COLUMN, BULKDATA_CSV_REPORT_FORMAT_PER_ROW); return USP_ERR_INVALID_VALUE; } @@ -2151,6 +2201,14 @@ int bulkdata_platform_get_profile_contro return err; } + // Exit if unable to get EncodingType + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.EncodingType", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, ctrl_params->encoding_type, sizeof(ctrl_params->encoding_type), 0); + if (err != USP_ERR_OK) + { + return err; + } + // Exit if unable to get ReportTimestamp USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.JSONEncoding.ReportTimestamp", bp->profile_id); err = DATA_MODEL_GetParameterValue(path, ctrl_params->report_timestamp, sizeof(ctrl_params->report_timestamp), 0); @@ -2159,6 +2217,54 @@ int bulkdata_platform_get_profile_contro return err; } + // Exit if unable to get FieldSeparator + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.FieldSeparator", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, ctrl_params->field_separator, sizeof(ctrl_params->field_separator), 0); + if (err != USP_ERR_OK) + { + return err; + } + + // Exit if unable to get RowSeparator + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.RowSeparator", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, ctrl_params->row_separator, sizeof(ctrl_params->row_separator), 0); + if (err != USP_ERR_OK) + { + return err; + } + + // Exit if unable to get EscapeCharacter + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.EscapeCharacter", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, ctrl_params->escape_char, sizeof(ctrl_params->escape_char), 0); + if (err != USP_ERR_OK) + { + return err; + } + + // Exit if unable to get ReportFormat + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.ReportFormat", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, ctrl_params->csv_format, sizeof(ctrl_params->csv_format), 0); + if (err != USP_ERR_OK) + { + return err; + } + + // Exit if unable to get RowTimestamp + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.RowTimestamp", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, ctrl_params->row_timestamp, sizeof(ctrl_params->row_timestamp), 0); + if (err != USP_ERR_OK) + { + return err; + } + + // Exit if unable to get ReportFormat + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.JSONEncoding.ReportFormat", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, ctrl_params->report_format, sizeof(ctrl_params->report_format), 0); + if (err != USP_ERR_OK) + { + return err; + } + #ifdef ENABLE_MQTT { char protocol[32]; @@ -2492,7 +2598,7 @@ void bulkdata_process_profile_http(bulkd { int err; report_t *cur_report; - char *json_report; + char *report; profile_ctrl_params_t ctrl; unsigned char *compressed_report; int compressed_len; @@ -2541,26 +2647,39 @@ void bulkdata_process_profile_http(bulkd } // Exit if unable to generate the report - json_report = bulkdata_generate_json_report(bp, ctrl.report_timestamp); - if (json_report == NULL) - { - USP_ERR_SetMessage("%s: bulkdata_generate_json_report failed", __FUNCTION__); + if (strcmp(ctrl.encoding_type, BULKDATA_ENCODING_TYPE_JSON) == 0) { + report = bulkdata_generate_json_report(bp, ctrl.report_timestamp, ctrl.report_format); + if (report == NULL) + { + USP_ERR_SetMessage("%s: bulkdata_generate_json_report failed", __FUNCTION__); + return; + } + } else if (strcmp(ctrl.encoding_type, BULKDATA_ENCODING_TYPE_CSV) == 0) { + report = bulkdata_generate_csv_report(bp, ctrl.field_separator, ctrl.row_separator, ctrl.escape_char, + ctrl.csv_format, ctrl.row_timestamp); + if (report == NULL) + { + USP_ERR_SetMessage("%s: bulkdata_generate_csv_report failed", __FUNCTION__); + return; + } + } else { + USP_ERR_SetMessage("%s: bulkdata invalid report encoding type %s", __FUNCTION__, ctrl.encoding_type); return; } // Print out the JSON report, if debugging is enabled - USP_LOG_Info("\nBULK DATA: %sing at time %s, to url=%s", ctrl.method, iso8601_cur_time(buf, sizeof(buf)), ctrl.url); + USP_LOG_Info("BULK DATA: %sing at time %s, to url=%s", ctrl.method, iso8601_cur_time(buf, sizeof(buf)), ctrl.url); USP_LOG_Info("BULK DATA: using compression method=%s", ctrl.compression); if (enable_protocol_trace) { - USP_LOG_String(kLogLevel_Info, kLogType_Protocol, json_report); + USP_LOG_String(kLogLevel_Info, kLogType_Protocol, report); } // Compress the report, if enabled - compressed_report = bulkdata_compress_report(&ctrl, json_report, strlen(json_report), &compressed_len); - if (compressed_report != (unsigned char *)json_report) + compressed_report = bulkdata_compress_report(&ctrl, report, strlen(report), &compressed_len); + if (compressed_report != (unsigned char *)report) { - free(json_report); + free(report); } // NOTE: From this point on, only the compressed_report exists @@ -2590,8 +2709,15 @@ void bulkdata_process_profile_usp_event( kv_vector_t event_args; kv_pair_t kv; report_t *cur_report; - char *json_report; char report_timestamp[33]; + char report_format[20]; + char *report; + char encoding_type[10] = {0}; + char field_separator[10]; + char row_separator[10]; + char escape_char[10]; + char csv_format[20]; + char row_timestamp[33]; combined_role_t *bulkdata_role; combined_role_t combined_role; int cont_instance; @@ -2610,13 +2736,63 @@ void bulkdata_process_profile_usp_event( return; } - // Exit if unable to get ReportTimestamp - USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.JSONEncoding.ReportTimestamp", bp->profile_id); - err = DATA_MODEL_GetParameterValue(path, report_timestamp, sizeof(report_timestamp), 0); - if (err != USP_ERR_OK) - { - return; - } + // Exit if unable to get EncodingType + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.EncodingType", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, encoding_type, sizeof(encoding_type), 0); + if (err != USP_ERR_OK) { + return; + } + + if (strcmp(encoding_type, BULKDATA_ENCODING_TYPE_JSON) == 0) { + // Exit if unable to get ReportTimestamp + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.JSONEncoding.ReportTimestamp", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, report_timestamp, sizeof(report_timestamp), 0); + if (err != USP_ERR_OK) { + return; + } + + // Exit if unable to get ReportFormat + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.JSONEncoding.ReportFormat", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, report_format, sizeof(report_format), 0); + if (err != USP_ERR_OK) { + return; + } + } else { + // Exit if unable to get FieldSeparator + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.FieldSeparator", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, field_separator, sizeof(field_separator), 0); + if (err != USP_ERR_OK) { + return; + } + + // Exit if unable to get RowSeparator + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.RowSeparator", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, row_separator, sizeof(row_separator), 0); + if (err != USP_ERR_OK) { + return; + } + + // Exit if unable to get EscapeCharacter + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.EscapeCharacter", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, escape_char, sizeof(escape_char), 0); + if (err != USP_ERR_OK) { + return; + } + + // Exit if unable to get ReportFormat + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.ReportFormat", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, csv_format, sizeof(csv_format), 0); + if (err != USP_ERR_OK) { + return; + } + + // Exit if unable to get RowTimestamp + USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.CSVEncoding.RowTimestamp", bp->profile_id); + err = DATA_MODEL_GetParameterValue(path, row_timestamp, sizeof(row_timestamp), 0); + if (err != USP_ERR_OK) { + return; + } + } // When sending via USP events, only one report is ever sent in each USP event // So ensure all retained reports are removed. NOTE: Clearing the reports here is only necessary when switching protocol from HTTP to USP event, and where HTTP had some unsent reports @@ -2634,11 +2810,17 @@ void bulkdata_process_profile_usp_event( } bp->num_retained_reports = 1; + if (strcmp(encoding_type, BULKDATA_ENCODING_TYPE_JSON) == 0) { + report = bulkdata_generate_json_report(bp, report_timestamp, report_format); + } else { + report = bulkdata_generate_csv_report(bp, field_separator, row_separator, escape_char, + csv_format, row_timestamp); + } + // Exit if unable to generate the report - json_report = bulkdata_generate_json_report(bp, report_timestamp); - if (json_report == NULL) + if (report == NULL) { - USP_ERR_SetMessage("%s: bulkdata_generate_json_report failed", __FUNCTION__); + USP_ERR_SetMessage("%s: bulkdata_generate_report failed", __FUNCTION__); return; } @@ -2646,15 +2828,15 @@ void bulkdata_process_profile_usp_event( // Construct event_args manually to avoid the overhead of a malloc and copy of the report in KV_VECTOR_Add() kv.key = "Data"; - kv.value = json_report; + kv.value = report; event_args.vector = &kv; event_args.num_entries = 1; USP_SNPRINTF(path, sizeof(path), "Device.BulkData.Profile.%d.Push!", bp->profile_id); DEVICE_SUBSCRIPTION_ProcessAllEventCompleteSubscriptions(path, &event_args, cont_instance); - // Free the report. No need to free the event_args as json_report is the only thing dynamically allocated in it - free(json_report); // The report is not allocated via USP_MALLOC + // Free the report. No need to free the event_args as report is the only thing dynamically allocated in it + free(report); // The report is not allocated via USP_MALLOC // From the point of view of this code, the report(s) have been successfully sent, so don't retain them // NOTE: Sending of the reports successfully is delegated to the USP notification retry mechanism @@ -2736,15 +2918,28 @@ void bulkdata_process_profile_mqtt(bulkd } // Exit if unable to generate the report - report = bulkdata_generate_json_report(bp, ctrl.report_timestamp); - if (report == NULL) - { - USP_ERR_SetMessage("%s: bulkdata_generate_json_report failed", __FUNCTION__); - return; + if (strcmp(ctrl.encoding_type, BULKDATA_ENCODING_TYPE_JSON) == 0) { + report = bulkdata_generate_json_report(bp, ctrl.report_timestamp, ctrl.report_format); + if (report == NULL) + { + USP_ERR_SetMessage("%s: bulkdata_generate_json_report failed", __FUNCTION__); + return; + } + } else if (strcmp(ctrl.encoding_type, BULKDATA_ENCODING_TYPE_CSV) == 0) { + report = bulkdata_generate_csv_report(bp, ctrl.field_separator, ctrl.row_separator, ctrl.escape_char, + ctrl.csv_format, ctrl.row_timestamp); + if (report == NULL) + { + USP_ERR_SetMessage("%s: bulkdata_generate_csv_report failed", __FUNCTION__); + return; + } + } else { + USP_ERR_SetMessage("%s: bulkdata invalid report encoding type %s", __FUNCTION__, ctrl.encoding_type); + return; } // Print out the JSON report, if debugging is enabled - USP_LOG_Debug("\nBULK DATA: Sending at time %s to MQTT topic %s", iso8601_cur_time(buf, sizeof(buf)), ctrl.mqtt_publish_topic); + USP_LOG_Debug("BULK DATA: Sending at time %s to MQTT topic %s", iso8601_cur_time(buf, sizeof(buf)), ctrl.mqtt_publish_topic); if (enable_protocol_trace) { USP_LOG_String(kLogLevel_Info, kLogType_Protocol, report); @@ -2939,7 +3134,7 @@ int bulkdata_reduce_to_alt_name(char *sp /*********************************************************************//** ** -** bulkdata_generate_json_report +** bulkdata_generate_json_name_value_pair_report ** ** Generates a JSON name-value pair format report ** NOTE: The report contains all retained failed reports, as well as the current report @@ -2951,7 +3146,7 @@ int bulkdata_reduce_to_alt_name(char *sp ** \return pointer to NULL terminated dynamically allocated buffer containing the serialized report to send ** **************************************************************************/ -char *bulkdata_generate_json_report(bulkdata_profile_t *bp, char *report_timestamp) +char *bulkdata_generate_json_name_value_pair_report(bulkdata_profile_t *bp, char *report_timestamp) { JsonNode *top; // top of report JsonNode *array; // array of reports (retained + current) @@ -3056,6 +3251,483 @@ char *bulkdata_generate_json_report(bulk return result; } +static char *create_json_obj_hier_report(bulkdata_profile_t *bp, char *report_timestamp) +{ + JsonNode *top; // top of report + JsonNode *array; // array of reports (retained + current) + JsonNode *element; // element of json array, containing an individual report + JsonNode *temp; + char *param_path; + char *param_type_value; + char param_type; + char *param_value; + kv_vector_t *report_map; + report_t *report; + double value_as_number; + long long value_as_ll; + unsigned long long value_as_ull; + bool value_as_bool; + int i, j; + char buf[32]; + kv_pair_t *kv; + int err; + char *result; + + top = json_mkobject(); + array = json_mkarray(); + + // Iterate over all reports adding them to the JSON array + for (i=0; i < bp->num_retained_reports; i++) + { + report = &bp->reports[i]; + report_map = &report->report_map; + + // Add Collection time to each json report element (only if specified and not 'None') + element = json_mkobject(); + if (strcmp(report_timestamp, "Unix-Epoch")==0) + { + json_append_member(element, "CollectionTime", json_mknumber(report->collection_time)); + } + else if (strcmp(report_timestamp, "ISO-8601")==0) + { + result = iso8601_from_unix_time(report->collection_time, buf, sizeof(buf)); + if (result != NULL) + { + json_append_member(element, "CollectionTime", json_mkstring(buf)); + } + } + + temp = element; + // Iterate over each parameter, adding it to the json element. Take account of the parameter's type + for (j=0; j < report_map->num_entries; j++) + { + char buff[2056] = {0}; + char *pch = NULL, *pchr = NULL, *argv[128] = {0}; + int n = 0; + + kv = &report_map->vector[j]; + param_path = kv->key; + param_type_value = kv->value; + param_type = param_type_value[0]; // First character denotes the type of the parameter + param_value = ¶m_type_value[1]; // Subsequent characters contain the parameter's value + + strncpy(buff, param_path, sizeof(buff)); + for (pch = strtok_r(buff, ".", &pchr); pch != NULL; pch = strtok_r(NULL, ".", &pchr)) { + int idx; + JsonNode *obj = element; + argv[n] = pch; + + for (idx = 0; idx <= n; idx++) { + if (obj == NULL) + break; + obj = json_find_member(obj, argv[idx]); + } + + if (obj) + temp = obj; + else { + if (pchr != NULL && *pchr != '\0') { + // It is a DMOBJ + JsonNode *new = json_mkobject(); + json_append_member(temp, pch, new); + temp = new; + } else { + // It is a DMPARAM + switch (param_type) + { + case 'S': + json_append_member(temp, pch, json_mkstring(param_value) ); + break; + + case 'U': + value_as_ull = strtoull(param_value, NULL, 10); + json_append_member(temp, pch, json_mkulonglong(value_as_ull) ); + break; + + case 'L': + value_as_ll = strtoll(param_value, NULL, 10); + json_append_member(temp, pch, json_mklonglong(value_as_ll) ); + break; + + case 'N': + value_as_number = atof(param_value); + json_append_member(temp, pch, json_mknumber(value_as_number) ); + break; + + case 'B': + err = TEXT_UTILS_StringToBool(param_value, &value_as_bool); + if (err == USP_ERR_OK) + { + json_append_member(temp, pch, json_mkbool(value_as_bool) ); + } + break; + + default: + USP_ERR_SetMessage("%s: Invalid JSON parameter type ('%c') in report map for %s", __FUNCTION__, param_type_value[0], param_path); + break; + } + } + } + n++; + } + } + + // Add the json element to the json array + json_append_element(array, element); + } + + // Finally add the array to the report top level + json_append_member(top, "Report", array); + + // Serialize the JSON tree + result = json_stringify(top, " "); + + // Clean up the JSON tree + json_delete(top); // Other JsonNodes which are children of this top level tree will be deleted + + return result; +} + +/*********************************************************************//** +** +** bulkdata_generate_json_report +** +** Generates a JSON name-value pair or object-hierarchy format report +** NOTE: The report contains all retained failed reports, as well as the current report +** See TR-157 section A.4.2 (end) for an example, and section A.3.5.2 for layout of content containing failed report transmissions +** +** \param bp - pointer to bulk data profile containing all reports (current and retained) +** \param report_timestamp - value of Device.BulkData.Profile.{i}.JSONEncoding.ReportTimestamp +** +** \return pointer to NULL terminated dynamically allocated buffer containing the serialized report to send +** +**************************************************************************/ +char *bulkdata_generate_json_report(bulkdata_profile_t *bp, char *report_timestamp, char *report_format) +{ + char *result = NULL; + + if (strcmp(report_format, BULKDATA_JSON_REPORT_FORMAT_NAME_VALUE) == 0) { + result = bulkdata_generate_json_name_value_pair_report(bp, report_timestamp); + } else if (strcmp(report_format, BULKDATA_JSON_REPORT_FORMAT_OBJ_HIER) == 0) { + result = create_json_obj_hier_report(bp, report_timestamp); + } + + return result; +} + +/*********************************************************************//** +** +** safe_asprintf +** +** Wrapper around asprintf that calls terminate in case of error +** +** \param strp -- pointer to the output string +** \param fmt -- printing format +** +** \return None +** +**************************************************************************/ +static void safe_asprintf(char **strp, const char *fmt, ...) +{ + int ret; + va_list argp; + + va_start(argp, fmt); + ret = vasprintf(strp, fmt, argp); + va_end(argp); + + if (ret == -1) { + USP_ERR_Terminate("%s (%d): asprintf(%s) failed", + __FUNCTION__, __LINE__, fmt); + } +} + +/*********************************************************************//** +** +** append_string_to_target +** +** concatenates the src string with target string in newly allocated memory +** and assign back the new pointer. +** +** \param str - pointer to the src string +** \param output - address of the pointer that points to the target string +** +** \return None +** +**************************************************************************/ +void append_string_to_target(char *str, char **output) +{ + char *tmp = NULL; + + if (str == NULL || strlen(str) == 0) + return; + + if (*output == NULL || strlen(*output) == 0) { + *output = USP_STRDUP(str); + return; + } else { + tmp = USP_STRDUP(*output); + free(*output); + } + + assert(tmp != NULL); + safe_asprintf(output, "%s%s", tmp, str); + free(tmp); +} + +/************************************************************************* +** +** csv_encode +** +** encodes the src string to CSV specification RFC4180 compliance string +** and assign back the new pointer. +** +** \param str - pointer to the src string +** \return address of the pointer that points to the target string or NULL +** +**************************************************************************/ +char *csv_encode(const char *str) +{ + if (str == NULL) + return NULL; + + int len = strlen(str); + if (len == 0) + return strdup(str); + + char *temp = NULL; + // Get the number of '\"' present in the string + int quote_count = 0; + + temp = strchr(str, '\"'); + while (temp) { + quote_count++; + temp = strchr(temp+1, '\"'); + } + + int encode_size = len + quote_count + 3; // added 3 for initial quote, end quote & null at end + temp = (char *)malloc(sizeof(char) * encode_size); + + if (!temp) + return NULL; + + memset(temp, 0, sizeof(char) * encode_size); + + int i = 0, j = 0; + + temp[j++] = '\"'; + for (i = 0; i < len; i++) { + if (str[i] == '\"') { + if (j > (encode_size - 3)) + break; + + temp[j++] = '\"'; + } + + if (j > (encode_size - 3)) + break; + + temp[j++] = str[i]; + } + + temp[j] = '\"'; + + return temp; +} + +/*********************************************************************//** +** +** bulkdata_generate_csv_report +** +** Generates a CSV ParameterPerRow or ParameterPerColumn format report +** NOTE: The report contains all retained failed reports, as well as the current report +** See TR-157 section A.4.2 (end) for an example, and section A.3.5.2 for layout of content containing failed report transmissions +** +** \param bp - pointer to bulk data profile containing all reports (current and retained) +** \param field_separator - value of Device.Bulkdata.Profile.{i}.CSVEncoding.FieldSeparator +** \param row_separator - value of Device.Bulkdata.Profile.{i}.CSVEncoding.RowSeparator +** \param escape_char - value of Device.Bulkdata.Profile.{i}.CSVEncoding.EscapeCharacter +** \param csv_format - value of Device.BulkData.Profile.{i}.CSVEncoding.ReportFormat +** \param row_timestamp - value of Device.Bulkdata.Profile.{i}.CSVEncoding.RowTimestamp +** +** \return pointer to NULL terminated dynamically allocated buffer containing the serialized report to send +** +**************************************************************************/ +char *bulkdata_generate_csv_report(bulkdata_profile_t *bp, char *field_separator, char *row_separator, + char *escape_char, char *csv_format, char *row_timestamp) +{ + char *param_path; + char *param_type_value; + char param_type; + char *param_value; + kv_vector_t *report_map; + report_t *report; + int i, j; + bool value_as_bool; + char buf[32]; + kv_pair_t *kv; + int err; + char *output = NULL, *str = NULL, *str1 = NULL, *str2 = NULL, rowseparator = '\0', separator = '\0'; + + if (strcmp(row_separator, " ") == 0) + rowseparator = '\n'; + else if (strcmp(row_separator, " ") == 0) + rowseparator = '\r'; + + if (field_separator) + separator = field_separator[0]; + + if (strcasecmp(csv_format, "ParameterPerRow") == 0) { + if (strcmp(row_timestamp, "None") == 0) + safe_asprintf(&str, "ParameterName%cParameterValue%cParameterType%c", separator, separator, rowseparator); + else + safe_asprintf(&str, "ReportTimestamp%cParameterName%cParameterValue%cParameterType%c", separator, separator, separator, rowseparator); + + assert(str != NULL); + append_string_to_target(str, &output); + free(str); + str = NULL; + } + + for (i=0; i < bp->num_retained_reports; i++) + { + char *timestamp = NULL; + report = &bp->reports[i]; + report_map = &report->report_map; + + // Add Collection time to each csv report element (only if specified and not 'None') + if (strcmp(row_timestamp, "Unix-Epoch")==0) + { + safe_asprintf(×tamp, "%lld", (long long int)report->collection_time); + } + else if (strcmp(row_timestamp, "ISO-8601")==0) + { + char *result = iso8601_from_unix_time(report->collection_time, buf, sizeof(buf)); + if (result != NULL) + { + char *val = csv_encode(buf); + if (val) { + safe_asprintf(×tamp, "%s", buf); + free(val); + } + } + } + + if (strcasecmp(csv_format, "ParameterPerColumn") == 0 && timestamp) { + append_string_to_target("ReportTimestamp", &str1); + append_string_to_target(timestamp, &str2); + } + + // Iterate over each parameter, adding it to the json element. Take account of the parameter's type + for (j=0; j < report_map->num_entries; j++) + { + kv = &report_map->vector[j]; + param_path = kv->key; + param_type_value = kv->value; + param_type = param_type_value[0]; // First character denotes the type of the parameter + param_value = ¶m_type_value[1]; // Subsequent characters contain the parameter's valu + char *type = NULL; + switch (param_type) + { + case 'S': + type = "string"; + break; + + case 'U': + type = "unsignedInt"; + break; + + case 'L': + type = "long"; + break; + + case 'N': + type = "decimal"; + break; + + case 'B': + err = TEXT_UTILS_StringToBool(param_value, &value_as_bool); + if (err == USP_ERR_OK) + { + type = "boolean"; + param_value = value_as_bool ? "True" : "False"; + } + break; + + default: + USP_ERR_SetMessage("%s: Invalid JSON parameter type ('%c') in report map for %s", __FUNCTION__, param_type_value[0], param_path); + break; + } + + if (type) { + char *p_path = csv_encode(param_path); + char *p_value = csv_encode(param_value); + + if (strcasecmp(csv_format, "ParameterPerRow") == 0) { + if (timestamp == NULL) + safe_asprintf(&str, "%s%c%s%c%s%c", p_path, separator, p_value, separator, type, rowseparator); + else + safe_asprintf(&str, "%s%c%s%c%s%c%s%c", timestamp, separator, p_path, separator, p_value, separator, type, rowseparator); + + assert(str != NULL); + append_string_to_target(str, &output); + free(str); + str = NULL; + } else { + if (str1 == NULL || strlen(str1) == 0) + safe_asprintf(&str, "%s", p_path); + else + safe_asprintf(&str, "%c%s", separator, p_path); + + assert(str != NULL); + append_string_to_target(str, &str1); + free(str); + str = NULL; + + if (str2 == NULL || strlen(str2) == 0) + safe_asprintf(&str, "%s", p_value); + else + safe_asprintf(&str, "%c%s", separator, p_value); + + assert(str != NULL); + append_string_to_target(str, &str2); + free(str); + str = NULL; + } + + if (p_path) + free(p_path); + + if (p_value) + free(p_value); + } + } + + if (timestamp) { + free(timestamp); + timestamp = NULL; + } + } + + if (strcasecmp(csv_format, "ParameterPerColumn") == 0) { + safe_asprintf(&str, "%c", rowseparator); + assert(str != NULL); + append_string_to_target(str, &str1); + append_string_to_target(str, &str2); + append_string_to_target(str1, &output); + append_string_to_target(str2, &output); + } + + if (str) + free(str); + if (str1) + free(str1); + if (str2) + free(str2); + + return output; +} + /*********************************************************************//** ** ** bulkdata_compress_report @@ -3259,6 +3931,20 @@ int bulkdata_schedule_sending_http_repor flags |= BDC_FLAG_DATE_HEADER; } + if (strcmp(ctrl->encoding_type, BULKDATA_ENCODING_TYPE_JSON) == 0) { + if (strcmp(ctrl->report_format, BULKDATA_JSON_REPORT_FORMAT_OBJ_HIER) == 0) { + flags |= BDC_FLAG_HEADER_OBJ_HIER; + } else { + flags |= BDC_FLAG_HEADER_NAME_VAL; + } + } else { + if (strcmp(ctrl->csv_format, BULKDATA_CSV_REPORT_FORMAT_PER_COLUMN) == 0) { + flags |= BDC_FLAG_HEADER_PER_COL; + } else { + flags |= BDC_FLAG_HEADER_PER_ROW; + } + } + // Exit if failed to post a message to BDC thread // NOTE: Ownership of full_url, query_string, report, username and password passes to BDC_EXEC err = BDC_EXEC_PostReportToSend(bp->profile_id, full_url, query_string, username, password, report, report_len, flags);