Skip to content

Performance issue for subscriptions with the same topic filter #15588

@ansd

Description

@ansd

Repro Steps

Start RabbitMQ on main branch:

make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test"  RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 6 +JPperf true" RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/mqtt-rabbitmq.conf" ENABLED_PLUGINS="rabbitmq_management rabbitmq_mqtt rabbitmq_prometheus" FULL=1

with mqtt-rabbitmq.conf:

mqtt.tcp_listen_options.sndbuf = 2048
mqtt.tcp_listen_options.recbuf = 2048
mqtt.tcp_listen_options.buffer = 2048
mqtt.tcp_listen_options.backlog = 512
mqtt.tcp_listen_options.nodelay = true
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
disk_free_limit.absolute = 200MB
management_agent.disable_metrics_collector = true
loopback_users = none
log.file.level = warning

Connect 20k MQTT clients in parallel. Each client has only 1 subscription, but with the same topic filter:

./omq --uri mqtt://localhost:1883 mqtt -x 0 -y 20000 --parallel-consumer-start --consume-from "mytopic"

Problem:

Clients time out when creating the subscriptions.

RabbitMQ logs lots of:

[error] <0.47157.0> Failed to add binding between exchange 'amq.topic' in vhost '/' and queue 'mqtt-subscription-omq-consumer-12543qos0' in vhost '/' for topic filter mytopic: timeout

Root Cause

The CPU flame graph shows how the Khepri Ra server proc is busy copying to/from ETS:

Image

If you run only with 3 clients connecting to the same topic filter as follows

./omq --uri mqtt://localhost:1883 mqtt -x 0 -y 3 --parallel-consumer-start --consume-from "mytopic"

then you see all 3 different bindings in the same ETS row:

