2323
2424#include "mongoose.h"
2525
26+ /* MQTT transmission list */
27+
28+ typedef struct mqtt_msg {
29+ char * topic ; //!< NULL after QoS 2 release
30+ char * msg ; //!< NULL after QoS 2 release
31+ double timeout ;
32+ int retries ;
33+ uint16_t mid ;
34+ } mqtt_msg_t ;
35+
36+ /// Dynamically growing list, call list_ensure_size() to alloc elems.
37+ typedef struct inflight {
38+ mqtt_msg_t * elems ;
39+ size_t size ;
40+ size_t len ;
41+ } inflight_t ;
42+
43+ static void inflight_ensure_size (inflight_t * list , size_t min_size )
44+ {
45+ if (!list -> elems || list -> size < min_size ) {
46+ // the input pointer is still valid if reallocation fails
47+ void * elems_realloc = realloc (list -> elems , min_size * sizeof (* list -> elems ));
48+ if (!elems_realloc ) {
49+ FATAL_REALLOC ("list_ensure_size()" );
50+ }
51+ list -> elems = elems_realloc ;
52+ list -> size = min_size ;
53+ }
54+ }
55+
56+ static void inflight_add (inflight_t * list , char const * topic , uint16_t mid , char const * msg )
57+ {
58+ if (list -> len >= list -> size ) {
59+ inflight_ensure_size (list , list -> size < 8 ? 8 : list -> size + list -> size / 2 );
60+ }
61+
62+ char * topic_dup = strdup (topic );
63+ if (!topic_dup ) {
64+ WARN_STRDUP ("inflight_add()" );
65+ return ; // this just ignores the error
66+ }
67+ char * msg_dup = strdup (msg );
68+ if (!msg_dup ) {
69+ WARN_STRDUP ("inflight_add()" );
70+ free (topic_dup );
71+ return ; // this just ignores the error
72+ }
73+
74+ list -> elems [list -> len ++ ] = (mqtt_msg_t ) {
75+ .topic = topic_dup ,
76+ .msg = msg_dup ,
77+ .timeout = mg_time () + 1.2 ,
78+ .retries = 0 ,
79+ .mid = mid ,
80+ };
81+ }
82+
83+ static void inflight_remove_at (inflight_t * list , size_t idx )
84+ {
85+ if (idx >= list -> len ) {
86+ return ; // report error?
87+ }
88+ free (list -> elems [idx ].topic );
89+ free (list -> elems [idx ].msg );
90+ list -> len -- ;
91+ if (list -> len > 0 ) {
92+ list -> elems [idx ] = list -> elems [list -> len ];
93+ }
94+ }
95+
96+ static int inflight_remove (inflight_t * list , uint16_t mid )
97+ {
98+ for (size_t i = 0 ; i < list -> len ; ++ i ) {
99+ if (list -> elems [i ].mid == mid ) {
100+ inflight_remove_at (list , i );
101+ return i ;
102+ }
103+ }
104+
105+ return -1 ;
106+ }
107+
108+ static int inflight_release (inflight_t * list , uint16_t mid )
109+ {
110+ for (size_t i = 0 ; i < list -> len ; ++ i ) {
111+ if (list -> elems [i ].mid == mid ) {
112+ free (list -> elems [i ].topic );
113+ list -> elems [i ].topic = NULL ;
114+
115+ free (list -> elems [i ].msg );
116+ list -> elems [i ].msg = NULL ;
117+
118+ return i ;
119+ }
120+ }
121+
122+ return -1 ;
123+ }
124+
125+ static void inflight_clear (inflight_t * list )
126+ {
127+ for (size_t i = 0 ; i < list -> len ; ++ i ) {
128+ free (list -> elems [i ].topic );
129+ free (list -> elems [i ].msg );
130+ }
131+ list -> len = 0 ;
132+ }
133+
134+ static void inflight_free (inflight_t * list )
135+ {
136+ inflight_clear (list );
137+ free (list -> elems );
138+ list -> elems = NULL ;
139+ list -> size = 0 ;
140+ }
141+
26142/* MQTT client abstraction */
27143
28144typedef struct mqtt_client {
@@ -36,6 +152,8 @@ typedef struct mqtt_client {
36152 char client_id [256 ];
37153 uint16_t message_id ;
38154 int publish_flags ; // MG_MQTT_RETAIN | MG_MQTT_QOS(0)
155+ unsigned qos ;
156+ inflight_t inflight ;
39157} mqtt_client_t ;
40158
41159char const * mqtt_availability_online = "online" ;
@@ -61,6 +179,9 @@ static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
61179 if (ctx ) {
62180 ctx -> reconnect_delay = 0 ;
63181 mg_send_mqtt_handshake_opt (nc , ctx -> client_id , ctx -> mqtt_opts );
182+
183+ // Send us MG_EV_TIMER event after 500 milliseconds
184+ mg_set_timer (ctx -> timer , mg_time () + 0.5 );
64185 }
65186 }
66187 else {
@@ -86,17 +207,56 @@ static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
86207 }
87208 }
88209 break ;
89- case MG_EV_MQTT_PUBACK :
90- print_logf (LOG_NOTICE , "MQTT" , "MQTT Message publishing acknowledged (msg_id: %u)" , msg -> message_id );
91- break ;
210+
92211 case MG_EV_MQTT_SUBACK :
93212 print_log (LOG_NOTICE , "MQTT" , "MQTT Subscription acknowledged." );
94213 break ;
214+
95215 case MG_EV_MQTT_PUBLISH : {
96- print_logf (LOG_NOTICE , "MQTT" , "MQTT Incoming message %.*s: %.*s" , (int )msg -> topic .len ,
216+ // This is not expected to happen
217+ print_logf (LOG_WARNING , "MQTT" , "MQTT Incoming message %.*s: %.*s" , (int )msg -> topic .len ,
97218 msg -> topic .p , (int )msg -> payload .len , msg -> payload .p );
98219 break ;
99220 }
221+
222+ // QoS 1
223+ // > Publish message (id)
224+ // < Publish acknowledged (id)
225+ case MG_EV_MQTT_PUBACK :
226+ if (inflight_remove (& ctx -> inflight , msg -> message_id ) >= 0 ) {
227+ print_logf (LOG_DEBUG , "MQTT" , "MQTT Message publishing acknowledged (msg_id %u)" , msg -> message_id );
228+ } else {
229+ print_logf (LOG_NOTICE , "MQTT" , "MQTT Message unknown publishing acknowledged (msg_id %u)" , msg -> message_id );
230+ }
231+ break ;
232+
233+ // QoS 2
234+ // > Publish message (id)
235+ // < Publish received (id)
236+ // > Publish release (id)
237+ // < Publish complete (id)
238+ case MG_EV_MQTT_PUBREC :
239+ if (inflight_release (& ctx -> inflight , msg -> message_id ) >= 0 ) {
240+ print_logf (LOG_DEBUG , "MQTT" , "MQTT Message publishing received (msg_id %u)" , msg -> message_id );
241+ mg_mqtt_pubrel (ctx -> conn , msg -> message_id );
242+ }
243+ else {
244+ print_logf (LOG_NOTICE , "MQTT" , "MQTT Message unknown publishing received (msg_id %u)" , msg -> message_id );
245+ }
246+ break ;
247+ case MG_EV_MQTT_PUBREL :
248+ // This is not expected to happen
249+ print_logf (LOG_WARNING , "MQTT" , "MQTT Incoming release (msg_id %u)" , msg -> message_id );
250+ break ;
251+ case MG_EV_MQTT_PUBCOMP :
252+ if (inflight_remove (& ctx -> inflight , msg -> message_id ) >= 0 ) {
253+ print_logf (LOG_DEBUG , "MQTT" , "MQTT Message publishing complete (msg_id %u)" , msg -> message_id );
254+ }
255+ else {
256+ print_logf (LOG_NOTICE , "MQTT" , "MQTT Message unknown publishing complete (msg_id %u)" , msg -> message_id );
257+ }
258+ break ;
259+
100260 case MG_EV_CLOSE :
101261 if (!ctx ) {
102262 break ; // shutting down
@@ -129,14 +289,52 @@ static void mqtt_client_timer(struct mg_connection *nc, int ev, void *ev_data)
129289
130290 switch (ev ) {
131291 case MG_EV_TIMER : {
132- // Try to reconnect
133- char const * error_string = NULL ;
134- ctx -> connect_opts .error_string = & error_string ;
135- ctx -> conn = mg_connect_opt (nc -> mgr , ctx -> address , mqtt_client_event , ctx -> connect_opts );
136- ctx -> connect_opts .error_string = NULL ;
137292 if (!ctx -> conn ) {
138- print_logf (LOG_WARNING , "MQTT" , "MQTT connect (%s) failed%s%s" , ctx -> address ,
139- error_string ? ": " : "" , error_string ? error_string : "" );
293+ // Try to reconnect
294+ char const * error_string = NULL ;
295+ ctx -> connect_opts .error_string = & error_string ;
296+ ctx -> conn = mg_connect_opt (nc -> mgr , ctx -> address , mqtt_client_event , ctx -> connect_opts );
297+ ctx -> connect_opts .error_string = NULL ;
298+ if (!ctx -> conn ) {
299+ print_logf (LOG_WARNING , "MQTT" , "MQTT connect (%s) failed%s%s" , ctx -> address ,
300+ error_string ? ": " : "" , error_string ? error_string : "" );
301+ }
302+ }
303+ else {
304+ // Process the QoS timer
305+ double now = * (double * )ev_data ;
306+ double next = mg_time () + 0.5 ;
307+ // fprintf(stderr, "timer event, current time: %.2lf, next timer: %.2lf\n", now, next);
308+ mg_set_timer (nc , next ); // Send us timer event again after 500 milliseconds
309+
310+ if (!ctx -> conn || !ctx -> conn -> proto_handler ) {
311+ break ;
312+ }
313+
314+ // check inflight...
315+ for (size_t i = 0 ; i < ctx -> inflight .len ; ++ i ) {
316+ mqtt_msg_t * elem = & ctx -> inflight .elems [i ];
317+ if (elem -> timeout < now ) {
318+ if (ctx -> qos == 1 ) {
319+ print_logf (LOG_NOTICE , "MQTT" , "MQTT resending (msg_id %u, retry %d)" , elem -> mid , elem -> retries + 1 );
320+ mg_mqtt_publish (ctx -> conn , elem -> topic , elem -> mid , ctx -> publish_flags | MG_MQTT_DUP , elem -> msg , strlen (elem -> msg ));
321+ elem -> timeout = now + 1.2 ;
322+ elem -> retries += 1 ;
323+ }
324+ else if (elem -> topic ) { // qos 2, first half
325+ print_logf (LOG_NOTICE , "MQTT" , "MQTT resending (msg_id %u, retry %d)" , elem -> mid , elem -> retries + 1 );
326+ mg_mqtt_publish (ctx -> conn , elem -> topic , elem -> mid , ctx -> publish_flags | MG_MQTT_DUP , elem -> msg , strlen (elem -> msg ));
327+ elem -> timeout = now + 1.2 ;
328+ elem -> retries += 1 ;
329+ }
330+ else { // qos 2, second half
331+ print_logf (LOG_NOTICE , "MQTT" , "MQTT reconfirming (msg_id %u, retry %d)" , elem -> mid , elem -> retries + 1 );
332+ mg_mqtt_pubrel (ctx -> conn , elem -> mid );
333+ elem -> timeout = now + 1.2 ;
334+ elem -> retries += 1 ;
335+ }
336+ }
337+ }
140338 }
141339 break ;
142340 }
@@ -149,6 +347,7 @@ static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, tls_opts_t *tls_opts,
149347 if (!ctx )
150348 FATAL_CALLOC ("mqtt_client_init()" );
151349
350+ ctx -> qos = qos ;
152351 ctx -> mqtt_opts .user_name = user ;
153352 ctx -> mqtt_opts .password = pass ;
154353 ctx -> mqtt_opts .will_topic = availability ;
@@ -217,19 +416,31 @@ static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, tls_opts_t *tls_opts,
217416
218417static void mqtt_client_publish (mqtt_client_t * ctx , char const * topic , char const * str )
219418{
419+ ctx -> message_id ++ ;
420+ if (ctx -> qos > 0 ) {
421+ inflight_add (& ctx -> inflight , topic , ctx -> message_id , str );
422+ print_logf (LOG_DEBUG , "MQTT" , "MQTT publishing: %d (%zu inflight)" , ctx -> message_id , ctx -> inflight .len );
423+ }
424+
220425 if (!ctx -> conn || !ctx -> conn -> proto_handler )
221426 return ;
222427
223- ctx -> message_id ++ ;
224428 mg_mqtt_publish (ctx -> conn , topic , ctx -> message_id , ctx -> publish_flags , str , strlen (str ));
225429}
226430
227431static void mqtt_client_free (mqtt_client_t * ctx )
228432{
433+ if (ctx && ctx -> timer ) {
434+ mg_set_timer (ctx -> timer , 0 ); // Clear retry timer
435+ }
229436 if (ctx && ctx -> conn ) {
230437 ctx -> conn -> user_data = NULL ;
231438 ctx -> conn -> flags |= MG_F_CLOSE_IMMEDIATELY ;
232439 }
440+ if (ctx ) {
441+ inflight_free (& ctx -> inflight );
442+ }
443+
233444 free (ctx );
234445}
235446
0 commit comments