Skip to content

Commit ae865f8

Browse files
Merge pull request #16243 from rabbitmq/mergify/bp/v4.2.x/pr-16239
By @tete17: Federation: handle `{error, already_present}` in link supervisors (backport #16234) (backport #16239)
2 parents 30c5c79 + 11cdf95 commit ae865f8

4 files changed

Lines changed: 55 additions & 2 deletions

File tree

deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link_sup_sup.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ start_child(X) ->
5050
?LOG_DEBUG("Federation link for exchange ~tp was already started",
5151
[rabbit_misc:rs(ExchangeName)]),
5252
ok;
53+
{error, already_present} ->
54+
#exchange{name = ExchangeName} = X,
55+
?LOG_WARNING("Federation link for exchange ~tp had a stale child spec; "
56+
"removing it so the link can be restarted",
57+
[rabbit_misc:rs(ExchangeName)]),
58+
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(X)),
59+
ok;
5360
%% A link returned {stop, gone}, the link_sup shut down, that's OK.
5461
{error, {shutdown, _}} -> ok
5562
catch

deps/rabbitmq_exchange_federation/test/unit_SUITE.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(unit_SUITE).
99
-include_lib("common_test/include/ct.hrl").
1010
-include_lib("eunit/include/eunit.hrl").
11+
-include_lib("rabbit_common/include/rabbit.hrl").
1112

1213
-include("rabbit_exchange_federation.hrl").
1314

@@ -17,7 +18,8 @@ all() -> [
1718
reconnect_all_empty_scope,
1819
reconnect_all_broadcasts_to_members,
1920
adjust_when_supervisor_not_running,
20-
adjust_clear_upstream_when_supervisor_not_running
21+
adjust_clear_upstream_when_supervisor_not_running,
22+
start_child_handles_already_present
2123
].
2224

2325
init_per_suite(Config) ->
@@ -77,3 +79,21 @@ adjust_clear_upstream_when_supervisor_not_running(_Config) ->
7779
%% adjust/1 with clear_upstream should not fail
7880
?assertEqual(ok, rabbit_federation_exchange_link_sup_sup:adjust({clear_upstream, <<"/">>, <<"test">>})),
7981
?assertEqual(ok, rabbit_federation_exchange_link_sup_sup:adjust({clear_upstream_set, <<"test">>})).
82+
83+
start_child_handles_already_present(_Config) ->
84+
XName = #resource{virtual_host = <<"/">>, kind = exchange, name = <<"x">>},
85+
X = #exchange{name = XName, type = direct, durable = true,
86+
auto_delete = false, internal = false, arguments = []},
87+
ExpectedId = (rabbit_exchange:immutable(X))#exchange{policy = X#exchange.policy},
88+
ok = meck:new(mirrored_supervisor, [unstick, passthrough]),
89+
ok = meck:expect(mirrored_supervisor, start_child,
90+
fun(_Sup, _ChildSpec) -> {error, already_present} end),
91+
ok = meck:expect(mirrored_supervisor, delete_child,
92+
fun(_Sup, _Id) -> ok end),
93+
try
94+
?assertEqual(ok, rabbit_federation_exchange_link_sup_sup:start_child(X)),
95+
?assert(meck:called(mirrored_supervisor, delete_child,
96+
[rabbit_federation_exchange_link_sup_sup, ExpectedId]))
97+
after
98+
ok = meck:unload(mirrored_supervisor)
99+
end.

deps/rabbitmq_queue_federation/src/rabbit_federation_queue_link_sup_sup.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ start_child(Q) ->
4848
?LOG_WARNING("Federation link for queue ~tp was already started",
4949
[rabbit_misc:rs(QueueName)]),
5050
ok;
51+
{error, already_present} ->
52+
QueueName = amqqueue:get_name(Q),
53+
?LOG_WARNING("Federation link for queue ~tp had a stale child spec; "
54+
"removing it so the link can be restarted",
55+
[rabbit_misc:rs(QueueName)]),
56+
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Q)),
57+
ok;
5158
%% A link returned {stop, gone}, the link_sup shut down, that's OK.
5259
{error, {shutdown, _}} -> ok
5360
catch

deps/rabbitmq_queue_federation/test/unit_SUITE.erl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(unit_SUITE).
99
-include_lib("common_test/include/ct.hrl").
1010
-include_lib("eunit/include/eunit.hrl").
11+
-include_lib("rabbit_common/include/rabbit.hrl").
1112

1213
-include("rabbit_queue_federation.hrl").
1314

@@ -17,7 +18,8 @@ all() -> [
1718
reconnect_all_empty_scope,
1819
reconnect_all_broadcasts_to_members,
1920
adjust_when_supervisor_not_running,
20-
adjust_clear_upstream_when_supervisor_not_running
21+
adjust_clear_upstream_when_supervisor_not_running,
22+
start_child_handles_already_present
2123
].
2224

2325
init_per_suite(Config) ->
@@ -77,3 +79,20 @@ adjust_clear_upstream_when_supervisor_not_running(_Config) ->
7779
%% adjust/1 with clear_upstream should not fail
7880
?assertEqual(ok, rabbit_federation_queue_link_sup_sup:adjust({clear_upstream, <<"/">>, <<"test">>})),
7981
?assertEqual(ok, rabbit_federation_queue_link_sup_sup:adjust({clear_upstream_set, <<"test">>})).
82+
83+
start_child_handles_already_present(_Config) ->
84+
Name = #resource{virtual_host = <<"/">>, kind = queue, name = <<"q">>},
85+
Q = amqqueue:new(Name, none, true, false, none, [], <<"/">>, #{}),
86+
ExpectedId = amqqueue:set_policy(amqqueue:set_immutable(Q), amqqueue:get_policy(Q)),
87+
ok = meck:new(mirrored_supervisor, [unstick, passthrough]),
88+
ok = meck:expect(mirrored_supervisor, start_child,
89+
fun(_Sup, _ChildSpec) -> {error, already_present} end),
90+
ok = meck:expect(mirrored_supervisor, delete_child,
91+
fun(_Sup, _Id) -> ok end),
92+
try
93+
?assertEqual(ok, rabbit_federation_queue_link_sup_sup:start_child(Q)),
94+
?assert(meck:called(mirrored_supervisor, delete_child,
95+
[rabbit_federation_queue_link_sup_sup, ExpectedId]))
96+
after
97+
ok = meck:unload(mirrored_supervisor)
98+
end.

0 commit comments

Comments
 (0)