Skip to content

Commit 6f602b7

Browse files
committed
rlm_kafka: Simpler, and arguably more correct way of handling kafka shutdown
1 parent c9ff289 commit 6f602b7

1 file changed

Lines changed: 55 additions & 96 deletions

File tree

src/modules/rlm_kafka/rlm_kafka.c

Lines changed: 55 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,6 @@ USES_APPLE_DEPRECATED_API
9292
* by talloc when the module instance is torn down (the library attaches
9393
* a lifecycle sentinel during config parse).
9494
*/
95-
/** Coordinates the flush / purge / flush dance across worker thread detaches
96-
*
97-
* Lives in its own `malloc`'d allocation because the surrounding
98-
* `rlm_kafka_t` is `mprotect`'d read-only after `mod_instantiate`, and
99-
* `pthread_mutex_lock` writes to the mutex memory (mprotect would
100-
* SIGBUS). The pointer on `rlm_kafka_t` is immutable once set.
101-
*/
102-
typedef struct {
103-
pthread_mutex_t mu; //!< serialises the flush/purge/flush dance so
104-
//!< only the first worker through performs it.
105-
bool done; //!< set once the dance has run; guarded by `mu`.
106-
} rlm_kafka_detach_sync_t;
107-
10895
typedef struct {
10996
fr_kafka_conf_t kconf; //!< parsed producer conf - MUST be first
11097
fr_time_delta_t flush_timeout; //!< How long `mod_detach` waits for in-flight
@@ -117,8 +104,6 @@ typedef struct {
117104
rd_kafka_t *rk; //!< shared producer, created at mod_instantiate.
118105
fr_rb_tree_t *topics; //!< rlm_kafka_topic_t keyed by name, read-only
119106
//!< after mod_instantiate.
120-
rlm_kafka_detach_sync_t *detach; //!< flush/purge/flush coordination at shutdown;
121-
//!< see `rlm_kafka_detach_sync_t`.
122107
} rlm_kafka_t;
123108

124109
/** Topic handle
@@ -273,34 +258,6 @@ static void _kafka_log_cb(rd_kafka_t const *rk, int level, char const *fac, char
273258
}
274259
}
275260

276-
/** Handle one pctx dequeued from a worker's mailbox
277-
*
278-
* Runs on the originating worker. Cancelled requests (pctx->request
279-
* NULLed by the signal handler) are freed here; otherwise the request
280-
* is marked runnable and the interpreter runs `mod_resume` /
281-
* `kafka_xlat_produce_resume` on this worker to translate the stashed
282-
* DR into an rcode and free the pctx.
283-
*/
284-
static inline CC_HINT(always_inline)
285-
void kafka_delivery_notification(rlm_kafka_msg_ctx_t *pctx)
286-
{
287-
/*
288-
* Relaxed: `pctx->request` is only ever stored by this
289-
* worker thread (initial set at produce, NULL at cancel),
290-
* so program order already guarantees we see our own
291-
* latest write. Cross-thread synchronisation with the bg
292-
* cb's writes to pctx->err / partition / offset has
293-
* already happened via fr_atomic_ring_pop's acquire.
294-
*/
295-
request_t *request = atomic_load_explicit(&pctx->request, memory_order_relaxed);
296-
297-
if (!request) {
298-
free(pctx);
299-
return;
300-
}
301-
unlang_interpret_mark_runnable(request);
302-
}
303-
304261
/** Worker wake-up callback - the bg cb triggered our EVFILT_USER event
305262
*
306263
* Pops everything sitting in the mailbox and dispatches each pctx on
@@ -979,9 +936,17 @@ static int kafka_topics_alloc(rlm_kafka_t *inst)
979936
* bg cb with an unbounded wait (purge makes the drain finite without
980937
* needing a user-configured timeout).
981938
*
982-
* The dance is producer-wide, so only the first worker into detach
983-
* actually runs it; `detach->mu` serialises, `detach->done` short-
984-
* circuits subsequent callers.
939+
* Every worker flushes. The first one through actually drains
940+
* librdkafka's queues; subsequent calls return immediately because
941+
* `outq_len` is already zero. The cost is one extra flush call per
942+
* worker (cheap when there's nothing to wait for), the gain is that
943+
* each worker has its own barrier guaranteeing no bg cb invocation
944+
* is mid-flight against this worker's `t->queue` / `t->wake`.
945+
*
946+
* Order: flush -> drain mailbox -> free wake. Freeing the wake
947+
* before draining would race a bg cb that loaded a non-NULL
948+
* `pctx->request` just before cancellation propagated and is about
949+
* to call `fr_event_user_trigger(t->wake)`.
985950
*
986951
* @param[in] mctx thread-instance ctx.
987952
* @return 0 (never fails fatally).
@@ -991,40 +956,55 @@ static int mod_thread_detach(module_thread_inst_ctx_t const *mctx)
991956
rlm_kafka_t const *inst = talloc_get_type_abort_const(mctx->mi->data, rlm_kafka_t);
992957
rlm_kafka_thread_t *t = talloc_get_type_abort(mctx->thread, rlm_kafka_thread_t);
993958
rlm_kafka_msg_ctx_t *pctx;
959+
rd_kafka_resp_err_t err;
994960

995-
pthread_mutex_lock(&inst->detach->mu);
996-
if (!inst->detach->done) {
997-
rd_kafka_resp_err_t err;
998-
999-
err = rd_kafka_flush(inst->rk, fr_time_delta_to_msec(inst->flush_timeout));
1000-
if (unlikely(err != RD_KAFKA_RESP_ERR_NO_ERROR)) {
1001-
WARN("%s - Shutdown flush timed out, purging %d in-flight message(s)",
1002-
inst->log_prefix, rd_kafka_outq_len(inst->rk));
1003-
1004-
rd_kafka_purge(inst->rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT);
961+
/*
962+
* Flush is thread safe, and only returns after
963+
* all in flight kafka requests have had their
964+
* delivery reports run through the callback.
965+
*
966+
* At the point where thread_detach is called
967+
* there are no more request_t in progress, so
968+
* we gurantee the callback will never add additional
969+
* delivery reports to this thread's queue.
970+
*
971+
* We call kafka flush in every thread, because
972+
* there is no explicit synchronisation which
973+
* gurantees all workers have stopped processing
974+
* reuests by the time the first thread is being
975+
* detached, so theoretically new requests can
976+
* be enqueued by other threads after the first
977+
* thread has called flush.
978+
*/
979+
err = rd_kafka_flush(inst->rk, fr_time_delta_to_msec(inst->flush_timeout));
980+
if (unlikely(err != RD_KAFKA_RESP_ERR_NO_ERROR)) {
981+
WARN("Shutdown flush timed out, purging %d in-flight message(s)",
982+
rd_kafka_outq_len(inst->rk));
1005983

1006-
/*
1007-
* Drain the purge-generated DRs. No broker
1008-
* round-trip left; drain time is bounded by
1009-
* bg cb processing speed (us per pctx).
1010-
* -1 == wait indefinitely.
1011-
*/
1012-
(void) rd_kafka_flush(inst->rk, -1);
1013-
}
984+
rd_kafka_purge(inst->rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT);
1014985

1015-
inst->detach->done = true;
986+
/*
987+
* Drain the purge-generated DRs. No broker
988+
* round-trip left; drain time is bounded by
989+
* bg cb processing speed (us per pctx).
990+
* -1 == wait indefinitely.
991+
*/
992+
(void) rd_kafka_flush(inst->rk, -1);
1016993
}
1017-
pthread_mutex_unlock(&inst->detach->mu);
1018-
1019-
TALLOC_FREE(t->wake);
1020994

