Skip to content

Commit 66d50d6

Browse files
committed
working on custom processors
1 parent 5f16f16 commit 66d50d6

File tree

9 files changed

+332
-207
lines changed

9 files changed

+332
-207
lines changed

docs/docker-example.md

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
# Docker example
2+
3+
#### Work pool
4+
5+
Prefect is able to run flows inside ephemeral docker containers.
6+
This requires that a Prefect work pool be created with the `docker` type:
7+
8+
```shell
9+
prefect work-pool create --type docker my-pool
10+
```
11+
12+
Start a worker that consumes work from this newly-created pool
13+
14+
??? note "Install prefect-docker"
15+
16+
In order to start a Prefect worker that executes flow runs in docker containers, you need to have the
17+
[prefect-docker](https://pypi.org/project/prefect-docker/) package installed.
18+
19+
Add it with:
20+
21+
=== "uv"
22+
23+
```shell
24+
uv add prefect-docker
25+
```
26+
27+
=== "pip"
28+
29+
```shell
30+
pip install prefect-docker
31+
```
32+
33+
```shell
34+
prefect worker start --pool my-pool
35+
```
36+
37+
#### Flow deployment
38+
39+
Write your processing flow, for example:
40+
41+
```python
42+
from prefect import flow, task
43+
44+
45+
@flow(log_prints=True)
46+
def simple_flow(
47+
processor_id: str,
48+
pygeoapi_job_id: str, # noqa, this is used for naming flow_runs
49+
inputs: dict,
50+
outputs: dict | None = None,
51+
) -> None:
52+
print(f"Hi from simple_flow locals: {locals()}")
53+
generate_greeting(name=inputs["name"], message=inputs.get("message"))
54+
55+
56+
@task(
57+
persist_result=True,
58+
result_storage_key="{parameters[pygeoapi_job_id]}.pickle",
59+
log_prints=True,
60+
)
61+
def generate_greeting(name: str, message: str | None = None) -> str:
62+
result = f"Hi {name}!"
63+
if message:
64+
result += f" {message}"
65+
return result
66+
67+
68+
if __name__ == "__main__":
69+
simple_flow.deploy(
70+
name="first-deployment",
71+
work_pool_name="my-pool",
72+
image="pygeoapi-prefect/flows/simple-flow",
73+
push=False
74+
)
75+
```
76+
77+
Deploy it:
78+
79+
```shell
80+
python simple_flow.py
81+
```
82+
83+
This creates a docker image named `pygeoapi-prefect/flows/simple-flow:{date}` with the flow contents and registers
84+
a deployment named `simple-flow/first-deployment` with the Prefect server. Because our `simple_flow.deploy()` call
85+
includes `push=False`, this docker image lives in the local filesystem only.
86+
87+
You can check the Prefect server UI in order to verify that your deployment is now registered. You can also used the
88+
Prefect API:
89+
90+
```shell
91+
prefect deployment ls
92+
```
93+
94+
95+
#### Pygeoapi processor configuration
96+
97+
Configure pygeoapi with this newly deployed flow:
98+
99+
```yaml
100+
# snippet of pygeoapi configuration file
101+
# (the rest of the configuration has been omitted for brevity)
102+
resources:
103+
simple-flow:
104+
type: process
105+
processor:
106+
prefect:
107+
deployment:
108+
name: simple-flow/first-deployment
109+
result_storage_key_template: "{parameters[pygeoapi_job_id]}.pickle"
110+
metadata:
111+
version: 0.0.1
112+
title: Hi world prefect example
113+
description: >
114+
An example processor that is powered by a Prefect deployment.
115+
inputs:
116+
name:
117+
description: Some name you think is cool. It will be used to greet you
118+
schema:
119+
type: string
120+
message:
121+
title: My message
122+
description: An optional additional message to be echoed
123+
schema:
124+
type: string
125+
minOccurs: 0
126+
outputs:
127+
result:
128+
schema:
129+
type: string
130+
contentMediaType: text/plain
131+
example:
132+
inputs:
133+
name: Joe
134+
```
135+
136+
Finally, start pygeoapi:
137+
138+
```shell
139+
pygeoapi serve
140+
```
141+
142+
143+
#### Process execution via pygeoapi
144+
145+
Check that pygeoapi recognizes our Prefect flow:
146+
147+
```shell
148+
http localhost:5000/processes/simple-flow
149+
```
150+
151+
The response should include the description of the `simple-flow` processor and also a link for executing it. Execution
152+
can be requested with:
153+
154+
```shell
155+
curl \
156+
-sSi \
157+
-X POST "http://localhost:5000/processes/simple-flow/execution" \
158+
-H "Content-Type: application/json" \
159+
-d '{"inputs": {"name": "Joe"}}'
160+
```
161+
162+
The response:
163+
164+
```shell
165+
HTTP/1.1 201 CREATED
166+
Content-Type: None
167+
Content-Language: en-US
168+
Preference-Applied: respond-async
169+
Location: http://localhost:5000/jobs/af101d53-7dcc-4882-a15b-a85fd4381153
170+
171+
{"jobID":"af101d53-7dcc-4882-a15b-a85fd4381153","type":"process","status":"accepted"}
172+
```

example-config.yml

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,36 @@ resources:
7777
simple-flow:
7878
type: process
7979
processor:
80-
name: pygeoapi_prefect.examples.simple_prefect.SimpleFlowProcessor
8180
prefect:
8281
deployment:
83-
name: simple-flow/deployed-simple-flow
84-
result_storage_block: remote-file-system/test-sb1
85-
result_storage_key_template: "{parameters[pygeoapi_job_id]}.pickle"
86-
87-
hi-prefect-world:
88-
type: process
89-
processor:
90-
name: pygeoapi_prefect.examples.hi_prefect_world.HiPrefectWorldProcessor
91-
prefect:
92-
deployment:
93-
name: hi-prefect-world/deployed-hi-prefect-world
94-
result_storage_block: remote-file-system/test-sb1
82+
name: simple-flow/first-deployment
83+
# result_storage_block: remote-file-system/test-sb1
9584
result_storage_key_template: "{parameters[pygeoapi_job_id]}.pickle"
85+
metadata:
86+
version: 0.0.1
87+
title: Hi world prefect example
88+
description: >
89+
An example processor that is powered by a Prefect deployment.
90+
inputs:
91+
name:
92+
description: Some name you think is cool. It will be used to greet you
93+
schema:
94+
type: string
95+
message:
96+
title: My message
97+
description: An optional additional message to be echoed
98+
schema:
99+
type: string
100+
minOccurs: 0
101+
outputs:
102+
result:
103+
schema:
104+
type: string
105+
contentMediaType: text/plain
106+
keywords:
107+
- processor
108+
- prefect
109+
- example
110+
example:
111+
inputs:
112+
name: Joe

mkdocs.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ nav:
1616
# - Executing processes: 'user-guide/executing-processes.md'
1717
# - Complete example: 'user-guide/complete-example.md'
1818
- Development: 'development.md'
19+
- Examples:
20+
- Run processes inside ephemeral docker containers: 'docker-example.md'
1921

2022
theme:
2123
name: material

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ dev = [
2323
"pytest>=7.3.1",
2424
"ruff>=0.9",
2525
"ty>=0.0.1a5",
26+
"prefect-docker>=0.7.1",
2627
]
2728
docs = [
2829
"mkdocs>=1.4.3",

src/pygeoapi_prefect/examples/hi_prefect_world.py

Lines changed: 0 additions & 132 deletions
This file was deleted.

0 commit comments

Comments
 (0)