Skip to content

Commit 897581f

Browse files
committed
Fix local shovels crashing with plugin queue types
Local shovels crashed with a badarg error when consuming from plugin queue types such as JMS or delayed queues: ``` crasher: initial call: rabbit_shovel_worker:init/1 pid: <0.1237.0> registered_name: [] exception exit: {badarg, [{persistent_term,get, [{rabbit_global_counters,'local-shovel', rabbit_jms_queue}], [{error_info,#{module => erl_erts_errors}}]}, {rabbit_global_counters,fetch,2, [{file,"rabbit_global_counters.erl"},{line,274}]}, {rabbit_global_counters,messages_delivered,3, [{file,"rabbit_global_counters.erl"},{line,223}]}, {rabbit_local_shovel,'-handle_deliver/3-fun-0-',3, [{file,"rabbit_local_shovel.erl"},{line,667}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2466}]}, {rabbit_local_shovel,handle_deliver,3, [{file,"rabbit_local_shovel.erl"},{line,665}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2466}]}, {rabbit_shovel_worker,handle_msg,2, [{file,"rabbit_shovel_worker.erl"},{line,116}]}]} in function gen_server2:terminate/3 (gen_server2.erl:1174) ``` The crash occurred because rabbit_local_shovel:boot_step/0 only initialized global counters for the three core queue types (classic, quorum, stream). When a local shovel delivered a message from a plugin queue type, rabbit_global_counters:messages_delivered/3 attempted to look up a counter that was never created. The root cause was a missing ordering guarantee between queue type registrations and the local shovel counter initialization during node boot. Previously, the boot step dependency chain was: ``` pre_boot -> rabbit_global_counters -. \ -> external_infrastructure / pre_boot -> rabbit_registry --------' ``` Queue type registrations (classic, quorum, stream, JMS, delayed) all required rabbit_registry but had no ordering relationship with the local shovel counter boot step. Both competed to run before external_infrastructure with no mutual dependency. Introduce a new boot step phase "queue_type_registrations" that serves as a synchronization point: - All queue type registration boot steps enable it - The local shovel counter boot step requires it (along with rabbit_global_counters) The resulting dependency chain is: ``` pre_boot -> rabbit_global_counters -. \ pre_boot -> rabbit_registry ----. -> local shovel counters \ / queue type regs --> queue_type_registrations ``` This guarantees that rabbit_queue_type:known_queue_type_modules() returns all registered queue types when boot_step/0 runs, so counters are initialized for every queue type without hardcoding. Here are the relevant boot step logs: ``` 2026-03-23 12:51:11.142127+01:00 [info] <0.217.0> Running boot step rabbit_classic_queue_type defined by app rabbit 2026-03-23 12:51:11.142138+01:00 [debug] <0.217.0> Applying MFA: M = rabbit_registry, F = register, A = [queue,<<"classic">>, 2026-03-23 12:51:11.142138+01:00 [debug] <0.217.0> rabbit_classic_queue] 2026-03-23 12:51:11.142191+01:00 [debug] <0.217.0> Finished MFA: M = rabbit_registry, F = register, A = [queue,<<"classic">>, 2026-03-23 12:51:11.142191+01:00 [debug] <0.217.0> rabbit_classic_queue] 2026-03-23 12:51:11.142210+01:00 [info] <0.217.0> Running boot step rabbit_quorum_queue_type defined by app rabbit 2026-03-23 12:51:11.142229+01:00 [debug] <0.217.0> Applying MFA: M = rabbit_registry, F = register, A = [queue,<<"quorum">>, 2026-03-23 12:51:11.142229+01:00 [debug] <0.217.0> rabbit_quorum_queue] 2026-03-23 12:51:11.142268+01:00 [debug] <0.217.0> Finished MFA: M = rabbit_registry, F = register, A = [queue,<<"quorum">>, 2026-03-23 12:51:11.142268+01:00 [debug] <0.217.0> rabbit_quorum_queue] 2026-03-23 12:51:11.142286+01:00 [info] <0.217.0> Running boot step rabbit_stream_queue defined by app rabbit 2026-03-23 12:51:11.142300+01:00 [debug] <0.217.0> Applying MFA: M = rabbit_registry, F = register, A = [queue,<<"stream">>, 2026-03-23 12:51:11.142300+01:00 [debug] <0.217.0> rabbit_stream_queue] 2026-03-23 12:51:11.142335+01:00 [debug] <0.217.0> Finished MFA: M = rabbit_registry, F = register, A = [queue,<<"stream">>, 2026-03-23 12:51:11.142335+01:00 [debug] <0.217.0> rabbit_stream_queue] ... 2026-03-23 12:51:11.142702+01:00 [info] <0.217.0> Running boot step delayed_queue_type defined by app rabbitmq_delayed_queue 2026-03-23 12:51:11.142709+01:00 [debug] <0.217.0> Applying MFA: M = rabbit_registry, F = register, A = [queue,<<"delayed">>, 2026-03-23 12:51:11.142709+01:00 [debug] <0.217.0> rabbitmq_delayed_queue] 2026-03-23 12:51:11.142733+01:00 [debug] <0.217.0> Finished MFA: M = rabbit_registry, F = register, A = [queue,<<"delayed">>, 2026-03-23 12:51:11.142733+01:00 [debug] <0.217.0> rabbitmq_delayed_queue] 2026-03-23 12:51:11.142745+01:00 [info] <0.217.0> Running boot step jms_queue_type defined by app rabbitmq_jms 2026-03-23 12:51:11.142795+01:00 [debug] <0.217.0> Applying MFA: M = rabbit_registry, F = register, A = [queue,<<"jms">>, 2026-03-23 12:51:11.142795+01:00 [debug] <0.217.0> rabbit_jms_queue] 2026-03-23 12:51:11.142825+01:00 [debug] <0.217.0> Finished MFA: M = rabbit_registry, F = register, A = [queue,<<"jms">>, 2026-03-23 12:51:11.142825+01:00 [debug] <0.217.0> rabbit_jms_queue] 2026-03-23 12:51:11.142837+01:00 [info] <0.217.0> Running boot step queue_type_registrations defined by app rabbit 2026-03-23 12:51:11.142847+01:00 [info] <0.217.0> Running boot step rabbit_global_local_shovel_counters defined by app rabbitmq_shovel 2026-03-23 12:51:11.142876+01:00 [debug] <0.217.0> Applying MFA: M = rabbit_local_shovel, F = boot_step, A = [] 2026-03-23 12:51:11.143000+01:00 [debug] <0.217.0> Finished MFA: M = rabbit_local_shovel, F = boot_step, A = [] ``` This commit makes the following test suite green: ``` make -C deps/rabbitmq_jms ct-jms_queue_shovel ```
1 parent 33409c3 commit 897581f

