-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathcloudformation.py
More file actions
269 lines (230 loc) · 9.11 KB
/
cloudformation.py
File metadata and controls
269 lines (230 loc) · 9.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
This utility file contains methods to read information from certain CFN stack
"""
import logging
import posixpath
from typing import Any, Dict, Iterable, Optional, Set
from attr import dataclass
from botocore.exceptions import ClientError
from samcli.lib.utils.boto_utils import BotoProviderType, get_client_error_code
from samcli.lib.utils.resources import AWS_CLOUDFORMATION_STACK
LOG = logging.getLogger(__name__)
# list of possible values for active stacks
# CFN console has a way to display active stacks but it is not possible in API calls
STACK_ACTIVE_STATUS = [
"CREATE_IN_PROGRESS",
"CREATE_COMPLETE",
"ROLLBACK_IN_PROGRESS",
"ROLLBACK_FAILED",
"ROLLBACK_COMPLETE",
"DELETE_IN_PROGRESS",
"DELETE_FAILED",
"UPDATE_IN_PROGRESS",
"UPDATE_COMPLETE_CLEANUP_IN_PROGRESS",
"UPDATE_COMPLETE",
"UPDATE_ROLLBACK_IN_PROGRESS",
"UPDATE_ROLLBACK_FAILED",
"UPDATE_ROLLBACK_COMPLETE_CLEANUP_IN_PROGRESS",
"UPDATE_ROLLBACK_COMPLETE",
"REVIEW_IN_PROGRESS",
]
@dataclass
class CloudFormationResourceSummary:
"""
Keeps information about CFN resource
"""
resource_type: str
logical_resource_id: str
physical_resource_id: str
def get_resource_summaries(
boto_resource_provider: BotoProviderType,
boto_client_provider: BotoProviderType,
stack_name: str,
resource_types: Optional[Set[str]] = None,
nested_stack_prefix: Optional[str] = None,
) -> Dict[str, CloudFormationResourceSummary]:
"""
Collects information about CFN resources and return their summary as list
Parameters
----------
boto_resource_provider : BotoProviderType
A callable which will return boto3 resource
boto_client_provider : BotoProviderType
A callable which will return boto3 client
stack_name : str
Name of the stack which is deployed to CFN
resource_types : Optional[Set[str]]
List of resource types, which will filter the results
nested_stack_prefix: Optional[str]
This will contain logical id of the parent stack. So that ChildStackA/GrandChildStackB so that resources
under GrandChildStackB can create their keys like ChildStackA/GrandChildStackB/MyFunction
Returns
-------
List of CloudFormationResourceSummary which contains information about resources in the given stack
"""
LOG.debug("Fetching stack (%s) resources", stack_name)
try:
cfn_resource_summaries = list(
boto_resource_provider("cloudformation").Stack(stack_name).resource_summaries.all()
)
except ClientError as ex:
if get_client_error_code(ex) == "ValidationError" and LOG.isEnabledFor(logging.DEBUG):
LOG.debug(
"Invalid stack name (%s). Available stack names: %s",
stack_name,
", ".join(list_active_stack_names(boto_client_provider)),
)
raise ex
resource_summaries: Dict[str, CloudFormationResourceSummary] = {}
for cfn_resource_summary in cfn_resource_summaries:
resource_summary = CloudFormationResourceSummary(
cfn_resource_summary.resource_type,
cfn_resource_summary.logical_resource_id,
cfn_resource_summary.physical_resource_id,
)
if resource_summary.resource_type == AWS_CLOUDFORMATION_STACK:
new_nested_stack_prefix = resource_summary.logical_resource_id
if nested_stack_prefix:
new_nested_stack_prefix = posixpath.join(nested_stack_prefix, new_nested_stack_prefix)
resource_summaries.update(
get_resource_summaries(
boto_resource_provider,
boto_client_provider,
resource_summary.physical_resource_id,
resource_types,
new_nested_stack_prefix,
)
)
if resource_types and resource_summary.resource_type not in resource_types:
LOG.debug(
"Skipping resource %s since its type %s is not supported. Supported types %s",
resource_summary.logical_resource_id,
resource_summary.resource_type,
resource_types,
)
continue
resource_key = resource_summary.logical_resource_id
if nested_stack_prefix:
resource_key = posixpath.join(nested_stack_prefix, resource_key)
resource_summaries[resource_key] = resource_summary
return resource_summaries
def get_resource_summary(
boto_resource_provider: BotoProviderType,
boto_client_provider: BotoProviderType,
stack_name: str,
resource_logical_id: str,
) -> Optional[CloudFormationResourceSummary]:
"""
Returns resource summary of given single resource with its logical id
Parameters
----------
boto_resource_provider : BotoProviderType
A callable which will return boto3 resource
boto_client_provider : BotoProviderType
A callable which will return boto3 client
stack_name : str
Name of the stack which is deployed to CFN
resource_logical_id : str
Logical ID of the resource that will be returned as resource summary
Returns
-------
CloudFormationResourceSummary of the resource which is identified by given logical id
"""
cfn_resource_summaries = get_resource_summaries(boto_resource_provider, boto_client_provider, stack_name)
for logical_id, cfn_resource_summary in cfn_resource_summaries.items():
if logical_id == resource_logical_id:
return cfn_resource_summary
return None
def get_resource_summary_from_physical_id(
boto_client_provider: BotoProviderType, resource_physical_id: str
) -> Optional[CloudFormationResourceSummary]:
"""
Returns resource summary from the physical id of the resource. Returns None if no resource can be found
Parameters
----------
boto_client_provider : BotoProviderType
A callable which will return boto3 client
resource_physical_id : str
Physical ID of the resource that will be returned as resource summary
Returns
-------
CloudFormationResourceSummary of the resource which is identified by given logical id
"""
try:
cfn_client = boto_client_provider("cloudformation")
describe_stack_response = cfn_client.describe_stack_resources(PhysicalResourceId=resource_physical_id)
stack_resources = describe_stack_response.get("StackResources", [])
for stack_resource in stack_resources:
if stack_resource.get("PhysicalResourceId") == resource_physical_id:
return CloudFormationResourceSummary(
stack_resource.get("ResourceType"),
stack_resource.get("LogicalResourceId"),
stack_resource.get("PhysicalResourceId"),
)
return None
except ClientError as e:
LOG.debug("Failed to pull resource (%s) information with its physical id.", exc_info=e)
return None
def list_active_stack_names(boto_client_provider: BotoProviderType, show_nested_stacks: bool = False) -> Iterable[str]:
"""
Returns list of active cloudformation stack names
Parameters
----------
boto_client_provider : BotoProviderType
A callable which will return boto3 client
show_nested_stacks : bool
True; will display nested stack names as well. False; will hide nested stack names from the list.
Returns
-------
Iterable[str] List of stack names that is currently active
"""
cfn_client = boto_client_provider("cloudformation")
first_call = True
next_token: Optional[str] = None
while first_call or next_token:
first_call = False
kwargs: Dict[str, Any] = {"StackStatusFilter": STACK_ACTIVE_STATUS}
if next_token:
kwargs["NextToken"] = next_token
list_stacks_result = cfn_client.list_stacks(**kwargs)
for stack_summary in list_stacks_result.get("StackSummaries", []):
if not show_nested_stacks and stack_summary.get("RootId"):
continue
yield stack_summary.get("StackName")
next_token = list_stacks_result.get("NextToken")
# CloudFormation intrinsic function names
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference.html
CLOUDFORMATION_INTRINSIC_FUNCTIONS = {
"Fn::Base64",
"Fn::Cidr",
"Fn::FindInMap",
"Fn::ForEach",
"Fn::GetAtt",
"Fn::GetAZs",
"Fn::ImportValue",
"Fn::Join",
"Fn::Length",
"Fn::Select",
"Fn::Split",
"Fn::Sub",
"Fn::ToJsonString",
"Fn::Transform",
"Fn::And",
"Fn::Equals",
"Fn::If",
"Fn::Not",
"Fn::Or",
"Ref",
}
def is_intrinsic_function(value: Any) -> bool:
"""
Checks if a value is a CloudFormation intrinsic function.
When YAML templates are parsed, intrinsic functions are represented as OrderedDict
with a single key that matches one of the CloudFormation intrinsic function names.
"""
if not isinstance(value, dict):
return False
if len(value) != 1:
return False
key = next(iter(value.keys()))
return key in CLOUDFORMATION_INTRINSIC_FUNCTIONS