Skip to content

Commit 00ac2ae

Browse files
Merge pull request #15388 from amazon-mq/fix/aws-peer-discovery-instance-states
AWS peer discovery: filter EC2 instances by state
2 parents 0c21d68 + 41d5030 commit 00ac2ae

4 files changed

Lines changed: 169 additions & 5 deletions

File tree

deps/rabbitmq_peer_discovery_aws/priv/schema/rabbitmq_peer_discovery_aws.schema

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,17 @@ fun(Conf) ->
108108
Value -> rabbit_peer_discovery_util:as_list(Value)
109109
end
110110
end}.
111+
112+
%% ec2_instance_states
113+
114+
{mapping, "cluster_formation.aws.ec2_instance_states.$state", "rabbit.cluster_formation.peer_discovery_aws.aws_ec2_instance_states", [
115+
{datatype, string}
116+
]}.
117+
118+
{translation, "rabbit.cluster_formation.peer_discovery_aws.aws_ec2_instance_states",
119+
fun(Conf) ->
120+
case cuttlefish_variable:filter_by_prefix("cluster_formation.aws.ec2_instance_states", Conf) of
121+
[] -> ["running", "pending"];
122+
Settings -> [V || {_K, V} <- Settings]
123+
end
124+
end}.

deps/rabbitmq_peer_discovery_aws/src/rabbit_peer_discovery_aws.erl

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
-define(BACKEND_CONFIG_KEY, peer_discovery_aws).
3232

33+
-define(VALID_EC2_INSTANCE_STATES, ["pending", "running", "shutting-down", "terminated", "stopping", "stopped"]).
34+
3335
-define(CONFIG_MAPPING,
3436
#{
3537
aws_autoscaling => #peer_discovery_config_entry_meta{
@@ -66,6 +68,11 @@
6668
type = atom,
6769
env_variable = "AWS_USE_PRIVATE_IP",
6870
default_value = false
71+
},
72+
aws_ec2_instance_states => #peer_discovery_config_entry_meta{
73+
type = list,
74+
env_variable = "AWS_EC2_INSTANCE_STATES",
75+
default_value = ["running", "pending"]
6976
}
7077
}).
7178

@@ -280,8 +287,10 @@ get_hostname_by_instance_ids(Instances, Tag) ->
280287
QArgs = build_instance_list_qargs(Instances,
281288
[{"Action", "DescribeInstances"},
282289
{"Version", "2015-10-01"}]),
283-
QArgs2 = lists:keysort(1, maybe_add_tag_filters(Tag, QArgs, 1)),
284-
Path = "/?" ++ rabbitmq_aws_urilib:build_query_string(QArgs2),
290+
QArgs1 = maybe_add_tag_filters(Tag, QArgs, 1),
291+
QArgs2 = maybe_add_instance_state_filters(QArgs1, length(QArgs1) + 1),
292+
QArgs3 = lists:keysort(1, QArgs2),
293+
Path = "/?" ++ rabbitmq_aws_urilib:build_query_string(QArgs3),
285294
get_hostname_names(Path).
286295

287296
-spec build_instance_list_qargs(Instances :: list(), Accum :: list()) -> list().
@@ -306,6 +315,53 @@ maybe_add_tag_filters(Tags, QArgs, Num) ->
306315
end, {QArgs, Num}, Tags),
307316
Filters.
308317

