Skip to content

Commit aaa0cf2

Browse files
authored
Merge pull request #15760 from rabbitmq/mergify/bp/v4.3.x/pr-15731
Add function to clean up and re-evaluate stream SAC group (backport #15731)
2 parents 20caa4d + c7a6130 commit aaa0cf2

4 files changed

Lines changed: 283 additions & 4 deletions

File tree

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@
5959
key_metrics_rpc/1]).
6060

6161
%% for SAC coordinator
62-
-export([sac_state/1]).
62+
-export([sac_state/1,
63+
evaluate_sac_group/3]).
6364

6465
%% for testing and debugging
6566
-export([eval_listeners/3,
@@ -274,6 +275,13 @@ update_config(Q, Config)
274275
sac_state(#?MODULE{single_active_consumer = SacState}) ->
275276
SacState.
276277

278+
-spec evaluate_sac_group(binary(), binary(), binary()) ->
279+
ok | {error, term()}.
280+
evaluate_sac_group(VirtualHost, Stream, ConsumerName) ->
281+
rabbit_stream_sac_coordinator:evaluate_group(VirtualHost,
282+
Stream,
283+
ConsumerName).
284+
277285
%% for debugging
278286
state() ->
279287
case ra_local_query(fun(State) -> State end) of

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
#command_activate_consumer{} |
2525
#command_connection_reconnected{} |
2626
#command_purge_nodes{} |
27-
#command_update_conf{}.
27+
#command_update_conf{} |
28+
#command_evaluate_group{}.
2829

2930
-opaque state() :: #?MODULE{}.
3031

@@ -54,7 +55,8 @@
5455
check_conf_change/1,
5556
list_nodes/1,
5657
state_enter/2,
57-
is_sac_error/1
58+
is_sac_error/1,
59+
evaluate_group/3
5860
]).
5961
-export([make_purge_nodes/1,
6062
make_update_conf/1]).
@@ -145,13 +147,84 @@ activate_consumer(VH, Stream, Name) ->
145147
connection_reconnected(Pid) ->
146148
process_command(#command_connection_reconnected{pid = Pid}).
147149

150+
-spec evaluate_group(binary(), binary(), binary()) ->
151+
ok | {error, sac_error() | term()}.
152+
evaluate_group(VirtualHost, Stream, ConsumerName) ->
153+
case rabbit_feature_flags:is_enabled('rabbitmq_4.3.0') of
154+
true ->
155+
case group_pids(VirtualHost, Stream, ConsumerName) of
156+
{ok, Pids} ->
157+
DeadPids = filter_dead_pids(Pids),
158+
process_command(
159+
#command_evaluate_group{vhost = VirtualHost,
160+
stream = Stream,
161+
consumer_name = ConsumerName,
162+
dead_pids = DeadPids});
163+
{error, _} = Err ->
164+
Err
165+
end;
166+
_ ->
167+
{error, feature_not_enabled}
168+
end.
169+
170+
group_pids(VirtualHost, Stream, ConsumerName) ->
171+
case ra_local_query(
172+
fun(State) ->
173+
SacState = rabbit_stream_coordinator:sac_state(State),
174+
group_pids0(VirtualHost, Stream, ConsumerName, SacState)
175+
end)
176+
of
177+
{ok, {_, Result}, _} ->
178+
Result;
179+
{error, noproc} ->
180+
{error, not_found};
181+
{error, _} = Err ->
182+
Err;
183+
{timeout, _} ->
184+
{error, timeout}
185+
end.
186+
187+
group_pids0(VH, Stream, ConsumerName,
188+
#?MODULE{groups = Groups} = S) when ?IS_STATE_REC(S) ->
189+
GroupId = {VH, Stream, ConsumerName},
190+
case Groups of
191+
#{GroupId := #group{consumers = Consumers}} ->
192+
PidMap = lists:foldl(fun(#consumer{pid = P}, Acc) ->
193+
Acc#{P => true}
194+
end, #{}, Consumers),
195+
{ok, maps:keys(PidMap)};
196+
_ ->
197+
{error, not_found}
198+
end;
199+
group_pids0(_, _, _, _) ->
200+
{error, not_found}.
201+
202+
filter_dead_pids(Pids) ->
203+
lists:filter(fun(Pid) -> not is_pid_alive(Pid) end, Pids).
204+
205+
is_pid_alive(Pid) when node(Pid) =:= node() ->
206+
erlang:is_process_alive(Pid);
207+
is_pid_alive(Pid) ->
208+
PidNode = node(Pid),
209+
case lists:member(PidNode, rabbit_nodes:list_members()) of
210+
true ->
211+
try
212+
erpc:call(PidNode, erlang, is_process_alive, [Pid], 5000)
213+
catch
214+
_:_ ->
215+
true
216+
end;
217+
false ->
218+
false
219+
end.
220+
148221
process_command(Cmd) ->
149222
case rabbit_stream_coordinator:process_command(wrap_cmd(Cmd)) of
150223
{ok, Res, _} ->
151224
Res;
152225
{error, _} = Err ->
153226
?LOG_WARNING("SAC coordinator command ~tp returned error ~tp",
154-
[Cmd, Err]),
227+
[Cmd, Err]),
155228
Err
156229
end.
157230

@@ -341,6 +414,30 @@ apply(#command_connection_reconnected{pid = Pid},
341414
end, {State0, []}, Groups0),
342415

343416
{State1, ok, Eff};
417+
apply(#command_evaluate_group{vhost = VH, stream = S,
418+
consumer_name = Name,
419+
dead_pids = DeadPids},
420+
#?MODULE{groups = Groups0} = State0) ->
421+
case lookup_group(VH, S, Name, Groups0) of
422+
undefined ->
423+
{State0, {error, not_found}, []};
424+
#group{consumers = Consumers0} = G0 ->
425+
DeadPidSet = maps:from_list([{P, true} || P <- DeadPids]),
426+
Consumers1 =
427+
lists:filter(
428+
fun(#consumer{pid = P, status = {Cnty, _}})
429+
when Cnty =:= ?DISCONNECTED orelse
430+
Cnty =:= ?PDOWN orelse
431+
is_map_key(P, DeadPidSet) ->
432+
false;
433+
(_) ->
434+
true
435+
end, Consumers0),
436+
G1 = G0#group{consumers = Consumers1},
437+
{G2, Effects} = maybe_rebalance_group(G1, {VH, S, Name}),
438+
Groups1 = update_groups(VH, S, Name, G2, Groups0),
439+
{State0#?MODULE{groups = Groups1}, ok, Effects}
440+
end;
344441
apply(#command_purge_nodes{nodes = Nodes}, State0) ->
345442
{State1, Eff} = lists:foldl(fun(N, {S0, Eff0}) ->
346443
{S1, Eff1} = purge_node(N, S0),
@@ -718,6 +815,14 @@ ensure_monitors(#command_purge_nodes{},
718815
{State#?MODULE{pids_groups = AllPidsGroups},
719816
Monitors,
720817
Effects};
818+
ensure_monitors(#command_evaluate_group{},
819+
#?MODULE{groups = Groups} = State,
820+
Monitors,
821+
Effects) ->
822+
AllPidsGroups = compute_pid_group_dependencies(Groups),
823+
{State#?MODULE{pids_groups = AllPidsGroups},
824+
Monitors,
825+
Effects};
721826
ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) ->
722827
{State0, Monitors, Effects}.
723828

deps/rabbit/src/rabbit_stream_sac_coordinator.hrl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,8 @@
7474
{nodes :: [node()]}).
7575
-record(command_update_conf,
7676
{conf :: conf()}).
77+
-record(command_evaluate_group,
78+
{vhost :: vhost(),
79+
stream :: stream(),
80+
consumer_name :: consumer_name(),
81+
dead_pids :: [connection_pid()]}).

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1907,6 +1907,161 @@ state_enter_leader(MapState) ->
19071907
list_nodes(MapState) ->
19081908
lists:sort(?MOD:list_nodes(state(MapState))).
19091909

1910+
evaluate_group_not_found_test(_) ->
1911+
State0 = state(),
1912+
Cmd = evaluate_group_command(<<"stream">>, <<"app">>, []),
1913+
{State0, {error, not_found}, []} = ?MOD:apply(Cmd, State0),
1914+
ok.
1915+
1916+
evaluate_group_all_connected_no_dead_pids_test(_) ->
1917+
Pid0 = new_process(),
1918+
Pid1 = new_process(),
1919+
GId = group_id(),
1920+
Group = grp([csr(Pid0, 0, active),
1921+
csr(Pid1, 1, waiting)]),
1922+
State0 = state(#{GId => Group}),
1923+
Cmd = evaluate_group_command(stream(), name(), []),
1924+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1925+
assertHasGroup(GId, grp([csr(Pid0, 0, active),
1926+
csr(Pid1, 1, waiting)]),
1927+
Groups1),
1928+
assertEmpty(Eff),
1929+
ok.
1930+
1931+
evaluate_group_remove_dead_pid_consumers_test(_) ->
1932+
Pid0 = new_process(),
1933+
Pid1 = new_process(),
1934+
GId = group_id(),
1935+
Group = grp([csr(Pid0, 0, active),
1936+
csr(Pid1, 1, waiting)]),
1937+
State0 = state(#{GId => Group}),
1938+
Cmd = evaluate_group_command(stream(), name(), [Pid0]),
1939+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1940+
assertHasGroup(GId, grp([csr(Pid1, 1, active)]),
1941+
Groups1),
1942+
assertSendMessageActivateEffect(Pid1, 1, stream(), name(), true, Eff),
1943+
ok.
1944+
1945+
evaluate_group_remove_disconnected_consumers_test(_) ->
1946+
Pid0 = new_process(),
1947+
Pid1 = new_process(),
1948+
Pid2 = new_process(),
1949+
GId = group_id(),
1950+
Group = grp([csr(Pid0, 0, {connected, active}),
1951+
csr(Pid1, 1, {disconnected, waiting}),
1952+
csr(Pid2, 2, {connected, waiting})]),
1953+
State0 = state(#{GId => Group}),
1954+
Cmd = evaluate_group_command(stream(), name(), []),
1955+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1956+
assertHasGroup(GId, grp([csr(Pid0, 0, {connected, active}),
1957+
csr(Pid2, 2, {connected, waiting})]),
1958+
Groups1),
1959+
assertEmpty(Eff),
1960+
ok.
1961+
1962+
evaluate_group_remove_presumed_down_consumers_test(_) ->
1963+
Pid0 = new_process(),
1964+
Pid1 = new_process(),
1965+
Pid2 = new_process(),
1966+
GId = group_id(),
1967+
Group = grp([csr(Pid0, 0, {connected, waiting}),
1968+
csr(Pid1, 1, {presumed_down, active}),
1969+
csr(Pid2, 2, {connected, waiting})]),
1970+
State0 = state(#{GId => Group}),
1971+
Cmd = evaluate_group_command(stream(), name(), []),
1972+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1973+
assertHasGroup(GId, grp([csr(Pid0, 0, {connected, active}),
1974+
csr(Pid2, 2, {connected, waiting})]),
1975+
Groups1),
1976+
assertSendMessageActivateEffect(Pid0, 0, stream(), name(), true, Eff),
1977+
ok.
1978+
1979+
evaluate_group_mix_dead_and_disconnected_test(_) ->
1980+
Pid0 = new_process(),
1981+
Pid1 = new_process(),
1982+
Pid2 = new_process(),
1983+
Pid3 = new_process(),
1984+
GId = group_id(),
1985+
Group = grp([csr(Pid0, 0, {connected, active}),
1986+
csr(Pid1, 1, {disconnected, waiting}),
1987+
csr(Pid2, 2, {connected, waiting}),
1988+
csr(Pid3, 3, {connected, waiting})]),
1989+
State0 = state(#{GId => Group}),
1990+
Cmd = evaluate_group_command(stream(), name(), [Pid2]),
1991+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1992+
assertHasGroup(GId, grp([csr(Pid0, 0, {connected, active}),
1993+
csr(Pid3, 3, {connected, waiting})]),
1994+
Groups1),
1995+
assertEmpty(Eff),
1996+
ok.
1997+
1998+
evaluate_group_empty_after_cleanup_test(_) ->
1999+
Pid0 = new_process(),
2000+
GId = group_id(),
2001+
Group = grp([csr(Pid0, 0, {disconnected, active})]),
2002+
State0 = state(#{GId => Group}),
2003+
Cmd = evaluate_group_command(stream(), name(), []),
2004+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
2005+
assertEmpty(Groups1),
2006+
assertEmpty(Eff),
2007+
ok.
2008+
2009+
evaluate_group_super_stream_rebalance_test(_) ->
2010+
Pid0 = new_process(),
2011+
Pid1 = new_process(),
2012+
Pid2 = new_process(),
2013+
GId = group_id(),
2014+
Group = grp(1, [csr(Pid0, 0, {connected, waiting}),
2015+
csr(Pid1, 1, {connected, active}),
2016+
csr(Pid2, 2, {connected, waiting})]),
2017+
State0 = state(#{GId => Group}),
2018+
Cmd = evaluate_group_command(stream(), name(), [Pid0]),
2019+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
2020+
%% Pid0 removed (dead), 2 consumers left
2021+
%% partition_index=1, 1 % 2 = 1, so Pid2 should be active
2022+
%% current active is Pid1 at index 0, new active should be Pid2 at index 1
2023+
assertHasGroup(GId,
2024+
grp(1, [csr(Pid1, 1, {connected, deactivating}),
2025+
csr(Pid2, 2, {connected, waiting})]),
2026+
Groups1),
2027+
assertSendMessageSteppingDownEffect(Pid1, 1, stream(), name(), Eff),
2028+
ok.
2029+
2030+
evaluate_group_super_stream_active_removed_test(_) ->
2031+
Pid0 = new_process(),
2032+
Pid1 = new_process(),
2033+
GId = group_id(),
2034+
Group = grp(1, [csr(Pid0, 0, {disconnected, active}),
2035+
csr(Pid1, 1, {connected, waiting})]),
2036+
State0 = state(#{GId => Group}),
2037+
Cmd = evaluate_group_command(stream(), name(), []),
2038+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
2039+
%% disconnected active removed, only Pid1 left, should become active
2040+
assertHasGroup(GId,
2041+
grp(1, [csr(Pid1, 1, {connected, active})]),
2042+
Groups1),
2043+
assertSendMessageActivateEffect(Pid1, 1, stream(), name(), true, Eff),
2044+
ok.
2045+
2046+
evaluate_group_ensure_monitors_test(_) ->
2047+
Pid0 = new_process(),
2048+
Pid1 = new_process(),
2049+
Pid2 = new_process(),
2050+
GId = group_id(),
2051+
Group0 = grp([csr(Pid0, 0, {connected, active}),
2052+
csr(Pid1, 1, {disconnected, waiting}),
2053+
csr(Pid2, 2, {connected, waiting})]),
2054+
State0 = state(#{GId => Group0}),
2055+
Cmd = evaluate_group_command(stream(), name(), []),
2056+
{State1, ok, _} = ?MOD:apply(Cmd, State0),
2057+
{#?STATE{pids_groups = PidsGroups1}, _, _} =
2058+
?MOD:ensure_monitors(Cmd, State1, #{}, []),
2059+
assertSize(2, PidsGroups1),
2060+
?assert(maps:is_key(Pid0, PidsGroups1)),
2061+
?assert(maps:is_key(Pid2, PidsGroups1)),
2062+
?assertNot(maps:is_key(Pid1, PidsGroups1)),
2063+
ok.
2064+
19102065
start_node(Name) ->
19112066
{ok, NodePid, Node} = peer:start(#{
19122067
name => Name,
@@ -2041,6 +2196,12 @@ connection_reconnected_command(Pid) ->
20412196
purge_nodes_command(Nodes) ->
20422197
#command_purge_nodes{nodes = Nodes}.
20432198

2199+
evaluate_group_command(Stream, ConsumerName, DeadPids) ->
2200+
#command_evaluate_group{vhost = <<"/">>,
2201+
stream = Stream,
2202+
consumer_name = ConsumerName,
2203+
dead_pids = DeadPids}.
2204+
20442205
assertContainsCheckConnectionEffect(Pid, Effects) ->
20452206
assertContainsSendMessageEffect(Pid, {sac, check_connection, #{}}, Effects).
20462207

0 commit comments

Comments
 (0)