Skip to content

Commit 0f06153

Browse files
committed
rlm_kafka: mark rare produce error paths unlikely
rd_kafka_produce can only fail on queue pressure or a rejected message, kafka_produce_enqueue then signals that back as NULL. The topic-not-declared check in the xlat fast path and the cancelled- request check in dr_msg_cb are both paths we only take in error or shutdown cases. Hinting the optimiser lets it lay the hot path out straight and push the error bodies out-of-line - matches the unlikely() idiom already used across src/lib/util/.
1 parent cb2ee22 commit 0f06153

1 file changed

Lines changed: 10 additions & 10 deletions

File tree

src/modules/rlm_kafka/rlm_kafka.c

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,11 @@ rlm_kafka_msg_ctx_t *kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *req
341341
.partition = RD_KAFKA_PARTITION_UA,
342342
.offset = -1
343343
};
344-
if (rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
345-
/* librdkafka copies under MSG_F_COPY */
346-
(void *)(uintptr_t) value, value_len,
347-
key, key_len,
348-
pctx) != 0) {
344+
if (unlikely(rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
345+
/* librdkafka copies under MSG_F_COPY */
346+
(void *)(uintptr_t) value, value_len,
347+
key, key_len,
348+
pctx) != 0)) {
349349
rd_kafka_resp_err_t err = rd_kafka_last_error();
350350

351351
talloc_free(pctx);
@@ -562,7 +562,7 @@ static unlang_action_t CC_HINT(nonnull) mod_produce(UNUSED unlang_result_t *p_re
562562
pctx = kafka_produce_enqueue(t, request, topic,
563563
key, key_len,
564564
env->value->vb_octets, env->value->vb_length);
565-
if (!pctx) RETURN_UNLANG_FAIL;
565+
if (unlikely(!pctx)) RETURN_UNLANG_FAIL;
566566

567567
return unlang_module_yield(request, mod_resume, mod_signal,
568568
~FR_SIGNAL_CANCEL, pctx);
@@ -688,7 +688,7 @@ static xlat_action_t kafka_xlat_produce_resume(TALLOC_CTX *xctx_ctx, fr_dcursor_
688688
fr_value_box_t *vb;
689689
bool delivered = (pctx->err == RD_KAFKA_RESP_ERR_NO_ERROR);
690690

691-
if (!delivered) REDEBUG("Kafka produce failed: %s", rd_kafka_err2str(pctx->err));
691+
if (unlikely(!delivered)) REDEBUG("Kafka produce failed: %s", rd_kafka_err2str(pctx->err));
692692

693693
MEM(vb = fr_value_box_alloc(xctx_ctx, FR_TYPE_BOOL, NULL));
694694
vb->vb_bool = delivered;
@@ -761,15 +761,15 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d
761761
*/
762762
topic = t_inst ? t_inst->topic : NULL;
763763
if (!topic) topic = kafka_thread_topic(t, topic_vb->vb_strvalue);
764-
if (!topic) {
764+
if (unlikely(!topic)) {
765765
REDEBUG("Kafka topic '%s' is not declared in the module config", topic_vb->vb_strvalue);
766766
return XLAT_ACTION_FAIL;
767767
}
768768

769769
pctx = kafka_produce_enqueue(t, request, topic,
770770
NULL, 0,
771771
value_vb->vb_octets, value_vb->vb_length);
772-
if (!pctx) return XLAT_ACTION_FAIL;
772+
if (unlikely(!pctx)) return XLAT_ACTION_FAIL;
773773

774774
return unlang_xlat_yield(request, kafka_xlat_produce_resume, kafka_xlat_produce_signal,
775775
~FR_SIGNAL_CANCEL, pctx);
@@ -801,7 +801,7 @@ static void _kafka_delivery_report_cb(UNUSED rd_kafka_t *rk, rd_kafka_message_t
801801

802802
fr_dlist_remove(&pctx->t->inflight, pctx);
803803

804-
if (!pctx->request) {
804+
if (unlikely(!pctx->request)) {
805805
talloc_free(pctx);
806806
return;
807807
}

0 commit comments

Comments
 (0)