iopsys-feed/obuspa/patches/2004-mqtt-dualstack-fallback.patch
2025-08-23 10:08:37 +05:30

401 lines
15 KiB
Diff

Index: obuspa-10.0.7.0/src/core/mqtt.c
===================================================================
--- obuspa-10.0.7.0.orig/src/core/mqtt.c
+++ obuspa-10.0.7.0/src/core/mqtt.c
@@ -53,6 +53,7 @@
#include <openssl/bio.h>
#include <openssl/err.h>
#include <openssl/x509v3.h>
+#include <netdb.h>
#include <mosquitto.h>
#include "mqtt.h"
@@ -206,8 +207,9 @@ int EnableMosquitto(mqtt_client_t *clien
void SetupCallbacks(mqtt_client_t *client);
void QueueUspConnectRecord_MQTT(mqtt_client_t *client, mtp_send_item_t *msi, char *controller_topic, time_t expiry_time);
int SendQueueHead(mqtt_client_t *client);
+bool check_connectivity(struct addrinfo *list_addr, unsigned int port, sa_family_t family, char *dst_ip, int size);
void Connect(mqtt_client_t *client);
-int PerformMqttClientConnect(mqtt_client_t *client);
+int PerformMqttClientConnect(mqtt_client_t *client, const char *host_ip);
int ConnectSetEncryption(mqtt_client_t *client);
void ConnectCallback(struct mosquitto *mosq, void *userdata, int result);
void ConnectV5Callback(struct mosquitto *mosq, void *userdata, int result, int flags, const mosquitto_property *props);
@@ -250,7 +252,7 @@ void HandleMqttReconnect(mqtt_client_t *
void HandleMqttReconnectAfterDisconnect(mqtt_client_t *client);
void HandleMqttDisconnect(mqtt_client_t *client);
void DisconnectIfAllSubscriptionsFailed(mqtt_client_t *client);
-bool IsMqttBrokerUp(mqtt_client_t *client);
+bool IsMqttBrokerUp(mqtt_client_t *client, char *dst_ip, int size);
void RemoveMqttQueueItem(mqtt_client_t *client, mqtt_send_item_t *queued_msg);
void RemoveExpiredMqttMessages(mqtt_client_t *client);
void ParseSubscribeTopicsFromConnack(mqtt_client_t *client, mosquitto_property *prop);
@@ -2380,6 +2382,143 @@ int SendQueueHead(mqtt_client_t *client)
return err;
}
+bool check_connectivity(struct addrinfo *list_addr, unsigned int port, sa_family_t family, char *dst_ip, int size)
+{
+ int err;
+ struct sockaddr_storage saddr;
+ socklen_t saddr_len;
+ sa_family_t sock_family;
+ int socket_fd = INVALID;
+ fd_set writefds;
+ struct timeval timeout;
+ int num_sockets;
+ int so_err;
+ socklen_t so_len = sizeof(so_err);
+ struct addrinfo *iterator;
+ struct sockaddr_in *a;
+ struct sockaddr_in6 *a6;
+ nu_ipaddr_t dst;
+
+ if (list_addr == NULL || dst_ip == NULL)
+ {
+ return false;
+ }
+
+ for (iterator=list_addr; iterator!=NULL; iterator=iterator->ai_next)
+ {
+ if (iterator->ai_family != family)
+ {
+ continue;
+ }
+
+ if (family == AF_INET)
+ {
+ a = (struct sockaddr_in *) iterator->ai_addr;
+ err = nu_ipaddr_from_inaddr(&a->sin_addr, &dst);
+
+ if (err != USP_ERR_OK)
+ {
+ USP_LOG_Error("%s: nu_ipaddr_from_inaddr() failed: %s", __FUNCTION__, strerror(err));
+ continue;
+ }
+ }
+ else
+ {
+ a6 = (struct sockaddr_in6 *) iterator->ai_addr;
+ err = nu_ipaddr_from_in6addr(&a6->sin6_addr, &dst);
+
+ if (err != USP_ERR_OK)
+ {
+ USP_LOG_Error("%s: nu_ipaddr_from_in6addr() failed: %s", __FUNCTION__, strerror(err));
+ continue;
+ }
+ }
+
+ // Next IP if unable to make a socket address structure to probe the MQTT server
+ err = nu_ipaddr_to_sockaddr(&dst, port, &saddr, &saddr_len);
+ if (err != USP_ERR_OK)
+ {
+ continue;
+ }
+
+ // Next IP if unable to determine which address family to use to contact the MQTT broker
+ err = nu_ipaddr_get_family(&dst, &sock_family);
+ if (err != USP_ERR_OK)
+ {
+ continue;
+ }
+
+ // Return if unable to create the socket
+ socket_fd = socket(sock_family, SOCK_STREAM, 0);
+ if (socket_fd == -1)
+ {
+ USP_LOG_Error("%s: failed to open socket", __FUNCTION__);
+ return false;
+ }
+
+ // Return if unable to set the socket as non blocking
+ // We do this before connecting so that we can timeout on connect taking too long
+ err = fcntl(socket_fd, F_SETFL, O_NONBLOCK);
+ if (err == -1)
+ {
+ USP_ERR_ERRNO("fcntl", errno);
+ close(socket_fd);
+ return false;
+ }
+
+ // Next IP if unable to connect to the MQTT Broker
+ // NOTE: The connect is performed in non-blocking mode
+ err = connect(socket_fd, (struct sockaddr *) &saddr, saddr_len);
+ if ((err == -1) && (errno != EINPROGRESS))
+ {
+ USP_ERR_ERRNO("connect", errno);
+ close(socket_fd);
+ continue;
+ }
+
+ // Set up arguments for the select() call
+ FD_ZERO(&writefds);
+ FD_SET(socket_fd, &writefds);
+ timeout.tv_sec = MQTT_CONNECT_TIMEOUT;
+ timeout.tv_usec = 0;
+
+ // Next IP if the connect timed out
+ num_sockets = select(socket_fd + 1, NULL, &writefds, NULL, &timeout);
+ if (num_sockets == 0)
+ {
+ USP_LOG_Error("%s: connect timed out", __FUNCTION__);
+ close(socket_fd);
+ continue;
+ }
+
+ // Next IP if unable to determine whether the connect was successful or not
+ err = getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, &so_err, &so_len);
+ if (err == -1)
+ {
+ USP_ERR_ERRNO("getsockopt", errno);
+ close(socket_fd);
+ continue;
+ }
+
+ // Next IP if connect was not successful
+ if (so_err != 0)
+ {
+ USP_LOG_Error("%s: async connect failed", __FUNCTION__);
+ close(socket_fd);
+ continue;
+ }
+
+ close(socket_fd);
+
+ // Connect was successful
+ nu_ipaddr_str(&dst, dst_ip, size);
+
+ return true;
+ }
+
+ return false;
+}
+
/*********************************************************************//**
**
** IsMqttBrokerUp
@@ -2394,109 +2533,92 @@ int SendQueueHead(mqtt_client_t *client)
** \return true if the MQTT Broker is up, false otherwise
**
**************************************************************************/
-bool IsMqttBrokerUp(mqtt_client_t *client)
+bool IsMqttBrokerUp(mqtt_client_t *client, char *dst_ip, int size)
{
int err;
bool prefer_ipv6;
- nu_ipaddr_t dst;
- struct sockaddr_storage saddr;
- socklen_t saddr_len;
- sa_family_t family;
- int socket_fd = INVALID;
- fd_set writefds;
- struct timeval timeout;
- int num_sockets;
- int so_err;
- socklen_t so_len = sizeof(so_err);
+ bool ipv4_supported = false;
+ bool ipv6_supported = false;
bool result = false;
+ struct addrinfo *addr_list = NULL;
+ struct addrinfo hints;
// Get the preference for IPv4 or IPv6, if dual stack
prefer_ipv6 = DEVICE_LOCAL_AGENT_GetDualStackPreference();
- // Exit if unable to determine the IP address of the MQTT Broker
- err = tw_ulib_diags_lookup_host(client->conn_params.host, AF_UNSPEC, prefer_ipv6, NULL, &dst);
- if (err != USP_ERR_OK)
- {
- goto exit;
- }
+ USP_LOG_Debug("%s: dual-stack preference is: %s", __FUNCTION__, prefer_ipv6 ? "IPv6" : "IPv4");
- // Exit if unable to make a socket address structure to probe the MQTT server
- err = nu_ipaddr_to_sockaddr(&dst, client->conn_params.port, &saddr, &saddr_len);
- if (err != USP_ERR_OK)
- {
- goto exit;
- }
+ // Initialise the hints to use, when obtaining the IP address of the specified host using getaddrinfo()
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_flags |= AI_ADDRCONFIG;
- // Exit if unable to determine which address family to use to contact the MQTT broker
- // NOTE: This shouldn't fail if tw_ulib_diags_lookup_host() is correct
- err = nu_ipaddr_get_family(&dst, &family);
- if (err != USP_ERR_OK)
+ err = getaddrinfo(client->conn_params.host, NULL, &hints, &addr_list);
+ if (err != USP_ERR_OK || addr_list == NULL)
{
- goto exit;
+ USP_ERR_SetMessage("%s: getaddrinfo() failed to resolve: %s", __FUNCTION__, client->conn_params.host);
+ return result;
}
- // Exit if unable to create the socket
- socket_fd = socket(family, SOCK_STREAM, 0);
- if (socket_fd == -1)
+ // Exit if unable to determine which address families are supported by the device
+ // NOTE: In theory, setting getaddrinfo hints to AI_ADDRCONFIG, should filter by supported address family
+ // However, unfortunately that flag does not take into account whether the address is globally routable (for IPv6) as well
+ err = nu_ipaddr_get_ip_supported_families(&ipv4_supported, &ipv6_supported);
+ if (err != USP_ERR_OK)
{
- goto exit;
+ USP_LOG_Error("%s: Failed to determine device IP families", __FUNCTION__);
+ return result;
}
- // Exit if unable to set the socket as non blocking
- // We do this before connecting so that we can timeout on connect taking too long
- err = fcntl(socket_fd, F_SETFL, O_NONBLOCK);
- if (err == -1)
+ if (ipv4_supported == false && ipv6_supported == false)
{
- USP_ERR_ERRNO("fcntl", errno);
- goto exit;
+ // couldn't determine device ip mode
+ USP_LOG_Error("%s: Could not determine device IP families", __FUNCTION__);
+ return result;
}
- // Exit if unable to connect to the MQTT Broker
- // NOTE: The connect is performed in non-blocking mode
- err = connect(socket_fd, (struct sockaddr *) &saddr, saddr_len);
- if ((err == -1) && (errno != EINPROGRESS))
+ if (ipv6_supported == false)
{
- USP_ERR_ERRNO("connect", errno);
- goto exit;
+ // device has only ipv4 address so only try with ipv4 addresses
+ USP_LOG_Debug("%s: Check connectivity with IPv4 only", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size);
}
-
- // Set up arguments for the select() call
- FD_ZERO(&writefds);
- FD_SET(socket_fd, &writefds);
- timeout.tv_sec = MQTT_CONNECT_TIMEOUT;
- timeout.tv_usec = 0;
-
- // Exit if the connect timed out
- num_sockets = select(socket_fd + 1, NULL, &writefds, NULL, &timeout);
- if (num_sockets == 0)
+ else if (ipv4_supported == false)
{
- USP_LOG_Error("%s: connect timed out", __FUNCTION__);
- goto exit;
+ // device has only ipv6 address so only try with ipv6 addresses
+ USP_LOG_Debug("%s: Check connectivity with IPv6 only", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size);
}
-
- // Exit if unable to determine whether the connect was successful or not
- err = getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, &so_err, &so_len);
- if (err == -1)
+ else
{
- USP_ERR_ERRNO("getsockopt", errno);
- goto exit;
- }
+ // device has both ipv6 and ipv4 address, so we need to search as per preference
+ if (prefer_ipv6 == true)
+ {
- // Exit if connect was not successful
- if (so_err != 0)
- {
- USP_LOG_Error("%s: async connect failed", __FUNCTION__);
- goto exit;
+ // First try with v6 then fallback to v4
+ USP_LOG_Debug("%s: Check connectivity with IPv6 first", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size);
+ if (result == false)
+ {
+ USP_LOG_Debug("%s: Check connectivity fallback with IPv4", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size);
+ }
+ }
+ else
+ {
+ // First try with v4 then fallback to v6
+ USP_LOG_Debug("%s: Check connectivity with IPv4 first", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size);
+ if (result == false)
+ {
+ USP_LOG_Debug("%s: Check connectivity fallback with IPv6", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size);
+ }
+ }
}
- // Connect was successful
- result = true;
-
-exit:
- // Tidy up
- if (socket_fd != INVALID)
- {
- close(socket_fd);
+ if (addr_list != NULL) {
+ (void) freeaddrinfo(addr_list);
}
return result;
@@ -2517,18 +2639,20 @@ void Connect(mqtt_client_t *client)
{
int err = USP_ERR_OK;
bool is_up;
+ char dst_ip[INET6_ADDRSTRLEN] = {0};
// Exit if after probing the MQTT Broker, it is not up
// In this case, we do not attempt to call libmosquitto's connect function, as that function will end up blocking for 2 minutes (since the MQTT broker is down)
- is_up = IsMqttBrokerUp(client);
- if (is_up == false)
+ is_up = IsMqttBrokerUp(client, dst_ip, sizeof(dst_ip));
+ if (is_up == false || strlen(dst_ip) == 0)
{
+ USP_LOG_Debug("%s: MQTT broker(%s) is not UP", __FUNCTION__, client->conn_params.host);
err = USP_ERR_INTERNAL_ERROR;
goto exit;
}
// Start the MQTT Connect
- err = PerformMqttClientConnect(client);
+ err = PerformMqttClientConnect(client, dst_ip);
// Exit if failed to connect
if (err != USP_ERR_OK)
@@ -2561,7 +2685,7 @@ exit:
** \return USP_ERR_INTERNAL_ERROR if failed to connect (and should retry)
**
**************************************************************************/
-int PerformMqttClientConnect(mqtt_client_t *client)
+int PerformMqttClientConnect(mqtt_client_t *client, const char *host_ip)
{
int version;
mosquitto_property *proplist = NULL;
@@ -2631,19 +2755,19 @@ int PerformMqttClientConnect(mqtt_client
// We do this to prevent the data model thread from potentially being blocked, whilst the connect call is taking place
OS_UTILS_UnlockMutex(&mqtt_access_mutex);
- USP_LOG_Info("Sending CONNECT frame to (host=%s, port=%d)", client->conn_params.host, client->conn_params.port);
+ USP_LOG_Info("Sending CONNECT frame to (host=%s, ip=%s, port=%d)", client->conn_params.host, host_ip, client->conn_params.port);
FRAME_TRACE_DUMP(client);
// Perform the TCP connect
// NOTE: TCP connect can block for around 2 minutes if the broker does not respond to the TCP handshake
if (version == kMqttProtocol_5_0)
{
- mosq_err = mosquitto_connect_bind_v5(client->mosq, client->conn_params.host, client->conn_params.port,
+ mosq_err = mosquitto_connect_bind_v5(client->mosq, host_ip, client->conn_params.port,
keep_alive, NULL, proplist);
}
else
{
- mosq_err = mosquitto_connect(client->mosq, client->conn_params.host, client->conn_params.port,
+ mosq_err = mosquitto_connect(client->mosq, host_ip, client->conn_params.port,
keep_alive);
}