Merge pull request #36 from btsimonh/mods

MQTT updates
This commit is contained in:
openshwprojects
2022-02-24 23:25:16 +01:00
committed by GitHub
4 changed files with 375 additions and 55 deletions

View File

@ -67,6 +67,7 @@ Connection: keep-alive
#elif defined(PLATFORM_BK7231T)
#define USER_SW_VER "BK7231T_Test"
#else
#define USER_SW_VER "unknown"
#endif
#endif
@ -354,7 +355,7 @@ const char *g_header = "<h1><a href=\"https://github.com/openshwprojects/OpenBK7
const char *g_header = "<h1>error</h1>";
#error "Platform not supported"
Platform not supported
//Platform not supported
#endif
@ -832,7 +833,7 @@ int HTTP_ProcessPacket(http_request_t *request) {
#else
#error "Unknown platform"
poststr(request,Unknown platform<br>");
poststr(request,"Unknown platform<br>");
#endif
}
poststr(request,"<form action=\"/cfg_wifi\">\

View File

@ -25,10 +25,29 @@
#endif
#endif
// from mqtt.c
extern void mqtt_disconnect_my2(mqtt_client_t *client);
static int g_my_reconnect_mqtt_after_time = -1;
ip_addr_t mqtt_ip LWIP_MQTT_EXAMPLE_IPADDR_INIT;
mqtt_client_t* mqtt_client;
typedef struct mqtt_callback_tag {
char *topic;
char *subscriptionTopic;
int ID;
mqtt_callback_fn callback;
} mqtt_callback_t;
#define MAX_MQTT_CALLBACKS 32
static mqtt_callback_t *callbacks[MAX_MQTT_CALLBACKS];
static int numCallbacks = 0;
// note: only one incomming can be processed at a time.
static mqtt_request_t g_mqtt_request;
int loopsWithDisconnected = 0;
int mqtt_reconnect = 0;
static struct mqtt_connect_client_info_t mqtt_client_info =
{
"test",
@ -45,8 +64,245 @@ static struct mqtt_connect_client_info_t mqtt_client_info =
#endif
};
// channel set callback
int channelSet(mqtt_request_t* request);
static void MQTT_do_connect(mqtt_client_t *client);
static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status);
// this can REPLACE callbacks, since we MAY wish to change the root topic....
// in which case we would re-resigster all callbacks?
int MQTT_RegisterCallback( const char *basetopic, const char *subscriptiontopic, int ID, mqtt_callback_fn callback){
int index;
int i;
int subscribechange = 0;
if (!basetopic || !subscriptiontopic || !callback){
return -1;
}
// find existing to replace
for (index = 0; index < numCallbacks; index++){
if (callbacks[index]){
if (callbacks[index]->ID == ID){
break;
}
}
}
// find empty if any (empty by MQTT_RemoveCallback)
if (index == numCallbacks){
for (index = 0; index < numCallbacks; index++){
if (!callbacks[index]){
break;
}
}
}
if (index >= MAX_MQTT_CALLBACKS){
return -4;
}
if (!callbacks[index]){
callbacks[index] = (mqtt_callback_t*)os_malloc(sizeof(mqtt_callback_t));
}
if (!callbacks[index]){
return -2;
}
if (!callbacks[index]->topic || strcmp(callbacks[index]->topic, basetopic)){
if (callbacks[index]->topic){
os_free(callbacks[index]->topic);
}
callbacks[index]->topic = (char *)os_malloc(strlen(basetopic)+1);
if (!callbacks[index]->topic){
os_free(callbacks[index]);
return -3;
}
strcpy(callbacks[index]->topic, basetopic);
}
if (!callbacks[index]->subscriptionTopic || strcmp(callbacks[index]->subscriptionTopic, subscriptiontopic)){
if (callbacks[index]->subscriptionTopic){
os_free(callbacks[index]->subscriptionTopic);
}
callbacks[index]->subscriptionTopic = (char *)os_malloc(strlen(subscriptiontopic)+1);
callbacks[index]->subscriptionTopic[0] = '\0';
if (!callbacks[index]->subscriptionTopic){
os_free(callbacks[index]->topic);
os_free(callbacks[index]);
return -3;
}
// find out if this subscription is new.
for (i = 0; i < numCallbacks; i++){
if (callbacks[i]){
if (callbacks[i]->subscriptionTopic &&
!strcmp(callbacks[i]->subscriptionTopic, subscriptiontopic)){
break;
}
}
}
strcpy(callbacks[index]->subscriptionTopic, subscriptiontopic);
// if this subscription is new, must reconnect
if (i == numCallbacks){
subscribechange++;
}
}
callbacks[index]->callback = callback;
if (index == numCallbacks){
numCallbacks++;
}
if (subscribechange){
mqtt_reconnect = 8;
}
// success
return 0;
}
int MQTT_RemoveCallback(int ID){
int index;
for (index = 0; index < numCallbacks; index++){
if (callbacks[index]){
if (callbacks[index]->ID == ID){
if (callbacks[index]->topic) {
os_free(callbacks[index]->topic);
callbacks[index]->topic = NULL;
}
if (callbacks[index]->subscriptionTopic) {
os_free(callbacks[index]->subscriptionTopic);
callbacks[index]->subscriptionTopic = NULL;
}
os_free(callbacks[index]);
callbacks[index] = NULL;
mqtt_reconnect = 8;
return 1;
}
}
}
return 0;
}
// this accepts obkXXXXXX/<chan>/set to receive data to set channels
int channelSet(mqtt_request_t* request){
// we only need a few bytes to receive a decimal number 0-100
char copy[12];
int len = request->receivedLen;
char *p = request->topic;
int channel = 0;
int iValue = 0;
// TODO: better
while(*p != '/') {
if(*p == 0)
return 0;
p++;
}
p++;
if ((*p - '0' >= 0) && (*p - '0' <= 9)){
channel = atoi(p);
} else {
channel = -1;
}
// if channel out of range, stop here.
if ((channel < 0) || (channel > 32)){
return 0;
}
// find something after channel - should be <base>/<chan>/set
while(*p != '/') {
if(*p == 0)
return 0;
p++;
}
// if not /set, then stop here
if (strcmp(p, "set")){
return 0;
}
if(len > sizeof(copy)-1) {
len = sizeof(copy)-1;
}
strncpy(copy, (char *)request->received, len);
// strncpy does not terminate??!!!!
copy[len] = '\0';
//PR_NOTICE("MQTT client in mqtt_incoming_data_cb\n");
PR_NOTICE("MQTT client in mqtt_incoming_data_cb data is %s for ch %i\n", copy, channel);
iValue = atoi((char *)copy);
CHANNEL_Set(channel,iValue,0);
// return 1 to stop processing callbacks here.
// return 0 to allow later callbacks to process this topic.
return 1;
}
// this accepts cmnd/<basename>/<xxx> to receive data to set channels
int tasCmnd(mqtt_request_t* request){
// we only need a few bytes to receive a decimal number 0-100
char copy[64];
int len = request->receivedLen;
char *p = request->topic;
int channel = 0;
int iValue = 0;
// assume a string input here, copy and terminate
if(len > sizeof(copy)-1) {
len = sizeof(copy)-1;
}
strncpy(copy, (char *)request->received, len);
// strncpy does not terminate??!!!!
copy[len] = '\0';
PR_NOTICE("tas? data is %s for ch %s\n", copy, request->topic);
// TODO: better
// skip to after second forward slash
while(*p != '/') { if(*p == 0) return 0; p++; }
p++;
while(*p != '/') { if(*p == 0) return 0; p++; }
p++;
do{
if (!strncmp(p, "POWER", 5)){
p += 5;
if ((*p - '0' >= 0) && (*p - '0' <= 9)){
channel = atoi(p);
} else {
channel = -1;
}
// if channel out of range, stop here.
if ((channel < 0) || (channel > 32)) return 0;
//PR_NOTICE("MQTT client in mqtt_incoming_data_cb\n");
PR_NOTICE("MQTT client in tasCmnd data is %s for ch %i\n", copy, channel);
iValue = atoi((char *)copy);
CHANNEL_Set(channel,iValue,0);
break;
}
PR_NOTICE("MQTT client unprocessed in tasCmnd data is %s for topic \n", copy, request->topic);
break;
} while (0);
// return 1 to stop processing callbacks here.
// return 0 to allow later callbacks to process this topic.
return 1;
}
// copied here because for some reason renames in sdk?
static void MQTT_disconnect(mqtt_client_t *client)
{
if (!client) return;
// this is what it was renamed to. why?
mqtt_disconnect_my2(client);
}
/* Called when publish is complete either with sucess or failure */
static void mqtt_pub_request_cb(void *arg, err_t result)
@ -55,8 +311,6 @@ static void mqtt_pub_request_cb(void *arg, err_t result)
PR_NOTICE("Publish result: %d\n", result);
}
}
static void
mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status);
void example_publish(mqtt_client_t *client, int channel, int iVal)
{
@ -97,51 +351,54 @@ void example_publish(mqtt_client_t *client, int channel, int iVal)
}
}
int g_incoming_channel_mqtt = 0;
static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
{
int iValue;
char copy[128];
int i;
// unused - left here as example
//const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
if(len > sizeof(copy)-1) {
len = sizeof(copy)-1;
// if we stored a topic in g_mqtt_request, then we found a matching callback, so use it.
if (g_mqtt_request.topic[0]) {
// note: data is NOT terminated (it may be binary...).
g_mqtt_request.received = data;
g_mqtt_request.receivedLen = len;
PR_NOTICE("MQTT in topic %s", g_mqtt_request.topic);
for (i = 0; i < numCallbacks; i++){
char *cbtopic = callbacks[i]->topic;
if (!strncmp(g_mqtt_request.topic, cbtopic, strlen(cbtopic))){
// note - callback must return 1 to say it ate the mqtt, else further processing can be performed.
// i.e. multiple people can get each topic if required.
if (callbacks[i]->callback(&g_mqtt_request)){
return;
}
}
}
}
strncpy(copy, (char *)data, len);
// strncpy does not terminate??!!!!
copy[len] = '\0';
//PR_NOTICE("MQTT client in mqtt_incoming_data_cb\n");
PR_NOTICE("MQTT client in mqtt_incoming_data_cb data is %s for ch %i\n", copy, g_incoming_channel_mqtt);
iValue = atoi((char *)copy);
CHANNEL_Set(g_incoming_channel_mqtt,iValue,0);
// PR_NOTICE(("MQTT client \"%s\" data cb: len %d, flags %d\n", client_info->client_id, (int)len, (int)flags));
}
static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
{
const char *p;
//const char *p;
int i;
// unused - left here as example
//const struct mqtt_connect_client_info_t* client_info = (const struct mqtt_connect_client_info_t*)arg;
// look for a callback with this URL and method, or HTTP_ANY
g_mqtt_request.topic[0] = '\0';
for (i = 0; i < numCallbacks; i++){
char *cbtopic = callbacks[i]->topic;
if (strncmp(topic, cbtopic, strlen(cbtopic))){
strncpy(g_mqtt_request.topic, topic, sizeof(g_mqtt_request.topic) - 1);
g_mqtt_request.topic[sizeof(g_mqtt_request.topic) - 1] = 0;
break;
}
}
//PR_NOTICE("MQTT client in mqtt_incoming_publish_cb\n");
PR_NOTICE("MQTT client in mqtt_incoming_publish_cb topic %s\n",topic);
// TODO: better
// g_incoming_channel_mqtt = topic[5] - '0';
p = topic;
while(*p != '/') {
if(*p == 0)
return;
p++;
}
p++;
g_incoming_channel_mqtt = *p - '0';
// PR_NOTICE(("MQTT client \"%s\" publish cb: topic %s, len %d\n", client_info->client_id, topic, (int)tot_len));
}
static void
@ -158,9 +415,10 @@ static void mqtt_sub_request_cb(void *arg, err_t result)
notifying user, retry subscribe or disconnect from server */
PR_NOTICE("Subscribe result: %i\n", result);
}
void example_do_connect(mqtt_client_t *client);
static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
{
int i;
char tmp[64];
const char *baseName;
err_t err = ERR_OK;
@ -173,26 +431,32 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection
if (status == MQTT_CONNECT_ACCEPTED) {
PR_NOTICE("mqtt_connection_cb: Successfully connected\n");
mqtt_set_inpub_callback(mqtt_client,
mqtt_set_inpub_callback(mqtt_client,
mqtt_incoming_publish_cb,
mqtt_incoming_data_cb,
LWIP_CONST_CAST(void*, &mqtt_client_info));
/* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */
baseName = CFG_GetShortDeviceName();
// + is a MQTT wildcard
sprintf(tmp,"%s/+/set",baseName);
err = mqtt_sub_unsub(client,
// "wb2s/+/set", 1,
tmp, 1,
// subscribe to all callback subscription topics
// this makes a BIG assumption that we can subscribe multiple times to the same one?
// TODO - check that subscribing multiple times to the same topic is not BAD
for (i = 0; i < numCallbacks; i++){
if (callbacks[i]){
if (callbacks[i]->subscriptionTopic && callbacks[i]->subscriptionTopic[0]){
err = mqtt_sub_unsub(client,
callbacks[i]->subscriptionTopic, 1,
mqtt_request_cb, LWIP_CONST_CAST(void*, client_info),
1);
if(err != ERR_OK) {
PR_NOTICE("mqtt_subscribe return: %d\n", err);
if(err != ERR_OK) {
PR_NOTICE("mqtt_subscribe to %s return: %d\n", callbacks[i]->subscriptionTopic, err);
} else {
PR_NOTICE("mqtt_subscribed to %s\n", callbacks[i]->subscriptionTopic);
}
}
}
}
baseName = CFG_GetShortDeviceName();
sprintf(tmp,"%s/connected",baseName);
err = mqtt_publish(client, tmp, "online", strlen("online"), 2, true, mqtt_pub_request_cb, 0);
if(err != ERR_OK) {
@ -212,12 +476,10 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection
// 1);
} else {
PR_NOTICE("mqtt_connection_cb: Disconnected, reason: %d\n", status);
// example_do_connect(client);
}
}
void example_do_connect(mqtt_client_t *client)
static void MQTT_do_connect(mqtt_client_t *client)
{
const char *mqtt_userName, *mqtt_host, *mqtt_pass, *mqtt_clientID;
int mqtt_port;
@ -294,9 +556,44 @@ static void app_my_channel_toggle_callback(int channel, int iVal)
example_publish(mqtt_client,channel,iVal);
}
int loopsWithDisconnected = 0;
// initialise things MQTT
// called from user_main
void MQTT_init(){
char cbtopicbase[64];
char cbtopicsub[64];
const char *baseName;
baseName = CFG_GetShortDeviceName();
// register the main set channel callback
sprintf(cbtopicbase,"%s/",baseName);
sprintf(cbtopicsub,"%s/+/set",baseName);
// note: this may REPLACE an existing entry with the same ID. ID 1 !!!
MQTT_RegisterCallback( cbtopicbase, cbtopicsub, 1, channelSet);
// register the TAS cmnd callback
sprintf(cbtopicbase,"cmnd/%s/",baseName);
sprintf(cbtopicsub,"cmnd/%s/+",baseName);
// note: this may REPLACE an existing entry with the same ID. ID 2 !!!
MQTT_RegisterCallback( cbtopicbase, cbtopicsub, 2, tasCmnd);
}
// called from user timer.
void MQTT_RunEverySecondUpdate() {
// if asked to reconnect (e.g. change of topic(s))
if (mqtt_reconnect > 0){
mqtt_reconnect --;
if (mqtt_reconnect == 0){
// then if connected, disconnect, and then it will reconnect automatically in 2s
if (mqtt_client && mqtt_client_is_connected(mqtt_client)) {
MQTT_disconnect(mqtt_client);
loopsWithDisconnected = 8;
}
}
}
if(mqtt_client == 0 || mqtt_client_is_connected(mqtt_client) == 0) {
ADDLOG_INFO(LOG_FEATURE_MAIN, "Timer discovers disconnected mqtt %i\n",loopsWithDisconnected);
loopsWithDisconnected++;
@ -306,8 +603,8 @@ void MQTT_RunEverySecondUpdate() {
{
mqtt_client = mqtt_client_new();
CHANNEL_SetChangeCallback(app_my_channel_toggle_callback);
}
example_do_connect(mqtt_client);
}
MQTT_do_connect(mqtt_client);
loopsWithDisconnected = 0;
}
}

View File

@ -12,10 +12,28 @@
extern ip_addr_t mqtt_ip;
extern mqtt_client_t* mqtt_client;
void mqtt_example_init(void);
void example_do_connect(mqtt_client_t *client);
//void mqtt_example_init(void);
//void example_do_connect(mqtt_client_t *client);
void example_publish(mqtt_client_t *client, int channel, int iVal);
void MQTT_init();
void MQTT_RunEverySecondUpdate();
// ability to register callbacks for MQTT data
typedef struct mqtt_request_tag {
const unsigned char *received; // note: NOT terminated, may be binary
int receivedLen;
char topic[128];
} mqtt_request_t;
// callback function for mqtt.
// return 0 to allow the incoming topic/data to be processed by others/channel set.
// return 1 to 'eat the packet and terminate further processing.
typedef int (*mqtt_callback_fn)(mqtt_request_t *request);
// topics must be unique (i.e. you can't have /about and /aboutme or /about/me)
// ALL topics currently must start with main device topic root.
// ID is unique and non-zero - so that callbacks can be replaced....
int MQTT_RegisterCallback( const char *basetopic, const char *subscriptiontopic, int ID, mqtt_callback_fn callback);
int MQTT_RemoveCallback(int ID);

View File

@ -370,6 +370,10 @@ void user_main(void)
// initialise rest interface
init_rest();
// initialise MQTT - just sets up variables.
// all MQTT happens in timer thread?
MQTT_init();
err = rtos_init_timer(&led_timer,
1 * 1000,
app_led_timer_handler,