Skip to content

Commit 30867b7

Browse files
committed
rlm_kafka: accept a key as the middle xlat argument
%kafka.produce now takes (topic, key, value) instead of (topic, value), so xlat callers can pick a partition the same way the method form does via a declared topic `key = ...`. Zero-length octets (the literal empty string, or an attribute that expands to nothing) mean "no key" on the wire - librdkafka falls back to its configured partitioner. Updated existing xlat tests to pass an explicit '' key, and xlat.unlang now covers the non-empty case too: produce to freeradius-test-xlat-alt with a `"xlat-key"` key and assert it round-trips byte-for-byte through the broker.
1 parent a5138bf commit 30867b7

7 files changed

Lines changed: 61 additions & 33 deletions

File tree

src/modules/rlm_kafka/rlm_kafka.c

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -744,24 +744,32 @@ static void kafka_xlat_produce_signal(xlat_ctx_t const *xctx, UNUSED request_t *
744744

745745
static xlat_arg_parser_t const kafka_xlat_produce_args[] = {
746746
{ .required = true, .concat = true, .type = FR_TYPE_STRING }, /* topic */
747+
{ .required = true, .concat = true, .type = FR_TYPE_OCTETS }, /* key (zero-length octets = no key on the wire) */
747748
{ .required = true, .concat = true, .type = FR_TYPE_OCTETS }, /* value */
748749
XLAT_ARG_PARSER_TERMINATOR
749750
};
750751

751-
/** `%kafka.produce(topic, value)` - runtime-named produce
752+
/** `%kafka.produce(topic, key, value)` - runtime-named produce
752753
*
753754
* Unlike the @ref mod_produce method form (which resolves topics at
754755
* config-parse time), the xlat takes the topic name as a runtime
755756
* argument. Use this when the topic or payload is chosen per-request:
756757
*
757758
* @code
758759
* send Accounting-Response {
759-
* if (!%kafka.produce('accounting', %json.encode(&request.[*]))) {
760+
* if (!%kafka.produce('accounting', %{Acct-Session-Id}, %json.encode(&request.[*]))) {
760761
* reject
761762
* }
762763
* }
763764
* @endcode
764765
*
766+
* `key` is optional: pass an empty string (or an unset attribute) to
767+
* produce without a key - librdkafka then uses the configured
768+
* partitioner to spread records across partitions. When a non-empty
769+
* key is supplied, librdkafka hashes it to pick a partition, so
770+
* records with the same key end up on the same partition and preserve
771+
* per-key produce order on the consumer side.
772+
*
765773
* Returns a bool: `true` on successful delivery, `false` on failure.
766774
* The topic must have been declared in the module config (unknown
767775
* topics fail the xlat) so librdkafka per-topic settings continue to
@@ -778,9 +786,12 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d
778786
rlm_kafka_thread_t *t = talloc_get_type_abort(xctx->mctx->thread, rlm_kafka_thread_t);
779787
rlm_kafka_xlat_thread_inst_t const *t_inst = xctx->thread;
780788
fr_value_box_t *topic_vb = fr_value_box_list_head(in);
781-
fr_value_box_t *value_vb = fr_value_box_list_next(in, topic_vb);
789+
fr_value_box_t *key_vb = fr_value_box_list_next(in, topic_vb);
790+
fr_value_box_t *value_vb = fr_value_box_list_next(in, key_vb);
782791
rd_kafka_topic_t *topic;
783792
rlm_kafka_msg_ctx_t *pctx;
793+
uint8_t const *key = NULL;
794+
size_t key_len = 0;
784795

785796
/*
786797
* Fast path: a literal topic argument was pre-resolved to
@@ -793,8 +804,19 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d
793804
return XLAT_ACTION_FAIL;
794805
}
795806

807+
/*
808+
* Zero-length octets (e.g. `''` or an attribute expanding
809+
* to nothing) map to "no key" on the wire - librdkafka then
810+
* uses the configured partitioner instead of key-hash
811+
* partitioning.
812+
*/
813+
if (key_vb->vb_length > 0) {
814+
key = key_vb->vb_octets;
815+
key_len = key_vb->vb_length;
816+
}
817+
796818
pctx = kafka_produce_enqueue(t, request, topic,
797-
NULL, 0,
819+
key, key_len,
798820
value_vb->vb_octets, value_vb->vb_length);
799821
if (unlikely(!pctx)) return XLAT_ACTION_FAIL;
800822

src/tests/modules/kafka/base.unlang

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
%exec('/bin/sh', '-c', '"$KAFKA_SUBSCRIBE" freeradius-test-base $ENV{OUTPUT_DIR}kafka-base.json 1')
1111

1212
control.Tmp-String-0 := "hello from rlm_kafka base test"
13-
if (!%kafka.produce('freeradius-test-base', control.Tmp-String-0)) {
13+
if (!%kafka.produce('freeradius-test-base', '', control.Tmp-String-0)) {
1414
test_fail
1515
}
1616

src/tests/modules/kafka/binary.unlang

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# control so the xlat path exercises an attribute reference.
1313
#
1414
control.Tmp-Octets-0 := 0x48656c6c6f00576f726c6400ff00ff
15-
if (!%kafka.produce('freeradius-test-binary', control.Tmp-Octets-0)) {
15+
if (!%kafka.produce('freeradius-test-binary', '', control.Tmp-Octets-0)) {
1616
test_fail
1717
}
1818

@@ -27,7 +27,7 @@ if (!ok) {
2727
#
2828
# Empty payload via the xlat form.
2929
#
30-
if (!%kafka.produce('freeradius-test-binary', "")) {
30+
if (!%kafka.produce('freeradius-test-binary', '', "")) {
3131
test_fail
3232
}
3333

src/tests/modules/kafka/invalid.unlang

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#
99
# Empty string payload.
1010
#
11-
if (!%kafka.produce('freeradius-test-invalid', "")) {
11+
if (!%kafka.produce('freeradius-test-invalid', '', "")) {
1212
test_fail
1313
}
1414

@@ -22,7 +22,7 @@ if (!%kafka.produce('freeradius-test-invalid', "")) {
2222
# sixteen 1024-byte binary classes.
2323
#
2424
control.Tmp-Octets-4 := (octets)%str.rand("1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b")
25-
if (!%kafka.produce('freeradius-test-invalid', control.Tmp-Octets-4)) {
25+
if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-Octets-4)) {
2626
test_fail
2727
}
2828

@@ -31,15 +31,15 @@ if (!%kafka.produce('freeradius-test-invalid', control.Tmp-Octets-4)) {
3131
# string, all valid bytes.
3232
#
3333
control.Tmp-String-1 := "line1\nline2\ttabbed\r\n\"quoted\""
34-
if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-1)) {
34+
if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-String-1)) {
3535
test_fail
3636
}
3737

3838
#
3939
# UTF-8 / high-bit characters.
4040
#
4141
control.Tmp-String-2 := "héllo — wörld 🚀"
42-
if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-2)) {
42+
if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-String-2)) {
4343
test_fail
4444
}
4545

src/tests/modules/kafka/race.unlang

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
redundant {
2020
timeout 300ms {
2121
parallel {
22-
group { %kafka_unreachable.produce('freeradius-test-race', "race 1") }
23-
group { %kafka_unreachable.produce('freeradius-test-race', "race 2") }
24-
group { %kafka_unreachable.produce('freeradius-test-race', "race 3") }
25-
group { %kafka_unreachable.produce('freeradius-test-race', "race 4") }
26-
group { %kafka_unreachable.produce('freeradius-test-race', "race 5") }
27-
group { %kafka_unreachable.produce('freeradius-test-race', "race 6") }
28-
group { %kafka_unreachable.produce('freeradius-test-race', "race 7") }
29-
group { %kafka_unreachable.produce('freeradius-test-race', "race 8") }
22+
group { %kafka_unreachable.produce('freeradius-test-race', '', "race 1") }
23+
group { %kafka_unreachable.produce('freeradius-test-race', '', "race 2") }
24+
group { %kafka_unreachable.produce('freeradius-test-race', '', "race 3") }
25+
group { %kafka_unreachable.produce('freeradius-test-race', '', "race 4") }
26+
group { %kafka_unreachable.produce('freeradius-test-race', '', "race 5") }
27+
group { %kafka_unreachable.produce('freeradius-test-race', '', "race 6") }
28+
group { %kafka_unreachable.produce('freeradius-test-race', '', "race 7") }
29+
group { %kafka_unreachable.produce('freeradius-test-race', '', "race 8") }
3030
}
3131
}
3232
group {

src/tests/modules/kafka/unreachable.unlang

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ redundant {
3232
timeout 200ms {
3333
parallel {
3434
redundant {
35-
%kafka_unreachable.produce('freeradius-test-unreachable', "cancelled mid-flight")
35+
%kafka_unreachable.produce('freeradius-test-unreachable', '', "cancelled mid-flight")
3636
}
3737
group {
3838
%cancel(0)
@@ -53,15 +53,15 @@ redundant {
5353
# RD_KAFKA_RESP_ERR__MSG_TIMED_OUT. kafka_produce_resume translates
5454
# that to a false return from the xlat.
5555
#
56-
if (%kafka_unreachable.produce('freeradius-test-unreachable', "doomed")) {
56+
if (%kafka_unreachable.produce('freeradius-test-unreachable', '', "doomed")) {
5757
test_fail
5858
}
5959

6060
#
6161
# And one more, to prove the worker's produce path is still healthy
6262
# after both the cancel and the natural failure above.
6363
#
64-
if (%kafka_unreachable.produce('freeradius-test-unreachable', "also doomed")) {
64+
if (%kafka_unreachable.produce('freeradius-test-unreachable', '', "also doomed")) {
6565
test_fail
6666
}
6767

src/tests/modules/kafka/xlat.unlang

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Exercise the %kafka.produce(topic, value) xlat across several
2+
# Exercise the %kafka.produce(topic, key, value) xlat across several
33
# produce cycles. Each produce yields waiting for a delivery report;
44
# the resume path translates it to a bool (true = delivered).
55
#
@@ -15,7 +15,7 @@
1515
# before being handed to librdkafka.
1616
#
1717
control.Tmp-String-0 := "user=%{User-Name},packet=%{Packet-Type}"
18-
if (!%kafka.produce('freeradius-test-xlat', control.Tmp-String-0)) {
18+
if (!%kafka.produce('freeradius-test-xlat', '', control.Tmp-String-0)) {
1919
test_fail
2020
}
2121

@@ -26,23 +26,25 @@ if (!%kafka.produce('freeradius-test-xlat', control.Tmp-String-0)) {
2626
# without dropping any. Unrolled because the unlang parser here doesn't
2727
# accept `while` as a loop construct.
2828
#
29-
if (!%kafka.produce('freeradius-test-xlat', "sequential 1")) { test_fail }
30-
if (!%kafka.produce('freeradius-test-xlat', "sequential 2")) { test_fail }
31-
if (!%kafka.produce('freeradius-test-xlat', "sequential 3")) { test_fail }
32-
if (!%kafka.produce('freeradius-test-xlat', "sequential 4")) { test_fail }
33-
if (!%kafka.produce('freeradius-test-xlat', "sequential 5")) { test_fail }
29+
if (!%kafka.produce('freeradius-test-xlat', '', "sequential 1")) { test_fail }
30+
if (!%kafka.produce('freeradius-test-xlat', '', "sequential 2")) { test_fail }
31+
if (!%kafka.produce('freeradius-test-xlat', '', "sequential 3")) { test_fail }
32+
if (!%kafka.produce('freeradius-test-xlat', '', "sequential 4")) { test_fail }
33+
if (!%kafka.produce('freeradius-test-xlat', '', "sequential 5")) { test_fail }
3434

3535
#
36-
# Produce to a distinct topic to exercise a second topic handle cache entry.
36+
# Produce to a distinct topic to exercise a second topic handle cache
37+
# entry, and pass a non-empty key so we also cover the xlat key path.
38+
# Same key bytes should round-trip unaltered on the consumer side.
3739
#
38-
if (!%kafka.produce('freeradius-test-xlat-alt', "alternate topic")) {
40+
if (!%kafka.produce('freeradius-test-xlat-alt', "xlat-key", "alternate topic")) {
3941
test_fail
4042
}
4143

4244
#
4345
# Empty value - librdkafka accepts zero-length payloads.
4446
#
45-
if (!%kafka.produce('freeradius-test-xlat', "")) {
47+
if (!%kafka.produce('freeradius-test-xlat', '', "")) {
4648
test_fail
4749
}
4850

@@ -94,15 +96,19 @@ control.Tmp-Octets-6 := %base64.decode(control.Tmp-String-7)
9496
if (%length(control.Tmp-Octets-6) != 0) { test_fail }
9597

9698
#
97-
# Alternate topic: one record.
99+
# Alternate topic: one record, value + key.
98100
#
99101
timeout 10s {
100102
map json %exec('/bin/sh', '-c', '"$KAFKA_WAIT_AND_CLEANUP" $ENV{OUTPUT_DIR}kafka-xlat-alt.json 1') {
101103
control.Tmp-String-8 := '$[0].value'
104+
control.Tmp-String-9 := '$[0].key'
102105
}
103106
}
104107

105108
control.Tmp-Octets-7 := %base64.decode(control.Tmp-String-8)
106109
if ((string)control.Tmp-Octets-7 != "alternate topic") { test_fail }
107110

111+
control.Tmp-Octets-8 := %base64.decode(control.Tmp-String-9)
112+
if ((string)control.Tmp-Octets-8 != "xlat-key") { test_fail }
113+
108114
test_pass

0 commit comments

Comments
 (0)