Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 223 additions & 12 deletions src/output_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,122 @@

#include "mongoose.h"

/* MQTT transmission list */

typedef struct mqtt_msg {
char *topic; //!< NULL after QoS 2 release
char *msg; //!< NULL after QoS 2 release
double timeout;
int retries;
uint16_t mid;
} mqtt_msg_t;

/// Dynamically growing list, call list_ensure_size() to alloc elems.
typedef struct inflight {
mqtt_msg_t *elems;
size_t size;
size_t len;
} inflight_t;

static void inflight_ensure_size(inflight_t *list, size_t min_size)
{
if (!list->elems || list->size < min_size) {
// the input pointer is still valid if reallocation fails
void *elems_realloc = realloc(list->elems, min_size * sizeof(*list->elems));
if (!elems_realloc) {
FATAL_REALLOC("list_ensure_size()");
}
list->elems = elems_realloc;
list->size = min_size;
}
}

static void inflight_add(inflight_t *list, char const *topic, uint16_t mid, char const *msg)
{
if (list->len >= list->size) {
inflight_ensure_size(list, list->size < 8 ? 8 : list->size + list->size / 2);
}

char *topic_dup = strdup(topic);
if (!topic_dup) {
WARN_STRDUP("inflight_add()");
return; // this just ignores the error
}
char *msg_dup = strdup(msg);
if (!msg_dup) {
WARN_STRDUP("inflight_add()");
free(topic_dup);
return; // this just ignores the error
}

list->elems[list->len++] = (mqtt_msg_t) {
.topic = topic_dup,
.msg = msg_dup,
.timeout = mg_time() + 1.2,
.retries = 0,
.mid = mid,
};
}

static void inflight_remove_at(inflight_t *list, size_t idx)
{
if (idx >= list->len) {
return; // report error?
}
free(list->elems[idx].topic);
free(list->elems[idx].msg);
list->len--;
if (list->len > 0) {
list->elems[idx] = list->elems[list->len];
}
}

static int inflight_remove(inflight_t *list, uint16_t mid)
{
for (size_t i = 0; i < list->len; ++i) {
if (list->elems[i].mid == mid) {
inflight_remove_at(list, i);
return i;
}
}

return -1;
}

static int inflight_release(inflight_t *list, uint16_t mid)
{
for (size_t i = 0; i < list->len; ++i) {
if (list->elems[i].mid == mid) {
free(list->elems[i].topic);
list->elems[i].topic = NULL;

free(list->elems[i].msg);
list->elems[i].msg = NULL;

return i;
}
}

return -1;
}

static void inflight_clear(inflight_t *list)
{
for (size_t i = 0; i < list->len; ++i) {
free(list->elems[i].topic);
free(list->elems[i].msg);
}
list->len = 0;
}

static void inflight_free(inflight_t *list)
{
inflight_clear(list);
free(list->elems);
list->elems = NULL;
list->size = 0;
}

/* MQTT client abstraction */

typedef struct mqtt_client {
Expand All @@ -36,6 +152,8 @@ typedef struct mqtt_client {
char client_id[256];
uint16_t message_id;
int publish_flags; // MG_MQTT_RETAIN | MG_MQTT_QOS(0)
unsigned qos;
inflight_t inflight;
} mqtt_client_t;

char const *mqtt_availability_online = "online";
Expand All @@ -61,6 +179,9 @@ static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
if (ctx) {
ctx->reconnect_delay = 0;
mg_send_mqtt_handshake_opt(nc, ctx->client_id, ctx->mqtt_opts);

// Send us MG_EV_TIMER event after 500 milliseconds
mg_set_timer(ctx->timer, mg_time() + 0.5);
}
}
else {
Expand All @@ -86,17 +207,56 @@ static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
}
}
break;
case MG_EV_MQTT_PUBACK:
print_logf(LOG_NOTICE, "MQTT", "MQTT Message publishing acknowledged (msg_id: %u)", msg->message_id);
break;

case MG_EV_MQTT_SUBACK:
print_log(LOG_NOTICE, "MQTT", "MQTT Subscription acknowledged.");
break;

