Skip to content

Commit a3cee13

Browse files
committed
Add proper MQTT QoS
1 parent 803c805 commit a3cee13

2 files changed

Lines changed: 224 additions & 13 deletions

File tree

src/output_mqtt.c

Lines changed: 223 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,122 @@
2323

2424
#include "mongoose.h"
2525

26+
/* MQTT transmission list */
27+
28+
typedef struct mqtt_msg {
29+
char *topic; //!< NULL after QoS 2 release
30+
char *msg; //!< NULL after QoS 2 release
31+
double timeout;
32+
int retries;
33+
uint16_t mid;
34+
} mqtt_msg_t;
35+
36+
/// Dynamically growing list, call list_ensure_size() to alloc elems.
37+
typedef struct inflight {
38+
mqtt_msg_t *elems;
39+
size_t size;
40+
size_t len;
41+
} inflight_t;
42+
43+
static void inflight_ensure_size(inflight_t *list, size_t min_size)
44+
{
45+
if (!list->elems || list->size < min_size) {
46+
// the input pointer is still valid if reallocation fails
47+
void *elems_realloc = realloc(list->elems, min_size * sizeof(*list->elems));
48+
if (!elems_realloc) {
49+
FATAL_REALLOC("list_ensure_size()");
50+
}
51+
list->elems = elems_realloc;
52+
list->size = min_size;
53+
}
54+
}
55+
56+
static void inflight_add(inflight_t *list, char const *topic, uint16_t mid, char const *msg)
57+
{
58+
if (list->len >= list->size) {
59+
inflight_ensure_size(list, list->size < 8 ? 8 : list->size + list->size / 2);
60+
}
61+
62+
char *topic_dup = strdup(topic);
63+
if (!topic_dup) {
64+
WARN_STRDUP("inflight_add()");
65+
return; // this just ignores the error
66+
}
67+
char *msg_dup = strdup(msg);
68+
if (!msg_dup) {
69+
WARN_STRDUP("inflight_add()");
70+
free(topic_dup);
71+
return; // this just ignores the error
72+
}
73+
74+
list->elems[list->len++] = (mqtt_msg_t) {
75+
.topic = topic_dup,
76+
.msg = msg_dup,
77+
.timeout = mg_time() + 1.2,
78+
.retries = 0,
79+
.mid = mid,
80+
};
81+
}
82+
83+
static void inflight_remove_at(inflight_t *list, size_t idx)
84+
{
85+
if (idx >= list->len) {
86+
return; // report error?
87+
}
88+
free(list->elems[idx].topic);
89+
free(list->elems[idx].msg);
90+
list->len--;
91+
if (list->len > 0) {
92+
list->elems[idx] = list->elems[list->len];
93+
}
94+
}
95+
96+
static int inflight_remove(inflight_t *list, uint16_t mid)
97+
{
98+
for (size_t i = 0; i < list->len; ++i) {
99+
if (list->elems[i].mid == mid) {
100+
inflight_remove_at(list, i);
101+
return i;
102+
}
103+
}
104+
105+
return -1;
106+
}
107+
108+
static int inflight_release(inflight_t *list, uint16_t mid)
109+
{
110+
for (size_t i = 0; i < list->len; ++i) {
111+
if (list->elems[i].mid == mid) {
112+
free(list->elems[i].topic);
113+
list->elems[i].topic = NULL;
114+
115+
free(list->elems[i].msg);
116+
list->elems[i].msg = NULL;
117+
118+
return i;
119+
}
120+
}
121+
122+
return -1;
123+
}
124+
125+
static void inflight_clear(inflight_t *list)
126+
{
127+
for (size_t i = 0; i < list->len; ++i) {
128+
free(list->elems[i].topic);
129+
free(list->elems[i].msg);
130+
}
131+
list->len = 0;
132+
}
133+
134+
static void inflight_free(inflight_t *list)
135+
{
136+
inflight_clear(list);
137+
free(list->elems);
138+
list->elems = NULL;
139+
list->size = 0;
140+
}
141+
26142
/* MQTT client abstraction */
27143

