obuspa: mqtt fallback IP family in dual-stack mode

This commit is contained in:
Suvendhu Hansa 2025-06-20 14:54:05 +05:30 committed by Vivek Kumar Dutta
parent f8d7788d6f
commit bb603cb492
No known key found for this signature in database
GPG key ID: 4E09F5AD8265FD4C
4 changed files with 405 additions and 4 deletions

View file

@ -5,13 +5,13 @@
include $(TOPDIR)/rules.mk include $(TOPDIR)/rules.mk
PKG_NAME:=obuspa PKG_NAME:=obuspa
PKG_VERSION:=10.0.0.11 PKG_VERSION:=10.0.0.12
LOCAL_DEV:=0 LOCAL_DEV:=0
ifneq ($(LOCAL_DEV),1) ifneq ($(LOCAL_DEV),1)
PKG_SOURCE_PROTO:=git PKG_SOURCE_PROTO:=git
PKG_SOURCE_URL:=https://dev.iopsys.eu/bbf/obuspa.git PKG_SOURCE_URL:=https://dev.iopsys.eu/bbf/obuspa.git
PKG_SOURCE_VERSION:=e258e1de0f3f679b56f729bd23c435986a5944ad PKG_SOURCE_VERSION:=00af30a16f3db3efba128c5aa32c2b8c940a3855
PKG_MAINTAINER:=Vivek Dutta <vivek.dutta@iopsys.eu> PKG_MAINTAINER:=Vivek Dutta <vivek.dutta@iopsys.eu>
PKG_SOURCE:=$(PKG_NAME)-$(PKG_VERSION)-$(PKG_SOURCE_VERSION).tar.gz PKG_SOURCE:=$(PKG_NAME)-$(PKG_VERSION)-$(PKG_SOURCE_VERSION).tar.gz
PKG_MIRROR_HASH:=skip PKG_MIRROR_HASH:=skip

View file

@ -10,4 +10,4 @@ config obuspa 'global'
#option client_cert '/etc/obuspa/client.pem' #option client_cert '/etc/obuspa/client.pem'
option log_dest 'syslog' option log_dest 'syslog'
option dm_caching_exclude '/etc/obuspa/transient_dm.json' option dm_caching_exclude '/etc/obuspa/transient_dm.json'
option dualstack_pref 'IPv6'

View file

@ -994,7 +994,7 @@ db_init()
#log "Create reset file ...." #log "Create reset file ...."
config_load $CONFIGURATION config_load $CONFIGURATION
config_get dualstack_pref global dualstack_pref "IPv4" config_get dualstack_pref global dualstack_pref "IPv6"
global_init global_init
config_foreach configure_localagent localagent config_foreach configure_localagent localagent

View file