318+
-spec maybe_add_instance_state_filters(filters(), integer()) -> filters().
319+
maybe_add_instance_state_filters(QArgs, Num) ->
320+
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
321+
States = get_config_key(aws_ec2_instance_states, M),
322+
case States of
323+
[] ->
324+
QArgs;
325+
[_|_] ->
326+
ValidStates = validate_instance_states(States),
327+
add_instance_state_filters(ValidStates, QArgs, Num)
328+
end.
329+
330+
-spec validate_instance_states(list()) -> list().
331+
validate_instance_states(States) ->
332+
NormalizedStates = [normalize_state(State) || State <- States],
333+
{Valid, Invalid} = lists:partition(
334+
fun(State) -> lists:member(State, ?VALID_EC2_INSTANCE_STATES) end,
335+
NormalizedStates),
336+
case Invalid of
337+
[] ->
338+
ok;
339+
[_|_] ->
340+
rabbit_log:warning(
341+
"Ignoring invalid EC2 instance states in configuration: ~tp. "
342+
"Valid states are: ~tp", [Invalid, ?VALID_EC2_INSTANCE_STATES])
343+
end,
344+
Valid.
345+
346+
-spec normalize_state(atom() | string()) -> string().
347+
normalize_state(State) when is_atom(State) ->
348+
atom_to_list(State);
349+
normalize_state(State) when is_list(State) ->
350+
State.
351+
352+
-spec add_instance_state_filters(list(), filters(), integer()) -> filters().
353+
add_instance_state_filters(States, QArgs, Num) ->
354+
FilterName = {"Filter." ++ integer_to_list(Num) ++ ".Name", "instance-state-name"},
355+
StateFilters = lists:foldl(
356+
fun(State, {Acc, Index}) ->
357+
Filter = {"Filter." ++ integer_to_list(Num) ++ ".Value." ++ integer_to_list(Index), State},
358+
{[Filter | Acc], Index + 1}
359+
end,
360+
{[], 1},
361+
States),
362+
{StateValues, _} = StateFilters,
363+
[FilterName | StateValues] ++ QArgs.
364+
309365
-spec get_node_list_from_tags(tags()) -> {ok, {[node()], disc}}.
310366
get_node_list_from_tags(M) when map_size(M) =:= 0 ->
311367
?LOG_WARNING("Cannot discover any nodes because AWS tags are not configured!", []),
@@ -336,8 +392,10 @@ get_hostname_names(Path) ->
336392

337393
get_hostname_by_tags(Tags) ->
338394
QArgs = [{"Action", "DescribeInstances"}, {"Version", "2015-10-01"}],
339-
QArgs2 = lists:keysort(1, maybe_add_tag_filters(Tags, QArgs, 1)),
340-
Path = "/?" ++ rabbitmq_aws_urilib:build_query_string(QArgs2),
395+
QArgs1 = maybe_add_tag_filters(Tags, QArgs, 1),
396+
QArgs2 = maybe_add_instance_state_filters(QArgs1, length(QArgs1) + 1),
397+
QArgs3 = lists:keysort(1, QArgs2),
398+
Path = "/?" ++ rabbitmq_aws_urilib:build_query_string(QArgs3),
341399
case get_hostname_names(Path) of
342400
error ->
343401
?LOG_WARNING("Cannot discover any nodes because AWS "

deps/rabbitmq_peer_discovery_aws/test/config_schema_SUITE_data/rabbitmq_peer_discovery_aws.snippets

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,30 @@
103103
]}
104104
]}
105105
], [rabbitmq_peer_discovery_aws]
106+
},
107+
{aws_ec2_instance_states,
108+
"cluster_formation.aws.ec2_instance_states.1 = running
109+
cluster_formation.aws.ec2_instance_states.2 = pending",
110+
[
111+
{rabbit, [
112+
{cluster_formation, [
113+
{peer_discovery_aws, [
114+
{aws_ec2_instance_states, ["running", "pending"]}
115+
]}
116+
]}
117+
]}
118+
], [rabbitmq_peer_discovery_aws]
119+
},
120+
{aws_ec2_instance_states_single,
121+
"cluster_formation.aws.ec2_instance_states.1 = running",
122+
[
123+
{rabbit, [
124+
{cluster_formation, [
125+
{peer_discovery_aws, [
126+
{aws_ec2_instance_states, ["running"]}
127+
]}
128+
]}
129+
]}
130+
], [rabbitmq_peer_discovery_aws]
106131
}
107132
].