5 files changed

Lines changed: 12 additions & 7 deletions

File tree

deps/rabbit/src/rabbit.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@
7979
{requires, pre_boot},
8080
{enables, database}]}).
8181

82+
-rabbit_boot_step({queue_type_registrations,
83+
[{description, "queue type registrations"},
84+
{requires, rabbit_registry}]}).
85+
8286
-rabbit_boot_step({database,
8387
[{mfa, {rabbit_db, init, []}},
8488
{requires, rabbit_registry},

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
{cleanup, {rabbit_registry, unregister,
9393
[queue, <<"classic">>]}},
9494
{requires, rabbit_registry},
95-
{enables, [?MODULE, rabbit_policy]}]}).
95+
{enables, [?MODULE, rabbit_policy, queue_type_registrations]}]}).
9696

9797
-rabbit_boot_step(
9898
{?MODULE,

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@
120120
{cleanup, {rabbit_registry, unregister,
121121
[queue, <<"quorum">>]}},
122122
{requires, rabbit_registry},
123-
{enables, rabbit_policy}]}).
123+
{enables, [rabbit_policy, queue_type_registrations]}]}).
124124

125125
-rabbit_boot_step(
126126
{quorum_memory_alarm_handler,

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@
130130
{cleanup, {rabbit_registry, unregister,
131131
[queue, <<"stream">>]}},
132132
{requires, rabbit_registry},
133-
{enables, rabbit_policy}
133+
{enables, [rabbit_policy, queue_type_registrations]}
134134
]}).
135135

136136
-type client() :: #stream_client{}.

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
[{description, "global local shovel counters"},
2020
{mfa, {?MODULE, boot_step,
2121
[]}},
22-
{requires, rabbit_global_counters},
22+
{requires, [rabbit_global_counters, queue_type_registrations]},
2323
{enables, external_infrastructure}]}).
2424

2525
-rabbit_boot_step(
@@ -96,9 +96,10 @@
9696
boot_step() ->
9797
Labels = #{protocol => ?PROTOCOL},
9898
rabbit_global_counters:init(Labels),
99-
rabbit_global_counters:init(Labels#{queue_type => rabbit_classic_queue}),
100-
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
101-
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).
99+
lists:foreach(
100+
fun(QType) ->
101+
rabbit_global_counters:init(Labels#{queue_type => QType})
102+
end, rabbit_queue_type:known_queue_type_modules()).
102103

103104
-spec conserve_resources(pid(),
104105
rabbit_alarm:resource_alarm_source(),

0 commit comments

Comments
 (0)