Skip to content

Commit 8f0aac8

Browse files
committed
rabbit_stream_coordinator: Make new_stream command idempotent
1 parent a9094b7 commit 8f0aac8

2 files changed

Lines changed: 39 additions & 2 deletions

File tree

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,6 +1411,17 @@ filter_command(_Meta, {delete_stream, _StreamId, #{}}, undefined) ->
14111411
%% Attempting to delete a stream which does not exist. Reply 'ok' to the
14121412
%% caller so that this action is idempotent.
14131413
{reply, ok};
1414+
filter_command(#{machine_version := Vsn},
1415+
{new_stream, _StreamId, #{}},
1416+
#stream{members = Members}) when ?V7_OR_MORE(Vsn) ->
1417+
MaybeLeader = [Pid || _Node := #member{state = {running, _, Pid},
1418+
role = {writer, _}} <- Members],
1419+
case MaybeLeader of
1420+
[LeaderPid] ->
1421+
{reply, {ok, LeaderPid}};
1422+
[] ->
1423+
{reply, '$ra_no_reply'}
1424+
end;
14141425
filter_command(_, _, _) ->
14151426
ok.
14161427

@@ -1420,8 +1431,8 @@ update_stream(Meta, Cmd, Stream) ->
14201431
catch
14211432
_:E:Stacktrace ->
14221433
?LOG_WARNING(
1423-
"~ts failed to update stream:~n~W~n~W",
1424-
[?MODULE, E, 10, Stacktrace, 10]),
1434+
"~ts failed to update stream:~n~P~n~P",
1435+
[?MODULE, E, 10, Stacktrace, 30]),
14251436
Stream
14261437
end.
14271438

deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ all_tests() ->
3333
sac_pre_v7_down_handler_should_use_monitors_map,
3434
sac_pre_v7_ensure_monitors_should_use_monitors_map,
3535
new_stream,
36+
new_stream_idempotent,
3637
leader_down,
3738
leader_down_scenario_1,
3839
replica_down,
@@ -497,6 +498,31 @@ new_stream(_) ->
497498

498499
ok.
499500

501+
new_stream_idempotent(_) ->
502+
S0 = rabbit_stream_coordinator:init(#{machine_version => 7}),
503+
StreamId = atom_to_list(?FUNCTION_NAME),
504+
505+
TypeState = #{name => StreamId,
506+
retention => [],
507+
nodes => [node()]},
508+
Q = new_q(list_to_binary(StreamId), TypeState),
509+
NewStream = {new_stream, StreamId, #{leader_node => node(),
510+
retention => [],
511+
queue => Q}},
512+
From = {self(), make_ref()},
513+
StartIdx = ?LINE,
514+
Meta = (meta(StartIdx))#{from => From},
515+
{S1, '$ra_no_reply', _} = apply_cmd(Meta, NewStream, S0),
516+
{S1, '$ra_no_reply', []} = apply_cmd(Meta#{index := ?LINE}, NewStream, S1),
517+
Pid = self(),
518+
{S2, _, _} = apply_cmd(meta(?LINE), {member_started, StreamId,
519+
#{epoch => 1,
520+
index => StartIdx,
521+
pid => Pid}}, S1),
522+
{_, {ok, Pid}, []} = apply_cmd(Meta#{index := ?LINE}, NewStream, S2),
523+
524+
ok.
525+
500526
leader_down(_) ->
501527
E = 1,
502528
StreamId = atom_to_list(?FUNCTION_NAME),

0 commit comments

Comments
 (0)