--- a/src/core/mqtt.c +++ b/src/core/mqtt.c @@ -63,6 +63,8 @@ #include +#include + // Defines for MQTT Property Values #define PUBLISH 0x30 #define CONTENT_TYPE 3 @@ -2192,6 +2194,75 @@ exit: } } +static int _check_host_rechability(CURL *handle, curl_infotype type, char *data, size_t size, void *userp) +{ + bool *palive = (bool *)userp; + + USP_ASSERT(palive != NULL); + switch(type) { + case CURLINFO_HEADER_OUT: + case CURLINFO_HEADER_IN: + *palive = true; + break; + case CURLINFO_TEXT: + { + USP_LOG_Debug("CURL DATA:: [%s]", data); + if (strstr(data, "Connected to ") != NULL) { + *palive = true; + } + break; + } + default: + break; + } + + return 0; +} + +int check_mqtt_host_reachability(mqtt_client_t *client) +{ + CURL *curl; + mqtt_conn_params_t *cparam = &client->conn_params; + char buffer[128] = {0}; + int ret = USP_ERR_INTERNAL_ERROR; + bool is_alive = false; + + curl = curl_easy_init(); + if(curl) { + USP_SNPRINTF(buffer, 128, "mqtt://%s:%d", cparam->host, cparam->port); + curl_easy_setopt(curl, CURLOPT_URL, buffer); + + if (strlen(cparam->username) > 0) { + curl_easy_setopt(curl, CURLOPT_USERNAME, cparam->username); + } + + if (strlen(cparam->password) > 0) { + curl_easy_setopt(curl, CURLOPT_PASSWORD, cparam->password); + } + + curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); + curl_easy_setopt(curl, CURLOPT_DEBUGDATA, &is_alive); + curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, _check_host_rechability); + + /* complete within 2 seconds */ + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2L); + + ret = curl_easy_perform(curl); + /* Check for errors */ + if(ret == CURLE_OK || ret == CURLE_URL_MALFORMAT || is_alive == true) { + USP_LOG_Debug("CURL MQTT host %s, ret %d, alive %d ...", buffer, ret, is_alive); + ret = USP_ERR_OK; + } else { + USP_LOG_Info("# CURL MQTT host %s unreachable: %d=>%s ...", buffer, ret, curl_easy_strerror(ret)); + } + + /* always cleanup */ + curl_easy_cleanup(curl); + } + + return ret; +} + /*********************************************************************//** ** ** PerformMqttClientConnect @@ -2261,6 +2332,14 @@ int PerformMqttClientConnect(mqtt_client keep_alive = 5; } + // Below function is a workaround to check the host reachability with a timeout + // mosquitto_connect_* API block the thread for 2 mins if host is not reachable, + // which halts other clients connectivity + err = check_mqtt_host_reachability(client); + if (err != USP_ERR_OK) { + err = USP_ERR_INTERNAL_ERROR; + goto exit; + } // Release the access mutex temporarily whilst performing the connect call // We do this to prevent the data model thread from potentially being blocked, whilst the connect call is taking place OS_UTILS_UnlockMutex(&mqtt_access_mutex);