1021995
/*
1022-
* Drain anything the bg cb pushed onto us before the flush
1023-
* returned. Each pctx has either `request == NULL`
1024-
* (cancelled) or a still-live request whose owning worker
1025-
* is us; the normal dispatch path does the right thing.
996+
* Drain anything the bg cb pushed onto us. Every pctx
997+
* here must have `request == NULL` because the framework
998+
* cancels every yielded request this worker owned before
999+
* calling thread_detach - assert that to catch any future
1000+
* change to that ordering immediately.
10261001
*/
1027-
while (fr_atomic_ring_pop(t->queue, (void **)&pctx)) kafka_delivery_notification(pctx);
1002+
while (fr_atomic_ring_pop(t->queue, (void **)&pctx)) {
1003+
fr_assert(atomic_load_explicit(&pctx->request, memory_order_relaxed) == NULL);
1004+
free(pctx);
1005+
}
1006+
1007+
TALLOC_FREE(t->wake);
10281008

10291009
return 0;
10301010
}
@@ -1066,13 +1046,6 @@ static int mod_thread_instantiate(module_thread_inst_ctx_t const *mctx)
10661046
return 0;
10671047
}
10681048

1069-
/** Destructor for the detach sync struct: tear down the pthread mutex */
1070-
static int _kafka_detach_sync_free(rlm_kafka_detach_sync_t *d)
1071-
{
1072-
pthread_mutex_destroy(&d->mu);
1073-
return 0;
1074-
}
1075-
10761049
/** Module-instance setup
10771050
*
10781051
* Builds the log prefix, wires up the log + background event callbacks
@@ -1090,13 +1063,13 @@ static int mod_instantiate(module_inst_ctx_t const *mctx)
10901063
rd_kafka_conf_t *conf;
10911064
char errstr[512];
10921065

1093-
MEM(inst->log_prefix = talloc_typed_asprintf(inst, "rlm_kafka (%s)", mctx->mi->name));
1094-
10951066
/*
10961067
* rd_kafka_new consumes the conf on success. The original
10971068
* lives under a talloc sentinel that destroys it at inst
10981069
* teardown, so dup it before handing ownership off.
10991070
*/
1071+
MEM(inst->log_prefix = talloc_typed_asprintf(inst, "rlm_kafka (%s)", mctx->mi->name));
1072+
11001073
MEM(conf = rd_kafka_conf_dup(inst->kconf.conf));
11011074
rd_kafka_conf_set_log_cb(conf, _kafka_log_cb);
11021075
rd_kafka_conf_set_background_event_cb(conf, _kafka_background_event_cb);
@@ -1125,25 +1098,11 @@ static int mod_instantiate(module_inst_ctx_t const *mctx)
11251098
}
11261099

11271100
if (kafka_topics_alloc(inst) < 0) {
1128-
error:
11291101
rd_kafka_destroy(inst->rk);
11301102
inst->rk = NULL;
11311103
return -1;
11321104
}
11331105

1134-
/*
1135-
* Detach-sync lives outside the mprotected instance region so
1136-
* `pthread_mutex_lock` can write to it. Allocate orphaned and
1137-
* link to `inst`'s lifetime so teardown is automatic.
1138-
*/
1139-
MEM(inst->detach = talloc_zero(NULL, rlm_kafka_detach_sync_t));
1140-
pthread_mutex_init(&inst->detach->mu, NULL);
1141-
talloc_set_destructor(inst->detach, _kafka_detach_sync_free);
1142-
if (talloc_link_ctx(inst, inst->detach) < 0) {
1143-
TALLOC_FREE(inst->detach);
1144-
goto error;
1145-
}
1146-
11471106
return 0;
11481107
}
11491108

0 commit comments

Comments
 (0)