case MG_EV_MQTT_PUBLISH: {
print_logf(LOG_NOTICE, "MQTT", "MQTT Incoming message %.*s: %.*s", (int)msg->topic.len,
// This is not expected to happen
print_logf(LOG_WARNING, "MQTT", "MQTT Incoming message %.*s: %.*s", (int)msg->topic.len,
msg->topic.p, (int)msg->payload.len, msg->payload.p);
break;
}

// QoS 1
// > Publish message (id)
// < Publish acknowledged (id)
case MG_EV_MQTT_PUBACK:
if (inflight_remove(&ctx->inflight, msg->message_id) >= 0) {
print_logf(LOG_DEBUG, "MQTT", "MQTT Message publishing acknowledged (msg_id %u)", msg->message_id);
} else {
print_logf(LOG_NOTICE, "MQTT", "MQTT Message unknown publishing acknowledged (msg_id %u)", msg->message_id);
}
break;

// QoS 2
// > Publish message (id)
// < Publish received (id)
// > Publish release (id)
// < Publish complete (id)
case MG_EV_MQTT_PUBREC:
if (inflight_release(&ctx->inflight, msg->message_id) >= 0) {
print_logf(LOG_DEBUG, "MQTT", "MQTT Message publishing received (msg_id %u)", msg->message_id);
mg_mqtt_pubrel(ctx->conn, msg->message_id);
}
else {
print_logf(LOG_NOTICE, "MQTT", "MQTT Message unknown publishing received (msg_id %u)", msg->message_id);
}
break;
case MG_EV_MQTT_PUBREL:
// This is not expected to happen
print_logf(LOG_WARNING, "MQTT", "MQTT Incoming release (msg_id %u)", msg->message_id);
break;
case MG_EV_MQTT_PUBCOMP:
if (inflight_remove(&ctx->inflight, msg->message_id) >= 0) {
print_logf(LOG_DEBUG, "MQTT", "MQTT Message publishing complete (msg_id %u)", msg->message_id);
}
else {
print_logf(LOG_NOTICE, "MQTT", "MQTT Message unknown publishing complete (msg_id %u)", msg->message_id);
}
break;

case MG_EV_CLOSE:
if (!ctx) {
break; // shutting down
Expand Down Expand Up @@ -129,14 +289,52 @@ static void mqtt_client_timer(struct mg_connection *nc, int ev, void *ev_data)

switch (ev) {
case MG_EV_TIMER: {
// Try to reconnect
char const *error_string = NULL;
ctx->connect_opts.error_string = &error_string;
ctx->conn = mg_connect_opt(nc->mgr, ctx->address, mqtt_client_event, ctx->connect_opts);
ctx->connect_opts.error_string = NULL;
if (!ctx->conn) {
print_logf(LOG_WARNING, "MQTT", "MQTT connect (%s) failed%s%s", ctx->address,
error_string ? ": " : "", error_string ? error_string : "");
// Try to reconnect
char const *error_string = NULL;
ctx->connect_opts.error_string = &error_string;
ctx->conn = mg_connect_opt(nc->mgr, ctx->address, mqtt_client_event, ctx->connect_opts);
ctx->connect_opts.error_string = NULL;
if (!ctx->conn) {
print_logf(LOG_WARNING, "MQTT", "MQTT connect (%s) failed%s%s", ctx->address,
error_string ? ": " : "", error_string ? error_string : "");
}
}
else {
// Process the QoS timer
double now = *(double *)ev_data;
double next = mg_time() + 0.5;
// fprintf(stderr, "timer event, current time: %.2lf, next timer: %.2lf\n", now, next);
mg_set_timer(nc, next); // Send us timer event again after 500 milliseconds

if (!ctx->conn || !ctx->conn->proto_handler) {
break;
}

// check inflight...
for (size_t i = 0; i < ctx->inflight.len; ++i) {
mqtt_msg_t *elem = &ctx->inflight.elems[i];
if (elem->timeout < now) {
if (ctx->qos == 1) {
print_logf(LOG_NOTICE, "MQTT", "MQTT resending (msg_id %u, retry %d)", elem->mid, elem->retries + 1);
mg_mqtt_publish(ctx->conn, elem->topic, elem->mid, ctx->publish_flags | MG_MQTT_DUP, elem->msg, strlen(elem->msg));
elem->timeout = now + 1.2;
elem->retries += 1;
}
else if (elem->topic) { // qos 2, first half
print_logf(LOG_NOTICE, "MQTT", "MQTT resending (msg_id %u, retry %d)", elem->mid, elem->retries + 1);
mg_mqtt_publish(ctx->conn, elem->topic, elem->mid, ctx->publish_flags | MG_MQTT_DUP, elem->msg, strlen(elem->msg));
elem->timeout = now + 1.2;
elem->retries += 1;
}
else { // qos 2, second half
print_logf(LOG_NOTICE, "MQTT", "MQTT reconfirming (msg_id %u, retry %d)", elem->mid, elem->retries + 1);
mg_mqtt_pubrel(ctx->conn, elem->mid);
elem->timeout = now + 1.2;
elem->retries += 1;
}
}
}
}
break;
}
Expand All @@ -149,6 +347,7 @@ static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, tls_opts_t *tls_opts,
if (!ctx)
FATAL_CALLOC("mqtt_client_init()");

ctx->qos = qos;
ctx->mqtt_opts.user_name = user;
ctx->mqtt_opts.password = pass;
ctx->mqtt_opts.will_topic = availability;
Expand Down Expand Up @@ -217,19 +416,31 @@ static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, tls_opts_t *tls_opts,

static void mqtt_client_publish(mqtt_client_t *ctx, char const *topic, char const *str)
{
ctx->message_id++;
if (ctx->qos > 0) {
inflight_add(&ctx->inflight, topic, ctx->message_id, str);
print_logf(LOG_DEBUG, "MQTT", "MQTT publishing: %d (%zu inflight)", ctx->message_id, ctx->inflight.len);
}

if (!ctx->conn || !ctx->conn->proto_handler)
return;

ctx->message_id++;
mg_mqtt_publish(ctx->conn, topic, ctx->message_id, ctx->publish_flags, str, strlen(str));
}

static void mqtt_client_free(mqtt_client_t *ctx)
{
if (ctx && ctx->timer) {
mg_set_timer(ctx->timer, 0); // Clear retry timer
}
if (ctx && ctx->conn) {
ctx->conn->user_data = NULL;
ctx->conn->flags |= MG_F_CLOSE_IMMEDIATELY;
}
if (ctx) {
inflight_free(&ctx->inflight);
}

free(ctx);
}

Expand Down
2 changes: 1 addition & 1 deletion src/rtl_433.c
Original file line number Diff line number Diff line change
Expand Up @@ -2104,7 +2104,7 @@ int main(int argc, char **argv) {

time(&cfg->hop_start_time);

// add dummy socket to receive broadcasts
// add dummy socket to receive timer broadcasts
struct mg_add_sock_opts opts = {.user_data = cfg};
struct mg_connection *nc = mg_add_sock_opt(get_mgr(cfg), INVALID_SOCKET, timer_handler, opts);
// Send us MG_EV_TIMER event after 2.5 seconds
Expand Down
Loading