@@ -33,6 +33,7 @@ all_tests() ->
3333 sac_pre_v7_down_handler_should_use_monitors_map ,
3434 sac_pre_v7_ensure_monitors_should_use_monitors_map ,
3535 new_stream ,
36+ new_stream_idempotent ,
3637 leader_down ,
3738 leader_down_scenario_1 ,
3839 replica_down ,
@@ -497,6 +498,31 @@ new_stream(_) ->
497498
498499 ok .
499500
501+ new_stream_idempotent (_ ) ->
502+ S0 = rabbit_stream_coordinator :init (#{machine_version => 7 }),
503+ StreamId = atom_to_list (? FUNCTION_NAME ),
504+
505+ TypeState = #{name => StreamId ,
506+ retention => [],
507+ nodes => [node ()]},
508+ Q = new_q (list_to_binary (StreamId ), TypeState ),
509+ NewStream = {new_stream , StreamId , #{leader_node => node (),
510+ retention => [],
511+ queue => Q }},
512+ From = {self (), make_ref ()},
513+ StartIdx = ? LINE ,
514+ Meta = (meta (StartIdx ))#{from => From },
515+ {S1 , '$ra_no_reply' , _ } = apply_cmd (Meta , NewStream , S0 ),
516+ {S1 , '$ra_no_reply' , []} = apply_cmd (Meta #{index := ? LINE }, NewStream , S1 ),
517+ Pid = self (),
518+ {S2 , _ , _ } = apply_cmd (meta (? LINE ), {member_started , StreamId ,
519+ #{epoch => 1 ,
520+ index => StartIdx ,
521+ pid => Pid }}, S1 ),
522+ {_ , {ok , Pid }, []} = apply_cmd (Meta #{index := ? LINE }, NewStream , S2 ),
523+
524+ ok .
525+
500526leader_down (_ ) ->
501527 E = 1 ,
502528 StreamId = atom_to_list (? FUNCTION_NAME ),
0 commit comments