forked from Kludex/mangum
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadapter.py
More file actions
70 lines (56 loc) · 2.22 KB
/
adapter.py
File metadata and controls
70 lines (56 loc) · 2.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import logging
from contextlib import ExitStack
from typing import Any, ContextManager, Dict, TYPE_CHECKING
from .exceptions import ConfigurationError
from .handlers import AbstractHandler
from .protocols import HTTPCycle, LifespanCycle
from .types import ASGIApp
if TYPE_CHECKING: # pragma: no cover
from awslambdaric.lambda_context import LambdaContext
DEFAULT_TEXT_MIME_TYPES = [
"text/",
"application/json",
"application/javascript",
"application/xml",
"application/vnd.api+json",
]
logger = logging.getLogger("mangum")
class Mangum:
"""
Creates an adapter instance.
* **app** - An asynchronous callable that conforms to version 3.0 of the ASGI
specification. This will usually be an ASGI framework application instance.
* **lifespan** - A string to configure lifespan support. Choices are `auto`, `on`,
and `off`. Default is `auto`.
* **log_level** - A string to configure the log level. Choices are: `info`,
`critical`, `error`, `warning`, and `debug`. Default is `info`.
* **text_mime_types** - A list of MIME types to include with the defaults that
should not return a binary response in API Gateway.
"""
app: ASGIApp
lifespan: str = "auto"
def __init__(
self,
app: ASGIApp,
lifespan: str = "auto",
**handler_kwargs: Dict[str, Any],
):
self.app = app
self.lifespan = lifespan
self.handler_kwargs = handler_kwargs
if self.lifespan not in ("auto", "on", "off"):
raise ConfigurationError(
"Invalid argument supplied for `lifespan`. Choices are: auto|on|off"
)
def __call__(self, event: dict, context: "LambdaContext") -> dict:
logger.debug("Event received.")
with ExitStack() as stack:
if self.lifespan != "off":
lifespan_cycle: ContextManager = LifespanCycle(self.app, self.lifespan)
stack.enter_context(lifespan_cycle)
handler = AbstractHandler.from_trigger(
event, context, **self.handler_kwargs
)
http_cycle = HTTPCycle(handler.scope)
response = http_cycle(self.app, handler.body)
return handler.transform_response(response)