deps/rabbitmq_peer_discovery_aws/test/unit_SUITE.erl

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@ groups() ->
2222
[
2323
{unit, [], [
2424
maybe_add_tag_filters,
25+
maybe_add_instance_state_filters,
26+
validate_instance_states,
2527
get_hostname_name_from_reservation_set,
2628
registration_support,
2729
network_interface_sorting,
28-
private_ip_address_sorting
30+
private_ip_address_sorting,
31+
get_hostname_by_instance_ids_with_state_filter,
32+
get_hostname_by_tags_with_state_filter
2933
]},
3034
{lock, [], [
3135
lock_single_node,
@@ -48,6 +52,27 @@ maybe_add_tag_filters(_Config) ->
4852
Result = lists:sort(rabbit_peer_discovery_aws:maybe_add_tag_filters(Tags, [], 1)),
4953
?assertEqual(Expectation, Result).
5054

55+
maybe_add_instance_state_filters(_Config) ->
56+
application:set_env(rabbit, cluster_formation,
57+
[{peer_discovery_aws, [{aws_ec2_instance_states, ["running", "pending"]}]}]),
58+
QArgs = [{"Action", "DescribeInstances"}, {"Version", "2015-10-01"}],
59+
Result = rabbit_peer_discovery_aws:maybe_add_instance_state_filters(QArgs, 1),
60+
Expectation = [{"Filter.1.Name", "instance-state-name"},
61+
{"Filter.1.Value.2", "pending"},
62+
{"Filter.1.Value.1", "running"},
63+
{"Action", "DescribeInstances"},
64+
{"Version", "2015-10-01"}],
65+
?assertEqual(Expectation, Result),
66+
application:unset_env(rabbit, cluster_formation).
67+
68+
validate_instance_states(_Config) ->
69+
ValidStates = ["pending", "running", "shutting-down", "terminated", "stopping", "stopped"],
70+
?assertEqual(ValidStates, rabbit_peer_discovery_aws:validate_instance_states(ValidStates)),
71+
?assertEqual(["running"], rabbit_peer_discovery_aws:validate_instance_states(["running", "invalid"])),
72+
?assertEqual([], rabbit_peer_discovery_aws:validate_instance_states(["bogus", "invalid"])),
73+
?assertEqual(["running", "pending"], rabbit_peer_discovery_aws:validate_instance_states([running, pending])),
74+
?assertEqual(["running"], rabbit_peer_discovery_aws:validate_instance_states([running, invalid])).
75+
5176
get_hostname_name_from_reservation_set(_Config) ->
5277
ok = eunit:test({
5378
foreach,
@@ -89,6 +114,44 @@ get_hostname_name_from_reservation_set(_Config) ->
89114
end}]
90115
}).
91116

117+
get_hostname_by_instance_ids_with_state_filter(_Config) ->
118+
application:set_env(rabbit, cluster_formation,
119+
[{peer_discovery_aws, [{aws_ec2_instance_states, ["running", "pending"]}]}]),
120+
meck:new(rabbitmq_aws, [passthrough]),
121+
meck:expect(rabbitmq_aws, api_get_request,
122+
fun("ec2", Path) ->
123+
?assert(string:str(Path, "Filter") > 0),
124+
?assert(string:str(Path, "instance-state-name") > 0),
125+
?assert(string:str(Path, "Value.1=running") > 0),
126+
?assert(string:str(Path, "Value.2=pending") > 0),
127+
{ok, mock_describe_instances_response()}
128+
end),
129+
Result = rabbit_peer_discovery_aws:get_hostname_by_instance_ids(
130+
["i-abc123", "i-def456"], #{}),
131+
?assertEqual(["ip-10-0-16-29.eu-west-1.compute.internal",
132+
"ip-10-0-16-31.eu-west-1.compute.internal"], Result),
133+
meck:unload(rabbitmq_aws),
134+
application:unset_env(rabbit, cluster_formation).
135+
136+
get_hostname_by_tags_with_state_filter(_Config) ->
137+
application:set_env(rabbit, cluster_formation,
138+
[{peer_discovery_aws, [{aws_ec2_instance_states, ["running"]}]}]),
139+
meck:new(rabbitmq_aws, [passthrough]),
140+
meck:expect(rabbitmq_aws, api_get_request,
141+
fun("ec2", Path) ->
142+
?assert(string:str(Path, "Filter") > 0),
143+
?assert(string:str(Path, "instance-state-name") > 0),
144+
?assert(string:str(Path, "Value.1=running") > 0),
145+
?assert(string:str(Path, "tag%3Aservice") > 0),
146+
{ok, mock_describe_instances_response()}
147+
end),
148+
Tags = maps:from_list([{"service", "rabbitmq"}]),
149+
Result = rabbit_peer_discovery_aws:get_hostname_by_tags(Tags),
150+
?assertEqual(["ip-10-0-16-29.eu-west-1.compute.internal",
151+
"ip-10-0-16-31.eu-west-1.compute.internal"], Result),
152+
meck:unload(rabbitmq_aws),
153+
application:unset_env(rabbit, cluster_formation).
154+
92155
registration_support(_Config) ->
93156
?assertEqual(false, rabbit_peer_discovery_aws:supports_registration()).
94157

@@ -289,3 +352,7 @@ reservation_set() ->
289352
]}
290353
]}]}]},
291354
{"privateIpAddress","10.0.16.31"}]}]}]}].
355+
356+
mock_describe_instances_response() ->
357+
[{"DescribeInstancesResponse",
358+
[{"reservationSet", reservation_set()}]}].

0 commit comments

Comments
 (0)