Index: obuspa-10.0.7.0/src/core/mqtt.c =================================================================== --- obuspa-10.0.7.0.orig/src/core/mqtt.c +++ obuspa-10.0.7.0/src/core/mqtt.c @@ -53,6 +53,7 @@ #include #include #include +#include #include #include "mqtt.h" @@ -206,8 +207,9 @@ int EnableMosquitto(mqtt_client_t *clien void SetupCallbacks(mqtt_client_t *client); void QueueUspConnectRecord_MQTT(mqtt_client_t *client, mtp_send_item_t *msi, char *controller_topic, time_t expiry_time); int SendQueueHead(mqtt_client_t *client); +bool check_connectivity(struct addrinfo *list_addr, unsigned int port, sa_family_t family, char *dst_ip, int size); void Connect(mqtt_client_t *client); -int PerformMqttClientConnect(mqtt_client_t *client); +int PerformMqttClientConnect(mqtt_client_t *client, const char *host_ip); int ConnectSetEncryption(mqtt_client_t *client); void ConnectCallback(struct mosquitto *mosq, void *userdata, int result); void ConnectV5Callback(struct mosquitto *mosq, void *userdata, int result, int flags, const mosquitto_property *props); @@ -250,7 +252,7 @@ void HandleMqttReconnect(mqtt_client_t * void HandleMqttReconnectAfterDisconnect(mqtt_client_t *client); void HandleMqttDisconnect(mqtt_client_t *client); void DisconnectIfAllSubscriptionsFailed(mqtt_client_t *client); -bool IsMqttBrokerUp(mqtt_client_t *client); +bool IsMqttBrokerUp(mqtt_client_t *client, char *dst_ip, int size); void RemoveMqttQueueItem(mqtt_client_t *client, mqtt_send_item_t *queued_msg); void RemoveExpiredMqttMessages(mqtt_client_t *client); void ParseSubscribeTopicsFromConnack(mqtt_client_t *client, mosquitto_property *prop); @@ -2380,6 +2382,143 @@ int SendQueueHead(mqtt_client_t *client) return err; } +bool check_connectivity(struct addrinfo *list_addr, unsigned int port, sa_family_t family, char *dst_ip, int size) +{ + int err; + struct sockaddr_storage saddr; + socklen_t saddr_len; + sa_family_t sock_family; + int socket_fd = INVALID; + fd_set writefds; + struct timeval timeout; + int num_sockets; + int so_err; + socklen_t so_len = sizeof(so_err); + struct addrinfo *iterator; + struct sockaddr_in *a; + struct sockaddr_in6 *a6; + nu_ipaddr_t dst; + + if (list_addr == NULL || dst_ip == NULL) + { + return false; + } + + for (iterator=list_addr; iterator!=NULL; iterator=iterator->ai_next) + { + if (iterator->ai_family != family) + { + continue; + } + + if (family == AF_INET) + { + a = (struct sockaddr_in *) iterator->ai_addr; + err = nu_ipaddr_from_inaddr(&a->sin_addr, &dst); + + if (err != USP_ERR_OK) + { + USP_LOG_Error("%s: nu_ipaddr_from_inaddr() failed: %s", __FUNCTION__, strerror(err)); + continue; + } + } + else + { + a6 = (struct sockaddr_in6 *) iterator->ai_addr; + err = nu_ipaddr_from_in6addr(&a6->sin6_addr, &dst); + + if (err != USP_ERR_OK) + { + USP_LOG_Error("%s: nu_ipaddr_from_in6addr() failed: %s", __FUNCTION__, strerror(err)); + continue; + } + } + + // Next IP if unable to make a socket address structure to probe the MQTT server + err = nu_ipaddr_to_sockaddr(&dst, port, &saddr, &saddr_len); + if (err != USP_ERR_OK) + { + continue; + } + + // Next IP if unable to determine which address family to use to contact the MQTT broker + err = nu_ipaddr_get_family(&dst, &sock_family); + if (err != USP_ERR_OK) + { + continue; + } + + // Return if unable to create the socket + socket_fd = socket(sock_family, SOCK_STREAM, 0); + if (socket_fd == -1) + { + USP_LOG_Error("%s: failed to open socket", __FUNCTION__); + return false; + } + + // Return if unable to set the socket as non blocking + // We do this before connecting so that we can timeout on connect taking too long + err = fcntl(socket_fd, F_SETFL, O_NONBLOCK); + if (err == -1) + { + USP_ERR_ERRNO("fcntl", errno); + close(socket_fd); + return false; + } + + // Next IP if unable to connect to the MQTT Broker + // NOTE: The connect is performed in non-blocking mode + err = connect(socket_fd, (struct sockaddr *) &saddr, saddr_len); + if ((err == -1) && (errno != EINPROGRESS)) + { + USP_ERR_ERRNO("connect", errno); + close(socket_fd); + continue; + } + + // Set up arguments for the select() call + FD_ZERO(&writefds); + FD_SET(socket_fd, &writefds); + timeout.tv_sec = MQTT_CONNECT_TIMEOUT; + timeout.tv_usec = 0; + + // Next IP if the connect timed out + num_sockets = select(socket_fd + 1, NULL, &writefds, NULL, &timeout); + if (num_sockets == 0) + { + USP_LOG_Error("%s: connect timed out", __FUNCTION__); + close(socket_fd); + continue; + } + + // Next IP if unable to determine whether the connect was successful or not + err = getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, &so_err, &so_len); + if (err == -1) + { + USP_ERR_ERRNO("getsockopt", errno); + close(socket_fd); + continue; + } + + // Next IP if connect was not successful + if (so_err != 0) + { + USP_LOG_Error("%s: async connect failed", __FUNCTION__); + close(socket_fd); + continue; + } + + close(socket_fd); + + // Connect was successful + nu_ipaddr_str(&dst, dst_ip, size); + + return true; + } + + return false; +} + /*********************************************************************//** ** ** IsMqttBrokerUp @@ -2394,109 +2533,92 @@ int SendQueueHead(mqtt_client_t *client) ** \return true if the MQTT Broker is up, false otherwise ** **************************************************************************/ -bool IsMqttBrokerUp(mqtt_client_t *client) +bool IsMqttBrokerUp(mqtt_client_t *client, char *dst_ip, int size) { int err; bool prefer_ipv6; - nu_ipaddr_t dst; - struct sockaddr_storage saddr; - socklen_t saddr_len; - sa_family_t family; - int socket_fd = INVALID; - fd_set writefds; - struct timeval timeout; - int num_sockets; - int so_err; - socklen_t so_len = sizeof(so_err); + bool ipv4_supported = false; + bool ipv6_supported = false; bool result = false; + struct addrinfo *addr_list = NULL; + struct addrinfo hints; // Get the preference for IPv4 or IPv6, if dual stack prefer_ipv6 = DEVICE_LOCAL_AGENT_GetDualStackPreference(); - // Exit if unable to determine the IP address of the MQTT Broker - err = tw_ulib_diags_lookup_host(client->conn_params.host, AF_UNSPEC, prefer_ipv6, NULL, &dst); - if (err != USP_ERR_OK) - { - goto exit; - } + USP_LOG_Debug("%s: dual-stack preference is: %s", __FUNCTION__, prefer_ipv6 ? "IPv6" : "IPv4"); - // Exit if unable to make a socket address structure to probe the MQTT server - err = nu_ipaddr_to_sockaddr(&dst, client->conn_params.port, &saddr, &saddr_len); - if (err != USP_ERR_OK) - { - goto exit; - } + // Initialise the hints to use, when obtaining the IP address of the specified host using getaddrinfo() + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_flags |= AI_ADDRCONFIG; - // Exit if unable to determine which address family to use to contact the MQTT broker - // NOTE: This shouldn't fail if tw_ulib_diags_lookup_host() is correct - err = nu_ipaddr_get_family(&dst, &family); - if (err != USP_ERR_OK) + err = getaddrinfo(client->conn_params.host, NULL, &hints, &addr_list); + if (err != USP_ERR_OK || addr_list == NULL) { - goto exit; + USP_ERR_SetMessage("%s: getaddrinfo() failed to resolve: %s", __FUNCTION__, client->conn_params.host); + return result; } - // Exit if unable to create the socket - socket_fd = socket(family, SOCK_STREAM, 0); - if (socket_fd == -1) + // Exit if unable to determine which address families are supported by the device + // NOTE: In theory, setting getaddrinfo hints to AI_ADDRCONFIG, should filter by supported address family + // However, unfortunately that flag does not take into account whether the address is globally routable (for IPv6) as well + err = nu_ipaddr_get_ip_supported_families(&ipv4_supported, &ipv6_supported); + if (err != USP_ERR_OK) { - goto exit; + USP_LOG_Error("%s: Failed to determine device IP families", __FUNCTION__); + return result; } - // Exit if unable to set the socket as non blocking - // We do this before connecting so that we can timeout on connect taking too long - err = fcntl(socket_fd, F_SETFL, O_NONBLOCK); - if (err == -1) + if (ipv4_supported == false && ipv6_supported == false) { - USP_ERR_ERRNO("fcntl", errno); - goto exit; + // couldn't determine device ip mode + USP_LOG_Error("%s: Could not determine device IP families", __FUNCTION__); + return result; } - // Exit if unable to connect to the MQTT Broker - // NOTE: The connect is performed in non-blocking mode - err = connect(socket_fd, (struct sockaddr *) &saddr, saddr_len); - if ((err == -1) && (errno != EINPROGRESS)) + if (ipv6_supported == false) { - USP_ERR_ERRNO("connect", errno); - goto exit; + // device has only ipv4 address so only try with ipv4 addresses + USP_LOG_Debug("%s: Check connectivity with IPv4 only", __FUNCTION__); + result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size); } - - // Set up arguments for the select() call - FD_ZERO(&writefds); - FD_SET(socket_fd, &writefds); - timeout.tv_sec = MQTT_CONNECT_TIMEOUT; - timeout.tv_usec = 0; - - // Exit if the connect timed out - num_sockets = select(socket_fd + 1, NULL, &writefds, NULL, &timeout); - if (num_sockets == 0) + else if (ipv4_supported == false) { - USP_LOG_Error("%s: connect timed out", __FUNCTION__); - goto exit; + // device has only ipv6 address so only try with ipv6 addresses + USP_LOG_Debug("%s: Check connectivity with IPv6 only", __FUNCTION__); + result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size); } - - // Exit if unable to determine whether the connect was successful or not - err = getsockopt(socket_fd, SOL_SOCKET, SO_ERROR, &so_err, &so_len); - if (err == -1) + else { - USP_ERR_ERRNO("getsockopt", errno); - goto exit; - } + // device has both ipv6 and ipv4 address, so we need to search as per preference + if (prefer_ipv6 == true) + { - // Exit if connect was not successful - if (so_err != 0) - { - USP_LOG_Error("%s: async connect failed", __FUNCTION__); - goto exit; + // First try with v6 then fallback to v4 + USP_LOG_Debug("%s: Check connectivity with IPv6 first", __FUNCTION__); + result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size); + if (result == false) + { + USP_LOG_Debug("%s: Check connectivity fallback with IPv4", __FUNCTION__); + result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size); + } + } + else + { + // First try with v4 then fallback to v6 + USP_LOG_Debug("%s: Check connectivity with IPv4 first", __FUNCTION__); + result = check_connectivity(addr_list, client->conn_params.port, AF_INET, dst_ip, size); + if (result == false) + { + USP_LOG_Debug("%s: Check connectivity fallback with IPv6", __FUNCTION__); + result = check_connectivity(addr_list, client->conn_params.port, AF_INET6, dst_ip, size); + } + } } - // Connect was successful - result = true; - -exit: - // Tidy up - if (socket_fd != INVALID) - { - close(socket_fd); + if (addr_list != NULL) { + (void) freeaddrinfo(addr_list); } return result; @@ -2517,18 +2639,20 @@ void Connect(mqtt_client_t *client) { int err = USP_ERR_OK; bool is_up; + char dst_ip[INET6_ADDRSTRLEN] = {0}; // Exit if after probing the MQTT Broker, it is not up // In this case, we do not attempt to call libmosquitto's connect function, as that function will end up blocking for 2 minutes (since the MQTT broker is down) - is_up = IsMqttBrokerUp(client); - if (is_up == false) + is_up = IsMqttBrokerUp(client, dst_ip, sizeof(dst_ip)); + if (is_up == false || strlen(dst_ip) == 0) { + USP_LOG_Debug("%s: MQTT broker(%s) is not UP", __FUNCTION__, client->conn_params.host); err = USP_ERR_INTERNAL_ERROR; goto exit; } // Start the MQTT Connect - err = PerformMqttClientConnect(client); + err = PerformMqttClientConnect(client, dst_ip); // Exit if failed to connect if (err != USP_ERR_OK) @@ -2561,7 +2685,7 @@ exit: ** \return USP_ERR_INTERNAL_ERROR if failed to connect (and should retry) ** **************************************************************************/ -int PerformMqttClientConnect(mqtt_client_t *client) +int PerformMqttClientConnect(mqtt_client_t *client, const char *host_ip) { int version; mosquitto_property *proplist = NULL; @@ -2631,19 +2755,19 @@ int PerformMqttClientConnect(mqtt_client // We do this to prevent the data model thread from potentially being blocked, whilst the connect call is taking place OS_UTILS_UnlockMutex(&mqtt_access_mutex); - USP_LOG_Info("Sending CONNECT frame to (host=%s, port=%d)", client->conn_params.host, client->conn_params.port); + USP_LOG_Info("Sending CONNECT frame to (host=%s, ip=%s, port=%d)", client->conn_params.host, host_ip, client->conn_params.port); FRAME_TRACE_DUMP(client); // Perform the TCP connect // NOTE: TCP connect can block for around 2 minutes if the broker does not respond to the TCP handshake if (version == kMqttProtocol_5_0) { - mosq_err = mosquitto_connect_bind_v5(client->mosq, client->conn_params.host, client->conn_params.port, + mosq_err = mosquitto_connect_bind_v5(client->mosq, host_ip, client->conn_params.port, keep_alive, NULL, proplist); } else { - mosq_err = mosquitto_connect(client->mosq, client->conn_params.host, client->conn_params.port, + mosq_err = mosquitto_connect(client->mosq, host_ip, client->conn_params.port, keep_alive); }