-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Expand file tree
/
Copy pathlambda_function.py
More file actions
249 lines (205 loc) · 8.1 KB
/
lambda_function.py
File metadata and controls
249 lines (205 loc) · 8.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""Integrations with AWS Lambda.
Examples:
Run a lambda function with a payload
```python
LambdaFunction(
function_name="test-function",
aws_credentials=aws_credentials,
).invoke(payload={"foo": "bar"})
```
Specify a version of a lambda function
```python
LambdaFunction(
function_name="test-function",
qualifier="1",
aws_credentials=aws_credentials,
).invoke()
```
Invoke a lambda function asynchronously
```python
LambdaFunction(
function_name="test-function",
aws_credentials=aws_credentials,
).invoke(invocation_type="Event")
```
Invoke a lambda function and return the last 4 KB of logs
```python
LambdaFunction(
function_name="test-function",
aws_credentials=aws_credentials,
).invoke(tail=True)
```
Invoke a lambda function with a client context
```python
LambdaFunction(
function_name="test-function",
aws_credentials=aws_credentials,
).invoke(client_context={"bar": "foo"})
```
"""
import json
from typing import Any, Literal, Optional
from pydantic import Field
from pydantic_core import to_json
from prefect._internal.compatibility.async_dispatch import async_dispatch
from prefect.blocks.core import Block
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect_aws.credentials import AwsCredentials
class LambdaFunction(Block):
"""Invoke a Lambda function. This block is part of the prefect-aws
collection. Install prefect-aws with `pip install prefect-aws` to use this
block.
Attributes:
function_name: The name, ARN, or partial ARN of the Lambda function to
run. This must be the name of a function that is already deployed
to AWS Lambda.
qualifier: The version or alias of the Lambda function to use when
invoked. If not specified, the latest (unqualified) version of the
Lambda function will be used.
aws_credentials: The AWS credentials to use to connect to AWS Lambda
with a default factory of AwsCredentials.
"""
_block_type_name = "Lambda Function"
_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # noqa
_documentation_url = "https://docs.prefect.io/integrations/prefect-aws" # noqa
function_name: str = Field(
title="Function Name",
description=(
"The name, ARN, or partial ARN of the Lambda function to run. This"
" must be the name of a function that is already deployed to AWS"
" Lambda."
),
)
qualifier: Optional[str] = Field(
default=None,
title="Qualifier",
description=(
"The version or alias of the Lambda function to use when invoked. "
"If not specified, the latest (unqualified) version of the Lambda "
"function will be used."
),
)
aws_credentials: AwsCredentials = Field(
title="AWS Credentials",
default_factory=AwsCredentials,
description="The AWS credentials to invoke the Lambda with.",
)
def _get_lambda_client(self):
"""
Retrieve a boto3 session and Lambda client
"""
boto_session = self.aws_credentials.get_boto3_session()
lambda_client = boto_session.client("lambda")
return lambda_client
async def ainvoke(
self,
payload: Optional[dict] = None,
invocation_type: Literal[
"RequestResponse", "Event", "DryRun"
] = "RequestResponse",
tail: bool = False,
client_context: Optional[dict] = None,
) -> dict:
"""
Asynchronously invoke the Lambda function with the given payload.
Args:
payload: The payload to send to the Lambda function.
invocation_type: The invocation type of the Lambda function. This
can be one of "RequestResponse", "Event", or "DryRun". Uses
"RequestResponse" by default.
tail: If True, the response will include the base64-encoded last 4
KB of log data produced by the Lambda function.
client_context: The client context to send to the Lambda function.
Limited to 3583 bytes.
Returns:
The response from the Lambda function.
Examples:
```python
from prefect import flow
from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials
@flow
async def example_flow():
credentials = AwsCredentials()
lambda_function = LambdaFunction(
function_name="test_lambda_function",
aws_credentials=credentials,
)
response = await lambda_function.ainvoke(
payload={"foo": "bar"},
invocation_type="RequestResponse",
)
return response["Payload"].read()
```
"""
# Add invocation arguments
kwargs: dict[str, Any] = dict(FunctionName=self.function_name)
if payload:
kwargs["Payload"] = to_json(payload)
# Let boto handle invalid invocation types
kwargs["InvocationType"] = invocation_type
if self.qualifier is not None:
kwargs["Qualifier"] = self.qualifier
if tail:
kwargs["LogType"] = "Tail"
if client_context is not None:
# For some reason this is string, but payload is bytes
kwargs["ClientContext"] = json.dumps(client_context)
# Get client and invoke
lambda_client = await run_sync_in_worker_thread(self._get_lambda_client)
return await run_sync_in_worker_thread(lambda_client.invoke, **kwargs)
@async_dispatch(ainvoke)
def invoke(
self,
payload: Optional[dict] = None,
invocation_type: Literal[
"RequestResponse", "Event", "DryRun"
] = "RequestResponse",
tail: bool = False,
client_context: Optional[dict] = None,
) -> dict:
"""
Invoke the Lambda function with the given payload.
Args:
payload: The payload to send to the Lambda function.
invocation_type: The invocation type of the Lambda function. This
can be one of "RequestResponse", "Event", or "DryRun". Uses
"RequestResponse" by default.
tail: If True, the response will include the base64-encoded last 4
KB of log data produced by the Lambda function.
client_context: The client context to send to the Lambda function.
Limited to 3583 bytes.
Returns:
The response from the Lambda function.
Examples:
```python
from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials
credentials = AwsCredentials()
lambda_function = LambdaFunction(
function_name="test_lambda_function",
aws_credentials=credentials,
)
response = lambda_function.invoke(
payload={"foo": "bar"},
invocation_type="RequestResponse",
)
response["Payload"].read()
```
"""
# Add invocation arguments
kwargs: dict[str, Any] = dict(FunctionName=self.function_name)
if payload:
kwargs["Payload"] = to_json(payload)
# Let boto handle invalid invocation types
kwargs["InvocationType"] = invocation_type
if self.qualifier is not None:
kwargs["Qualifier"] = self.qualifier
if tail:
kwargs["LogType"] = "Tail"
if client_context is not None:
# For some reason this is string, but payload is bytes
kwargs["ClientContext"] = json.dumps(client_context)
# Get client and invoke
lambda_client = self._get_lambda_client()
return lambda_client.invoke(**kwargs)