Skip to content

Commit f72ede3

Browse files
committed
rabbit_chaos: easily pick Ra system
1 parent e4acc6a commit f72ede3

1 file changed

Lines changed: 140 additions & 43 deletions

File tree

deps/rabbit/src/rabbit_chaos.erl

Lines changed: 140 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,16 @@
2020
-export([
2121
begin_default/0,
2222
begin_default/1,
23-
begin_chaos/1
23+
begin_quorum_chaos/0,
24+
begin_quorum_chaos/1,
25+
begin_coordination_chaos/0,
26+
begin_coordination_chaos/1,
27+
begin_delayed_chaos/0,
28+
begin_delayed_chaos/1,
29+
begin_jms_chaos/0,
30+
begin_jms_chaos/1,
31+
begin_chaos/1,
32+
stop/0
2433
]).
2534

2635

@@ -40,12 +49,13 @@
4049
non_neg_integer(), [chaos_event()]}
4150
}.
4251

43-
-type chaos_cfg() :: #{ra_system => atom(),
52+
-type chaos_cfg() :: #{ra_systems := [atom()],
4453
interval := non_neg_integer(),
4554
events := [chaos_event()]}.
4655
-define(SERVER, ?MODULE).
4756

48-
-record(?MODULE, {cfg :: chaos_cfg()}).
57+
-record(?MODULE, {cfg :: chaos_cfg(),
58+
timer_ref :: undefined | reference()}).
4959

5060
-export_type([chaos_cfg/0,
5161
chaos_event/0]).
@@ -55,49 +65,79 @@
5565
%% inside the broker.
5666
%%----------------------------------------------------------------------------
5767

68+
-define(ALL_SYSTEMS, [quorum_queues, coordination, delayed_queues, jms_queues]).
69+
-define(DEFAULT_INTERVAL, 20000).
70+
5871
begin_default() ->
59-
begin_default(20000).
72+
begin_default(?DEFAULT_INTERVAL).
6073

6174
begin_default(Interval) ->
62-
Events = [
63-
{kill_qq_wal, 1, {kill_named_proc, ra_log_wal, chaos}},
64-
{kill_qq_seg_writer, 1,
65-
{kill_named_proc, ra_log_segment_writer, kill}},
66-
{kill_qq_member, 2, {kill_ra_member, chaos}},
67-
{restart_qq_member, 2, restart_ra_member},
68-
{flood_a_node, 2, flood_node}
69-
],
70-
begin_chaos(#{ra_system => quorum_queues,
71-
interval => Interval,
72-
events => Events}).
75+
begin_for_systems(?ALL_SYSTEMS, Interval).
76+
77+
begin_quorum_chaos() ->
78+
begin_quorum_chaos(?DEFAULT_INTERVAL).
79+
80+
begin_quorum_chaos(Interval) ->
81+
begin_for_systems([quorum_queues], Interval).
82+
83+
begin_coordination_chaos() ->
84+
begin_coordination_chaos(?DEFAULT_INTERVAL).
85+
86+
begin_coordination_chaos(Interval) ->
87+
begin_for_systems([coordination], Interval).
88+
89+
begin_delayed_chaos() ->
90+
begin_delayed_chaos(?DEFAULT_INTERVAL).
91+
92+
begin_delayed_chaos(Interval) ->
93+
begin_for_systems([delayed_queues], Interval).
94+
95+
begin_jms_chaos() ->
96+
begin_jms_chaos(?DEFAULT_INTERVAL).
97+
98+
begin_jms_chaos(Interval) ->
99+
begin_for_systems([jms_queues], Interval).
73100

74101
begin_chaos(Cfg) ->
75102
gen_server:call(?SERVER, {begin_chaos, Cfg}).
76103

104+
stop() ->
105+
gen_server:call(?SERVER, stop_chaos).
106+
77107
-spec start_link() -> rabbit_types:ok_pid_or_error().
78108
start_link() ->
79109
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
80110