@ -0,0 +1,401 @@
diff --git a/src/core/mqtt.c b/src/core/mqtt.c
index 70978501b1..96119fe080 100644
--- a/src/core/mqtt.c
+++ b/src/core/mqtt.c
@@ -53,6 +53,7 @@
#include <openssl/bio.h>
#include <openssl/err.h>
#include <openssl/x509v3.h>
+#include <netdb.h>
#include <mosquitto.h>
#include "mqtt.h"
@@ -201,8 +202,9 @@ int EnableMosquitto(mqtt_client_t *client);
void SetupCallbacks(mqtt_client_t *client);
void QueueUspConnectRecord_MQTT(mqtt_client_t *client, mtp_send_item_t *msi, char *controller_topic, time_t expiry_time);
int SendQueueHead(mqtt_client_t *client);
+bool check_connectivity(struct addrinfo *list_addr, unsigned int port, sa_family_t family, char *dst_ip, int size);
void Connect(mqtt_client_t *client);
-int PerformMqttClientConnect(mqtt_client_t *client);
+int PerformMqttClientConnect(mqtt_client_t *client, const char *host_ip);
int ConnectSetEncryption(mqtt_client_t *client);
void ConnectCallback(struct mosquitto *mosq, void *userdata, int result);
void ConnectV5Callback(struct mosquitto *mosq, void *userdata, int result, int flags, const mosquitto_property *props);
@@ -245,7 +247,7 @@ void HandleMqttReconnect(mqtt_client_t *client);
void HandleMqttReconnectAfterDisconnect(mqtt_client_t *client);
void HandleMqttDisconnect(mqtt_client_t *client);
void DisconnectIfAllSubscriptionsFailed(mqtt_client_t *client);
-bool IsMqttBrokerUp(mqtt_client_t *client);
+bool IsMqttBrokerUp(mqtt_client_t *client, char *dst_ip, int size);
void RemoveMqttQueueItem(mqtt_client_t *client, mqtt_send_item_t *queued_msg);
void RemoveExpiredMqttMessages(mqtt_client_t *client);
void ParseSubscribeTopicsFromConnack(mqtt_client_t *client, mosquitto_property *prop);
@@ -2350,6 +2352,143 @@ int SendQueueHead(mqtt_client_t *client)
return err;
}
+bool check_connectivity(struct addrinfo *list_addr, unsigned int port, sa_family_t family, char *dst_ip, int size)
+{
+ int err;
+ struct sockaddr_storage saddr;
+ socklen_t saddr_len;
+ sa_family_t sock_family;
+ int socket_fd = INVALID;
+ fd_set writefds;
+ struct timeval timeout;
+ int num_sockets;
+ int so_err;
+ socklen_t so_len = sizeof(so_err);
+ struct addrinfo *iterator;
+ struct sockaddr_in *a;
+ struct sockaddr_in6 *a6;
+ nu_ipaddr_t dst;
+
+ if (list_addr == NULL || dst_ip == NULL)
+ {
+ return false;
+ }
+
+ for (iterator=list_addr; iterator!=NULL; iterator=iterator->ai_next)
+ {
+ if (iterator->ai_family != family)
+ {
+ continue;
+ }
+
+ if (family == AF_INET)
+ {
+ a = (struct sockaddr_in *) iterator->ai_addr;
+ err = nu_ipaddr_from_inaddr(&a->sin_addr, &dst);
+
+ if (err != USP_ERR_OK)
+ {
+ USP_LOG_Error("%s: nu_ipaddr_from_inaddr() failed: %s", __FUNCTION__, strerror(err));
+ continue;
+ }
+ }
+ else
+ {
+ a6 = (struct sockaddr_in6 *) iterator->ai_addr;
+ err = nu_ipaddr_from_in6addr(&a6->sin6_addr, &dst);
+
+ if (err != USP_ERR_OK)
+ {
+ USP_LOG_Error("%s: nu_ipaddr_from_in6addr() failed: %s", __FUNCTION__, strerror(err));
+ continue;
+ }
+ }
+
+ // Next IP if unable to make a socket address structure to probe the MQTT server
+ err = nu_ipaddr_to_sockaddr(&dst, port, &saddr, &saddr_len);
+ if (err != USP_ERR_OK)
+ {
+ continue;
+ }
+
+ // Next IP if unable to determine which address family to use to contact the MQTT broker
+ err = nu_ipaddr_get_family(&dst, &sock_family);
+ if (err != USP_ERR_OK)
+ {
+ continue;
+ }
+
+ // Return if unable to create the socket
+ socket_fd = socket(sock_family, SOCK_STREAM, 0);
+ if (socket_fd == -1)
+ {
+ USP_LOG_Error("%s: failed to open socket", __FUNCTION__);
+ return false;
+ }
+
+ // Return if unable to set the socket as non blocking
+ // We do this before connecting so that we can timeout on connect taking too long
+ err = fcntl(socket_fd, F_SETFL, O_NONBLOCK);
+ if (err == -1)
+ {
+ USP_ERR_ERRNO("fcntl", errno);
+ close(socket_fd);
+ return false;
+ }
+
+ // Next IP if unable to connect to the MQTT Broker
+ // NOTE: The connect is performed in non-blocking mode
+ err = connect(socket_fd, (struct sockaddr *) &saddr, saddr_len);
+ if ((err == -1) && (errno != EINPROGRESS))
+ {
+ USP_ERR_ERRNO("connect", errno);
+ close(socket_fd);
+ continue;
+ }
+
+ // Set up arguments for the select() call
+ FD_ZERO(&writefds);
+ FD_SET(socket_fd, &writefds);
+ timeout.tv_sec = MQTT_CONNECT_TIMEOUT;
+ timeout.tv_usec = 0;
+
+ // Next IP if the connect timed out
+ num_sockets = select(socket_fd + 1, NULL, &writefds, NULL, &timeout);
+ if (num_sockets == 0)
+ {
+ USP_LOG_Error("%s: connect timed out", __FUNCTION__);
+ close(socket_fd);
+ continue;
+ }
+
+ // Next IP if unable to determine whether the connect was successful or not
+ err = getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, &so_err, &so_len);
+ if (err == -1)
+ {
+ USP_ERR_ERRNO("getsockopt", errno);
+ close(socket_fd);
+ continue;
+ }
+
+ // Next IP if connect was not successful
+ if (so_err != 0)
+ {
+ USP_LOG_Error("%s: async connect failed", __FUNCTION__);
+ close(socket_fd);
+ continue;
+ }
+
+ close(socket_fd);
+
+ // Connect was successful
+ nu_ipaddr_str(&dst, dst_ip, size);
+
+ return true;
+ }
+
+ return false;
+}
+
/*********************************************************************//**
**
** IsMqttBrokerUp
@@ -2364,109 +2503,92 @@ int SendQueueHead(mqtt_client_t *client)
** \return true if the MQTT Broker is up, false otherwise
**
**************************************************************************/
-bool IsMqttBrokerUp(mqtt_client_t *client)
+bool IsMqttBrokerUp(mqtt_client_t *client, char *dst_ip, int size)
{
int err;
bool prefer_ipv6;
- nu_ipaddr_t dst;
- struct sockaddr_storage saddr;
- socklen_t saddr_len;
- sa_family_t family;
- int socket_fd = INVALID;
- fd_set writefds;
- struct timeval timeout;
- int num_sockets;
- int so_err;
- socklen_t so_len = sizeof(so_err);
+ bool ipv4_supported = false;
+ bool ipv6_supported = false;
bool result = false;
+ struct addrinfo *addr_list = NULL;
+ struct addrinfo hints;
// Get the preference for IPv4 or IPv6, if dual stack
prefer_ipv6 = DEVICE_LOCAL_AGENT_GetDualStackPreference();
- // Exit if unable to determine the IP address of the MQTT Broker
- err = tw_ulib_diags_lookup_host(client->conn_params.host, AF_UNSPEC, prefer_ipv6, NULL, &dst);
- if (err != USP_ERR_OK)
- {
- goto exit;
- }
+ USP_LOG_Debug("%s: dual-stack preference is: %s", __FUNCTION__, prefer_ipv6 ? "IPv6" : "IPv4");
- // Exit if unable to make a socket address structure to probe the MQTT server
- err = nu_ipaddr_to_sockaddr(&dst, client->conn_params.port, &saddr, &saddr_len);
- if (err != USP_ERR_OK)
- {
- goto exit;
- }
+ // Initialise the hints to use, when obtaining the IP address of the specified host using getaddrinfo()
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_flags |= AI_ADDRCONFIG;
- // Exit if unable to determine which address family to use to contact the MQTT broker
- // NOTE: This shouldn't fail if tw_ulib_diags_lookup_host() is correct
- err = nu_ipaddr_get_family(&dst, &family);
- if (err != USP_ERR_OK)
+ err = getaddrinfo(client->conn_params.host, NULL, &hints, &addr_list);
+ if (err != USP_ERR_OK || addr_list == NULL)
{
- goto exit;
+ USP_ERR_SetMessage("%s: getaddrinfo() failed to resolve: %s", __FUNCTION__, client->conn_params.host);
+ return result;
}
- // Exit if unable to create the socket
- socket_fd = socket(family, SOCK_STREAM, 0);
- if (socket_fd == -1)
+ // Exit if unable to determine which address families are supported by the device
+ // NOTE: In theory, setting getaddrinfo hints to AI_ADDRCONFIG, should filter by supported address family
+ // However, unfortunately that flag does not take into account whether the address is globally routable (for IPv6) as well
+ err = nu_ipaddr_get_ip_supported_families(&ipv4_supported, &ipv6_supported);
+ if (err != USP_ERR_OK)
{
- goto exit;
+ USP_LOG_Error("%s: Failed to determine device IP families", __FUNCTION__);
+ return result;
}
- // Exit if unable to set the socket as non blocking
- // We do this before connecting so that we can timeout on connect taking too long
- err = fcntl(socket_fd, F_SETFL, O_NONBLOCK);
- if (err == -1)
+ if (ipv4_supported == false && ipv6_supported == false)
{
- USP_ERR_ERRNO("fcntl", errno);
- goto exit;
+ // couldn't determine device ip mode
+ USP_LOG_Error("%s: Could not determine device IP families", __FUNCTION__);
+ return result;
}
- // Exit if unable to connect to the MQTT Broker
- // NOTE: The connect is performed in non-blocking mode
- err = connect(socket_fd, (struct sockaddr *) &saddr, saddr_len);
- if ((err == -1) && (errno != EINPROGRESS))
+ if (ipv6_supported == false)
{
- USP_ERR_ERRNO("connect", errno);
- goto exit;
+ // device has only ipv4 address so only try with ipv4 addresses
+ USP_LOG_Debug("%s: Check connectivity with IPv4 only", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size);
}
-
- // Set up arguments for the select() call
- FD_ZERO(&writefds);
- FD_SET(socket_fd, &writefds);
- timeout.tv_sec = MQTT_CONNECT_TIMEOUT;
- timeout.tv_usec = 0;
-
- // Exit if the connect timed out
- num_sockets = select(socket_fd + 1, NULL, &writefds, NULL, &timeout);
- if (num_sockets == 0)
+ else if (ipv4_supported == false)
{
- USP_LOG_Error("%s: connect timed out", __FUNCTION__);
- goto exit;
+ // device has only ipv6 address so only try with ipv6 addresses
+ USP_LOG_Debug("%s: Check connectivity with IPv6 only", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size);
}
-
- // Exit if unable to determine whether the connect was successful or not
- err = getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, &so_err, &so_len);
- if (err == -1)
+ else
{
- USP_ERR_ERRNO("getsockopt", errno);
- goto exit;
- }
+ // device has both ipv6 and ipv4 address, so we need to search as per preference
+ if (prefer_ipv6 == true)
+ {
- // Exit if connect was not successful
- if (so_err != 0)
- {
- USP_LOG_Error("%s: async connect failed", __FUNCTION__);
- goto exit;
+ // First try with v6 then fallback to v4
+ USP_LOG_Debug("%s: Check connectivity with IPv6 first", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size);
+ if (result == false)
+ {
+ USP_LOG_Debug("%s: Check connectivity fallback with IPv4", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size);
+ }
+ }
+ else
+ {
+ // First try with v4 then fallback to v6
+ USP_LOG_Debug("%s: Check connectivity with IPv4 first", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size);
+ if (result == false)
+ {
+ USP_LOG_Debug("%s: Check connectivity fallback with IPv6", __FUNCTION__);
+ result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size);
+ }
+ }
}
- // Connect was successful
- result = true;
-
-exit:
- // Tidy up
- if (socket_fd != INVALID)
- {
- close(socket_fd);
+ if (addr_list != NULL) {
+ (void) freeaddrinfo(addr_list);
}
return result;
@@ -2487,18 +2609,20 @@ void Connect(mqtt_client_t *client)
{
int err = USP_ERR_OK;
bool is_up;
+ char dst_ip[INET6_ADDRSTRLEN] = {0};
// Exit if after probing the MQTT Broker, it is not up
// In this case, we do not attempt to call libmosquitto's connect function, as that function will end up blocking for 2 minutes (since the MQTT broker is down)
- is_up = IsMqttBrokerUp(client);
- if (is_up == false)
+ is_up = IsMqttBrokerUp(client, dst_ip, sizeof(dst_ip));
+ if (is_up == false || strlen(dst_ip) == 0)
{
+ USP_LOG_Debug("%s: MQTT broker(%s) is not UP", __FUNCTION__, client->conn_params.host);
err = USP_ERR_INTERNAL_ERROR;
goto exit;
}
// Start the MQTT Connect
- err = PerformMqttClientConnect(client);
+ err = PerformMqttClientConnect(client, dst_ip);
// Exit if failed to connect
if (err != USP_ERR_OK)
@@ -2531,7 +2655,7 @@ exit:
** \return USP_ERR_INTERNAL_ERROR if failed to connect (and should retry)
**
**************************************************************************/
-int PerformMqttClientConnect(mqtt_client_t *client)
+int PerformMqttClientConnect(mqtt_client_t *client, const char *host_ip)
{
int version;
mosquitto_property *proplist = NULL;
@@ -2601,19 +2725,19 @@ int PerformMqttClientConnect(mqtt_client_t *client)
// We do this to prevent the data model thread from potentially being blocked, whilst the connect call is taking place
OS_UTILS_UnlockMutex(&mqtt_access_mutex);
- USP_LOG_Info("Sending CONNECT frame to (host=%s, port=%d)", client->conn_params.host, client->conn_params.port);
+ USP_LOG_Info("Sending CONNECT frame to (host=%s, ip=%s, port=%d)", client->conn_params.host, host_ip, client->conn_params.port);
FRAME_TRACE_DUMP(client);
// Perform the TCP connect
// NOTE: TCP connect can block for around 2 minutes if the broker does not respond to the TCP handshake
if (version == kMqttProtocol_5_0)
{
- mosq_err = mosquitto_connect_bind_v5(client->mosq, client->conn_params.host, client->conn_params.port,
+ mosq_err = mosquitto_connect_bind_v5(client->mosq, host_ip, client->conn_params.port,
keep_alive, NULL, proplist);
}
else
{
- mosq_err = mosquitto_connect(client->mosq, client->conn_params.host, client->conn_params.port,
+ mosq_err = mosquitto_connect(client->mosq, host_ip, client->conn_params.port,
keep_alive);
}