Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
key_metrics_rpc/1]).

%% for SAC coordinator
-export([sac_state/1]).
-export([sac_state/1,
evaluate_sac_group/3]).

%% for testing and debugging
-export([eval_listeners/3,
Expand Down Expand Up @@ -274,6 +275,13 @@ update_config(Q, Config)
sac_state(#?MODULE{single_active_consumer = SacState}) ->
SacState.

-spec evaluate_sac_group(binary(), binary(), binary()) ->
ok | {error, term()}.
evaluate_sac_group(VirtualHost, Stream, ConsumerName) ->
rabbit_stream_sac_coordinator:evaluate_group(VirtualHost,
Stream,
ConsumerName).

%% for debugging
state() ->
case ra_local_query(fun(State) -> State end) of
Expand Down
111 changes: 108 additions & 3 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#command_activate_consumer{} |
#command_connection_reconnected{} |
#command_purge_nodes{} |
#command_update_conf{}.
#command_update_conf{} |
#command_evaluate_group{}.

-opaque state() :: #?MODULE{}.

Expand Down Expand Up @@ -54,7 +55,8 @@
check_conf_change/1,
list_nodes/1,
state_enter/2,
is_sac_error/1
is_sac_error/1,
evaluate_group/3
]).
-export([make_purge_nodes/1,
make_update_conf/1]).
Expand Down Expand Up @@ -145,13 +147,84 @@ activate_consumer(VH, Stream, Name) ->
connection_reconnected(Pid) ->
process_command(#command_connection_reconnected{pid = Pid}).

-spec evaluate_group(binary(), binary(), binary()) ->
ok | {error, sac_error() | term()}.
evaluate_group(VirtualHost, Stream, ConsumerName) ->
case rabbit_feature_flags:is_enabled('rabbitmq_4.3.0') of
true ->
case group_pids(VirtualHost, Stream, ConsumerName) of
{ok, Pids} ->
DeadPids = filter_dead_pids(Pids),
process_command(
#command_evaluate_group{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName,
dead_pids = DeadPids});
{error, _} = Err ->
Err
end;
_ ->
{error, feature_not_enabled}
end.

group_pids(VirtualHost, Stream, ConsumerName) ->
case ra_local_query(
fun(State) ->
SacState = rabbit_stream_coordinator:sac_state(State),
group_pids0(VirtualHost, Stream, ConsumerName, SacState)
end)
of
{ok, {_, Result}, _} ->
Result;
{error, noproc} ->
{error, not_found};
{error, _} = Err ->
Err;
{timeout, _} ->
{error, timeout}
end.

group_pids0(VH, Stream, ConsumerName,
#?MODULE{groups = Groups} = S) when ?IS_STATE_REC(S) ->
GroupId = {VH, Stream, ConsumerName},
case Groups of
#{GroupId := #group{consumers = Consumers}} ->
PidMap = lists:foldl(fun(#consumer{pid = P}, Acc) ->
Acc#{P => true}
end, #{}, Consumers),
{ok, maps:keys(PidMap)};
_ ->
{error, not_found}
end;
group_pids0(_, _, _, _) ->
{error, not_found}.

filter_dead_pids(Pids) ->
lists:filter(fun(Pid) -> not is_pid_alive(Pid) end, Pids).

is_pid_alive(Pid) when node(Pid) =:= node() ->
erlang:is_process_alive(Pid);
is_pid_alive(Pid) ->
PidNode = node(Pid),
case lists:member(PidNode, rabbit_nodes:list_members()) of
true ->
try
erpc:call(PidNode, erlang, is_process_alive, [Pid], 5000)
catch
_:_ ->
true
end;
false ->
false
end.

process_command(Cmd) ->
case rabbit_stream_coordinator:process_command(wrap_cmd(Cmd)) of
{ok, Res, _} ->
Res;
{error, _} = Err ->
?LOG_WARNING("SAC coordinator command ~tp returned error ~tp",
[Cmd, Err]),
[Cmd, Err]),
Err
end.

Expand Down Expand Up @@ -341,6 +414,30 @@ apply(#command_connection_reconnected{pid = Pid},
end, {State0, []}, Groups0),

{State1, ok, Eff};
apply(#command_evaluate_group{vhost = VH, stream = S,
consumer_name = Name,
dead_pids = DeadPids},
#?MODULE{groups = Groups0} = State0) ->
case lookup_group(VH, S, Name, Groups0) of
undefined ->
{State0, {error, not_found}, []};
#group{consumers = Consumers0} = G0 ->
DeadPidSet = maps:from_list([{P, true} || P <- DeadPids]),
Consumers1 =
lists:filter(
fun(#consumer{pid = P, status = {Cnty, _}})
when Cnty =:= ?DISCONNECTED orelse
Cnty =:= ?PDOWN orelse
is_map_key(P, DeadPidSet) ->
false;
(_) ->
true
end, Consumers0),
G1 = G0#group{consumers = Consumers1},
{G2, Effects} = maybe_rebalance_group(G1, {VH, S, Name}),
Groups1 = update_groups(VH, S, Name, G2, Groups0),
{State0#?MODULE{groups = Groups1}, ok, Effects}
end;
apply(#command_purge_nodes{nodes = Nodes}, State0) ->
{State1, Eff} = lists:foldl(fun(N, {S0, Eff0}) ->
{S1, Eff1} = purge_node(N, S0),
Expand Down Expand Up @@ -718,6 +815,14 @@ ensure_monitors(#command_purge_nodes{},
{State#?MODULE{pids_groups = AllPidsGroups},
Monitors,
Effects};
ensure_monitors(#command_evaluate_group{},
#?MODULE{groups = Groups} = State,
Monitors,
Effects) ->
AllPidsGroups = compute_pid_group_dependencies(Groups),
{State#?MODULE{pids_groups = AllPidsGroups},
Monitors,
Effects};
ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) ->
{State0, Monitors, Effects}.

Expand Down
5 changes: 5 additions & 0 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,8 @@
{nodes :: [node()]}).
-record(command_update_conf,
{conf :: conf()}).
-record(command_evaluate_group,
{vhost :: vhost(),
stream :: stream(),
consumer_name :: consumer_name(),
dead_pids :: [connection_pid()]}).
161 changes: 161 additions & 0 deletions deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,161 @@ state_enter_leader(MapState) ->
list_nodes(MapState) ->
lists:sort(?MOD:list_nodes(state(MapState))).

evaluate_group_not_found_test(_) ->
State0 = state(),
Cmd = evaluate_group_command(<<"stream">>, <<"app">>, []),
{State0, {error, not_found}, []} = ?MOD:apply(Cmd, State0),
ok.

evaluate_group_all_connected_no_dead_pids_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = grp([csr(Pid0, 0, active),
csr(Pid1, 1, waiting)]),
State0 = state(#{GId => Group}),
Cmd = evaluate_group_command(stream(), name(), []),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp([csr(Pid0, 0, active),
csr(Pid1, 1, waiting)]),
Groups1),
assertEmpty(Eff),
ok.

evaluate_group_remove_dead_pid_consumers_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = grp([csr(Pid0, 0, active),
csr(Pid1, 1, waiting)]),
State0 = state(#{GId => Group}),
Cmd = evaluate_group_command(stream(), name(), [Pid0]),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp([csr(Pid1, 1, active)]),
Groups1),
assertSendMessageActivateEffect(Pid1, 1, stream(), name(), true, Eff),
ok.

evaluate_group_remove_disconnected_consumers_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = grp([csr(Pid0, 0, {connected, active}),
csr(Pid1, 1, {disconnected, waiting}),
csr(Pid2, 2, {connected, waiting})]),
State0 = state(#{GId => Group}),
Cmd = evaluate_group_command(stream(), name(), []),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp([csr(Pid0, 0, {connected, active}),
csr(Pid2, 2, {connected, waiting})]),
Groups1),
assertEmpty(Eff),
ok.

evaluate_group_remove_presumed_down_consumers_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = grp([csr(Pid0, 0, {connected, waiting}),
csr(Pid1, 1, {presumed_down, active}),
csr(Pid2, 2, {connected, waiting})]),
State0 = state(#{GId => Group}),
Cmd = evaluate_group_command(stream(), name(), []),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp([csr(Pid0, 0, {connected, active}),
csr(Pid2, 2, {connected, waiting})]),
Groups1),
assertSendMessageActivateEffect(Pid0, 0, stream(), name(), true, Eff),
ok.

evaluate_group_mix_dead_and_disconnected_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
Pid3 = new_process(),
GId = group_id(),
Group = grp([csr(Pid0, 0, {connected, active}),
csr(Pid1, 1, {disconnected, waiting}),
csr(Pid2, 2, {connected, waiting}),
csr(Pid3, 3, {connected, waiting})]),
State0 = state(#{GId => Group}),
Cmd = evaluate_group_command(stream(), name(), [Pid2]),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp([csr(Pid0, 0, {connected, active}),
csr(Pid3, 3, {connected, waiting})]),
Groups1),
assertEmpty(Eff),
ok.

evaluate_group_empty_after_cleanup_test(_) ->
Pid0 = new_process(),
GId = group_id(),
Group = grp([csr(Pid0, 0, {disconnected, active})]),
State0 = state(#{GId => Group}),
Cmd = evaluate_group_command(stream(), name(), []),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertEmpty(Groups1),
assertEmpty(Eff),
ok.

evaluate_group_super_stream_rebalance_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = grp(1, [csr(Pid0, 0, {connected, waiting}),
csr(Pid1, 1, {connected, active}),
csr(Pid2, 2, {connected, waiting})]),
State0 = state(#{GId => Group}),
Cmd = evaluate_group_command(stream(), name(), [Pid0]),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
%% Pid0 removed (dead), 2 consumers left
%% partition_index=1, 1 % 2 = 1, so Pid2 should be active
%% current active is Pid1 at index 0, new active should be Pid2 at index 1
assertHasGroup(GId,
grp(1, [csr(Pid1, 1, {connected, deactivating}),
csr(Pid2, 2, {connected, waiting})]),
Groups1),
assertSendMessageSteppingDownEffect(Pid1, 1, stream(), name(), Eff),
ok.

evaluate_group_super_stream_active_removed_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = grp(1, [csr(Pid0, 0, {disconnected, active}),
csr(Pid1, 1, {connected, waiting})]),
State0 = state(#{GId => Group}),
Cmd = evaluate_group_command(stream(), name(), []),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
%% disconnected active removed, only Pid1 left, should become active
assertHasGroup(GId,
grp(1, [csr(Pid1, 1, {connected, active})]),
Groups1),
assertSendMessageActivateEffect(Pid1, 1, stream(), name(), true, Eff),
ok.

evaluate_group_ensure_monitors_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group0 = grp([csr(Pid0, 0, {connected, active}),
csr(Pid1, 1, {disconnected, waiting}),
csr(Pid2, 2, {connected, waiting})]),
State0 = state(#{GId => Group0}),
Cmd = evaluate_group_command(stream(), name(), []),
{State1, ok, _} = ?MOD:apply(Cmd, State0),
{#?STATE{pids_groups = PidsGroups1}, _, _} =
?MOD:ensure_monitors(Cmd, State1, #{}, []),
assertSize(2, PidsGroups1),
?assert(maps:is_key(Pid0, PidsGroups1)),
?assert(maps:is_key(Pid2, PidsGroups1)),
?assertNot(maps:is_key(Pid1, PidsGroups1)),
ok.

start_node(Name) ->
{ok, NodePid, Node} = peer:start(#{
name => Name,
Expand Down Expand Up @@ -2041,6 +2196,12 @@ connection_reconnected_command(Pid) ->
purge_nodes_command(Nodes) ->
#command_purge_nodes{nodes = Nodes}.

evaluate_group_command(Stream, ConsumerName, DeadPids) ->
#command_evaluate_group{vhost = <<"/">>,
stream = Stream,
consumer_name = ConsumerName,
dead_pids = DeadPids}.

assertContainsCheckConnectionEffect(Pid, Effects) ->
assertContainsSendMessageEffect(Pid, {sac, check_connection, #{}}, Effects).

Expand Down
Loading