81111
init([]) ->
82112
process_flag(trap_exit, true),
83-
Cfg = #{interval => 20000,
113+
Cfg = #{interval => ?DEFAULT_INTERVAL,
114+
ra_systems => [],
84115
events => []},
85-
{ok, #?MODULE{cfg = Cfg}}.
116+
{ok, #?MODULE{cfg = Cfg, timer_ref = undefined}}.
86117

87-
handle_call({begin_chaos, #{interval := Interval} = Cfg}, _From, State) ->
88-
_ = erlang:send_after(Interval, self(), do_chaos),
118+
handle_call({begin_chaos, #{interval := Interval} = Cfg}, _From, State0) ->
119+
State = cancel_timer(State0),
120+
Ref = erlang:send_after(Interval, self(), do_chaos),
121+
{reply, ok, State#?MODULE{cfg = Cfg, timer_ref = Ref}};
122+
handle_call(stop_chaos, _From, State0) ->
123+
State = cancel_timer(State0),
124+
Cfg = #{interval => ?DEFAULT_INTERVAL,
125+
ra_systems => [],
126+
events => []},
127+
rabbit_log:info("~s: chaos stopped", [?MODULE]),
89128
{reply, ok, State#?MODULE{cfg = Cfg}}.
90129

91130
handle_cast(_Request, State) ->
92131
{noreply, State}.
93132

94-
handle_info(do_chaos, #?MODULE{cfg = #{ra_system := Sys,
133+
handle_info(do_chaos, #?MODULE{cfg = #{ra_systems := Systems,
95134
interval := Interval} = Cfg} = State) ->
96135
Events = maps:get(events, Cfg),
97136
{Name, _, Event} = pick_event(Events),
137+
Sys = pick_random(Systems),
98138
do_event(Sys, Name, Event),
99-
_ = erlang:send_after(Interval, self(), do_chaos),
100-
{noreply, State};
139+
Ref = erlang:send_after(Interval, self(), do_chaos),
140+
{noreply, State#?MODULE{timer_ref = Ref}};
101141
handle_info(_, #?MODULE{} = State) ->
102142
{noreply, State}.
103143

@@ -109,12 +149,54 @@ code_change(_OldVsn, State, _Extra) ->
109149

110150
%% internal
111151

152+
begin_for_systems(Systems, Interval) ->
153+
Events = lists:flatmap(fun events_for_system/1, Systems)
154+
++ common_events(),
155+
begin_chaos(#{ra_systems => Systems,
156+
interval => Interval,
157+
events => Events}).
158+
159+
events_for_system(quorum_queues) ->
160+
[{kill_qq_wal, 1, {kill_named_proc, ra_log_wal, chaos}},
161+
{kill_qq_seg_writer, 1, {kill_named_proc, ra_log_segment_writer, kill}},
162+
{kill_qq_member, 2, {kill_ra_member, chaos}},
163+
{restart_qq_member, 2, restart_ra_member}];
164+
events_for_system(coordination) ->
165+
[{kill_coord_wal, 1, {kill_named_proc, ra_coordination_log_wal, chaos}},
166+
{kill_coord_seg_writer, 1,
167+
{kill_named_proc, ra_coordination_segment_writer, kill}},
168+
{kill_coord_member, 2, {kill_ra_member, chaos}},
169+
{restart_coord_member, 2, restart_ra_member}];
170+
events_for_system(delayed_queues) ->
171+
[{kill_dq_wal, 1, {kill_named_proc, ra_delayed_queues_log_wal, chaos}},
172+
{kill_dq_seg_writer, 1,
173+
{kill_named_proc, ra_delayed_queues_segment_writer, kill}},
174+
{kill_dq_member, 2, {kill_ra_member, chaos}},
175+
{restart_dq_member, 2, restart_ra_member}];
176+
events_for_system(jms_queues) ->
177+
[{kill_jms_wal, 1, {kill_named_proc, ra_jms_queues_log_wal, chaos}},
178+
{kill_jms_seg_writer, 1,
179+
{kill_named_proc, ra_jms_queues_segment_writer, kill}},
180+
{kill_jms_member, 2, {kill_ra_member, chaos}},
181+
{restart_jms_member, 2, restart_ra_member}].
182+
183+
common_events() ->
184+
[{flood_a_node, 2, flood_node}].
185+
186+
cancel_timer(#?MODULE{timer_ref = undefined} = State) ->
187+
State;
188+
cancel_timer(#?MODULE{timer_ref = Ref} = State) ->
189+
erlang:cancel_timer(Ref),
190+
State#?MODULE{timer_ref = undefined}.
191+
192+
pick_random(List) ->
193+
lists:nth(rand:uniform(length(List)), List).
194+
112195
do_event(_Sys, Name, {kill_named_proc, ProcName, ExitReason}) ->
113196
rabbit_log:info("~s: doing event ~s...", [?MODULE, Name]),
114197
catch exit(whereis(ProcName), ExitReason),
115198
ok;
116199
do_event(_Sys, Name, flood_node) ->
117-
%% TODO: avoid if nodes() == []
118200
Nodes = nodes(),
119201
case Nodes of
120202
[] -> ok;
@@ -133,7 +215,6 @@ do_event(_Sys, Name, flood_node) ->
133215
Pid ! Data,
134216
rabbit_log:info("~s: flood of node ~s competed ~s...",
135217
[?MODULE, Selected, Name]),
136-
%% flood complete
137218
ok;
138219
ok ->
139220
F(N-1)
@@ -143,26 +224,35 @@ do_event(_Sys, Name, flood_node) ->
143224
Loop(10000),
144225
ok
145226
end;
146-
do_event(_Sys, Name, {kill_ra_member, ExitReason}) ->
147-
rabbit_log:info("~s: doing event ~s...", [?MODULE, Name]),
148-
Procs = ets:tab2list(ra_leaderboard),
149-
At = rand:uniform(length(Procs)),
150-
{Selected, _, _} = lists:nth(At, Procs),
151-
{ok, _, _} = ra:local_query({Selected, node()},
152-
fun (_) -> process_flag(trap_exit, false) end),
153-
catch exit(whereis(Selected), ExitReason),
154-
ok;
227+
do_event(Sys, Name, {kill_ra_member, ExitReason}) ->
228+
rabbit_log:info("~s: doing event ~s in Ra system ~s...", [?MODULE, Name, Sys]),
229+
case list_registered_safe(Sys) of
230+
[] ->
231+
rabbit_log:info("~s: no Ra members in system ~s, skipping", [?MODULE, Sys]),
232+
ok;
233+
Registered ->
234+
{Selected, _UId} = pick_random(Registered),
235+
{ok, _, _} = ra:local_query({Selected, node()},
236+
fun (_) -> process_flag(trap_exit, false) end),
237+
catch exit(whereis(Selected), ExitReason),
238+
ok
239+
end;
155240
do_event(Sys, Name, restart_ra_member = Type) ->
156-
rabbit_log:info("~s: doing event ~s of type ~s", [?MODULE, Name, Type]),
157-
Procs = ets:tab2list(ra_leaderboard),
158-
At = rand:uniform(length(Procs)),
159-
{ServerName, _, _} = lists:nth(At, Procs),
160-
ServerId = {ServerName, node()},
161-
_ = ra:stop_server(Sys, ServerId),
162-
Sleep = rand:uniform(10000) + 1000,
163-
timer:sleep(Sleep),
164-
_ = ra:restart_server(Sys, ServerId),
165-
ok;
241+
rabbit_log:info("~s: doing event ~s of type ~s in Ra system ~s",
242+
[?MODULE, Name, Type, Sys]),
243+
case list_registered_safe(Sys) of
244+
[] ->
245+
rabbit_log:info("~s: no Ra members in system ~s, skipping", [?MODULE, Sys]),
246+
ok;
247+
Registered ->
248+
{ServerName, _UId} = pick_random(Registered),
249+
ServerId = {ServerName, node()},
250+
_ = ra:stop_server(Sys, ServerId),
251+
Sleep = rand:uniform(10000) + 1000,
252+
timer:sleep(Sleep),
253+
_ = ra:restart_server(Sys, ServerId),
254+
ok
255+
end;
166256
do_event(Sys, Name, {multi, Num, Interval, Event}) ->
167257
rabbit_log:info("~s: doing multi event ~s...",
168258
[?MODULE, Name]),
@@ -172,6 +262,13 @@ do_event(Sys, Name, {multi, Num, Interval, Event}) ->
172262
end || _ <- lists:seq(1, Num)],
173263
ok.
174264

265+
list_registered_safe(Sys) ->
266+
try
267+
ra_directory:list_registered(Sys)
268+
catch
269+
_:_ -> []
270+
end.
271+
175272
pick_event(Events) ->
176273
TotalWeight = lists:sum([element(2, E) || E <- Events]),
177274
Pick = rand:uniform(TotalWeight),

0 commit comments

Comments
 (0)