diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index a0b8bcd8ba49..6041cfc6bab6 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -1411,6 +1411,17 @@ filter_command(_Meta, {delete_stream, _StreamId, #{}}, undefined) -> %% Attempting to delete a stream which does not exist. Reply 'ok' to the %% caller so that this action is idempotent. {reply, ok}; +filter_command(#{machine_version := Vsn}, + {new_stream, _StreamId, #{}}, + #stream{members = Members}) when ?V7_OR_MORE(Vsn) -> + MaybeLeader = [Pid || _Node := #member{state = {running, _, Pid}, + role = {writer, _}} <- Members], + case MaybeLeader of + [LeaderPid] -> + {reply, {ok, LeaderPid}}; + [] -> + {reply, '$ra_no_reply'} + end; filter_command(_, _, _) -> ok. @@ -1420,8 +1431,8 @@ update_stream(Meta, Cmd, Stream) -> catch _:E:Stacktrace -> ?LOG_WARNING( - "~ts failed to update stream:~n~W~n~W", - [?MODULE, E, 10, Stacktrace, 10]), + "~ts failed to update stream:~n~P~n~P", + [?MODULE, E, 10, Stacktrace, 30]), Stream end. diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl index 3fb94dabfa6a..f14c7c8f3f21 100644 --- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl @@ -33,6 +33,7 @@ all_tests() -> sac_pre_v7_down_handler_should_use_monitors_map, sac_pre_v7_ensure_monitors_should_use_monitors_map, new_stream, + new_stream_idempotent, leader_down, leader_down_scenario_1, replica_down, @@ -497,6 +498,31 @@ new_stream(_) -> ok. +new_stream_idempotent(_) -> + S0 = rabbit_stream_coordinator:init(#{machine_version => 7}), + StreamId = atom_to_list(?FUNCTION_NAME), + + TypeState = #{name => StreamId, + retention => [], + nodes => [node()]}, + Q = new_q(list_to_binary(StreamId), TypeState), + NewStream = {new_stream, StreamId, #{leader_node => node(), + retention => [], + queue => Q}}, + From = {self(), make_ref()}, + StartIdx = ?LINE, + Meta = (meta(StartIdx))#{from => From}, + {S1, '$ra_no_reply', _} = apply_cmd(Meta, NewStream, S0), + {S1, '$ra_no_reply', []} = apply_cmd(Meta#{index := ?LINE}, NewStream, S1), + Pid = self(), + {S2, _, _} = apply_cmd(meta(?LINE), {member_started, StreamId, + #{epoch => 1, + index => StartIdx, + pid => Pid}}, S1), + {_, {ok, Pid}, []} = apply_cmd(Meta#{index := ?LINE}, NewStream, S2), + + ok. + leader_down(_) -> E = 1, StreamId = atom_to_list(?FUNCTION_NAME),