@@ -27,6 +27,11 @@ all_tests() ->
2727 listeners ,
2828 machine_version_upgrade_to_2 ,
2929 machine_version_upgrade_to_3 ,
30+ machine_version_upgrade_to_7 ,
31+ sac_v7_down_handler_should_not_use_monitors_map ,
32+ sac_v7_ensure_monitors_should_not_use_monitors_map ,
33+ sac_pre_v7_down_handler_should_use_monitors_map ,
34+ sac_pre_v7_ensure_monitors_should_use_monitors_map ,
3035 new_stream ,
3136 leader_down ,
3237 leader_down_scenario_1 ,
@@ -60,9 +65,23 @@ init_per_group(_Group, Config) ->
6065end_per_group (_Group , _Config ) ->
6166 ok .
6267
68+ init_per_testcase (TestCase , Config )
69+ when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map ;
70+ TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map ;
71+ TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map ;
72+ TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map ->
73+ ok = meck :new (rabbit_stream_sac_coordinator , [no_link ]),
74+ Config ;
6375init_per_testcase (_TestCase , Config ) ->
6476 Config .
6577
78+ end_per_testcase (TestCase , _Config )
79+ when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map ;
80+ TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map ;
81+ TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map ;
82+ TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map ->
83+ meck :unload (rabbit_stream_sac_coordinator ),
84+ ok ;
6685end_per_testcase (_TestCase , _Config ) ->
6786 ok .
6887
@@ -253,6 +272,123 @@ machine_version_to_3(From) ->
253272 ? assertEqual (Effects , []),
254273 ok .
255274
275+ machine_version_upgrade_to_7 (_ ) ->
276+ Pid1 = spawn (fun () -> ok end ),
277+ Pid2 = spawn (fun () -> ok end ),
278+ Pid3 = spawn (fun () -> ok end ),
279+ S = <<" stream" >>,
280+ Monitors0 = #{Pid1 => sac ,
281+ Pid2 => {S , member },
282+ Pid3 => sac },
283+ State0 = #? STATE {monitors = Monitors0 },
284+
285+ {State1 , ok , Effects } = apply_cmd (#{index => 42 }, {machine_version , 6 , 7 }, State0 ),
286+
287+ ? assertEqual (#{Pid2 => {S , member }}, State1 #? STATE .monitors ),
288+ ? assertEqual ([], Effects ),
289+ ok .
290+
291+ sac_v7_down_handler_should_not_use_monitors_map (_ ) ->
292+ ConnectionPid = spawn (fun () -> ok end ),
293+ SacState0 = fake_sac_state ,
294+ SacState1 = updated_sac_state ,
295+ meck :expect (rabbit_stream_sac_coordinator , handle_connection_down ,
296+ fun (_Meta , Pid , normal , State ) when Pid =:= ConnectionPid ,
297+ State =:= SacState0 ->
298+ {SacState1 , []}
299+ end ),
300+
301+ OtherPid = spawn (fun () -> ok end ),
302+ Monitors0 = #{OtherPid => {<<" other" >>, member }},
303+ State0 = #? STATE {single_active_consumer = SacState0 ,
304+ monitors = Monitors0 },
305+
306+ {State1 , ok , _Effects } = apply_cmd (meta (#{index => 42 , machine_version => 7 }),
307+ {down , ConnectionPid , normal }, State0 ),
308+
309+ ? assert (meck :called (rabbit_stream_sac_coordinator , handle_connection_down ,
310+ ['_' , ConnectionPid , normal , SacState0 ])),
311+ ? assertEqual (SacState1 , State1 #? STATE .single_active_consumer ),
312+ ? assertEqual (Monitors0 , State1 #? STATE .monitors ),
313+ ok .
314+
315+ sac_v7_ensure_monitors_should_not_use_monitors_map (_ ) ->
316+ ConnectionPid = self (),
317+ SacCmd = fake_sac_cmd ,
318+ SacState0 = fake_sac_state ,
319+ SacState1 = updated_sac_state ,
320+ meck :expect (rabbit_stream_sac_coordinator , apply ,
321+ fun (Cmd , State ) when Cmd =:= SacCmd ,
322+ State =:= SacState0 ->
323+ {SacState1 , {ok , true }, []}
324+ end ),
325+ meck :expect (rabbit_stream_sac_coordinator , ensure_monitors ,
326+ fun (Cmd , State , Monitors , Effects ) when Cmd =:= SacCmd ,
327+ State =:= SacState1 ->
328+ {State , Monitors #{ConnectionPid => sac }, Effects }
329+ end ),
330+
331+ State0 = #? STATE {single_active_consumer = SacState0 ,
332+ monitors = #{}},
333+
334+ {State1 , {ok , true }, _Effects } = apply_cmd (meta (#{index => 42 , machine_version => 7 }),
335+ {sac , SacCmd }, State0 ),
336+
337+ ? assertEqual (#{}, State1 #? STATE .monitors ),
338+ ? assertEqual (SacState1 , State1 #? STATE .single_active_consumer ),
339+ ok .
340+
341+ sac_pre_v7_down_handler_should_use_monitors_map (_ ) ->
342+ ConnectionPid = spawn (fun () -> ok end ),
343+ SacState0 = fake_sac_state ,
344+ SacState1 = updated_sac_state ,
345+ meck :expect (rabbit_stream_sac_coordinator , handle_connection_down ,
346+ fun (_Meta , Pid , normal , State ) when Pid =:= ConnectionPid ,
347+ State =:= SacState0 ->
348+ {SacState1 , []}
349+ end ),
350+
351+ OtherPid = spawn (fun () -> ok end ),
352+ Monitors0 = #{ConnectionPid => sac ,
353+ OtherPid => {<<" other" >>, member }},
354+ State0 = #? STATE {single_active_consumer = SacState0 ,
355+ monitors = Monitors0 },
356+
357+ {State1 , ok , _Effects } = apply_cmd (meta (#{index => 42 , machine_version => 6 }),
358+ {down , ConnectionPid , normal }, State0 ),
359+
360+ ? assert (meck :called (rabbit_stream_sac_coordinator , handle_connection_down ,
361+ ['_' , ConnectionPid , normal , SacState0 ])),
362+ ? assertEqual (SacState1 , State1 #? STATE .single_active_consumer ),
363+ ? assertEqual (#{OtherPid => {<<" other" >>, member }}, State1 #? STATE .monitors ),
364+ ok .
365+
366+ sac_pre_v7_ensure_monitors_should_use_monitors_map (_ ) ->
367+ ConnectionPid = self (),
368+ SacCmd = fake_sac_cmd ,
369+ SacState0 = fake_sac_state ,
370+ SacState1 = updated_sac_state ,
371+ meck :expect (rabbit_stream_sac_coordinator , apply ,
372+ fun (Cmd , State ) when Cmd =:= SacCmd ,
373+ State =:= SacState0 ->
374+ {SacState1 , {ok , true }, []}
375+ end ),
376+ meck :expect (rabbit_stream_sac_coordinator , ensure_monitors ,
377+ fun (Cmd , State , Monitors , Effects ) when Cmd =:= SacCmd ,
378+ State =:= SacState1 ->
379+ {State , Monitors #{ConnectionPid => sac }, Effects }
380+ end ),
381+
382+ State0 = #? STATE {single_active_consumer = SacState0 ,
383+ monitors = #{}},
384+
385+ {State1 , {ok , true }, _Effects } = apply_cmd (meta (#{index => 42 , machine_version => 6 }),
386+ {sac , SacCmd }, State0 ),
387+
388+ ? assertEqual (#{ConnectionPid => sac }, State1 #? STATE .monitors ),
389+ ? assertEqual (SacState1 , State1 #? STATE .single_active_consumer ),
390+ ok .
391+
256392new_stream (_ ) ->
257393 [N1 , N2 , N3 ] = Nodes = [r@n1 , r@n2 , r@n3 ],
258394 StreamId = atom_to_list (? FUNCTION_NAME ),
0 commit comments