Skip to content

Commit 7aa9951

Browse files
committed
rabbit_stream_coordinator: Make new_stream command idempotent
1 parent 0b951b8 commit 7aa9951

2 files changed

Lines changed: 37 additions & 2 deletions

File tree

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,15 @@ filter_command(_Meta, {delete_stream, _StreamId, #{}}, undefined) ->
13801380
%% Attempting to delete a stream which does not exist. Reply 'ok' to the
13811381
%% caller so that this action is idempotent.
13821382
{reply, ok};
1383+
filter_command(_Meta, {new_stream, _StreamId, #{}}, #stream{members = Members}) ->
1384+
MaybeLeader = [Pid || _Node := #member{state = {running, _, Pid},
1385+
role = {writer, _}} <- Members],
1386+
case MaybeLeader of
1387+
[LeaderPid] ->
1388+
{reply, {ok, LeaderPid}};
1389+
[] ->
1390+
{reply, '$ra_no_reply'}
1391+
end;
13831392
filter_command(_, _, _) ->
13841393
ok.
13851394

@@ -1389,8 +1398,8 @@ update_stream(Meta, Cmd, Stream) ->
13891398
catch
13901399
_:E:Stacktrace ->
13911400
?LOG_WARNING(
1392-
"~ts failed to update stream:~n~W~n~W",
1393-
[?MODULE, E, 10, Stacktrace, 10]),
1401+
"~ts failed to update stream:~n~P~n~P",
1402+
[?MODULE, E, 10, Stacktrace, 30]),
13941403
Stream
13951404
end.
13961405

deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ all_tests() ->
2828
machine_version_upgrade_to_2,
2929
machine_version_upgrade_to_3,
3030
new_stream,
31+
new_stream_idempotent,
3132
leader_down,
3233
leader_down_scenario_1,
3334
replica_down,
@@ -361,6 +362,31 @@ new_stream(_) ->
361362

362363
ok.
363364

365+
new_stream_idempotent(_) ->
366+
S0 = rabbit_stream_coordinator:init(#{machine_version => 5}),
367+
StreamId = atom_to_list(?FUNCTION_NAME),
368+
369+
TypeState = #{name => StreamId,
370+
retention => [],
371+
nodes => [node()]},
372+
Q = new_q(list_to_binary(StreamId), TypeState),
373+
NewStream = {new_stream, StreamId, #{leader_node => node(),
374+
retention => [],
375+
queue => Q}},
376+
From = {self(), make_ref()},
377+
StartIdx = ?LINE,
378+
Meta = (meta(StartIdx))#{from => From},
379+
{S1, '$ra_no_reply', _} = apply_cmd(Meta, NewStream, S0),
380+
{S1, '$ra_no_reply', []} = apply_cmd(Meta#{index := ?LINE}, NewStream, S1),
381+
Pid = self(),
382+
{S2, _, _} = apply_cmd(meta(?LINE), {member_started, StreamId,
383+
#{epoch => 1,
384+
index => StartIdx,
385+
pid => Pid}}, S1),
386+
{_, {ok, Pid}, []} = apply_cmd(Meta#{index := ?LINE}, NewStream, S2),
387+
388+
ok.
389+
364390
leader_down(_) ->
365391
E = 1,
366392
StreamId = atom_to_list(?FUNCTION_NAME),

0 commit comments

Comments
 (0)