From 528424664f2173fb123f2e69ea1f0274caa7588e Mon Sep 17 00:00:00 2001 From: btsimonh Date: Thu, 24 Feb 2022 09:18:22 +0000 Subject: [PATCH] more extensive updates of MQTT, including multi-subscribe and forced reconnect on subscriptions requirmeents change. What happens if we subscribe multiple times to the same topic? --- src/mqtt/new_mqtt.c | 233 +++++++++++++++++++++++++++++++++++--------- src/mqtt/new_mqtt.h | 8 +- src/user_main.c | 4 + 3 files changed, 197 insertions(+), 48 deletions(-) diff --git a/src/mqtt/new_mqtt.c b/src/mqtt/new_mqtt.c index 22a98c049..b39ec4860 100644 --- a/src/mqtt/new_mqtt.c +++ b/src/mqtt/new_mqtt.c @@ -25,12 +25,16 @@ #endif #endif +// from mqtt.c +extern void mqtt_disconnect_my2(mqtt_client_t *client); + static int g_my_reconnect_mqtt_after_time = -1; ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT; mqtt_client_t* mqtt_client; typedef struct mqtt_callback_tag { char *topic; + char *subscriptionTopic; int ID; mqtt_callback_fn callback; } mqtt_callback_t; @@ -38,7 +42,11 @@ typedef struct mqtt_callback_tag { #define MAX_MQTT_CALLBACKS 32 static mqtt_callback_t *callbacks[MAX_MQTT_CALLBACKS]; static int numCallbacks = 0; +// note: only one incomming can be processed at a time. +static mqtt_request_t g_mqtt_request; +int loopsWithDisconnected = 0; +int mqtt_reconnect = 0; static struct mqtt_connect_client_info_t mqtt_client_info = { @@ -56,12 +64,19 @@ static struct mqtt_connect_client_info_t mqtt_client_info = #endif }; +// channel set callback +int channelSet(mqtt_request_t* request); +static void MQTT_do_connect(mqtt_client_t *client); +static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status); + // this can REPLACE callbacks, since we MAY wish to change the root topic.... // in which case we would re-resigster all callbacks? -int MQTT_RegisterCallback( const char *topic, int ID, mqtt_callback_fn callback){ +int MQTT_RegisterCallback( const char *basetopic, const char *subscriptiontopic, int ID, mqtt_callback_fn callback){ int index; - if (!topic || !callback){ + int i; + int subscribechange = 0; + if (!basetopic || !subscriptiontopic || !callback){ return -1; } @@ -92,20 +107,54 @@ int MQTT_RegisterCallback( const char *topic, int ID, mqtt_callback_fn callback) if (!callbacks[index]){ return -2; } - if (callbacks[index]->topic) { - os_free(callbacks[index]->topic); - callbacks[index]->topic = NULL; + if (!callbacks[index]->topic || strcmp(callbacks[index]->topic, basetopic)){ + if (callbacks[index]->topic){ + os_free(callbacks[index]->topic); + } + callbacks[index]->topic = (char *)os_malloc(strlen(basetopic)+1); + if (!callbacks[index]->topic){ + os_free(callbacks[index]); + return -3; + } + strcpy(callbacks[index]->topic, basetopic); } - callbacks[index]->topic = (char *)os_malloc(strlen(topic)+1); - if (!callbacks[index]->topic){ - os_free(callbacks[index]); - return -3; - } - strcpy(callbacks[index]->topic, topic); + + if (!callbacks[index]->subscriptionTopic || strcmp(callbacks[index]->subscriptionTopic, subscriptiontopic)){ + if (callbacks[index]->subscriptionTopic){ + os_free(callbacks[index]->subscriptionTopic); + } + callbacks[index]->subscriptionTopic = (char *)os_malloc(strlen(subscriptiontopic)+1); + callbacks[index]->subscriptionTopic[0] = '\0'; + if (!callbacks[index]->subscriptionTopic){ + os_free(callbacks[index]->topic); + os_free(callbacks[index]); + return -3; + } + + // find out if this subscription is new. + for (i = 0; i < numCallbacks; i++){ + if (callbacks[i]){ + if (callbacks[i]->subscriptionTopic && + !strcmp(callbacks[i]->subscriptionTopic, subscriptiontopic)){ + break; + } + } + } + strcpy(callbacks[index]->subscriptionTopic, subscriptiontopic); + // if this subscription is new, must reconnect + if (i == numCallbacks){ + subscribechange++; + } + } + callbacks[index]->callback = callback; if (index == numCallbacks){ numCallbacks++; } + + if (subscribechange){ + mqtt_reconnect = 8; + } // success return 0; } @@ -119,16 +168,21 @@ int MQTT_RemoveCallback(int ID){ if (callbacks[index]->topic) { os_free(callbacks[index]->topic); callbacks[index]->topic = NULL; + } + if (callbacks[index]->subscriptionTopic) { + os_free(callbacks[index]->subscriptionTopic); + callbacks[index]->subscriptionTopic = NULL; } os_free(callbacks[index]); callbacks[index] = NULL; - break; + mqtt_reconnect = 8; + return 1; } } } + return 0; } - // this accepts obkXXXXXX//set to receive data to set channels int channelSet(mqtt_request_t* request){ // we only need a few bytes to receive a decimal number 0-100 @@ -164,7 +218,7 @@ int channelSet(mqtt_request_t* request){ } // if not /set, then stop here - if (strcmp(p, 'set')){ + if (strcmp(p, "set")){ return 0; } @@ -188,6 +242,66 @@ int channelSet(mqtt_request_t* request){ } +// this accepts cmnd// to receive data to set channels +int tasCmnd(mqtt_request_t* request){ + // we only need a few bytes to receive a decimal number 0-100 + char copy[64]; + int len = request->receivedLen; + char *p = request->topic; + int channel = 0; + int iValue = 0; + + // assume a string input here, copy and terminate + if(len > sizeof(copy)-1) { + len = sizeof(copy)-1; + } + strncpy(copy, (char *)request->received, len); + // strncpy does not terminate??!!!! + copy[len] = '\0'; + + // TODO: better + // skip to after second forward slash + while(*p != '/') { if(*p == 0) return 0; p++; } + p++; + while(*p != '/') { if(*p == 0) return 0; p++; } + p++; + + do{ + if (!strncmp(p, "POWER", 5)){ + p += 5; + if ((*p - '0' >= 0) && (*p - '0' <= 9)){ + channel = atoi(p); + } else { + channel = -1; + } + // if channel out of range, stop here. + if ((channel < 0) || (channel > 32)) return 0; + + //PR_NOTICE("MQTT client in mqtt_incoming_data_cb\n"); + PR_NOTICE("MQTT client in tasCmnd data is %s for ch %i\n", copy, channel); + iValue = atoi((char *)copy); + CHANNEL_Set(channel,iValue,0); + break; + } + + PR_NOTICE("MQTT client unprocessed in tasCmnd data is %s for topic \n", copy, request->topic); + break; + } while (0); + + // return 1 to stop processing callbacks here. + // return 0 to allow later callbacks to process this topic. + return 1; +} + + +// copied here because for some reason renames in sdk? +static void MQTT_disconnect(mqtt_client_t *client) +{ + if (!client) return; + // this is what it was renamed to. why? + mqtt_disconnect_my2(client); +} + /* Called when publish is complete either with sucess or failure */ static void mqtt_pub_request_cb(void *arg, err_t result) { @@ -195,8 +309,6 @@ static void mqtt_pub_request_cb(void *arg, err_t result) PR_NOTICE("Publish result: %d\n", result); } } -static void -mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status); void example_publish(mqtt_client_t *client, int channel, int iVal) { @@ -237,8 +349,6 @@ void example_publish(mqtt_client_t *client, int channel, int iVal) } } -// note: only one incomming can be processed at a time. -static mqtt_request_t g_mqtt_request; static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags) { @@ -267,7 +377,7 @@ static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t f static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len) { - const char *p; + //const char *p; int i; // unused - left here as example //const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; @@ -301,9 +411,10 @@ static void mqtt_sub_request_cb(void *arg, err_t result) notifying user, retry subscribe or disconnect from server */ PR_NOTICE("Subscribe result: %i\n", result); } -void example_do_connect(mqtt_client_t *client); + static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status) { + int i; char tmp[64]; const char *baseName; err_t err = ERR_OK; @@ -316,33 +427,32 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection if (status == MQTT_CONNECT_ACCEPTED) { PR_NOTICE("mqtt_connection_cb: Successfully connected\n"); - - mqtt_set_inpub_callback(mqtt_client, + mqtt_set_inpub_callback(mqtt_client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, LWIP_CONST_CAST(void*, &mqtt_client_info)); - /* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */ - - baseName = CFG_GetShortDeviceName(); - - // register the main set channel callback - sprintf(tmp,"%s/",baseName); - // note: this may REPLACE an existing entry with the same ID. - MQTT_RegisterCallback( tmp, 1, channelSet); - - // setup subscription - // + is a MQTT wildcard - sprintf(tmp,"%s/+/set",baseName); - err = mqtt_sub_unsub(client, - // "wb2s/+/set", 1, - tmp, 1, + // subscribe to all callback subscription topics + // this makes a BIG assumption that we can subscribe multiple times to the same one? + // TODO - check that subscribing multiple times to the same topic is not BAD + for (i = 0; i < numCallbacks; i++){ + if (callbacks[i]){ + if (callbacks[i]->subscriptionTopic && callbacks[i]->subscriptionTopic[0]){ + err = mqtt_sub_unsub(client, + callbacks[i]->subscriptionTopic, 1, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1); - if(err != ERR_OK) { - PR_NOTICE("mqtt_subscribe return: %d\n", err); + if(err != ERR_OK) { + PR_NOTICE("mqtt_subscribe to %s return: %d\n", callbacks[i]->subscriptionTopic, err); + } else { + PR_NOTICE("mqtt_subscribed to %s\n", callbacks[i]->subscriptionTopic); + } + } + } } + baseName = CFG_GetShortDeviceName(); + sprintf(tmp,"%s/connected",baseName); err = mqtt_publish(client, tmp, "online", strlen("online"), 2, true, mqtt_pub_request_cb, 0); if(err != ERR_OK) { @@ -362,12 +472,10 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection // 1); } else { PR_NOTICE("mqtt_connection_cb: Disconnected, reason: %d\n", status); - // example_do_connect(client); - } } -void example_do_connect(mqtt_client_t *client) +static void MQTT_do_connect(mqtt_client_t *client) { const char *mqtt_userName, *mqtt_host, *mqtt_pass, *mqtt_clientID; int mqtt_port; @@ -444,9 +552,44 @@ static void app_my_channel_toggle_callback(int channel, int iVal) example_publish(mqtt_client,channel,iVal); } -int loopsWithDisconnected = 0; +// initialise things MQTT +// called from user_main +void MQTT_init(){ + char cbtopicbase[64]; + char cbtopicsub[64]; + const char *baseName; + baseName = CFG_GetShortDeviceName(); + // register the main set channel callback + sprintf(cbtopicbase,"%s/",baseName); + sprintf(cbtopicsub,"%s/+/set",baseName); + // note: this may REPLACE an existing entry with the same ID. ID 1 !!! + MQTT_RegisterCallback( cbtopicbase, cbtopicsub, 1, channelSet); + + // register the TAS cmnd callback + sprintf(cbtopicbase,"cmnd/%s/",baseName); + sprintf(cbtopicsub,"cmnd/%s/",baseName); + // note: this may REPLACE an existing entry with the same ID. ID 2 !!! + MQTT_RegisterCallback( cbtopicbase, cbtopicsub, 2, tasCmnd); + +} + + +// called from user timer. void MQTT_RunEverySecondUpdate() { + + // if asked to reconnect (e.g. change of topic(s)) + if (mqtt_reconnect > 0){ + mqtt_reconnect --; + if (mqtt_reconnect == 0){ + // then if connected, disconnect, and then it will reconnect automatically in 2s + if (mqtt_client && mqtt_client_is_connected(mqtt_client)) { + MQTT_disconnect(mqtt_client); + loopsWithDisconnected = 8; + } + } + } + if(mqtt_client == 0 || mqtt_client_is_connected(mqtt_client) == 0) { ADDLOG_INFO(LOG_FEATURE_MAIN, "Timer discovers disconnected mqtt %i\n",loopsWithDisconnected); loopsWithDisconnected++; @@ -456,8 +599,8 @@ void MQTT_RunEverySecondUpdate() { { mqtt_client = mqtt_client_new(); CHANNEL_SetChangeCallback(app_my_channel_toggle_callback); - } - example_do_connect(mqtt_client); + } + MQTT_do_connect(mqtt_client); loopsWithDisconnected = 0; } } diff --git a/src/mqtt/new_mqtt.h b/src/mqtt/new_mqtt.h index 2c67ca38b..4f45d6816 100644 --- a/src/mqtt/new_mqtt.h +++ b/src/mqtt/new_mqtt.h @@ -12,9 +12,10 @@ extern ip_addr_t mqtt_ip; extern mqtt_client_t* mqtt_client; -void mqtt_example_init(void); -void example_do_connect(mqtt_client_t *client); +//void mqtt_example_init(void); +//void example_do_connect(mqtt_client_t *client); void example_publish(mqtt_client_t *client, int channel, int iVal); +void MQTT_init(); void MQTT_RunEverySecondUpdate(); @@ -33,5 +34,6 @@ typedef int (*mqtt_callback_fn)(mqtt_request_t *request); // topics must be unique (i.e. you can't have /about and /aboutme or /about/me) // ALL topics currently must start with main device topic root. // ID is unique and non-zero - so that callbacks can be replaced.... -int MQTT_RegisterCallback( const char *topic, int ID, mqtt_callback_fn callback); +int MQTT_RegisterCallback( const char *basetopic, const char *subscriptiontopic, int ID, mqtt_callback_fn callback); +int MQTT_RemoveCallback(int ID); diff --git a/src/user_main.c b/src/user_main.c index 14f3719b6..bfc2a5e83 100644 --- a/src/user_main.c +++ b/src/user_main.c @@ -370,6 +370,10 @@ void user_main(void) // initialise rest interface init_rest(); + // initialise MQTT - just sets up variables. + // all MQTT happens in timer thread? + MQTT_init(); + err = rtos_init_timer(&led_timer, 1 * 1000, app_led_timer_handler,