diff --git a/src/httpserver/new_http.c b/src/httpserver/new_http.c index 6c05dff7c..4a3694a4f 100644 --- a/src/httpserver/new_http.c +++ b/src/httpserver/new_http.c @@ -67,6 +67,7 @@ Connection: keep-alive #elif defined(PLATFORM_BK7231T) #define USER_SW_VER "BK7231T_Test" #else +#define USER_SW_VER "unknown" #endif #endif @@ -354,7 +355,7 @@ const char *g_header = "

"); + poststr(request,"Unknown platform
"); #endif } poststr(request,"
\ diff --git a/src/mqtt/new_mqtt.c b/src/mqtt/new_mqtt.c index 5bfd5bea5..2b638ecdc 100644 --- a/src/mqtt/new_mqtt.c +++ b/src/mqtt/new_mqtt.c @@ -25,10 +25,29 @@ #endif #endif +// from mqtt.c +extern void mqtt_disconnect_my2(mqtt_client_t *client); + static int g_my_reconnect_mqtt_after_time = -1; ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT; mqtt_client_t* mqtt_client; +typedef struct mqtt_callback_tag { + char *topic; + char *subscriptionTopic; + int ID; + mqtt_callback_fn callback; +} mqtt_callback_t; + +#define MAX_MQTT_CALLBACKS 32 +static mqtt_callback_t *callbacks[MAX_MQTT_CALLBACKS]; +static int numCallbacks = 0; +// note: only one incomming can be processed at a time. +static mqtt_request_t g_mqtt_request; + +int loopsWithDisconnected = 0; +int mqtt_reconnect = 0; + static struct mqtt_connect_client_info_t mqtt_client_info = { "test", @@ -45,8 +64,245 @@ static struct mqtt_connect_client_info_t mqtt_client_info = #endif }; +// channel set callback +int channelSet(mqtt_request_t* request); +static void MQTT_do_connect(mqtt_client_t *client); +static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status); +// this can REPLACE callbacks, since we MAY wish to change the root topic.... +// in which case we would re-resigster all callbacks? +int MQTT_RegisterCallback( const char *basetopic, const char *subscriptiontopic, int ID, mqtt_callback_fn callback){ + int index; + int i; + int subscribechange = 0; + if (!basetopic || !subscriptiontopic || !callback){ + return -1; + } + + // find existing to replace + for (index = 0; index < numCallbacks; index++){ + if (callbacks[index]){ + if (callbacks[index]->ID == ID){ + break; + } + } + } + + // find empty if any (empty by MQTT_RemoveCallback) + if (index == numCallbacks){ + for (index = 0; index < numCallbacks; index++){ + if (!callbacks[index]){ + break; + } + } + } + + if (index >= MAX_MQTT_CALLBACKS){ + return -4; + } + if (!callbacks[index]){ + callbacks[index] = (mqtt_callback_t*)os_malloc(sizeof(mqtt_callback_t)); + } + if (!callbacks[index]){ + return -2; + } + if (!callbacks[index]->topic || strcmp(callbacks[index]->topic, basetopic)){ + if (callbacks[index]->topic){ + os_free(callbacks[index]->topic); + } + callbacks[index]->topic = (char *)os_malloc(strlen(basetopic)+1); + if (!callbacks[index]->topic){ + os_free(callbacks[index]); + return -3; + } + strcpy(callbacks[index]->topic, basetopic); + } + + if (!callbacks[index]->subscriptionTopic || strcmp(callbacks[index]->subscriptionTopic, subscriptiontopic)){ + if (callbacks[index]->subscriptionTopic){ + os_free(callbacks[index]->subscriptionTopic); + } + callbacks[index]->subscriptionTopic = (char *)os_malloc(strlen(subscriptiontopic)+1); + callbacks[index]->subscriptionTopic[0] = '\0'; + if (!callbacks[index]->subscriptionTopic){ + os_free(callbacks[index]->topic); + os_free(callbacks[index]); + return -3; + } + + // find out if this subscription is new. + for (i = 0; i < numCallbacks; i++){ + if (callbacks[i]){ + if (callbacks[i]->subscriptionTopic && + !strcmp(callbacks[i]->subscriptionTopic, subscriptiontopic)){ + break; + } + } + } + strcpy(callbacks[index]->subscriptionTopic, subscriptiontopic); + // if this subscription is new, must reconnect + if (i == numCallbacks){ + subscribechange++; + } + } + + callbacks[index]->callback = callback; + if (index == numCallbacks){ + numCallbacks++; + } + + if (subscribechange){ + mqtt_reconnect = 8; + } + // success + return 0; +} + +int MQTT_RemoveCallback(int ID){ + int index; + + for (index = 0; index < numCallbacks; index++){ + if (callbacks[index]){ + if (callbacks[index]->ID == ID){ + if (callbacks[index]->topic) { + os_free(callbacks[index]->topic); + callbacks[index]->topic = NULL; + } + if (callbacks[index]->subscriptionTopic) { + os_free(callbacks[index]->subscriptionTopic); + callbacks[index]->subscriptionTopic = NULL; + } + os_free(callbacks[index]); + callbacks[index] = NULL; + mqtt_reconnect = 8; + return 1; + } + } + } + return 0; +} + +// this accepts obkXXXXXX//set to receive data to set channels +int channelSet(mqtt_request_t* request){ + // we only need a few bytes to receive a decimal number 0-100 + char copy[12]; + int len = request->receivedLen; + char *p = request->topic; + int channel = 0; + int iValue = 0; + + // TODO: better + while(*p != '/') { + if(*p == 0) + return 0; + p++; + } + p++; + if ((*p - '0' >= 0) && (*p - '0' <= 9)){ + channel = atoi(p); + } else { + channel = -1; + } + + // if channel out of range, stop here. + if ((channel < 0) || (channel > 32)){ + return 0; + } + + // find something after channel - should be //set + while(*p != '/') { + if(*p == 0) + return 0; + p++; + } + + // if not /set, then stop here + if (strcmp(p, "set")){ + return 0; + } + + if(len > sizeof(copy)-1) { + len = sizeof(copy)-1; + } + + strncpy(copy, (char *)request->received, len); + // strncpy does not terminate??!!!! + copy[len] = '\0'; + + //PR_NOTICE("MQTT client in mqtt_incoming_data_cb\n"); + PR_NOTICE("MQTT client in mqtt_incoming_data_cb data is %s for ch %i\n", copy, channel); + + iValue = atoi((char *)copy); + CHANNEL_Set(channel,iValue,0); + + // return 1 to stop processing callbacks here. + // return 0 to allow later callbacks to process this topic. + return 1; +} + + +// this accepts cmnd// to receive data to set channels +int tasCmnd(mqtt_request_t* request){ + // we only need a few bytes to receive a decimal number 0-100 + char copy[64]; + int len = request->receivedLen; + char *p = request->topic; + int channel = 0; + int iValue = 0; + + // assume a string input here, copy and terminate + if(len > sizeof(copy)-1) { + len = sizeof(copy)-1; + } + strncpy(copy, (char *)request->received, len); + // strncpy does not terminate??!!!! + copy[len] = '\0'; + + PR_NOTICE("tas? data is %s for ch %s\n", copy, request->topic); + + // TODO: better + // skip to after second forward slash + while(*p != '/') { if(*p == 0) return 0; p++; } + p++; + while(*p != '/') { if(*p == 0) return 0; p++; } + p++; + + do{ + if (!strncmp(p, "POWER", 5)){ + p += 5; + if ((*p - '0' >= 0) && (*p - '0' <= 9)){ + channel = atoi(p); + } else { + channel = -1; + } + // if channel out of range, stop here. + if ((channel < 0) || (channel > 32)) return 0; + + //PR_NOTICE("MQTT client in mqtt_incoming_data_cb\n"); + PR_NOTICE("MQTT client in tasCmnd data is %s for ch %i\n", copy, channel); + iValue = atoi((char *)copy); + CHANNEL_Set(channel,iValue,0); + break; + } + + PR_NOTICE("MQTT client unprocessed in tasCmnd data is %s for topic \n", copy, request->topic); + break; + } while (0); + + // return 1 to stop processing callbacks here. + // return 0 to allow later callbacks to process this topic. + return 1; +} + + +// copied here because for some reason renames in sdk? +static void MQTT_disconnect(mqtt_client_t *client) +{ + if (!client) return; + // this is what it was renamed to. why? + mqtt_disconnect_my2(client); +} /* Called when publish is complete either with sucess or failure */ static void mqtt_pub_request_cb(void *arg, err_t result) @@ -55,8 +311,6 @@ static void mqtt_pub_request_cb(void *arg, err_t result) PR_NOTICE("Publish result: %d\n", result); } } -static void -mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status); void example_publish(mqtt_client_t *client, int channel, int iVal) { @@ -97,51 +351,54 @@ void example_publish(mqtt_client_t *client, int channel, int iVal) } } -int g_incoming_channel_mqtt = 0; + static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags) { - int iValue; - char copy[128]; + int i; // unused - left here as example //const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; - if(len > sizeof(copy)-1) { - len = sizeof(copy)-1; + // if we stored a topic in g_mqtt_request, then we found a matching callback, so use it. + if (g_mqtt_request.topic[0]) { + // note: data is NOT terminated (it may be binary...). + g_mqtt_request.received = data; + g_mqtt_request.receivedLen = len; + + PR_NOTICE("MQTT in topic %s", g_mqtt_request.topic); + + for (i = 0; i < numCallbacks; i++){ + char *cbtopic = callbacks[i]->topic; + if (!strncmp(g_mqtt_request.topic, cbtopic, strlen(cbtopic))){ + // note - callback must return 1 to say it ate the mqtt, else further processing can be performed. + // i.e. multiple people can get each topic if required. + if (callbacks[i]->callback(&g_mqtt_request)){ + return; + } + } + } } - - strncpy(copy, (char *)data, len); - // strncpy does not terminate??!!!! - copy[len] = '\0'; - - //PR_NOTICE("MQTT client in mqtt_incoming_data_cb\n"); - PR_NOTICE("MQTT client in mqtt_incoming_data_cb data is %s for ch %i\n", copy, g_incoming_channel_mqtt); - - iValue = atoi((char *)copy); - CHANNEL_Set(g_incoming_channel_mqtt,iValue,0); - - // PR_NOTICE(("MQTT client \"%s\" data cb: len %d, flags %d\n", client_info->client_id, (int)len, (int)flags)); } static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len) { - const char *p; + //const char *p; + int i; // unused - left here as example //const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg; + // look for a callback with this URL and method, or HTTP_ANY + g_mqtt_request.topic[0] = '\0'; + for (i = 0; i < numCallbacks; i++){ + char *cbtopic = callbacks[i]->topic; + if (strncmp(topic, cbtopic, strlen(cbtopic))){ + strncpy(g_mqtt_request.topic, topic, sizeof(g_mqtt_request.topic) - 1); + g_mqtt_request.topic[sizeof(g_mqtt_request.topic) - 1] = 0; + break; + } + } + //PR_NOTICE("MQTT client in mqtt_incoming_publish_cb\n"); PR_NOTICE("MQTT client in mqtt_incoming_publish_cb topic %s\n",topic); -// TODO: better -// g_incoming_channel_mqtt = topic[5] - '0'; - p = topic; - while(*p != '/') { - if(*p == 0) - return; - p++; - } - p++; - g_incoming_channel_mqtt = *p - '0'; - - // PR_NOTICE(("MQTT client \"%s\" publish cb: topic %s, len %d\n", client_info->client_id, topic, (int)tot_len)); } static void @@ -158,9 +415,10 @@ static void mqtt_sub_request_cb(void *arg, err_t result) notifying user, retry subscribe or disconnect from server */ PR_NOTICE("Subscribe result: %i\n", result); } -void example_do_connect(mqtt_client_t *client); + static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status) { + int i; char tmp[64]; const char *baseName; err_t err = ERR_OK; @@ -173,26 +431,32 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection if (status == MQTT_CONNECT_ACCEPTED) { PR_NOTICE("mqtt_connection_cb: Successfully connected\n"); - - mqtt_set_inpub_callback(mqtt_client, + mqtt_set_inpub_callback(mqtt_client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, LWIP_CONST_CAST(void*, &mqtt_client_info)); - /* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */ - - baseName = CFG_GetShortDeviceName(); - // + is a MQTT wildcard - sprintf(tmp,"%s/+/set",baseName); - err = mqtt_sub_unsub(client, - // "wb2s/+/set", 1, - tmp, 1, + // subscribe to all callback subscription topics + // this makes a BIG assumption that we can subscribe multiple times to the same one? + // TODO - check that subscribing multiple times to the same topic is not BAD + for (i = 0; i < numCallbacks; i++){ + if (callbacks[i]){ + if (callbacks[i]->subscriptionTopic && callbacks[i]->subscriptionTopic[0]){ + err = mqtt_sub_unsub(client, + callbacks[i]->subscriptionTopic, 1, mqtt_request_cb, LWIP_CONST_CAST(void*, client_info), 1); - if(err != ERR_OK) { - PR_NOTICE("mqtt_subscribe return: %d\n", err); + if(err != ERR_OK) { + PR_NOTICE("mqtt_subscribe to %s return: %d\n", callbacks[i]->subscriptionTopic, err); + } else { + PR_NOTICE("mqtt_subscribed to %s\n", callbacks[i]->subscriptionTopic); + } + } + } } + baseName = CFG_GetShortDeviceName(); + sprintf(tmp,"%s/connected",baseName); err = mqtt_publish(client, tmp, "online", strlen("online"), 2, true, mqtt_pub_request_cb, 0); if(err != ERR_OK) { @@ -212,12 +476,10 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection // 1); } else { PR_NOTICE("mqtt_connection_cb: Disconnected, reason: %d\n", status); - // example_do_connect(client); - } } -void example_do_connect(mqtt_client_t *client) +static void MQTT_do_connect(mqtt_client_t *client) { const char *mqtt_userName, *mqtt_host, *mqtt_pass, *mqtt_clientID; int mqtt_port; @@ -294,9 +556,44 @@ static void app_my_channel_toggle_callback(int channel, int iVal) example_publish(mqtt_client,channel,iVal); } -int loopsWithDisconnected = 0; +// initialise things MQTT +// called from user_main +void MQTT_init(){ + char cbtopicbase[64]; + char cbtopicsub[64]; + const char *baseName; + baseName = CFG_GetShortDeviceName(); + // register the main set channel callback + sprintf(cbtopicbase,"%s/",baseName); + sprintf(cbtopicsub,"%s/+/set",baseName); + // note: this may REPLACE an existing entry with the same ID. ID 1 !!! + MQTT_RegisterCallback( cbtopicbase, cbtopicsub, 1, channelSet); + + // register the TAS cmnd callback + sprintf(cbtopicbase,"cmnd/%s/",baseName); + sprintf(cbtopicsub,"cmnd/%s/+",baseName); + // note: this may REPLACE an existing entry with the same ID. ID 2 !!! + MQTT_RegisterCallback( cbtopicbase, cbtopicsub, 2, tasCmnd); + +} + + +// called from user timer. void MQTT_RunEverySecondUpdate() { + + // if asked to reconnect (e.g. change of topic(s)) + if (mqtt_reconnect > 0){ + mqtt_reconnect --; + if (mqtt_reconnect == 0){ + // then if connected, disconnect, and then it will reconnect automatically in 2s + if (mqtt_client && mqtt_client_is_connected(mqtt_client)) { + MQTT_disconnect(mqtt_client); + loopsWithDisconnected = 8; + } + } + } + if(mqtt_client == 0 || mqtt_client_is_connected(mqtt_client) == 0) { ADDLOG_INFO(LOG_FEATURE_MAIN, "Timer discovers disconnected mqtt %i\n",loopsWithDisconnected); loopsWithDisconnected++; @@ -306,8 +603,8 @@ void MQTT_RunEverySecondUpdate() { { mqtt_client = mqtt_client_new(); CHANNEL_SetChangeCallback(app_my_channel_toggle_callback); - } - example_do_connect(mqtt_client); + } + MQTT_do_connect(mqtt_client); loopsWithDisconnected = 0; } } diff --git a/src/mqtt/new_mqtt.h b/src/mqtt/new_mqtt.h index e517a5c9e..4f45d6816 100644 --- a/src/mqtt/new_mqtt.h +++ b/src/mqtt/new_mqtt.h @@ -12,10 +12,28 @@ extern ip_addr_t mqtt_ip; extern mqtt_client_t* mqtt_client; -void mqtt_example_init(void); -void example_do_connect(mqtt_client_t *client); +//void mqtt_example_init(void); +//void example_do_connect(mqtt_client_t *client); void example_publish(mqtt_client_t *client, int channel, int iVal); +void MQTT_init(); void MQTT_RunEverySecondUpdate(); +// ability to register callbacks for MQTT data +typedef struct mqtt_request_tag { + const unsigned char *received; // note: NOT terminated, may be binary + int receivedLen; + char topic[128]; +} mqtt_request_t; + +// callback function for mqtt. +// return 0 to allow the incoming topic/data to be processed by others/channel set. +// return 1 to 'eat the packet and terminate further processing. +typedef int (*mqtt_callback_fn)(mqtt_request_t *request); + +// topics must be unique (i.e. you can't have /about and /aboutme or /about/me) +// ALL topics currently must start with main device topic root. +// ID is unique and non-zero - so that callbacks can be replaced.... +int MQTT_RegisterCallback( const char *basetopic, const char *subscriptiontopic, int ID, mqtt_callback_fn callback); +int MQTT_RemoveCallback(int ID); diff --git a/src/user_main.c b/src/user_main.c index 14f3719b6..bfc2a5e83 100644 --- a/src/user_main.c +++ b/src/user_main.c @@ -370,6 +370,10 @@ void user_main(void) // initialise rest interface init_rest(); + // initialise MQTT - just sets up variables. + // all MQTT happens in timer thread? + MQTT_init(); + err = rtos_init_timer(&led_timer, 1 * 1000, app_led_timer_handler,