more extensive updates of MQTT, including multi-subscribe and forced reconnect on subscriptions requirmeents change.

What happens if we subscribe multiple times to the same topic?
This commit is contained in:
btsimonh
2022-02-24 09:18:22 +00:00
parent 996708ef47
commit 528424664f
3 changed files with 197 additions and 48 deletions

View File

@ -25,12 +25,16 @@
#endif
#endif
// from mqtt.c
extern void mqtt_disconnect_my2(mqtt_client_t *client);
static int g_my_reconnect_mqtt_after_time = -1;
ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT;
mqtt_client_t* mqtt_client;
typedef struct mqtt_callback_tag {
char *topic;
char *subscriptionTopic;
int ID;
mqtt_callback_fn callback;
} mqtt_callback_t;
@ -38,7 +42,11 @@ typedef struct mqtt_callback_tag {
#define MAX_MQTT_CALLBACKS 32
static mqtt_callback_t *callbacks[MAX_MQTT_CALLBACKS];
static int numCallbacks = 0;
// note: only one incomming can be processed at a time.
static mqtt_request_t g_mqtt_request;
int loopsWithDisconnected = 0;
int mqtt_reconnect = 0;
static struct mqtt_connect_client_info_t mqtt_client_info =
{
@ -56,12 +64,19 @@ static struct mqtt_connect_client_info_t mqtt_client_info =
#endif
};
// channel set callback
int channelSet(mqtt_request_t* request);
static void MQTT_do_connect(mqtt_client_t *client);
static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status);
// this can REPLACE callbacks, since we MAY wish to change the root topic....
// in which case we would re-resigster all callbacks?
int MQTT_RegisterCallback( const char *topic, int ID, mqtt_callback_fn callback){
int MQTT_RegisterCallback( const char *basetopic, const char *subscriptiontopic, int ID, mqtt_callback_fn callback){
int index;
if (!topic || !callback){
int i;
int subscribechange = 0;
if (!basetopic || !subscriptiontopic || !callback){
return -1;
}
@ -92,20 +107,54 @@ int MQTT_RegisterCallback( const char *topic, int ID, mqtt_callback_fn callback)
if (!callbacks[index]){
return -2;
}
if (callbacks[index]->topic) {
os_free(callbacks[index]->topic);
callbacks[index]->topic = NULL;
if (!callbacks[index]->topic || strcmp(callbacks[index]->topic, basetopic)){
if (callbacks[index]->topic){
os_free(callbacks[index]->topic);
}
callbacks[index]->topic = (char *)os_malloc(strlen(basetopic)+1);
if (!callbacks[index]->topic){
os_free(callbacks[index]);
return -3;
}
strcpy(callbacks[index]->topic, basetopic);
}
callbacks[index]->topic = (char *)os_malloc(strlen(topic)+1);
if (!callbacks[index]->topic){
os_free(callbacks[index]);
return -3;
}
strcpy(callbacks[index]->topic, topic);
if (!callbacks[index]->subscriptionTopic || strcmp(callbacks[index]->subscriptionTopic, subscriptiontopic)){
if (callbacks[index]->subscriptionTopic){
os_free(callbacks[index]->subscriptionTopic);
}
callbacks[index]->subscriptionTopic = (char *)os_malloc(strlen(subscriptiontopic)+1);
callbacks[index]->subscriptionTopic[0] = '\0';
if (!callbacks[index]->subscriptionTopic){
os_free(callbacks[index]->topic);
os_free(callbacks[index]);
return -3;
}
// find out if this subscription is new.
for (i = 0; i < numCallbacks; i++){
if (callbacks[i]){
if (callbacks[i]->subscriptionTopic &&
!strcmp(callbacks[i]->subscriptionTopic, subscriptiontopic)){
break;
}
}
}
strcpy(callbacks[index]->subscriptionTopic, subscriptiontopic);
// if this subscription is new, must reconnect
if (i == numCallbacks){
subscribechange++;
}
}
callbacks[index]->callback = callback;
if (index == numCallbacks){
numCallbacks++;
}
if (subscribechange){
mqtt_reconnect = 8;
}
// success
return 0;
}
@ -119,16 +168,21 @@ int MQTT_RemoveCallback(int ID){
if (callbacks[index]->topic) {
os_free(callbacks[index]->topic);
callbacks[index]->topic = NULL;
}
if (callbacks[index]->subscriptionTopic) {
os_free(callbacks[index]->subscriptionTopic);
callbacks[index]->subscriptionTopic = NULL;
}
os_free(callbacks[index]);
callbacks[index] = NULL;
break;
mqtt_reconnect = 8;
return 1;
}
}
}
return 0;
}
// this accepts obkXXXXXX/<chan>/set to receive data to set channels
int channelSet(mqtt_request_t* request){
// we only need a few bytes to receive a decimal number 0-100
@ -164,7 +218,7 @@ int channelSet(mqtt_request_t* request){
}
// if not /set, then stop here
if (strcmp(p, 'set')){
if (strcmp(p, "set")){
return 0;
}
@ -188,6 +242,66 @@ int channelSet(mqtt_request_t* request){
}
// this accepts cmnd/<basename>/<xxx> to receive data to set channels
int tasCmnd(mqtt_request_t* request){
// we only need a few bytes to receive a decimal number 0-100
char copy[64];
int len = request->receivedLen;
char *p = request->topic;
int channel = 0;
int iValue = 0;
// assume a string input here, copy and terminate
if(len > sizeof(copy)-1) {
len = sizeof(copy)-1;
}
strncpy(copy, (char *)request->received, len);
// strncpy does not terminate??!!!!
copy[len] = '\0';
// TODO: better
// skip to after second forward slash
while(*p != '/') { if(*p == 0) return 0; p++; }
p++;
while(*p != '/') { if(*p == 0) return 0; p++; }
p++;
do{
if (!strncmp(p, "POWER", 5)){
p += 5;
if ((*p - '0' >= 0) && (*p - '0' <= 9)){
channel = atoi(p);
} else {
channel = -1;
}
// if channel out of range, stop here.
if ((channel < 0) || (channel > 32)) return 0;
//PR_NOTICE("MQTT client in mqtt_incoming_data_cb\n");
PR_NOTICE("MQTT client in tasCmnd data is %s for ch %i\n", copy, channel);
iValue = atoi((char *)copy);
CHANNEL_Set(channel,iValue,0);
break;
}
PR_NOTICE("MQTT client unprocessed in tasCmnd data is %s for topic \n", copy, request->topic);
break;
} while (0);
// return 1 to stop processing callbacks here.
// return 0 to allow later callbacks to process this topic.
return 1;
}
// copied here because for some reason renames in sdk?
static void MQTT_disconnect(mqtt_client_t *client)
{
if (!client) return;
// this is what it was renamed to. why?
mqtt_disconnect_my2(client);
}
/* Called when publish is complete either with sucess or failure */
static void mqtt_pub_request_cb(void *arg, err_t result)
{
@ -195,8 +309,6 @@ static void mqtt_pub_request_cb(void *arg, err_t result)
PR_NOTICE("Publish result: %d\n", result);
}
}
static void
mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status);
void example_publish(mqtt_client_t *client, int channel, int iVal)
{
@ -237,8 +349,6 @@ void example_publish(mqtt_client_t *client, int channel, int iVal)
}
}
// note: only one incomming can be processed at a time.
static mqtt_request_t g_mqtt_request;
static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
{
@ -267,7 +377,7 @@ static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t f
static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
{
const char *p;
//const char *p;
int i;
// unused - left here as example
//const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
@ -301,9 +411,10 @@ static void mqtt_sub_request_cb(void *arg, err_t result)
notifying user, retry subscribe or disconnect from server */
PR_NOTICE("Subscribe result: %i\n", result);
}
void example_do_connect(mqtt_client_t *client);
static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
{
int i;
char tmp[64];
const char *baseName;
err_t err = ERR_OK;
@ -316,33 +427,32 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection
if (status == MQTT_CONNECT_ACCEPTED) {
PR_NOTICE("mqtt_connection_cb: Successfully connected\n");
mqtt_set_inpub_callback(mqtt_client,
mqtt_set_inpub_callback(mqtt_client,
mqtt_incoming_publish_cb,
mqtt_incoming_data_cb,
LWIP_CONST_CAST(void*, &mqtt_client_info));
/* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */
baseName = CFG_GetShortDeviceName();
// register the main set channel callback
sprintf(tmp,"%s/",baseName);
// note: this may REPLACE an existing entry with the same ID.
MQTT_RegisterCallback( tmp, 1, channelSet);
// setup subscription
// + is a MQTT wildcard
sprintf(tmp,"%s/+/set",baseName);
err = mqtt_sub_unsub(client,
// "wb2s/+/set", 1,
tmp, 1,
// subscribe to all callback subscription topics
// this makes a BIG assumption that we can subscribe multiple times to the same one?
// TODO - check that subscribing multiple times to the same topic is not BAD
for (i = 0; i < numCallbacks; i++){
if (callbacks[i]){
if (callbacks[i]->subscriptionTopic && callbacks[i]->subscriptionTopic[0]){
err = mqtt_sub_unsub(client,
callbacks[i]->subscriptionTopic, 1,
mqtt_request_cb, LWIP_CONST_CAST(void*, client_info),
1);
if(err != ERR_OK) {
PR_NOTICE("mqtt_subscribe return: %d\n", err);
if(err != ERR_OK) {
PR_NOTICE("mqtt_subscribe to %s return: %d\n", callbacks[i]->subscriptionTopic, err);
} else {
PR_NOTICE("mqtt_subscribed to %s\n", callbacks[i]->subscriptionTopic);
}
}
}
}
baseName = CFG_GetShortDeviceName();
sprintf(tmp,"%s/connected",baseName);
err = mqtt_publish(client, tmp, "online", strlen("online"), 2, true, mqtt_pub_request_cb, 0);
if(err != ERR_OK) {
@ -362,12 +472,10 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection
// 1);
} else {
PR_NOTICE("mqtt_connection_cb: Disconnected, reason: %d\n", status);
// example_do_connect(client);
}
}
void example_do_connect(mqtt_client_t *client)
static void MQTT_do_connect(mqtt_client_t *client)
{
const char *mqtt_userName, *mqtt_host, *mqtt_pass, *mqtt_clientID;
int mqtt_port;
@ -444,9 +552,44 @@ static void app_my_channel_toggle_callback(int channel, int iVal)
example_publish(mqtt_client,channel,iVal);
}
int loopsWithDisconnected = 0;
// initialise things MQTT
// called from user_main
void MQTT_init(){
char cbtopicbase[64];
char cbtopicsub[64];
const char *baseName;
baseName = CFG_GetShortDeviceName();
// register the main set channel callback
sprintf(cbtopicbase,"%s/",baseName);
sprintf(cbtopicsub,"%s/+/set",baseName);
// note: this may REPLACE an existing entry with the same ID. ID 1 !!!
MQTT_RegisterCallback( cbtopicbase, cbtopicsub, 1, channelSet);
// register the TAS cmnd callback
sprintf(cbtopicbase,"cmnd/%s/",baseName);
sprintf(cbtopicsub,"cmnd/%s/",baseName);
// note: this may REPLACE an existing entry with the same ID. ID 2 !!!
MQTT_RegisterCallback( cbtopicbase, cbtopicsub, 2, tasCmnd);
}
// called from user timer.
void MQTT_RunEverySecondUpdate() {
// if asked to reconnect (e.g. change of topic(s))
if (mqtt_reconnect > 0){
mqtt_reconnect --;
if (mqtt_reconnect == 0){
// then if connected, disconnect, and then it will reconnect automatically in 2s
if (mqtt_client && mqtt_client_is_connected(mqtt_client)) {
MQTT_disconnect(mqtt_client);
loopsWithDisconnected = 8;
}
}
}
if(mqtt_client == 0 || mqtt_client_is_connected(mqtt_client) == 0) {
ADDLOG_INFO(LOG_FEATURE_MAIN, "Timer discovers disconnected mqtt %i\n",loopsWithDisconnected);
loopsWithDisconnected++;
@ -456,8 +599,8 @@ void MQTT_RunEverySecondUpdate() {
{
mqtt_client = mqtt_client_new();
CHANNEL_SetChangeCallback(app_my_channel_toggle_callback);
}
example_do_connect(mqtt_client);
}
MQTT_do_connect(mqtt_client);
loopsWithDisconnected = 0;
}
}

View File

@ -12,9 +12,10 @@
extern ip_addr_t mqtt_ip;
extern mqtt_client_t* mqtt_client;
void mqtt_example_init(void);
void example_do_connect(mqtt_client_t *client);
//void mqtt_example_init(void);
//void example_do_connect(mqtt_client_t *client);
void example_publish(mqtt_client_t *client, int channel, int iVal);
void MQTT_init();
void MQTT_RunEverySecondUpdate();
@ -33,5 +34,6 @@ typedef int (*mqtt_callback_fn)(mqtt_request_t *request);
// topics must be unique (i.e. you can't have /about and /aboutme or /about/me)
// ALL topics currently must start with main device topic root.
// ID is unique and non-zero - so that callbacks can be replaced....
int MQTT_RegisterCallback( const char *topic, int ID, mqtt_callback_fn callback);
int MQTT_RegisterCallback( const char *basetopic, const char *subscriptiontopic, int ID, mqtt_callback_fn callback);
int MQTT_RemoveCallback(int ID);

View File

@ -370,6 +370,10 @@ void user_main(void)
// initialise rest interface
init_rest();
// initialise MQTT - just sets up variables.
// all MQTT happens in timer thread?
MQTT_init();
err = rtos_init_timer(&led_timer,
1 * 1000,
app_led_timer_handler,