Skip to content

Commit ffad24d

Browse files
committed
rlm_kafka: debug-build sanity check that dr / error cbs run on-thread
Capture pthread_self() into rlm_kafka_thread_t at thread_instantiate, then assert the same tid in _kafka_delivery_report_cb and _kafka_error_cb. librdkafka only wakes us via the main queue (which we poll from the worker's event loop), so a cross-thread hit would mean an event slipped a different path and the no-lock handling of the inflight list is unsafe. Field and assertions are #ifndef NDEBUG so release builds carry neither the extra tid nor the check - fr_assert(_x) expands to nothing under NDEBUG so the missing field doesn't matter.
1 parent 26eb236 commit ffad24d

1 file changed

Lines changed: 33 additions & 1 deletion

File tree

src/modules/rlm_kafka/rlm_kafka.c

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,16 @@ struct rlm_kafka_thread_s {
129129
fr_rb_tree_t *topics; //!< rlm_kafka_topic_t per declared topic
130130

131131
fr_dlist_head_t inflight; //!< outstanding rlm_kafka_msg_ctx_t
132+
133+
#ifndef NDEBUG
134+
pthread_t worker_tid; //!< pthread_self() captured at thread_instantiate.
135+
//!< Debug-build sanity check: the delivery-report and
136+
//!< error callbacks must run on this thread, never
137+
//!< cross-thread - a cross-thread hit would mean
138+
//!< librdkafka dispatched through something other than
139+
//!< our polled main queue and invalidate the no-lock
140+
//!< assumption around the inflight list.
141+
#endif
132142
};
133143

134144
/** Call env for `kafka.produce.<topic>`
@@ -184,8 +194,18 @@ static rd_kafka_topic_t *kafka_thread_topic(rlm_kafka_thread_t *t, char const *n
184194
* @param[in] reason human-readable description.
185195
* @param[in] uctx thread instance pointer we passed to rd_kafka_conf_set_opaque().
186196
*/
187-
static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason, UNUSED void *uctx)
197+
static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason, void *uctx)
188198
{
199+
rlm_kafka_thread_t *t = talloc_get_type_abort(uctx, rlm_kafka_thread_t);
200+
201+
/*
202+
* librdkafka dispatches error events via our polled main
203+
* queue, so this must fire on the worker thread that
204+
* called rd_kafka_poll - otherwise the no-lock assumption
205+
* around our per-thread state would be unsafe.
206+
*/
207+
fr_assert(pthread_equal(pthread_self(), t->worker_tid) != 0);
208+
189209
ERROR("%s", rd_kafka_err2name(err), reason ? reason : "<UNKNOWN ERROR>");
190210
}
191211

@@ -804,6 +824,14 @@ static void _kafka_delivery_report_cb(UNUSED rd_kafka_t *rk, rd_kafka_message_t
804824
if (!msg->_private) return;
805825
pctx = talloc_get_type_abort(msg->_private, rlm_kafka_msg_ctx_t);
806826

827+
/*
828+
* DR dispatch must happen on the thread that owns the
829+
* producer - librdkafka is only allowed to wake us via
830+
* the polled main queue, so a cross-thread hit here would
831+
* invalidate the no-lock handling of the inflight list.
832+
*/
833+
fr_assert(pthread_equal(pthread_self(), pctx->t->worker_tid) != 0);
834+
807835
fr_dlist_remove(&pctx->t->inflight, pctx);
808836

809837
if (unlikely(!pctx->request)) {
@@ -959,6 +987,10 @@ static int mod_thread_instantiate(module_thread_inst_ctx_t const *mctx)
959987
t->el = mctx->el;
960988
t->wake_pipe[0] = t->wake_pipe[1] = -1;
961989

990+
#ifndef NDEBUG
991+
t->worker_tid = pthread_self();
992+
#endif
993+
962994
fr_dlist_talloc_init(&t->inflight, rlm_kafka_msg_ctx_t, entry);
963995

964996
/*

0 commit comments

Comments
 (0)