1> rp(ets:tab2list(rabbit_khepri_topic_trie_v3)).
[{topic_trie_edge_v2,{trie_edge,{resource,<<"/">>,exchange,
                                          <<"amq.topic">>},
                                #Ref<0.770359334.1341915139.10425>,bindings},
                     {bindings,#{{binding,{resource,<<"/">>,exchange,
                                                    <<"amq.topic">>},
                                          <<"mytopic">>,
                                          {resource,<<"/">>,queue,
                                                    <<"mqtt-subscription-omq-consumer-1qos0">>},
                                          [{<<"x-binding-key">>,longstr,<<"mytopic">>},
                                           {<<"x-mqtt-subscription-opts">>,table,
                                            [{<<"no-local">>,bool,false},
                                             {<<"qos">>,unsignedbyte,0},
                                             {<<"retain-as-published">>,bool,false},
                                             {<<"retain-handling">>,unsignedbyte,0}]}]} =>
                                     [],
                                 {binding,{resource,<<"/">>,exchange,<<"amq.topic">>},
                                          <<"mytopic">>,
                                          {resource,<<"/">>,queue,
                                                    <<"mqtt-subscription-omq-consumer-2qos0">>},
                                          [{<<"x-binding-key">>,longstr,<<"mytopic">>},
                                           {<<"x-mqtt-subscription-opts">>,table,
                                            [{<<"no-local">>,bool,false},
                                             {<<"qos">>,unsignedbyte,0},
                                             {<<"retain-as-published">>,bool,false},
                                             {<<"retain-handling">>,unsignedbyte,0}]}]} =>
                                     [],
                                 {binding,{resource,<<"/">>,exchange,<<"amq.topic">>},
                                          <<"mytopic">>,
                                          {resource,<<"/">>,queue,
                                                    <<"mqtt-subscription-omq-consumer-3qos0">>},
                                          [{<<"x-binding-key">>,longstr,<<"mytopic">>},
                                           {<<"x-mqtt-subscription-opts">>,table,
                                            [{<<"no-local">>,bool,false},
                                             {<<"qos">>,unsignedbyte,0},
                                             {<<"retain-as-published">>,bool,false},
                                             {<<"retain-handling">>,unsignedbyte,0}]}]} =>
                                     []}},
                     undefined},
 {topic_trie_edge_v2,{trie_edge,{resource,<<"/">>,exchange,
                                          <<"amq.topic">>},
                                root,<<"mytopic">>},
                     #Ref<0.770359334.1341915139.10425>,1}]
ok

I don't suggest this is the correct solution, but in contrast, with Mnesia on v4.2.x these are all in separate ETS rows:

(rabbit@nuc)2> ets:tab2list(rabbit_topic_trie_edge).
[{topic_trie_edge,{trie_edge,{resource,<<"/">>,exchange,
                                       <<"amq.topic">>},
                             root,"mytopic"},
                  <<111,200,14,198,26,220,215,250,84,12,77,239,121,191,95,
                    159>>}]

(rabbit@nuc)3> ets:tab2list(rabbit_topic_trie_node).
[{topic_trie_node,{trie_node,{resource,<<"/">>,exchange,
                                       <<"amq.topic">>},
                             root},
                  1,0},
 {topic_trie_node,{trie_node,{resource,<<"/">>,exchange,
                                       <<"amq.topic">>},
                             <<111,200,14,198,26,220,215,250,84,12,77,239,121,191,95,
                               159>>},
                  0,3}]

(rabbit@nuc)4> ets:tab2list(rabbit_topic_trie_binding).
[{topic_trie_binding,{trie_binding,{resource,<<"/">>,
                                             exchange,<<"amq.topic">>},
                                   <<111,200,14,198,26,220,215,250,84,12,77,239,121,191,95,
                                     159>>,
                                   {resource,<<"/">>,queue,
                                             <<"mqtt-subscription-omq-consumer-1qos0">>},
                                   [{<<"x-binding-key">>,longstr,<<"mytopic">>},
                                    {<<"x-mqtt-subscription-opts">>,table,
                                     [{<<"no-local">>,bool,false},
                                      {<<"qos">>,unsignedbyte,0},
                                      {<<"retain-as-published">>,bool,false},
                                      {<<"retain-handling">>,unsignedbyte,0}]}]},
                     const},
 {topic_trie_binding,{trie_binding,{resource,<<"/">>,
                                             exchange,<<"amq.topic">>},
                                   <<111,200,14,198,26,220,215,250,84,12,77,239,121,191,95,
                                     159>>,
                                   {resource,<<"/">>,queue,
                                             <<"mqtt-subscription-omq-consumer-2qos0">>},
                                   [{<<"x-binding-key">>,longstr,<<"mytopic">>},
                                    {<<"x-mqtt-subscription-opts">>,table,
                                     [{<<"no-local">>,bool,false},
                                      {<<"qos">>,unsignedbyte,0},
                                      {<<"retain-as-published">>,bool,false},
                                      {<<"retain-handling">>,unsignedbyte,0}]}]},
                     const},
 {topic_trie_binding,{trie_binding,{resource,<<"/">>,
                                             exchange,<<"amq.topic">>},
                                   <<111,200,14,198,26,220,215,250,84,12,77,239,121,191,95,
                                     159>>,
                                   {resource,<<"/">>,queue,
                                             <<"mqtt-subscription-omq-consumer-3qos0">>},
                                   [{<<"x-binding-key">>,longstr,<<"mytopic">>},
                                    {<<"x-mqtt-subscription-opts">>,table,
                                     [{<<"no-local">>,bool,false},
                                      {<<"qos">>,unsignedbyte,0},
                                      {<<"retain-as-published">>,bool,false},
                                      {<<"retain-handling">>,unsignedbyte,0}]}]},
                     const}]

This means if many clients subscribe with the same topic filter, this will overload the Khepri proc eventually.

20k clients subscribing all in parallel as shown above is a lot. But even when 1k clients subscribe per second in batches, the problem is that the ETS table will generate more and more copy overhead per subscription being added leading to timeouts eventually.

Use case

The use cases are cloud to devices fanouts as described in https://www.rabbitmq.com/blog/2023/03/21/native-mqtt#benefit-1-large-fan-outs where many clients subscribe to the same topic.

(This is not specific to MQTT, but also affects AMQP clients subscribing to the same topic.)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions