diff --git a/src/httpserver/http_fns.c b/src/httpserver/http_fns.c index 825fd9802..abdea3f58 100644 --- a/src/httpserver/http_fns.c +++ b/src/httpserver/http_fns.c @@ -1309,6 +1309,10 @@ int http_fn_ha_discovery(http_request_t* request) { sprintf(topic, "homeassistant"); //default discovery topic is `homeassistant` } + //Note: PublishChannels should be done for the last MQTT publish except for power measurement which always + //sends out MQTT updates. + bool ledDriverChipRunning = isLedDriverChipRunning(); + struct cJSON_Hooks hooks; hooks.malloc_fn = os_malloc; hooks.free_fn = os_free; @@ -1327,12 +1331,18 @@ int http_fn_ha_discovery(http_request_t* request) { //Invoke publishChannles after the last topic if (dev_info != NULL) { - MQTT_QueuePublishWithCommand(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN, PublishChannels); + PostPublishCommands ppCommand = PublishChannels; + + if (ledDriverChipRunning || (pwmCount > 0)) { + ppCommand = None; + } + + MQTT_QueuePublishWithCommand(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN, ppCommand); hass_free_device_info(dev_info); } } - if (pwmCount == 5 || isLedDriverChipRunning()) { + if (pwmCount == 5 || ledDriverChipRunning) { // Enable + RGB control + CW control dev_info = hass_init_light_device_info(ENTITY_LIGHT_RGBCW); MQTT_QueuePublishWithCommand(topic, dev_info->channel, hass_build_discovery_json(dev_info), OBK_PUBLISH_FLAG_RETAIN, PublishChannels); diff --git a/src/mqtt/new_mqtt.c b/src/mqtt/new_mqtt.c index 7b1e30efc..c3aed85b8 100644 --- a/src/mqtt/new_mqtt.c +++ b/src/mqtt/new_mqtt.c @@ -20,25 +20,25 @@ #endif int wal_stricmp(const char* a, const char* b) { - int ca, cb; - do { - ca = (unsigned char)*a++; - cb = (unsigned char)*b++; - ca = tolower(toupper(ca)); - cb = tolower(toupper(cb)); - } while ((ca == cb) && (ca != '\0')); - return ca - cb; + int ca, cb; + do { + ca = (unsigned char)*a++; + cb = (unsigned char)*b++; + ca = tolower(toupper(ca)); + cb = tolower(toupper(cb)); + } while ((ca == cb) && (ca != '\0')); + return ca - cb; } int wal_strnicmp(const char* a, const char* b, int count) { - int ca, cb; - do { - ca = (unsigned char)*a++; - cb = (unsigned char)*b++; - ca = tolower(toupper(ca)); - cb = tolower(toupper(cb)); - count--; - } while ((ca == cb) && (ca != '\0') && (count > 0)); - return ca - cb; + int ca, cb; + do { + ca = (unsigned char)*a++; + cb = (unsigned char)*b++; + ca = tolower(toupper(ca)); + cb = tolower(toupper(cb)); + count--; + } while ((ca == cb) && (ca != '\0') && (count > 0)); + return ca - cb; } #define MQTT_QUEUE_ITEM_IS_REUSABLE(x) (x->topic[0] == 0) @@ -64,10 +64,10 @@ static int mqtt_publish_errors = 0; static int mqtt_received_events = 0; typedef struct mqtt_callback_tag { - char* topic; - char* subscriptionTopic; - int ID; - mqtt_callback_fn callback; + char* topic; + char* subscriptionTopic; + int ID; + mqtt_callback_fn callback; } mqtt_callback_t; #define MAX_MQTT_CALLBACKS 32 @@ -96,15 +96,16 @@ int g_bPublishAllStatesNow = 0; #define PUBLISHITEM_QUEUED_VALUES -10 //Publish queued items //These values are dynamic -#define PUBLISHITEM_SELF_DYNAMIC_LIGHTSTATE -9 -#define PUBLISHITEM_SELF_DYNAMIC_LIGHTMODE -8 -#define PUBLISHITEM_SELF_DYNAMIC_DIMMER -7 -#define PUBLISHITEM_SELF_DATETIME -6 //Current unix datetime -#define PUBLISHITEM_SELF_SOCKETS -5 //Active sockets -#define PUBLISHITEM_SELF_RSSI -4 //Link strength -#define PUBLISHITEM_SELF_UPTIME -3 //Uptime -#define PUBLISHITEM_SELF_FREEHEAP -2 //Free heap -#define PUBLISHITEM_SELF_IP -1 //ip address +#define PUBLISHITEM_SELF_DATETIME -9 //Current unix datetime +#define PUBLISHITEM_SELF_SOCKETS -8 //Active sockets +#define PUBLISHITEM_SELF_RSSI -7 //Link strength +#define PUBLISHITEM_SELF_UPTIME -6 //Uptime +#define PUBLISHITEM_SELF_FREEHEAP -5 //Free heap +#define PUBLISHITEM_SELF_IP -4 //ip address + +#define PUBLISHITEM_SELF_DYNAMIC_LIGHTSTATE -3 +#define PUBLISHITEM_SELF_DYNAMIC_LIGHTMODE -2 +#define PUBLISHITEM_SELF_DYNAMIC_DIMMER -1 int g_publishItemIndex = PUBLISHITEM_ALL_INDEX_FIRST; static bool g_firstFullBroadcast = true; //Flag indicating that we need to do a full broadcast @@ -113,48 +114,48 @@ int g_memoryErrorsThisSession = 0; static SemaphoreHandle_t g_mutex = 0; static bool MQTT_Mutex_Take(int del) { - int taken; + int taken; - if (g_mutex == 0) - { - g_mutex = xSemaphoreCreateMutex(); - } - taken = xSemaphoreTake(g_mutex, del); - if (taken == pdTRUE) { - return true; - } - return false; + if (g_mutex == 0) + { + g_mutex = xSemaphoreCreateMutex(); + } + taken = xSemaphoreTake(g_mutex, del); + if (taken == pdTRUE) { + return true; + } + return false; } static void MQTT_Mutex_Free() { - xSemaphoreGive(g_mutex); + xSemaphoreGive(g_mutex); } void MQTT_PublishWholeDeviceState_Internal(bool bAll) { - g_bPublishAllStatesNow = 1; - if (bAll) { - g_publishItemIndex = PUBLISHITEM_ALL_INDEX_FIRST; - } - else { - g_publishItemIndex = PUBLISHITEM_DYNAMIC_INDEX_FIRST; - } + g_bPublishAllStatesNow = 1; + if (bAll) { + g_publishItemIndex = PUBLISHITEM_ALL_INDEX_FIRST; + } + else { + g_publishItemIndex = PUBLISHITEM_DYNAMIC_INDEX_FIRST; + } } void MQTT_PublishWholeDeviceState() { - //Publish all status items once. Publish only dynamic items after that. - MQTT_PublishWholeDeviceState_Internal(g_firstFullBroadcast); + //Publish all status items once. Publish only dynamic items after that. + MQTT_PublishWholeDeviceState_Internal(g_firstFullBroadcast); } void MQTT_PublishOnlyDeviceChannelsIfPossible() { - if (g_bPublishAllStatesNow == 1) - return; - g_bPublishAllStatesNow = 1; - //Start with channels - g_publishItemIndex = 0; + if (g_bPublishAllStatesNow == 1) + return; + g_bPublishAllStatesNow = 1; + //Start with light channels + g_publishItemIndex = PUBLISHITEM_SELF_DYNAMIC_LIGHTSTATE; } static struct mqtt_connect_client_info_t mqtt_client_info = @@ -180,303 +181,303 @@ static void mqtt_connection_cb(mqtt_client_t* client, void* arg, mqtt_connection int MQTT_GetConnectEvents(void) { - return mqtt_connect_events; + return mqtt_connect_events; } int MQTT_GetPublishEventCounter(void) { - return mqtt_published_events; + return mqtt_published_events; } int MQTT_GetPublishErrorCounter(void) { - return mqtt_publish_errors; + return mqtt_publish_errors; } int MQTT_GetReceivedEventCounter(void) { - return mqtt_received_events; + return mqtt_received_events; } int MQTT_GetConnectResult(void) { - return mqtt_connect_result; + return mqtt_connect_result; } //Based on mqtt_connection_status_t and https://www.nongnu.org/lwip/2_1_x/group__mqtt.html const char* get_callback_error(int reason) { - switch (reason) - { - case MQTT_CONNECT_REFUSED_PROTOCOL_VERSION: return "Refused protocol version"; - case MQTT_CONNECT_REFUSED_IDENTIFIER: return "Refused identifier"; - case MQTT_CONNECT_REFUSED_SERVER: return "Refused server"; - case MQTT_CONNECT_REFUSED_USERNAME_PASS: return "Refused user credentials"; - case MQTT_CONNECT_REFUSED_NOT_AUTHORIZED_: return "Refused not authorized"; - case MQTT_CONNECT_DISCONNECTED: return "Disconnected"; - case MQTT_CONNECT_TIMEOUT: return "Timeout"; - } - return ""; + switch (reason) + { + case MQTT_CONNECT_REFUSED_PROTOCOL_VERSION: return "Refused protocol version"; + case MQTT_CONNECT_REFUSED_IDENTIFIER: return "Refused identifier"; + case MQTT_CONNECT_REFUSED_SERVER: return "Refused server"; + case MQTT_CONNECT_REFUSED_USERNAME_PASS: return "Refused user credentials"; + case MQTT_CONNECT_REFUSED_NOT_AUTHORIZED_: return "Refused not authorized"; + case MQTT_CONNECT_DISCONNECTED: return "Disconnected"; + case MQTT_CONNECT_TIMEOUT: return "Timeout"; + } + return ""; } const char* get_error_name(int err) { - switch (err) - { - case ERR_OK: return "ERR_OK"; - case ERR_MEM: return "ERR_MEM"; - /** Buffer error. */ - case ERR_BUF: return "ERR_BUF"; - /** Timeout. */ - case ERR_TIMEOUT: return "ERR_TIMEOUT"; - /** Routing problem. */ - case ERR_RTE: return "ERR_RTE"; - /** Operation in progress */ - case ERR_INPROGRESS: return "ERR_INPROGRESS"; - /** Illegal value. */ - case ERR_VAL: return "ERR_VAL"; - /** Operation would block. */ - case ERR_WOULDBLOCK: return "ERR_WOULDBLOCK"; - /** Address in use. */ - case ERR_USE: return "ERR_USE"; + switch (err) + { + case ERR_OK: return "ERR_OK"; + case ERR_MEM: return "ERR_MEM"; + /** Buffer error. */ + case ERR_BUF: return "ERR_BUF"; + /** Timeout. */ + case ERR_TIMEOUT: return "ERR_TIMEOUT"; + /** Routing problem. */ + case ERR_RTE: return "ERR_RTE"; + /** Operation in progress */ + case ERR_INPROGRESS: return "ERR_INPROGRESS"; + /** Illegal value. */ + case ERR_VAL: return "ERR_VAL"; + /** Operation would block. */ + case ERR_WOULDBLOCK: return "ERR_WOULDBLOCK"; + /** Address in use. */ + case ERR_USE: return "ERR_USE"; #if defined(ERR_ALREADY) - /** Already connecting. */ - case ERR_ALREADY: return "ERR_ALREADY"; + /** Already connecting. */ + case ERR_ALREADY: return "ERR_ALREADY"; #endif - /** Conn already established.*/ - case ERR_ISCONN: return "ERR_ISCONN"; - /** Not connected. */ - case ERR_CONN: return "ERR_CONN"; - /** Low-level netif error */ - case ERR_IF: return "ERR_IF"; - /** Connection aborted. */ - case ERR_ABRT: return "ERR_ABRT"; - /** Connection reset. */ - case ERR_RST: return "ERR_RST"; - /** Connection closed. */ - case ERR_CLSD: return "ERR_CLSD"; - /** Illegal argument. */ - case ERR_ARG: return "ERR_ARG"; - } - return ""; + /** Conn already established.*/ + case ERR_ISCONN: return "ERR_ISCONN"; + /** Not connected. */ + case ERR_CONN: return "ERR_CONN"; + /** Low-level netif error */ + case ERR_IF: return "ERR_IF"; + /** Connection aborted. */ + case ERR_ABRT: return "ERR_ABRT"; + /** Connection reset. */ + case ERR_RST: return "ERR_RST"; + /** Connection closed. */ + case ERR_CLSD: return "ERR_CLSD"; + /** Illegal argument. */ + case ERR_ARG: return "ERR_ARG"; + } + return ""; } char* MQTT_GetStatusMessage(void) { - return mqtt_status_message; + return mqtt_status_message; } // this can REPLACE callbacks, since we MAY wish to change the root topic.... // in which case we would re-resigster all callbacks? int MQTT_RegisterCallback(const char* basetopic, const char* subscriptiontopic, int ID, mqtt_callback_fn callback) { - int index; - int i; - int subscribechange = 0; - if (!basetopic || !subscriptiontopic || !callback) { - return -1; - } - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT_RegisterCallback called for bT %s subT %s", basetopic, subscriptiontopic); + int index; + int i; + int subscribechange = 0; + if (!basetopic || !subscriptiontopic || !callback) { + return -1; + } + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT_RegisterCallback called for bT %s subT %s", basetopic, subscriptiontopic); - // find existing to replace - for (index = 0; index < numCallbacks; index++) { - if (callbacks[index]) { - if (callbacks[index]->ID == ID) { - break; - } - } - } + // find existing to replace + for (index = 0; index < numCallbacks; index++) { + if (callbacks[index]) { + if (callbacks[index]->ID == ID) { + break; + } + } + } - // find empty if any (empty by MQTT_RemoveCallback) - if (index == numCallbacks) { - for (index = 0; index < numCallbacks; index++) { - if (!callbacks[index]) { - break; - } - } - } + // find empty if any (empty by MQTT_RemoveCallback) + if (index == numCallbacks) { + for (index = 0; index < numCallbacks; index++) { + if (!callbacks[index]) { + break; + } + } + } - if (index >= MAX_MQTT_CALLBACKS) { - return -4; - } - if (!callbacks[index]) { - callbacks[index] = (mqtt_callback_t*)os_malloc(sizeof(mqtt_callback_t)); - if (callbacks[index] != 0) { - memset(callbacks[index], 0, sizeof(mqtt_callback_t)); - } - } - if (!callbacks[index]) { - return -2; - } - if (!callbacks[index]->topic || strcmp(callbacks[index]->topic, basetopic)) { - if (callbacks[index]->topic) { - os_free(callbacks[index]->topic); - } - callbacks[index]->topic = (char*)os_malloc(strlen(basetopic) + 1); - if (!callbacks[index]->topic) { - os_free(callbacks[index]); - return -3; - } - strcpy(callbacks[index]->topic, basetopic); - } + if (index >= MAX_MQTT_CALLBACKS) { + return -4; + } + if (!callbacks[index]) { + callbacks[index] = (mqtt_callback_t*)os_malloc(sizeof(mqtt_callback_t)); + if (callbacks[index] != 0) { + memset(callbacks[index], 0, sizeof(mqtt_callback_t)); + } + } + if (!callbacks[index]) { + return -2; + } + if (!callbacks[index]->topic || strcmp(callbacks[index]->topic, basetopic)) { + if (callbacks[index]->topic) { + os_free(callbacks[index]->topic); + } + callbacks[index]->topic = (char*)os_malloc(strlen(basetopic) + 1); + if (!callbacks[index]->topic) { + os_free(callbacks[index]); + return -3; + } + strcpy(callbacks[index]->topic, basetopic); + } - if (!callbacks[index]->subscriptionTopic || strcmp(callbacks[index]->subscriptionTopic, subscriptiontopic)) { - if (callbacks[index]->subscriptionTopic) { - os_free(callbacks[index]->subscriptionTopic); - } - callbacks[index]->subscriptionTopic = (char*)os_malloc(strlen(subscriptiontopic) + 1); - callbacks[index]->subscriptionTopic[0] = '\0'; - if (!callbacks[index]->subscriptionTopic) { - os_free(callbacks[index]->topic); - os_free(callbacks[index]); - return -3; - } + if (!callbacks[index]->subscriptionTopic || strcmp(callbacks[index]->subscriptionTopic, subscriptiontopic)) { + if (callbacks[index]->subscriptionTopic) { + os_free(callbacks[index]->subscriptionTopic); + } + callbacks[index]->subscriptionTopic = (char*)os_malloc(strlen(subscriptiontopic) + 1); + callbacks[index]->subscriptionTopic[0] = '\0'; + if (!callbacks[index]->subscriptionTopic) { + os_free(callbacks[index]->topic); + os_free(callbacks[index]); + return -3; + } - // find out if this subscription is new. - for (i = 0; i < numCallbacks; i++) { - if (callbacks[i]) { - if (callbacks[i]->subscriptionTopic && - !strcmp(callbacks[i]->subscriptionTopic, subscriptiontopic)) { - break; - } - } - } - strcpy(callbacks[index]->subscriptionTopic, subscriptiontopic); - // if this subscription is new, must reconnect - if (i == numCallbacks) { - subscribechange++; - } - } + // find out if this subscription is new. + for (i = 0; i < numCallbacks; i++) { + if (callbacks[i]) { + if (callbacks[i]->subscriptionTopic && + !strcmp(callbacks[i]->subscriptionTopic, subscriptiontopic)) { + break; + } + } + } + strcpy(callbacks[index]->subscriptionTopic, subscriptiontopic); + // if this subscription is new, must reconnect + if (i == numCallbacks) { + subscribechange++; + } + } - callbacks[index]->callback = callback; - if (index == numCallbacks) { - numCallbacks++; - } + callbacks[index]->callback = callback; + if (index == numCallbacks) { + numCallbacks++; + } - if (subscribechange) { - mqtt_reconnect = 8; - } - // success - return 0; + if (subscribechange) { + mqtt_reconnect = 8; + } + // success + return 0; } int MQTT_RemoveCallback(int ID) { - int index; + int index; - for (index = 0; index < numCallbacks; index++) { - if (callbacks[index]) { - if (callbacks[index]->ID == ID) { - if (callbacks[index]->topic) { - os_free(callbacks[index]->topic); - callbacks[index]->topic = NULL; - } - if (callbacks[index]->subscriptionTopic) { - os_free(callbacks[index]->subscriptionTopic); - callbacks[index]->subscriptionTopic = NULL; - } - os_free(callbacks[index]); - callbacks[index] = NULL; - mqtt_reconnect = 8; - return 1; - } - } - } - return 0; + for (index = 0; index < numCallbacks; index++) { + if (callbacks[index]) { + if (callbacks[index]->ID == ID) { + if (callbacks[index]->topic) { + os_free(callbacks[index]->topic); + callbacks[index]->topic = NULL; + } + if (callbacks[index]->subscriptionTopic) { + os_free(callbacks[index]->subscriptionTopic); + callbacks[index]->subscriptionTopic = NULL; + } + os_free(callbacks[index]); + callbacks[index] = NULL; + mqtt_reconnect = 8; + return 1; + } + } + } + return 0; } // this accepts obkXXXXXX//set to receive data to set channels int channelSet(obk_mqtt_request_t* request) { - // we only need a few bytes to receive a decimal number 0-100 - char copy[12]; - int len = request->receivedLen; - char* p = request->topic; - int channel = 0; - int iValue = 0; + // we only need a few bytes to receive a decimal number 0-100 + char copy[12]; + int len = request->receivedLen; + char* p = request->topic; + int channel = 0; + int iValue = 0; - addLogAdv(LOG_DEBUG, LOG_FEATURE_MQTT, "channelSet topic %i with arg %s", request->topic, request->received); + addLogAdv(LOG_DEBUG, LOG_FEATURE_MQTT, "channelSet topic %i with arg %s", request->topic, request->received); - // TODO: better - while (*p != '/') { - if (*p == 0) - return 0; - p++; - } - p++; - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "channelSet part topic %s", p); + // TODO: better + while (*p != '/') { + if (*p == 0) + return 0; + p++; + } + p++; + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "channelSet part topic %s", p); - if ((*p - '0' >= 0) && (*p - '0' <= 9)) { - channel = atoi(p); - } - else { - channel = -1; - } + if ((*p - '0' >= 0) && (*p - '0' <= 9)) { + channel = atoi(p); + } + else { + channel = -1; + } - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "channelSet channel %i", channel); + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "channelSet channel %i", channel); - // if channel out of range, stop here. - if ((channel < 0) || (channel > 32)) { - return 0; - } + // if channel out of range, stop here. + if ((channel < 0) || (channel > 32)) { + return 0; + } - // find something after channel - should be //set - while (*p != '/') { - if (*p == 0) - return 0; - p++; - } - p++; + // find something after channel - should be //set + while (*p != '/') { + if (*p == 0) + return 0; + p++; + } + p++; - // if not /set, then stop here - if (strcmp(p, "set")) { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "channelSet NOT 'set'"); - return 0; - } + // if not /set, then stop here + if (strcmp(p, "set")) { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "channelSet NOT 'set'"); + return 0; + } - if (len > sizeof(copy) - 1) { - len = sizeof(copy) - 1; - } + if (len > sizeof(copy) - 1) { + len = sizeof(copy) - 1; + } - strncpy(copy, (char*)request->received, len); - // strncpy does not terminate??!!!! - copy[len] = '\0'; + strncpy(copy, (char*)request->received, len); + // strncpy does not terminate??!!!! + copy[len] = '\0'; - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT client in mqtt_incoming_data_cb data is %s for ch %i\n", copy, channel); + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT client in mqtt_incoming_data_cb data is %s for ch %i\n", copy, channel); - iValue = atoi((char*)copy); - CHANNEL_Set(channel, iValue, 0); + iValue = atoi((char*)copy); + CHANNEL_Set(channel, iValue, 0); - // return 1 to stop processing callbacks here. - // return 0 to allow later callbacks to process this topic. - return 1; + // return 1 to stop processing callbacks here. + // return 0 to allow later callbacks to process this topic. + return 1; } // this accepts cmnd// to receive data to set channels int tasCmnd(obk_mqtt_request_t* request) { - // we only need a few bytes to receive a decimal number 0-100 - char copy[64]; - int len = request->receivedLen; - const char* p = request->topic; + // we only need a few bytes to receive a decimal number 0-100 + char copy[64]; + int len = request->receivedLen; + const char* p = request->topic; - // assume a string input here, copy and terminate - if (len > sizeof(copy) - 1) { - len = sizeof(copy) - 1; - } - strncpy(copy, (char*)request->received, len); - // strncpy does not terminate??!!!! - copy[len] = '\0'; + // assume a string input here, copy and terminate + if (len > sizeof(copy) - 1) { + len = sizeof(copy) - 1; + } + strncpy(copy, (char*)request->received, len); + // strncpy does not terminate??!!!! + copy[len] = '\0'; - // TODO: better - // skip to after second forward slash - while (*p != '/') { if (*p == 0) return 0; p++; } - p++; - while (*p != '/') { if (*p == 0) return 0; p++; } - p++; + // TODO: better + // skip to after second forward slash + while (*p != '/') { if (*p == 0) return 0; p++; } + p++; + while (*p != '/') { if (*p == 0) return 0; p++; } + p++; - // use command executor.... - CMD_ExecuteCommandArgs(p, copy, COMMAND_FLAG_SOURCE_MQTT); + // use command executor.... + CMD_ExecuteCommandArgs(p, copy, COMMAND_FLAG_SOURCE_MQTT); - // return 1 to stop processing callbacks here. - // return 0 to allow later callbacks to process this topic. - return 1; + // return 1 to stop processing callbacks here. + // return 0 to allow later callbacks to process this topic. + return 1; } //void MQTT_GetStats(int *outUsed, int *outMax, int *outFreeMem) { @@ -486,106 +487,107 @@ int tasCmnd(obk_mqtt_request_t* request) { // copied here because for some reason renames in sdk? static void MQTT_disconnect(mqtt_client_t* client) { - if (!client) - return; - // this is what it was renamed to. why? - mqtt_disconnect(client); + if (!client) + return; + // this is what it was renamed to. why? + mqtt_disconnect(client); } /* Called when publish is complete either with sucess or failure */ static void mqtt_pub_request_cb(void* arg, err_t result) { - if (result != ERR_OK) - { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish result: %d(%s)\n", result, get_error_name(result)); - mqtt_publish_errors++; - } + if (result != ERR_OK) + { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish result: %d(%s)\n", result, get_error_name(result)); + mqtt_publish_errors++; + } } // This publishes value to the specified topic/channel. static OBK_Publish_Result MQTT_PublishTopicToClient(mqtt_client_t* client, const char* sTopic, const char* sChannel, const char* sVal, int flags, bool appendGet) { - err_t err; - u8_t qos = 2; /* 0 1 or 2, see MQTT specification */ - u8_t retain = 0; /* No don't retain such crappy payload... */ - size_t sVal_len; + err_t err; + u8_t qos = 2; /* 0 1 or 2, see MQTT specification */ + u8_t retain = 0; /* No don't retain such crappy payload... */ + size_t sVal_len; - if (client == 0) - return OBK_PUBLISH_WAS_DISCONNECTED; + if (client == 0) + return OBK_PUBLISH_WAS_DISCONNECTED; - if (flags & OBK_PUBLISH_FLAG_MUTEX_SILENT) - { - if (MQTT_Mutex_Take(100) == 0) - { - return OBK_PUBLISH_MUTEX_FAIL; - } - } - else { - if (MQTT_Mutex_Take(500) == 0) - { - addLogAdv(LOG_ERROR, LOG_FEATURE_MQTT, "MQTT_PublishTopicToClient: mutex failed for %s=%s\r\n", sChannel, sVal); - return OBK_PUBLISH_MUTEX_FAIL; - } - } - if (flags & OBK_PUBLISH_FLAG_RETAIN) - { - retain = 1; - } - // global tool - if (CFG_HasFlag(OBK_FLAG_MQTT_ALWAYSSETRETAIN)) - { - retain = 1; - } + if (flags & OBK_PUBLISH_FLAG_MUTEX_SILENT) + { + if (MQTT_Mutex_Take(100) == 0) + { + return OBK_PUBLISH_MUTEX_FAIL; + } + } + else { + if (MQTT_Mutex_Take(500) == 0) + { + addLogAdv(LOG_ERROR, LOG_FEATURE_MQTT, "MQTT_PublishTopicToClient: mutex failed for %s=%s\r\n", sChannel, sVal); + return OBK_PUBLISH_MUTEX_FAIL; + } + } + if (flags & OBK_PUBLISH_FLAG_RETAIN) + { + retain = 1; + } + // global tool + if (CFG_HasFlag(OBK_FLAG_MQTT_ALWAYSSETRETAIN)) + { + retain = 1; + } - if (mqtt_client_is_connected(client) == 0) - { - g_my_reconnect_mqtt_after_time = 5; - MQTT_Mutex_Free(); - return OBK_PUBLISH_WAS_DISCONNECTED; - } + if (mqtt_client_is_connected(client) == 0) + { + g_my_reconnect_mqtt_after_time = 5; + MQTT_Mutex_Free(); + return OBK_PUBLISH_WAS_DISCONNECTED; + } - g_timeSinceLastMQTTPublish = 0; + g_timeSinceLastMQTTPublish = 0; - char* pub_topic = (char*)os_malloc(strlen(sTopic) + 1 + strlen(sChannel) + 5 + 1); //5 for /get - if ((pub_topic != NULL) && (sVal != NULL)) - { - sVal_len = strlen(sVal); - sprintf(pub_topic, "%s/%s%s", sTopic, sChannel, (appendGet == true ? "/get" : "")); - if (sVal_len < 128) - { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publishing val %s to %s retain=%i\n", sVal, pub_topic, retain); - } else { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publishing val (%d bytes) to %s retain=%i\n", sVal_len, pub_topic, retain); - } + char* pub_topic = (char*)os_malloc(strlen(sTopic) + 1 + strlen(sChannel) + 5 + 1); //5 for /get + if ((pub_topic != NULL) && (sVal != NULL)) + { + sVal_len = strlen(sVal); + sprintf(pub_topic, "%s/%s%s", sTopic, sChannel, (appendGet == true ? "/get" : "")); + if (sVal_len < 128) + { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publishing val %s to %s retain=%i\n", sVal, pub_topic, retain); + } + else { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publishing val (%d bytes) to %s retain=%i\n", sVal_len, pub_topic, retain); + } - err = mqtt_publish(client, pub_topic, sVal, strlen(sVal), qos, retain, mqtt_pub_request_cb, 0); - os_free(pub_topic); + err = mqtt_publish(client, pub_topic, sVal, strlen(sVal), qos, retain, mqtt_pub_request_cb, 0); + os_free(pub_topic); - if (err != ERR_OK) - { - if (err == ERR_CONN) - { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish err: ERR_CONN aka %d\n", err); - } - else if (err == ERR_MEM) { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish err: ERR_MEM aka %d\n", err); - g_memoryErrorsThisSession++; - } - else { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish err: %d\n", err); - } - mqtt_publish_errors++; - MQTT_Mutex_Free(); - return OBK_PUBLISH_MEM_FAIL; - } - mqtt_published_events++; - MQTT_Mutex_Free(); - return OBK_PUBLISH_OK; - } - else { - MQTT_Mutex_Free(); - return OBK_PUBLISH_MEM_FAIL; - } + if (err != ERR_OK) + { + if (err == ERR_CONN) + { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish err: ERR_CONN aka %d\n", err); + } + else if (err == ERR_MEM) { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish err: ERR_MEM aka %d\n", err); + g_memoryErrorsThisSession++; + } + else { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish err: %d\n", err); + } + mqtt_publish_errors++; + MQTT_Mutex_Free(); + return OBK_PUBLISH_MEM_FAIL; + } + mqtt_published_events++; + MQTT_Mutex_Free(); + return OBK_PUBLISH_OK; + } + else { + MQTT_Mutex_Free(); + return OBK_PUBLISH_MEM_FAIL; + } } // This is used to publish channel values in "obk0696FB33/1/get" format with numerical value, @@ -593,7 +595,7 @@ static OBK_Publish_Result MQTT_PublishTopicToClient(mqtt_client_t* client, const // for example, "obk0696FB33/voltage/get" is used to publish voltage from the sensor static OBK_Publish_Result MQTT_PublishMain(mqtt_client_t* client, const char* sChannel, const char* sVal, int flags, bool appendGet) { - return MQTT_PublishTopicToClient(mqtt_client, CFG_GetMQTTClientId(), sChannel, sVal, flags, appendGet); + return MQTT_PublishTopicToClient(mqtt_client, CFG_GetMQTTClientId(), sChannel, sVal, flags, appendGet); } /// @brief Publish a MQTT message immediately. @@ -604,81 +606,81 @@ static OBK_Publish_Result MQTT_PublishMain(mqtt_client_t* client, const char* sC /// @return OBK_Publish_Result MQTT_Publish(char* sTopic, char* sChannel, char* sVal, int flags) { - return MQTT_PublishTopicToClient(mqtt_client, sTopic, sChannel, sVal, flags, false); + return MQTT_PublishTopicToClient(mqtt_client, sTopic, sChannel, sVal, flags, false); } void MQTT_OBK_Printf(char* s) { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, s); + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, s); } static void mqtt_incoming_data_cb(void* arg, const u8_t* data, u16_t len, u8_t flags) { - int i; - // unused - left here as example - //const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; + int i; + // unused - left here as example + //const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; - // if we stored a topic in g_mqtt_request, then we found a matching callback, so use it. - if (g_mqtt_request.topic[0]) - { - // note: data is NOT terminated (it may be binary...). - g_mqtt_request.received = data; - g_mqtt_request.receivedLen = len; + // if we stored a topic in g_mqtt_request, then we found a matching callback, so use it. + if (g_mqtt_request.topic[0]) + { + // note: data is NOT terminated (it may be binary...). + g_mqtt_request.received = data; + g_mqtt_request.receivedLen = len; - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT in topic %s", g_mqtt_request.topic); - mqtt_received_events++; + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT in topic %s", g_mqtt_request.topic); + mqtt_received_events++; - for (i = 0; i < numCallbacks; i++) - { - char* cbtopic = callbacks[i]->topic; - if (!strncmp(g_mqtt_request.topic, cbtopic, strlen(cbtopic))) - { - // note - callback must return 1 to say it ate the mqtt, else further processing can be performed. - // i.e. multiple people can get each topic if required. - if (callbacks[i]->callback(&g_mqtt_request)) - { - return; - } - } - } - } + for (i = 0; i < numCallbacks; i++) + { + char* cbtopic = callbacks[i]->topic; + if (!strncmp(g_mqtt_request.topic, cbtopic, strlen(cbtopic))) + { + // note - callback must return 1 to say it ate the mqtt, else further processing can be performed. + // i.e. multiple people can get each topic if required. + if (callbacks[i]->callback(&g_mqtt_request)) + { + return; + } + } + } + } } static void mqtt_incoming_publish_cb(void* arg, const char* topic, u32_t tot_len) { - //const char *p; - int i; - // unused - left here as example - //const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; + //const char *p; + int i; + // unused - left here as example + //const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; - // look for a callback with this URL and method, or HTTP_ANY - g_mqtt_request.topic[0] = '\0'; - for (i = 0; i < numCallbacks; i++) - { - char* cbtopic = callbacks[i]->topic; - if (strncmp(topic, cbtopic, strlen(cbtopic))) - { - strncpy(g_mqtt_request.topic, topic, sizeof(g_mqtt_request.topic) - 1); - g_mqtt_request.topic[sizeof(g_mqtt_request.topic) - 1] = 0; - break; - } - } - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT client in mqtt_incoming_publish_cb topic %s\n", topic); + // look for a callback with this URL and method, or HTTP_ANY + g_mqtt_request.topic[0] = '\0'; + for (i = 0; i < numCallbacks; i++) + { + char* cbtopic = callbacks[i]->topic; + if (strncmp(topic, cbtopic, strlen(cbtopic))) + { + strncpy(g_mqtt_request.topic, topic, sizeof(g_mqtt_request.topic) - 1); + g_mqtt_request.topic[sizeof(g_mqtt_request.topic) - 1] = 0; + break; + } + } + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT client in mqtt_incoming_publish_cb topic %s\n", topic); } static void mqtt_request_cb(void* arg, err_t err) { - const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; + const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT client \"%s\" request cb: err %d\n", client_info->client_id, (int)err); + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT client \"%s\" request cb: err %d\n", client_info->client_id, (int)err); } static void mqtt_sub_request_cb(void* arg, err_t result) { - /* Just print the result code here for simplicity, - normal behaviour would be to take some action if subscribe fails like - notifying user, retry subscribe or disconnect from server */ - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Subscribe result: %i\n", result); + /* Just print the result code here for simplicity, + normal behaviour would be to take some action if subscribe fails like + notifying user, retry subscribe or disconnect from server */ + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Subscribe result: %i\n", result); } static void mqtt_connection_cb(mqtt_client_t* client, void* arg, mqtt_connection_status_t status) @@ -793,11 +795,11 @@ static void MQTT_do_connect(mqtt_client_t* client) mqtt_client_info.will_topic = will_topic; mqtt_client_info.will_msg = "offline"; mqtt_client_info.will_retain = true, - mqtt_client_info.will_qos = 2, + mqtt_client_info.will_qos = 2, - hostEntry = gethostbyname(mqtt_host); - if (NULL != hostEntry) - { + hostEntry = gethostbyname(mqtt_host); + if (NULL != hostEntry) + { if (hostEntry->h_addr_list && hostEntry->h_addr_list[0]) { int len = hostEntry->h_length; if (len > 4) { @@ -846,54 +848,54 @@ static void MQTT_do_connect(mqtt_client_t* client) OBK_Publish_Result MQTT_PublishMain_StringInt(const char* sChannel, int iv) { - char valueStr[16]; + char valueStr[16]; - sprintf(valueStr, "%i", iv); + sprintf(valueStr, "%i", iv); - return MQTT_PublishMain(mqtt_client, sChannel, valueStr, 0, true); + return MQTT_PublishMain(mqtt_client, sChannel, valueStr, 0, true); } OBK_Publish_Result MQTT_PublishMain_StringFloat(const char* sChannel, float f) { - char valueStr[16]; + char valueStr[16]; - sprintf(valueStr, "%f", f); + sprintf(valueStr, "%f", f); - return MQTT_PublishMain(mqtt_client, sChannel, valueStr, 0, true); + return MQTT_PublishMain(mqtt_client, sChannel, valueStr, 0, true); } OBK_Publish_Result MQTT_PublishMain_StringString(const char* sChannel, const char* valueStr, int flags) { - return MQTT_PublishMain(mqtt_client, sChannel, valueStr, flags, true); + return MQTT_PublishMain(mqtt_client, sChannel, valueStr, flags, true); } OBK_Publish_Result MQTT_ChannelChangeCallback(int channel, int iVal) { - char channelNameStr[8]; - char valueStr[16]; + char channelNameStr[8]; + char valueStr[16]; - addLogAdv(LOG_INFO, LOG_FEATURE_MAIN, "Channel has changed! Publishing change %i with %i \n", channel, iVal); + addLogAdv(LOG_INFO, LOG_FEATURE_MAIN, "Channel has changed! Publishing change %i with %i \n", channel, iVal); - sprintf(channelNameStr, "%i", channel); - sprintf(valueStr, "%i", iVal); + sprintf(channelNameStr, "%i", channel); + sprintf(valueStr, "%i", iVal); - return MQTT_PublishMain(mqtt_client, channelNameStr, valueStr, 0, true); + return MQTT_PublishMain(mqtt_client, channelNameStr, valueStr, 0, true); } OBK_Publish_Result MQTT_ChannelPublish(int channel, int flags) { - char channelNameStr[8]; - char valueStr[16]; - int iValue; + char channelNameStr[8]; + char valueStr[16]; + int iValue; - iValue = CHANNEL_Get(channel); + iValue = CHANNEL_Get(channel); - addLogAdv(LOG_INFO, LOG_FEATURE_MAIN, "Forced channel publish! Publishing val %i with %i \n", channel, iValue); + addLogAdv(LOG_INFO, LOG_FEATURE_MAIN, "Forced channel publish! Publishing val %i to %i", iValue, channel); - sprintf(channelNameStr, "%i", channel); - sprintf(valueStr, "%i", iValue); + sprintf(channelNameStr, "%i", channel); + sprintf(valueStr, "%i", iValue); - return MQTT_PublishMain(mqtt_client, channelNameStr, valueStr, flags, true); + return MQTT_PublishMain(mqtt_client, channelNameStr, valueStr, flags, true); } // This console command will trigger a publish of all used variables (channels and extra stuff) OBK_Publish_Result MQTT_PublishAll(const void* context, const char* cmd, const char* args, int cmdFlags) { @@ -906,21 +908,21 @@ OBK_Publish_Result MQTT_PublishChannels(const void* context, const char* cmd, co return 1;// TODO make return values consistent for all console commands } OBK_Publish_Result MQTT_PublishCommand(const void* context, const char* cmd, const char* args, int cmdFlags) { - const char* topic, * value; - OBK_Publish_Result ret; + const char* topic, * value; + OBK_Publish_Result ret; - Tokenizer_TokenizeString(args,0); + Tokenizer_TokenizeString(args, 0); - if (Tokenizer_GetArgsCount() < 2) { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish command requires two arguments (topic and value)"); - return 0; - } - topic = Tokenizer_GetArg(0); - value = Tokenizer_GetArg(1); + if (Tokenizer_GetArgsCount() < 2) { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Publish command requires two arguments (topic and value)"); + return 0; + } + topic = Tokenizer_GetArg(0); + value = Tokenizer_GetArg(1); - ret = MQTT_PublishMain_StringString(topic, value, 0); + ret = MQTT_PublishMain_StringString(topic, value, 0); - return ret; + return ret; } /**************************************************************************************************** @@ -930,100 +932,102 @@ OBK_Publish_Result MQTT_PublishCommand(const void* context, const char* cmd, con typedef struct BENCHMARK_TEST_INFO { - portTickType TestStartTick; - portTickType TestStopTick; - long msg_cnt; - long msg_num; - char topic[256]; - char value[256]; - float bench_time; - float bench_rate; - bool report_published; + portTickType TestStartTick; + portTickType TestStopTick; + long msg_cnt; + long msg_num; + char topic[256]; + char value[256]; + float bench_time; + float bench_rate; + bool report_published; } BENCHMARK_TEST_INFO; #ifndef portTICK_RATE_MS #define portTICK_RATE_MS ( ( portTickType ) 1000 / configTICK_RATE_HZ ) #endif -void MQTT_Test_Tick(void *param) +void MQTT_Test_Tick(void* param) { - BENCHMARK_TEST_INFO *info = (BENCHMARK_TEST_INFO*)param; - int block = 1; - err_t err; - int qos = 2; - int retain = 0; + BENCHMARK_TEST_INFO* info = (BENCHMARK_TEST_INFO*)param; + int block = 1; + err_t err; + int qos = 2; + int retain = 0; - if (info != NULL) - { - while (1) - { - if (mqtt_client_is_connected(mqtt_client) == 0) - break; - if (info->msg_cnt < info->msg_num) - { - sprintf(info->value, "TestMSG: %li/%li Time: %i s, Rate: %i msg/s", info->msg_cnt, info->msg_num, - (int)info->bench_time, (int)info->bench_rate); - err = mqtt_publish(mqtt_client, info->topic, info->value, strlen(info->value), qos, retain, mqtt_pub_request_cb, 0); - if (err == ERR_OK) - { - /* MSG published */ - info->msg_cnt++; - info->TestStopTick = xTaskGetTickCount(); - /* calculate stats */ - info->bench_time = (float)(info->TestStopTick - info->TestStartTick); - info->bench_time /= (float)(1000 / portTICK_RATE_MS); - info->bench_rate = (float)info->msg_cnt; - if (info->bench_time != 0.0) - info->bench_rate /= info->bench_time; - block--; - if (block <= 0) - break; - } else { - /* MSG not published, error occured */ - break; - } - } else { - /* All messages publiched */ - if (info->report_published == false) - { - /* Publish report */ - sprintf(info->value, "Benchmark completed. %li msg published. Total Time: %i s MsgRate: %i msg/s", - info->msg_cnt, (int)info->bench_time, (int)info->bench_rate); - err = mqtt_publish(mqtt_client, info->topic, info->value, strlen(info->value), qos, retain, mqtt_pub_request_cb, 0); - if (err == ERR_OK) - { - /* Report published */ - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, info->value); - info->report_published = true; - /* Stop timer */ - } - } - break; - } - } - } + if (info != NULL) + { + while (1) + { + if (mqtt_client_is_connected(mqtt_client) == 0) + break; + if (info->msg_cnt < info->msg_num) + { + sprintf(info->value, "TestMSG: %li/%li Time: %i s, Rate: %i msg/s", info->msg_cnt, info->msg_num, + (int)info->bench_time, (int)info->bench_rate); + err = mqtt_publish(mqtt_client, info->topic, info->value, strlen(info->value), qos, retain, mqtt_pub_request_cb, 0); + if (err == ERR_OK) + { + /* MSG published */ + info->msg_cnt++; + info->TestStopTick = xTaskGetTickCount(); + /* calculate stats */ + info->bench_time = (float)(info->TestStopTick - info->TestStartTick); + info->bench_time /= (float)(1000 / portTICK_RATE_MS); + info->bench_rate = (float)info->msg_cnt; + if (info->bench_time != 0.0) + info->bench_rate /= info->bench_time; + block--; + if (block <= 0) + break; + } + else { + /* MSG not published, error occured */ + break; + } + } + else { + /* All messages publiched */ + if (info->report_published == false) + { + /* Publish report */ + sprintf(info->value, "Benchmark completed. %li msg published. Total Time: %i s MsgRate: %i msg/s", + info->msg_cnt, (int)info->bench_time, (int)info->bench_rate); + err = mqtt_publish(mqtt_client, info->topic, info->value, strlen(info->value), qos, retain, mqtt_pub_request_cb, 0); + if (err == ERR_OK) + { + /* Report published */ + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, info->value); + info->report_published = true; + /* Stop timer */ + } + } + break; + } + } + } } -static BENCHMARK_TEST_INFO *info = NULL; +static BENCHMARK_TEST_INFO* info = NULL; #if WINDOWS #elif PLATFORM_BL602 -static void mqtt_timer_thread(void *param) +static void mqtt_timer_thread(void* param) { - while(1) - { - vTaskDelay(MQTT_TMR_DURATION); - MQTT_Test_Tick(param); - } + while (1) + { + vTaskDelay(MQTT_TMR_DURATION); + MQTT_Test_Tick(param); + } } #elif PLATFORM_W600 || PLATFORM_W800 -static void mqtt_timer_thread(void *param) +static void mqtt_timer_thread(void* param) { - while(1) { - vTaskDelay(MQTT_TMR_DURATION); - MQTT_Test_Tick(param); - } + while (1) { + vTaskDelay(MQTT_TMR_DURATION); + MQTT_Test_Tick(param); + } } #elif PLATFORM_XR809 static OS_Timer_t timer; @@ -1033,65 +1037,65 @@ static beken_timer_t g_mqtt_timer; int MQTT_StartMQTTTestThread(const void* context, const char* cmd, const char* args, int cmdFlags) { - if (info != NULL) - { - /* Benchmark test already started */ - /* try to restart */ - info->TestStartTick = xTaskGetTickCount(); - info->msg_cnt = 0; - info->report_published = false; - return 0; - } + if (info != NULL) + { + /* Benchmark test already started */ + /* try to restart */ + info->TestStartTick = xTaskGetTickCount(); + info->msg_cnt = 0; + info->report_published = false; + return 0; + } - info = (BENCHMARK_TEST_INFO*)os_malloc(sizeof(BENCHMARK_TEST_INFO)); - if (info == NULL) - { - return -1; - } + info = (BENCHMARK_TEST_INFO*)os_malloc(sizeof(BENCHMARK_TEST_INFO)); + if (info == NULL) + { + return -1; + } - memset(info, 0, sizeof(BENCHMARK_TEST_INFO)); - info->TestStartTick = xTaskGetTickCount(); - info->msg_num = 1000; - sprintf(info->topic, "%s/benchmark", CFG_GetMQTTClientId()); + memset(info, 0, sizeof(BENCHMARK_TEST_INFO)); + info->TestStartTick = xTaskGetTickCount(); + info->msg_num = 1000; + sprintf(info->topic, "%s/benchmark", CFG_GetMQTTClientId()); #if WINDOWS #elif PLATFORM_BL602 - xTaskCreate(mqtt_timer_thread, "mqtt", 1024, (void *)info, 15, NULL); + xTaskCreate(mqtt_timer_thread, "mqtt", 1024, (void*)info, 15, NULL); #elif PLATFORM_W600 || PLATFORM_W800 - xTaskCreate(mqtt_timer_thread, "mqtt", 1024, (void *)info, 15, NULL); + xTaskCreate(mqtt_timer_thread, "mqtt", 1024, (void*)info, 15, NULL); #elif PLATFORM_XR809 - OS_TimerSetInvalid(&timer); - if (OS_TimerCreate(&timer, OS_TIMER_PERIODIC, MQTT_Test_Tick, (void *)info, MQTT_TMR_DURATION) != OS_OK) - { - printf("PIN_AddCommands timer create failed\n"); - return -1; - } - OS_TimerStart(&timer); /* start OS timer to feed watchdog */ + OS_TimerSetInvalid(&timer); + if (OS_TimerCreate(&timer, OS_TIMER_PERIODIC, MQTT_Test_Tick, (void*)info, MQTT_TMR_DURATION) != OS_OK) + { + printf("PIN_AddCommands timer create failed\n"); + return -1; + } + OS_TimerStart(&timer); /* start OS timer to feed watchdog */ #else - OSStatus result; + OSStatus result; - result = rtos_init_timer(&g_mqtt_timer, MQTT_TMR_DURATION, MQTT_Test_Tick, (void *)info); - ASSERT(kNoErr == result); - result = rtos_start_timer(&g_mqtt_timer); - ASSERT(kNoErr == result); + result = rtos_init_timer(&g_mqtt_timer, MQTT_TMR_DURATION, MQTT_Test_Tick, (void*)info); + ASSERT(kNoErr == result); + result = rtos_start_timer(&g_mqtt_timer); + ASSERT(kNoErr == result); #endif - return 0; + return 0; } /**************************************************************************************************** * ****************************************************************************************************/ -// initialise things MQTT -// called from user_main + // initialise things MQTT + // called from user_main void MQTT_init() { - char cbtopicbase[CGF_MQTT_CLIENT_ID_SIZE + 16]; - char cbtopicsub[CGF_MQTT_CLIENT_ID_SIZE + 16]; - const char* clientId; + char cbtopicbase[CGF_MQTT_CLIENT_ID_SIZE + 16]; + char cbtopicsub[CGF_MQTT_CLIENT_ID_SIZE + 16]; + const char* clientId; - clientId = CFG_GetMQTTClientId(); + clientId = CFG_GetMQTTClientId(); // register the main set channel callback snprintf(cbtopicbase, sizeof(cbtopicbase), "%s/", clientId); @@ -1105,283 +1109,281 @@ void MQTT_init() // note: this may REPLACE an existing entry with the same ID. ID 2 !!! MQTT_RegisterCallback(cbtopicbase, cbtopicsub, 2, tasCmnd); - mqtt_initialised = 1; + mqtt_initialised = 1; CMD_RegisterCommand(MQTT_COMMAND_PUBLISH, "", MQTT_PublishCommand, "Sqqq", NULL); CMD_RegisterCommand(MQTT_COMMAND_PUBLISH_ALL, "", MQTT_PublishAll, "Sqqq", NULL); CMD_RegisterCommand(MQTT_COMMAND_PUBLISH_CHANNELS, "", MQTT_PublishChannels, "Sqqq", NULL); - CMD_RegisterCommand(MQTT_COMMAND_PUBLISH_BENCHMARK, "", MQTT_StartMQTTTestThread, "", NULL); + CMD_RegisterCommand(MQTT_COMMAND_PUBLISH_BENCHMARK, "", MQTT_StartMQTTTestThread, "", NULL); } OBK_Publish_Result MQTT_DoItemPublishString(const char* sChannel, const char* valueStr) { - return MQTT_PublishMain(mqtt_client, sChannel, valueStr, OBK_PUBLISH_FLAG_MUTEX_SILENT, false); + return MQTT_PublishMain(mqtt_client, sChannel, valueStr, OBK_PUBLISH_FLAG_MUTEX_SILENT, false); } OBK_Publish_Result MQTT_DoItemPublish(int idx) { - char dataStr[3 * 6 + 1]; //This is sufficient to hold mac value + char dataStr[3 * 6 + 1]; //This is sufficient to hold mac value - switch (idx) { - case PUBLISHITEM_SELF_STATIC_RESERVED_2: - case PUBLISHITEM_SELF_STATIC_RESERVED_1: - return OBK_PUBLISH_WAS_NOT_REQUIRED; + switch (idx) { + case PUBLISHITEM_SELF_STATIC_RESERVED_2: + case PUBLISHITEM_SELF_STATIC_RESERVED_1: + return OBK_PUBLISH_WAS_NOT_REQUIRED; - case PUBLISHITEM_QUEUED_VALUES: - return PublishQueuedItems(); + case PUBLISHITEM_QUEUED_VALUES: + return PublishQueuedItems(); - case PUBLISHITEM_SELF_DYNAMIC_LIGHTSTATE: - return LED_IsRunningDriver() ? LED_SendEnableAllState() : OBK_PUBLISH_WAS_NOT_REQUIRED; - case PUBLISHITEM_SELF_DYNAMIC_LIGHTMODE: - return LED_IsRunningDriver() ? LED_SendCurrentLightMode() : OBK_PUBLISH_WAS_NOT_REQUIRED; - case PUBLISHITEM_SELF_DYNAMIC_DIMMER: - return LED_IsRunningDriver() ? LED_SendDimmerChange() : OBK_PUBLISH_WAS_NOT_REQUIRED; + case PUBLISHITEM_SELF_DYNAMIC_LIGHTSTATE: + return LED_IsRunningDriver() ? LED_SendEnableAllState() : OBK_PUBLISH_WAS_NOT_REQUIRED; + case PUBLISHITEM_SELF_DYNAMIC_LIGHTMODE: + return LED_IsRunningDriver() ? LED_SendCurrentLightMode() : OBK_PUBLISH_WAS_NOT_REQUIRED; + case PUBLISHITEM_SELF_DYNAMIC_DIMMER: + return LED_IsRunningDriver() ? LED_SendDimmerChange() : OBK_PUBLISH_WAS_NOT_REQUIRED; - case PUBLISHITEM_SELF_HOSTNAME: - return MQTT_DoItemPublishString("host", CFG_GetShortDeviceName()); + case PUBLISHITEM_SELF_HOSTNAME: + return MQTT_DoItemPublishString("host", CFG_GetShortDeviceName()); - case PUBLISHITEM_SELF_BUILD: - return MQTT_DoItemPublishString("build", g_build_str); + case PUBLISHITEM_SELF_BUILD: + return MQTT_DoItemPublishString("build", g_build_str); - case PUBLISHITEM_SELF_MAC: - return MQTT_DoItemPublishString("mac", HAL_GetMACStr(dataStr)); + case PUBLISHITEM_SELF_MAC: + return MQTT_DoItemPublishString("mac", HAL_GetMACStr(dataStr)); - case PUBLISHITEM_SELF_DATETIME: - //Drivers are only built on BK7231 chips + case PUBLISHITEM_SELF_DATETIME: + //Drivers are only built on BK7231 chips #ifndef OBK_DISABLE_ALL_DRIVERS - if (DRV_IsRunning("NTP")) { - sprintf(dataStr, "%d", NTP_GetCurrentTime()); - return MQTT_DoItemPublishString("datetime", dataStr); - } - else { - return OBK_PUBLISH_WAS_NOT_REQUIRED; - } + if (DRV_IsRunning("NTP")) { + sprintf(dataStr, "%d", NTP_GetCurrentTime()); + return MQTT_DoItemPublishString("datetime", dataStr); + } + else { + return OBK_PUBLISH_WAS_NOT_REQUIRED; + } #else - return OBK_PUBLISH_WAS_NOT_REQUIRED; + return OBK_PUBLISH_WAS_NOT_REQUIRED; #endif - case PUBLISHITEM_SELF_SOCKETS: - sprintf(dataStr, "%d", LWIP_GetActiveSockets()); - return MQTT_DoItemPublishString("sockets", dataStr); + case PUBLISHITEM_SELF_SOCKETS: + sprintf(dataStr, "%d", LWIP_GetActiveSockets()); + return MQTT_DoItemPublishString("sockets", dataStr); - case PUBLISHITEM_SELF_RSSI: - sprintf(dataStr, "%d", HAL_GetWifiStrength()); - return MQTT_DoItemPublishString("rssi", dataStr); + case PUBLISHITEM_SELF_RSSI: + sprintf(dataStr, "%d", HAL_GetWifiStrength()); + return MQTT_DoItemPublishString("rssi", dataStr); - case PUBLISHITEM_SELF_UPTIME: - sprintf(dataStr, "%d", Time_getUpTimeSeconds()); - return MQTT_DoItemPublishString("uptime", dataStr); + case PUBLISHITEM_SELF_UPTIME: + sprintf(dataStr, "%d", Time_getUpTimeSeconds()); + return MQTT_DoItemPublishString("uptime", dataStr); - case PUBLISHITEM_SELF_FREEHEAP: - sprintf(dataStr, "%d", xPortGetFreeHeapSize()); - return MQTT_DoItemPublishString("freeheap", dataStr); + case PUBLISHITEM_SELF_FREEHEAP: + sprintf(dataStr, "%d", xPortGetFreeHeapSize()); + return MQTT_DoItemPublishString("freeheap", dataStr); - case PUBLISHITEM_SELF_IP: - g_firstFullBroadcast = false; //We published the last status item, disable full broadcast - return MQTT_DoItemPublishString("ip", HAL_GetMyIPString()); + case PUBLISHITEM_SELF_IP: + g_firstFullBroadcast = false; //We published the last status item, disable full broadcast + return MQTT_DoItemPublishString("ip", HAL_GetMyIPString()); - default: - break; - } + default: + break; + } - // if LED driver is active, do not publish raw channel values - if (LED_IsRunningDriver() == false && idx >= 0) { - // This is because raw channels are like PWM values, RGBCW has 5 raw channels - // (unless it has I2C LED driver) - // We do not need raw values for RGBCW lights (or RGB, etc) - // because we are using led_basecolor_rgb, led_dimmer, led_enableAll, etc - // NOTE: negative indexes are not channels - they are special values - if (CHANNEL_IsInUse(idx)) { - MQTT_ChannelPublish(g_publishItemIndex, OBK_PUBLISH_FLAG_MUTEX_SILENT); - } - } - return OBK_PUBLISH_WAS_NOT_REQUIRED; // didnt publish + // Do not publish raw channel value for channels like PWM values, RGBCW has 5 raw channels. + // We do not need raw values for RGBCW lights (or RGB, etc) + // because we are using led_basecolor_rgb, led_dimmer, led_enableAll, etc + // NOTE: negative indexes are not channels - they are special values + int role = CHANNEL_GetRoleForOutputChannel(idx); + if (role == IOR_Relay || role == IOR_Relay_n || role == IOR_LED || role == IOR_LED_n) { + MQTT_ChannelPublish(g_publishItemIndex, OBK_PUBLISH_FLAG_MUTEX_SILENT); + } + + return OBK_PUBLISH_WAS_NOT_REQUIRED; // didnt publish } static int g_secondsBeforeNextFullBroadcast = 30; // called from user timer. int MQTT_RunEverySecondUpdate() { - if (!mqtt_initialised) - return 0; + if (!mqtt_initialised) + return 0; - if (Main_HasWiFiConnected() == 0) - { - mqtt_reconnect = 0; - loopsWithDisconnected = LOOPS_WITH_DISCONNECTED - 2; - return 0; - } + if (Main_HasWiFiConnected() == 0) + { + mqtt_reconnect = 0; + loopsWithDisconnected = LOOPS_WITH_DISCONNECTED - 2; + return 0; + } - // take mutex for connect and disconnect operations - if (MQTT_Mutex_Take(100) == 0) - { - return 0; - } + // take mutex for connect and disconnect operations + if (MQTT_Mutex_Take(100) == 0) + { + return 0; + } - // reconnect if went into MQTT library ERR_MEM forever loop - if (g_memoryErrorsThisSession >= 5) - { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT will reconnect soon to fix ERR_MEM errors\n"); - g_memoryErrorsThisSession = 0; - mqtt_reconnect = 5; - } + // reconnect if went into MQTT library ERR_MEM forever loop + if (g_memoryErrorsThisSession >= 5) + { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "MQTT will reconnect soon to fix ERR_MEM errors\n"); + g_memoryErrorsThisSession = 0; + mqtt_reconnect = 5; + } - // if asked to reconnect (e.g. change of topic(s)) - if (mqtt_reconnect > 0) - { - mqtt_reconnect--; - if (mqtt_reconnect == 0) - { - // then if connected, disconnect, and then it will reconnect automatically in 2s - if (mqtt_client && mqtt_client_is_connected(mqtt_client)) - { - MQTT_disconnect(mqtt_client); - loopsWithDisconnected = LOOPS_WITH_DISCONNECTED - 2; - } - } - } + // if asked to reconnect (e.g. change of topic(s)) + if (mqtt_reconnect > 0) + { + mqtt_reconnect--; + if (mqtt_reconnect == 0) + { + // then if connected, disconnect, and then it will reconnect automatically in 2s + if (mqtt_client && mqtt_client_is_connected(mqtt_client)) + { + MQTT_disconnect(mqtt_client); + loopsWithDisconnected = LOOPS_WITH_DISCONNECTED - 2; + } + } + } - if (mqtt_client == 0 || mqtt_client_is_connected(mqtt_client) == 0) - { - //addLogAdv(LOG_INFO,LOG_FEATURE_MAIN, "Timer discovers disconnected mqtt %i\n",loopsWithDisconnected); + if (mqtt_client == 0 || mqtt_client_is_connected(mqtt_client) == 0) + { + //addLogAdv(LOG_INFO,LOG_FEATURE_MAIN, "Timer discovers disconnected mqtt %i\n",loopsWithDisconnected); #if WINDOWS #elif PLATFORM_BL602 #elif PLATFORM_W600 || PLATFORM_W800 #elif PLATFORM_XR809 #elif PLATFORM_BK7231N || PLATFORM_BK7231T - if (ota_progress() == -1) + if (ota_progress() == -1) #endif - { - loopsWithDisconnected++; - if (loopsWithDisconnected > LOOPS_WITH_DISCONNECTED) - { - if (mqtt_client == 0) - { - mqtt_client = mqtt_client_new(); - } - else - { - mqtt_disconnect(mqtt_client); + { + loopsWithDisconnected++; + if (loopsWithDisconnected > LOOPS_WITH_DISCONNECTED) + { + if (mqtt_client == 0) + { + mqtt_client = mqtt_client_new(); + } + else + { + mqtt_disconnect(mqtt_client); #if defined(MQTT_CLIENT_CLEANUP) - mqtt_client_cleanup(mqtt_client); + mqtt_client_cleanup(mqtt_client); #endif - } - MQTT_do_connect(mqtt_client); - mqtt_connect_events++; - loopsWithDisconnected = 0; - } - } - MQTT_Mutex_Free(); - return 0; - } - else { - MQTT_Mutex_Free(); - // below mutex is not required any more + } + MQTT_do_connect(mqtt_client); + mqtt_connect_events++; + loopsWithDisconnected = 0; + } + } + MQTT_Mutex_Free(); + return 0; + } + else { + MQTT_Mutex_Free(); + // below mutex is not required any more - // it is connected - g_timeSinceLastMQTTPublish++; + // it is connected + g_timeSinceLastMQTTPublish++; #if WINDOWS #elif PLATFORM_BL602 #elif PLATFORM_W600 || PLATFORM_W800 #elif PLATFORM_XR809 #elif PLATFORM_BK7231N || PLATFORM_BK7231T - if (ota_progress() != -1) - { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "OTA started MQTT will be closed\n"); - mqtt_disconnect(mqtt_client); - return 1; - } + if (ota_progress() != -1) + { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "OTA started MQTT will be closed\n"); + mqtt_disconnect(mqtt_client); + return 1; + } #endif - // do we want to broadcast full state? - // Do it slowly in order not to overload the buffers - // The item indexes start at negative values for special items - // and then covers Channel indexes up to CHANNEL_MAX - //Handle only queued items. Don't need to do this separately if entire state is being published. - if ((g_MqttPublishItemsQueued > 0) && !g_bPublishAllStatesNow) - { - PublishQueuedItems(); - return 1; - } - else if (g_bPublishAllStatesNow) - { - // Doing step by a step a full publish state - //if (g_timeSinceLastMQTTPublish > 2) - { - OBK_Publish_Result publishRes; - int g_sent_thisFrame = 0; + // do we want to broadcast full state? + // Do it slowly in order not to overload the buffers + // The item indexes start at negative values for special items + // and then covers Channel indexes up to CHANNEL_MAX + //Handle only queued items. Don't need to do this separately if entire state is being published. + if ((g_MqttPublishItemsQueued > 0) && !g_bPublishAllStatesNow) + { + PublishQueuedItems(); + return 1; + } + else if (g_bPublishAllStatesNow) + { + // Doing step by a step a full publish state + //if (g_timeSinceLastMQTTPublish > 2) + { + OBK_Publish_Result publishRes; + int g_sent_thisFrame = 0; - while (g_publishItemIndex < CHANNEL_MAX) - { - publishRes = MQTT_DoItemPublish(g_publishItemIndex); - if (publishRes != OBK_PUBLISH_WAS_NOT_REQUIRED) - { - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "[g_bPublishAllStatesNow] item %i result %i\n", g_publishItemIndex, publishRes); - } - // There are several things that can happen now - // OBK_PUBLISH_OK - it was required and was published - if (publishRes == OBK_PUBLISH_OK) - { - g_sent_thisFrame++; - if (g_sent_thisFrame >= 1) - { - g_publishItemIndex++; - break; - } - } - // OBK_PUBLISH_MUTEX_FAIL - MQTT is busy - if (publishRes == OBK_PUBLISH_MUTEX_FAIL - || publishRes == OBK_PUBLISH_WAS_DISCONNECTED) - { - // retry the same later - break; - } - // OBK_PUBLISH_WAS_NOT_REQUIRED - // The item is not used for this device - g_publishItemIndex++; - } + while (g_publishItemIndex < CHANNEL_MAX) + { + publishRes = MQTT_DoItemPublish(g_publishItemIndex); + if (publishRes != OBK_PUBLISH_WAS_NOT_REQUIRED) + { + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "[g_bPublishAllStatesNow] item %i result %i\n", g_publishItemIndex, publishRes); + } + // There are several things that can happen now + // OBK_PUBLISH_OK - it was required and was published + if (publishRes == OBK_PUBLISH_OK) + { + g_sent_thisFrame++; + if (g_sent_thisFrame >= 1) + { + g_publishItemIndex++; + break; + } + } + // OBK_PUBLISH_MUTEX_FAIL - MQTT is busy + if (publishRes == OBK_PUBLISH_MUTEX_FAIL + || publishRes == OBK_PUBLISH_WAS_DISCONNECTED) + { + // retry the same later + break; + } + // OBK_PUBLISH_WAS_NOT_REQUIRED + // The item is not used for this device + g_publishItemIndex++; + } - if (g_publishItemIndex >= CHANNEL_MAX) - { - // done - g_bPublishAllStatesNow = 0; - } - } - } - else { - // not doing anything - if (CFG_HasFlag(OBK_FLAG_MQTT_BROADCASTSELFSTATEPERMINUTE)) - { - // this is called every second - g_secondsBeforeNextFullBroadcast--; - if (g_secondsBeforeNextFullBroadcast <= 0) - { - g_secondsBeforeNextFullBroadcast = 60; - MQTT_PublishWholeDeviceState(); - } - } - } - } - return 1; + if (g_publishItemIndex >= CHANNEL_MAX) + { + // done + g_bPublishAllStatesNow = 0; + } + } + } + else { + // not doing anything + if (CFG_HasFlag(OBK_FLAG_MQTT_BROADCASTSELFSTATEPERMINUTE)) + { + // this is called every second + g_secondsBeforeNextFullBroadcast--; + if (g_secondsBeforeNextFullBroadcast <= 0) + { + g_secondsBeforeNextFullBroadcast = 60; + MQTT_PublishWholeDeviceState(); + } + } + } + } + return 1; } MqttPublishItem_t* get_queue_tail(MqttPublishItem_t* head) { - if (head == NULL) { return NULL; } + if (head == NULL) { return NULL; } - while (head->next != NULL) { - head = head->next; - } - return head; + while (head->next != NULL) { + head = head->next; + } + return head; } MqttPublishItem_t* find_queue_reusable_item(MqttPublishItem_t* head) { - while (head != NULL) { - if (MQTT_QUEUE_ITEM_IS_REUSABLE(head)) { - return head; - } - head = head->next; - } - return head; + while (head != NULL) { + if (MQTT_QUEUE_ITEM_IS_REUSABLE(head)) { + return head; + } + head = head->next; + } + return head; } /// @brief Queue an entry for publish and execute a command after the publish. @@ -1430,7 +1432,7 @@ void MQTT_QueuePublishWithCommand(char* topic, char* channel, char* value, int f newItem->flags = flags; g_MqttPublishItemsQueued++; - addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Queued topic=%s/%s %i, items queued", newItem->topic, newItem->channel, g_MqttPublishItemsQueued); + addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Queued topic=%s/%s, %i items in queue", newItem->topic, newItem->channel, g_MqttPublishItemsQueued); } /// @brief Queue an entry for publish. @@ -1489,6 +1491,6 @@ OBK_Publish_Result PublishQueuedItems() { /// @brief Is MQTT sub system ready and connected? /// @return bool MQTT_IsReady() { - return mqtt_client && mqtt_client_is_connected(mqtt_client); + return mqtt_client && mqtt_client_is_connected(mqtt_client); }