Merge branch 'mx/rm19832-obuspa-mqtt-retry' into 'devel'

obuspa: keep MQTT replies alive across PPP reconnects

See merge request feed/iopsys!2132
This commit is contained in:
Xiaofeng Meng 2026-03-12 20:07:38 +00:00 committed by IOPSYS Dev
commit e28f518fb8
No known key found for this signature in database

View file

@ -0,0 +1,473 @@
diff --git a/src/core/mqtt.c b/src/core/mqtt.c
index 9523e1e958..62c4cb954f 100644
--- a/src/core/mqtt.c
+++ b/src/core/mqtt.c
@@ -49,6 +49,7 @@
#include <errno.h>
#include <math.h>
#include <fcntl.h>
+#include <net/if.h>
#include <openssl/ssl.h>
#include <openssl/bio.h>
#include <openssl/err.h>
@@ -66,6 +67,7 @@
#include "text_utils.h"
#include "msg_handler.h"
#include "iso8601.h"
+#include "nu_ipaddr.h"
// Defines for MQTT Property Values
#define PUBLISH 0x30
@@ -79,6 +81,7 @@
//------------------------------------------------------------------------------
// Maximum length of a topic in characters (including NULL terminator)
#define MAX_TOPIC_LEN 128
+#define MQTT_MGMT_IP_ADDR_POLL_PERIOD 5
//------------------------------------------------------------------------------
// Cause of failure of an MQTT Client Connection
@@ -148,6 +151,8 @@ typedef struct
bool is_disconnected; // Set if the disconnect callback has been called, to cause ensuing actions to be taken
bool is_subscribed; // Set if the agent is subscribed to at least one topic. This flag is used to prevent the agent
// from sending notification messages until it knows that it can receive a notification response from the Controller
+ char mgmt_ip_addr[NU_IPADDRSTRLEN]; // Source IP address used by the current MQTT session
+ char mgmt_if_name[IFNAMSIZ]; // Source interface used by the current MQTT session
str_vector_t frame_trace; // string vector in which to build up debug trace of the options for the next MQTT frame to be transmitted
} mqtt_client_t;
@@ -180,11 +185,13 @@ typedef struct
char *topic; // Name of the MQTT Topic to send to
mqtt_qos_t qos; // QOS to request when building PUBLISH message (obtained from mqtt_conn_params_t.publish_qos)
int mid; // MQTT message ID. This is filled in by libmosquitto, when we tell libmosquitto to send this message
+ bool is_in_flight; // Set once a publish has been initiated for this queue entry and cleared on completion or disconnect
time_t expiry_time; // Time at which this USP record should be removed from the queue
} mqtt_send_item_t;
mqtt_client_t mqtt_clients[MAX_MQTT_CLIENTS];
static pthread_mutex_t mqtt_access_mutex;
+static time_t next_mqtt_mgmt_if_poll_time = 0;
//------------------------------------------------------------------------------------
// Macros to build up a log of the contents of the MQTT protocol aspects of a frame being sent (eg user properties)
@@ -238,6 +245,7 @@ void HandleMqttError(mqtt_client_t *client, mqtt_failure_t failure_code, const c
void MoveState_Private(mqtt_state_t *state, mqtt_state_t to, const char *event, const char* func);
void ParamReplace(mqtt_conn_params_t *dest, mqtt_conn_params_t *src);
bool IsUspRecordInMqttQueue(mqtt_client_t *client, unsigned char *pbuf, int pbuf_len);
+mqtt_send_item_t *FindInflightMqttQueueItem(mqtt_client_t *client, int mid);
mqtt_client_t *FindUnusedMqttClient_Local();
mqtt_client_t *FindMqttClientByInstance(int instance);
mqtt_subscription_t *FindMqttSubscriptionByInstance(mqtt_client_t *client, int subinstance);
@@ -254,7 +262,12 @@ void HandleMqttDisconnect(mqtt_client_t *client);
void DisconnectIfAllSubscriptionsFailed(mqtt_client_t *client);
bool IsMqttBrokerUp(mqtt_client_t *client, char *dst_ip, int size);
void RemoveMqttQueueItem(mqtt_client_t *client, mqtt_send_item_t *queued_msg);
+void ResetMqttQueueItemState(mqtt_client_t *client);
void RemoveExpiredMqttMessages(mqtt_client_t *client);
+void UpdateMqttSourceInterface(mqtt_client_t *client);
+int UpdateMqttMgmtInterface(void);
+void HandleMqttSourceIPAddrChanges(void);
+void ForceMqttReconnect(mqtt_client_t *client);
void ParseSubscribeTopicsFromConnack(mqtt_client_t *client, mosquitto_property *prop);
void AddConnackSubscription(mqtt_client_t *client, char *topic);
void QueueUspMqttConnectRecords(mqtt_client_t *client);
@@ -386,6 +399,7 @@ int MQTT_Start(void)
mqtt_client_t *client;
OS_UTILS_LockMutex(&mqtt_access_mutex);
+ next_mqtt_mgmt_if_poll_time = 0;
// Initialise the SSL contexts for all of the clients
// This cannot be done in MQTT_Init() because at that time in the initialisation the trust store certs haven't been locally cached
@@ -1244,6 +1258,7 @@ void MQTT_UpdateAllSockSet(socket_set_t *set)
int i;
int sock;
int err;
+ int timeout;
mqtt_send_item_t *q_msg;
mqtt_client_t *client;
@@ -1259,6 +1274,9 @@ void MQTT_UpdateAllSockSet(socket_set_t *set)
// Set a default timeout of 1 second
SOCKET_SET_UpdateTimeout(1*SECONDS, set);
+ timeout = UpdateMqttMgmtInterface();
+ SOCKET_SET_UpdateTimeout(timeout*SECONDS, set);
+
// Iterate over all mqtt clients currently enabled
for (i = 0; i < MAX_MQTT_CLIENTS; i++)
{
@@ -1271,6 +1289,13 @@ void MQTT_UpdateAllSockSet(socket_set_t *set)
RemoveExpiredMqttMessages(client);
if ((client->usp_record_send_queue.head) && (client->is_subscribed))
{
+ q_msg = (mqtt_send_item_t *) client->usp_record_send_queue.head;
+
+ if (q_msg->is_in_flight)
+ {
+ break;
+ }
+
// Initiate send. Note: if an error occured, it will be logged in mqtt_publish_err_msg[]
err = SendQueueHead(client);
if (err == USP_ERR_OK)
@@ -1279,17 +1304,12 @@ void MQTT_UpdateAllSockSet(socket_set_t *set)
// If this is a USP disconnect record, then save the (libmosquitto allocated) message id, so that
// when we get the publish callback, we can then shutdown the connection
// NOTE: USP Disconnect records terminating an E2E session (kMtpContentType_E2E_SessTermination) do not shutdown the connection
- q_msg = (mqtt_send_item_t *) client->usp_record_send_queue.head;
client->disconnect_mid = (q_msg->item.content_type == kMtpContentType_DisconnectRecord) ? q_msg->mid : INVALID_MOSQUITTO_MID;
-
- // Remove item from send queue
- RemoveMqttQueueItem(client, q_msg);
}
else
{
// Failed to initiate send
// Replace the item to send with a USP ERROR or drop the item, to prevent send queue deadlock
- q_msg = (mqtt_send_item_t *) client->usp_record_send_queue.head;
if (q_msg != NULL)
{
if (MSG_HANDLER_ShouldDropFailedMessage(&q_msg->item, mqtt_publish_err_msg) == true)
@@ -1811,6 +1831,8 @@ void InitClient(mqtt_client_t *client, int index)
client->disconnect_mid = INVALID_MOSQUITTO_MID;
client->is_reconnect = false;
client->is_subscribed = false;
+ client->mgmt_ip_addr[0] = '\0';
+ client->mgmt_if_name[0] = '\0';
KV_VECTOR_Init(&client->controller_topics);
STR_VECTOR_Init(&client->frame_trace);
@@ -1968,6 +1990,8 @@ void CleanMqttClient(mqtt_client_t *client, bool is_reconnect)
USP_SAFE_FREE(client->agent_topic_from_connack);
client->retry_time = 0;
client->is_subscribed = false;
+ client->mgmt_ip_addr[0] = '\0';
+ client->mgmt_if_name[0] = '\0';
// Free all subscriptions whose lifetime is set by the connection (rather than being set by configuration in Device.MQTT.Client.{i}.Subscription table)
MqttSubscriptionDestroy(&client->response_subscription);
@@ -2323,6 +2347,7 @@ void QueueUspRecord_MQTT(mqtt_client_t *client, mtp_send_item_t *msi, char *cont
send_item->item = *msi; // NOTE: Ownership of the payload buffer passes to the MQTT client
send_item->topic = USP_STRDUP(controller_topic);
send_item->mid = INVALID;
+ send_item->is_in_flight = false;
send_item->qos = client->conn_params.publish_qos;
send_item->expiry_time = expiry_time;
@@ -2335,6 +2360,7 @@ void QueueUspRecord_MQTT(mqtt_client_t *client, mtp_send_item_t *msi, char *cont
{
DLLIST_LinkToTail(&client->usp_record_send_queue, send_item);
}
+
}
/*********************************************************************//**
@@ -2352,6 +2378,7 @@ void QueueUspRecord_MQTT(mqtt_client_t *client, mtp_send_item_t *msi, char *cont
int SendQueueHead(mqtt_client_t *client)
{
int err = USP_ERR_OK;
+ mqtt_send_item_t * q_msg;
// Can't be passed a NULL client
USP_ASSERT(client != NULL);
@@ -2362,7 +2389,7 @@ int SendQueueHead(mqtt_client_t *client)
mqtt_state_t state = client->state;
if (state == kMqttState_Running)
{
- mqtt_send_item_t * q_msg = (mqtt_send_item_t *) client->usp_record_send_queue.head;
+ q_msg = (mqtt_send_item_t *) client->usp_record_send_queue.head;
// Check the queue head is ok
if (q_msg == NULL)
@@ -2372,6 +2399,10 @@ int SendQueueHead(mqtt_client_t *client)
}
err = Publish(client, q_msg);
+ if (err == USP_ERR_OK)
+ {
+ q_msg->is_in_flight = true;
+ }
}
else
{
@@ -2924,6 +2955,7 @@ void ConnectCallback(struct mosquitto *mosq, void *userdata, int result)
ResetRetryCount(client);
+ UpdateMqttSourceInterface(client);
MoveState(&client->state, kMqttState_Running, "Connect Callback Received");
SubscribeToAll(client);
@@ -3031,6 +3063,7 @@ void ConnectV5Callback(struct mosquitto *mosq, void *userdata, int result, int f
FRAME_TRACE_DUMP(client);
ResetRetryCount(client);
+ UpdateMqttSourceInterface(client);
MoveState(&client->state, kMqttState_Running, "Connect Callback Received");
SubscribeToAll(client);
@@ -3222,6 +3255,7 @@ void DisconnectCallback(struct mosquitto *mosq, void *userdata, int rc)
// Mark the MQTT client as disconnected. The actions to perform after this must be performed in the MQTT MTP thread
// because they might free the mosquitto context, and that must not be done from this callback (libmosquitto is still using it in the functions which called this)
USP_LOG_Debug("%s: Disconnected from (host=%s, port=%d, cause rc=%d, is_reconnect=%d)\n", __FUNCTION__, client->conn_params.host, client->conn_params.port, rc, client->is_reconnect);
+ ResetMqttQueueItemState(client);
client->is_disconnected = true;
exit:
@@ -3292,6 +3326,141 @@ void HandleMqttReconnectAfterDisconnect(mqtt_client_t *client)
}
}
+/*********************************************************************//**
+**
+** UpdateMqttSourceInterface
+**
+** Saves the source IP address and source interface used by the current MQTT
+** socket. This is used to detect WAN address changes whilst the broker
+** session is otherwise idle.
+**
+** \param client - pointer to MQTT client
+**
+** \return None
+**
+**************************************************************************/
+void UpdateMqttSourceInterface(mqtt_client_t *client)
+{
+ int err;
+ int sock;
+
+ client->mgmt_ip_addr[0] = '\0';
+ client->mgmt_if_name[0] = '\0';
+
+ sock = mosquitto_socket(client->mosq);
+ if (sock == -1)
+ {
+ return;
+ }
+
+ err = nu_ipaddr_get_interface_addr_from_sock_fd(sock, client->mgmt_ip_addr, sizeof(client->mgmt_ip_addr));
+ if (err != USP_ERR_OK)
+ {
+ client->mgmt_ip_addr[0] = '\0';
+ return;
+ }
+
+ err = nu_ipaddr_get_interface_name_from_src_addr(client->mgmt_ip_addr, client->mgmt_if_name, sizeof(client->mgmt_if_name));
+ if (err != USP_ERR_OK)
+ {
+ client->mgmt_ip_addr[0] = '\0';
+ client->mgmt_if_name[0] = '\0';
+ return;
+ }
+}
+
+/*********************************************************************//**
+**
+** UpdateMqttMgmtInterface
+**
+** Polls the source interface used by MQTT connections for address changes.
+**
+** \param None
+**
+** \return Number of seconds until the next poll should run
+**
+**************************************************************************/
+int UpdateMqttMgmtInterface(void)
+{
+ time_t cur_time;
+ int timeout;
+
+ cur_time = time(NULL);
+ timeout = next_mqtt_mgmt_if_poll_time - cur_time;
+ if ((next_mqtt_mgmt_if_poll_time != 0) && (timeout > 0))
+ {
+ return timeout;
+ }
+
+ HandleMqttSourceIPAddrChanges();
+
+ timeout = MQTT_MGMT_IP_ADDR_POLL_PERIOD;
+ next_mqtt_mgmt_if_poll_time = cur_time + timeout;
+
+ return timeout;
+}
+
+/*********************************************************************//**
+**
+** HandleMqttSourceIPAddrChanges
+**
+** Forces an MQTT reconnect if the source interface address used by an active
+** MQTT session has changed or disappeared.
+**
+** \param None
+**
+** \return None
+**
+**************************************************************************/
+void HandleMqttSourceIPAddrChanges(void)
+{
+ int i;
+ bool has_addr;
+ bool has_changed;
+ mqtt_client_t *client;
+
+ for (i = 0; i < MAX_MQTT_CLIENTS; i++)
+ {
+ client = &mqtt_clients[i];
+ if ((client->conn_params.instance == INVALID) ||
+ (client->mgmt_if_name[0] == '\0') ||
+ (client->mgmt_ip_addr[0] == '\0'))
+ {
+ continue;
+ }
+
+ has_addr = false;
+ has_changed = nu_ipaddr_has_interface_addr_changed(client->mgmt_if_name, client->mgmt_ip_addr, &has_addr);
+ if (has_changed == false)
+ {
+ continue;
+ }
+
+ USP_LOG_Warning("%s: Source interface changed. Restarting MQTT client %d.", __FUNCTION__, client->conn_params.instance);
+ ForceMqttReconnect(client);
+ }
+}
+
+/*********************************************************************//**
+**
+** ForceMqttReconnect
+**
+** Immediately restarts an MQTT session without waiting for the broker-side
+** keepalive timeout. This is used when the source WAN address used by the
+** current socket changes underneath the connection.
+**
+** \param client - pointer to MQTT client
+** \return None
+**
+**************************************************************************/
+void ForceMqttReconnect(mqtt_client_t *client)
+{
+ ResetMqttQueueItemState(client);
+ client->is_disconnected = false;
+ client->is_reconnect = true;
+ HandleMqttReconnectAfterDisconnect(client);
+}
+
/*********************************************************************//**
**
** Subscribe
@@ -4057,7 +4226,9 @@ error:
void PublishCallback(struct mosquitto* mosq, void *userdata, int mid /*message id*/)
{
mqtt_client_t *client = NULL;
+ mqtt_send_item_t *q_msg = NULL;
int instance = *(int*) userdata;
+ bool should_wakeup = false;
OS_UTILS_LockMutex(&mqtt_access_mutex);
@@ -4068,17 +4239,33 @@ void PublishCallback(struct mosquitto* mosq, void *userdata, int mid /*message i
goto exit;
}
- USP_LOG_Debug("Received PUBACK frame from (host=%s, port=%d, mid=%d)", client->conn_params.host, client->conn_params.port, mid);
+ USP_LOG_Debug("Publish completed for (host=%s, port=%d, mid=%d)", client->conn_params.host, client->conn_params.port, mid);
+
+ q_msg = FindInflightMqttQueueItem(client, mid);
+ if (q_msg == NULL)
+ {
+ USP_LOG_Warning("%s: Received publish completion with unknown mid=%d. Ignoring.", __FUNCTION__, mid);
+ goto exit;
+ }
+
+ RemoveMqttQueueItem(client, q_msg);
+ should_wakeup = true;
// If this publish was for a USP disconnect record, then force an actual disconnect
if (mid == client->disconnect_mid)
{
+ client->disconnect_mid = INVALID_MOSQUITTO_MID;
HandleMqttError(client, kMqttFailure_OtherError, "Disconnecting after USP Disconnect Record sent");
}
exit:
OS_UTILS_UnlockMutex(&mqtt_access_mutex);
+ if (should_wakeup)
+ {
+ MTP_EXEC_MqttWakeup();
+ }
+
}
/*********************************************************************//**
@@ -4514,6 +4701,36 @@ bool IsUspRecordInMqttQueue(mqtt_client_t *client, unsigned char *pbuf, int pbuf
return false;
}
+/*********************************************************************//**
+**
+** FindInflightMqttQueueItem
+**
+** Finds the MQTT queue entry currently waiting for publish completion
+**
+** \param client - pointer to MQTT client
+** \param mid - libmosquitto message id identifying the message that completed
+**
+** \return matching queue entry or NULL if not found
+**
+**************************************************************************/
+mqtt_send_item_t *FindInflightMqttQueueItem(mqtt_client_t *client, int mid)
+{
+ mqtt_send_item_t *q_msg;
+
+ q_msg = (mqtt_send_item_t *) client->usp_record_send_queue.head;
+ while (q_msg != NULL)
+ {
+ if ((q_msg->is_in_flight) && ((q_msg->mid == mid) || (mid == INVALID_MOSQUITTO_MID)))
+ {
+ return q_msg;
+ }
+
+ q_msg = (mqtt_send_item_t *) q_msg->link.next;
+ }
+
+ return NULL;
+}
+
/*********************************************************************//**
**
** FindUnusedMqttClient_Local
@@ -4812,6 +5029,32 @@ void RemoveMqttQueueItem(mqtt_client_t *client, mqtt_send_item_t *queued_msg)
USP_SAFE_FREE(queued_msg);
}
+/*********************************************************************//**
+**
+** ResetMqttQueueItemState
+**
+** Clears any publish-in-progress state for queued messages after a disconnect
+**
+** \param client - pointer to MQTT client
+**
+** \return None
+**
+**************************************************************************/
+void ResetMqttQueueItemState(mqtt_client_t *client)
+{
+ mqtt_send_item_t *queued_msg;
+
+ client->disconnect_mid = INVALID_MOSQUITTO_MID;
+
+ queued_msg = (mqtt_send_item_t *) client->usp_record_send_queue.head;
+ while (queued_msg != NULL)
+ {
+ queued_msg->mid = INVALID_MOSQUITTO_MID;
+ queued_msg->is_in_flight = false;
+ queued_msg = (mqtt_send_item_t *) queued_msg->link.next;
+ }
+}
+
/*********************************************************************//**
**
** RemoveExpiredMqttMessages