28144
typedef struct mqtt_client {
@@ -36,6 +152,8 @@ typedef struct mqtt_client {
36152
char client_id[256];
37153
uint16_t message_id;
38154
int publish_flags; // MG_MQTT_RETAIN | MG_MQTT_QOS(0)
155+
unsigned qos;
156+
inflight_t inflight;
39157
} mqtt_client_t;
40158

41159
char const *mqtt_availability_online = "online";
@@ -61,6 +179,9 @@ static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
61179
if (ctx) {
62180
ctx->reconnect_delay = 0;
63181
mg_send_mqtt_handshake_opt(nc, ctx->client_id, ctx->mqtt_opts);
182+
183+
// Send us MG_EV_TIMER event after 500 milliseconds
184+
mg_set_timer(ctx->timer, mg_time() + 0.5);
64185
}
65186
}
66187
else {
@@ -86,17 +207,56 @@ static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
86207
}
87208
}
88209
break;
89-
case MG_EV_MQTT_PUBACK:
90-
print_logf(LOG_NOTICE, "MQTT", "MQTT Message publishing acknowledged (msg_id: %u)", msg->message_id);
91-
break;
210+
92211
case MG_EV_MQTT_SUBACK:
93212
print_log(LOG_NOTICE, "MQTT", "MQTT Subscription acknowledged.");
94213
break;
214+
95215
case MG_EV_MQTT_PUBLISH: {
96-
print_logf(LOG_NOTICE, "MQTT", "MQTT Incoming message %.*s: %.*s", (int)msg->topic.len,
216+
// This is not expected to happen
217+
print_logf(LOG_WARNING, "MQTT", "MQTT Incoming message %.*s: %.*s", (int)msg->topic.len,
97218
msg->topic.p, (int)msg->payload.len, msg->payload.p);
98219
break;
99220
}
221+
222+
// QoS 1
223+
// > Publish message (id)
224+
// < Publish acknowledged (id)
225+
case MG_EV_MQTT_PUBACK:
226+
if (inflight_remove(&ctx->inflight, msg->message_id) >= 0) {
227+
print_logf(LOG_DEBUG, "MQTT", "MQTT Message publishing acknowledged (msg_id %u)", msg->message_id);
228+
} else {
229+
print_logf(LOG_NOTICE, "MQTT", "MQTT Message unknown publishing acknowledged (msg_id %u)", msg->message_id);
230+
}
231+
break;
232+
233+
// QoS 2
234+
// > Publish message (id)
235+
// < Publish received (id)
236+
// > Publish release (id)
237+
// < Publish complete (id)
238+
case MG_EV_MQTT_PUBREC:
239+
if (inflight_release(&ctx->inflight, msg->message_id) >= 0) {
240+
print_logf(LOG_DEBUG, "MQTT", "MQTT Message publishing received (msg_id %u)", msg->message_id);
241+
mg_mqtt_pubrel(ctx->conn, msg->message_id);
242+
}
243+
else {
244+
print_logf(LOG_NOTICE, "MQTT", "MQTT Message unknown publishing received (msg_id %u)", msg->message_id);
245+
}
246+
break;
247+
case MG_EV_MQTT_PUBREL:
248+
// This is not expected to happen
249+
print_logf(LOG_WARNING, "MQTT", "MQTT Incoming release (msg_id %u)", msg->message_id);
250+
break;
251+
case MG_EV_MQTT_PUBCOMP:
252+
if (inflight_remove(&ctx->inflight, msg->message_id) >= 0) {
253+
print_logf(LOG_DEBUG, "MQTT", "MQTT Message publishing complete (msg_id %u)", msg->message_id);
254+
}
255+
else {
256+
print_logf(LOG_NOTICE, "MQTT", "MQTT Message unknown publishing complete (msg_id %u)", msg->message_id);
257+
}
258+
break;
259+
100260
case MG_EV_CLOSE:
101261
if (!ctx) {
102262
break; // shutting down
@@ -129,14 +289,52 @@ static void mqtt_client_timer(struct mg_connection *nc, int ev, void *ev_data)
129289

130290
switch (ev) {
131291
case MG_EV_TIMER: {
132-
// Try to reconnect
133-
char const *error_string = NULL;
134-
ctx->connect_opts.error_string = &error_string;
135-
ctx->conn = mg_connect_opt(nc->mgr, ctx->address, mqtt_client_event, ctx->connect_opts);
136-
ctx->connect_opts.error_string = NULL;
137292
if (!ctx->conn) {
138-
print_logf(LOG_WARNING, "MQTT", "MQTT connect (%s) failed%s%s", ctx->address,
139-
error_string ? ": " : "", error_string ? error_string : "");
293+
// Try to reconnect
294+
char const *error_string = NULL;
295+
ctx->connect_opts.error_string = &error_string;
296+
ctx->conn = mg_connect_opt(nc->mgr, ctx->address, mqtt_client_event, ctx->connect_opts);
297+
ctx->connect_opts.error_string = NULL;
298+
if (!ctx->conn) {
299+
print_logf(LOG_WARNING, "MQTT", "MQTT connect (%s) failed%s%s", ctx->address,
300+
error_string ? ": " : "", error_string ? error_string : "");
301+
}
302+
}
303+
else {
304+
// Process the QoS timer
305+
double now = *(double *)ev_data;
306+
double next = mg_time() + 0.5;
307+
// fprintf(stderr, "timer event, current time: %.2lf, next timer: %.2lf\n", now, next);
308+
mg_set_timer(nc, next); // Send us timer event again after 500 milliseconds
309+
310+
if (!ctx->conn || !ctx->conn->proto_handler) {
311+
break;
312+
}
313+
314+
// check inflight...
315+
for (size_t i = 0; i < ctx->inflight.len; ++i) {
316+
mqtt_msg_t *elem = &ctx->inflight.elems[i];
317+
if (elem->timeout < now) {
318+
if (ctx->qos == 1) {
319+
print_logf(LOG_NOTICE, "MQTT", "MQTT resending (msg_id %u, retry %d)", elem->mid, elem->retries + 1);
320+
mg_mqtt_publish(ctx->conn, elem->topic, elem->mid, ctx->publish_flags | MG_MQTT_DUP, elem->msg, strlen(elem->msg));
321+
elem->timeout = now + 1.2;
322+
elem->retries += 1;
323+
}
324+
else if (elem->topic) { // qos 2, first half
325+
print_logf(LOG_NOTICE, "MQTT", "MQTT resending (msg_id %u, retry %d)", elem->mid, elem->retries + 1);
326+
mg_mqtt_publish(ctx->conn, elem->topic, elem->mid, ctx->publish_flags | MG_MQTT_DUP, elem->msg, strlen(elem->msg));
327+
elem->timeout = now + 1.2;
328+
elem->retries += 1;
329+
}
330+
else { // qos 2, second half
331+
print_logf(LOG_NOTICE, "MQTT", "MQTT reconfirming (msg_id %u, retry %d)", elem->mid, elem->retries + 1);
332+
mg_mqtt_pubrel(ctx->conn, elem->mid);
333+
elem->timeout = now + 1.2;
334+
elem->retries += 1;
335+
}
336+
}
337+
}
140338
}
141339
break;
142340
}
@@ -149,6 +347,7 @@ static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, tls_opts_t *tls_opts,
149347
if (!ctx)
150348
FATAL_CALLOC("mqtt_client_init()");
151349

350+
ctx->qos = qos;
152351
ctx->mqtt_opts.user_name = user;
153352
ctx->mqtt_opts.password = pass;
154353
ctx->mqtt_opts.will_topic = availability;
@@ -217,19 +416,31 @@ static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, tls_opts_t *tls_opts,
217416

218417
static void mqtt_client_publish(mqtt_client_t *ctx, char const *topic, char const *str)
219418
{
419+
ctx->message_id++;
420+
if (ctx->qos > 0) {
421+
inflight_add(&ctx->inflight, topic, ctx->message_id, str);
422+
print_logf(LOG_DEBUG, "MQTT", "MQTT publishing: %d (%zu inflight)", ctx->message_id, ctx->inflight.len);
423+
}
424+
220425
if (!ctx->conn || !ctx->conn->proto_handler)
221426
return;
222427

223-
ctx->message_id++;
224428
mg_mqtt_publish(ctx->conn, topic, ctx->message_id, ctx->publish_flags, str, strlen(str));
225429
}
226430

227431
static void mqtt_client_free(mqtt_client_t *ctx)
228432
{
433+
if (ctx && ctx->timer) {
434+
mg_set_timer(ctx->timer, 0); // Clear retry timer
435+
}
229436
if (ctx && ctx->conn) {
230437
ctx->conn->user_data = NULL;
231438
ctx->conn->flags |= MG_F_CLOSE_IMMEDIATELY;
232439
}
440+
if (ctx) {
441+
inflight_free(&ctx->inflight);
442+
}
443+
233444
free(ctx);
234445
}
235446

src/rtl_433.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2104,7 +2104,7 @@ int main(int argc, char **argv) {
21042104

21052105
time(&cfg->hop_start_time);
21062106

2107-
// add dummy socket to receive broadcasts
2107+
// add dummy socket to receive timer broadcasts
21082108
struct mg_add_sock_opts opts = {.user_data = cfg};
21092109
struct mg_connection *nc = mg_add_sock_opt(get_mgr(cfg), INVALID_SOCKET, timer_handler, opts);
21102110
// Send us MG_EV_TIMER event after 2.5 seconds

0 commit comments

Comments
 (0)