forked from aws/aws-sam-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_error_responses.py
More file actions
310 lines (255 loc) · 10.1 KB
/
lambda_error_responses.py
File metadata and controls
310 lines (255 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
"""Common Lambda Error Responses"""
import json
from collections import OrderedDict
from typing import Any, Dict
from flask import Response
from samcli.local.services.base_local_service import BaseLocalService
class LambdaErrorResponses:
# The content type of the Invoke request body is not JSON.
UnsupportedMediaTypeException = ("UnsupportedMediaType", 415)
# The AWS Lambda service encountered an internal error.
ServiceException = ("Service", 500)
# The resource (for example, a Lambda function or access policy statement) specified in the request does not exist.
ResourceNotFoundException = ("ResourceNotFound", 404)
# The request body could not be parsed as JSON.
InvalidRequestContentException = ("InvalidRequestContent", 400)
# One or more parameters values were invalid.
ValidationException = ("ValidationException", 400)
NotImplementedException = ("NotImplemented", 501)
ContainerCreationFailed = ("ContainerCreationFailed", 501)
PathNotFoundException = ("PathNotFoundLocally", 404)
MethodNotAllowedException = ("MethodNotAllowedLocally", 405)
# Error Types
USER_ERROR = "User"
SERVICE_ERROR = "Service"
LOCAL_SERVICE_ERROR = "LocalService"
# Header Information
CONTENT_TYPE = "application/json"
CONTENT_TYPE_HEADER_KEY = "Content-Type"
@staticmethod
def resource_not_found(function_name: str) -> Response:
"""
Creates a Lambda Service ResourceNotFound Response
Parameters
----------
function_name str
Name of the function that was requested to invoke
Returns
-------
Flask.Response
A response object representing the ResourceNotFound Error
"""
exception_tuple = LambdaErrorResponses.ResourceNotFoundException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(
LambdaErrorResponses.USER_ERROR,
"Function not found: arn:aws:lambda:us-west-2:012345678901:function:{}".format(function_name),
),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def invalid_request_content(message: str) -> Response:
"""
Creates a Lambda Service InvalidRequestContent Response
Parameters
----------
message str
Message to be added to the body of the response
Returns
-------
Flask.Response
A response object representing the InvalidRequestContent Error
"""
exception_tuple = LambdaErrorResponses.InvalidRequestContentException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(LambdaErrorResponses.USER_ERROR, message),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def validation_exception(message: str) -> Response:
"""
Creates a Lambda Service ValidationException Response
Parameters
----------
message str
Message to be added to the body of the response
Returns
-------
Flask.Response
A response object representing the ValidationException Error
"""
exception_tuple = LambdaErrorResponses.ValidationException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(LambdaErrorResponses.USER_ERROR, message),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def unsupported_media_type(content_type: str) -> Response:
"""
Creates a Lambda Service UnsupportedMediaType Response
Parameters
----------
content_type str
Content Type of the request that was made
Returns
-------
Flask.Response
A response object representing the UnsupportedMediaType Error
"""
exception_tuple = LambdaErrorResponses.UnsupportedMediaTypeException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(
LambdaErrorResponses.USER_ERROR, "Unsupported content type: {}".format(content_type)
),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def generic_service_exception(*args: Any) -> Response:
"""
Creates a Lambda Service Generic ServiceException Response
Parameters
----------
args list
List of arguments Flask passes to the method
Returns
-------
Flask.Response
A response object representing the GenericServiceException Error
"""
exception_tuple = LambdaErrorResponses.ServiceException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(LambdaErrorResponses.SERVICE_ERROR, "ServiceException"),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def not_implemented_locally(message: str) -> Response:
"""
Creates a Lambda Service NotImplementedLocally Response
Parameters
----------
message str
Message to be added to the body of the response
Returns
-------
Flask.Response
A response object representing the NotImplementedLocally Error
"""
exception_tuple = LambdaErrorResponses.NotImplementedException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(LambdaErrorResponses.LOCAL_SERVICE_ERROR, message),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def generic_path_not_found(*args: Any) -> Response:
"""
Creates a Lambda Service Generic PathNotFound Response
Parameters
----------
args list
List of arguments Flask passes to the method
Returns
-------
Flask.Response
A response object representing the GenericPathNotFound Error
"""
exception_tuple = LambdaErrorResponses.PathNotFoundException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(
LambdaErrorResponses.LOCAL_SERVICE_ERROR, "PathNotFoundException"
),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def generic_method_not_allowed(*args: Any) -> Response:
"""
Creates a Lambda Service Generic MethodNotAllowed Response
Parameters
----------
args list
List of arguments Flask passes to the method
Returns
-------
Flask.Response
A response object representing the GenericMethodNotAllowed Error
"""
exception_tuple = LambdaErrorResponses.MethodNotAllowedException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(
LambdaErrorResponses.LOCAL_SERVICE_ERROR, "MethodNotAllowedException"
),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def container_creation_failed(message: str) -> Response:
"""
Creates a Container Creation Failed response
Parameters
----------
message str
Message to be added to the body of the response
Returns
-------
Flask.Response
A response object representing the ContainerCreationFailed Error
"""
error_type, status_code = LambdaErrorResponses.ContainerCreationFailed
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(
LambdaErrorResponses.LOCAL_SERVICE_ERROR,
message,
),
LambdaErrorResponses._construct_headers(error_type),
status_code,
)
@staticmethod
def _construct_error_response_body(error_type: str, error_message: str) -> str:
"""
Constructs a string to be used in the body of the Response that conforms
to the structure of the Lambda Service Responses
Parameters
----------
error_type str
The type of error
error_message str
Message of the error that occured
Returns
-------
str
str representing the response body
"""
# OrderedDict is used to make testing in Py2 and Py3 consistent
return json.dumps(OrderedDict([("Type", error_type), ("Message", error_message)]))
# Durable Functions Error Responses
@staticmethod
def durable_execution_not_found(execution_arn: str) -> Response:
"""Creates a ResourceNotFound response for durable executions"""
exception_tuple = LambdaErrorResponses.ResourceNotFoundException
return BaseLocalService.service_response(
LambdaErrorResponses._construct_error_response_body(
LambdaErrorResponses.USER_ERROR, f"Durable execution not found: {execution_arn}"
),
LambdaErrorResponses._construct_headers(exception_tuple[0]),
exception_tuple[1],
)
@staticmethod
def _construct_headers(error_type: str) -> Dict[str, str]:
"""
Constructs Headers for the Local Lambda Error Response
Parameters
----------
error_type str
Error type that occurred to be put into the 'x-amzn-errortype' header
Returns
-------
dict
Dict representing the Lambda Error Response Headers
"""
return {"x-amzn-errortype": error_type, "Content-Type": "application/json"}