Skip to content

Commit d3cbb29

Browse files
committed
refactoring
1 parent 66d50d6 commit d3cbb29

File tree

11 files changed

+182
-377
lines changed

11 files changed

+182
-377
lines changed

docs/docker-example.md

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,19 @@
55
Prefect is able to run flows inside ephemeral docker containers.
66
This requires that a Prefect work pool be created with the `docker` type:
77

8-
```shell
9-
prefect work-pool create --type docker my-pool
10-
```
8+
=== "uv"
9+
10+
```shell
11+
uv run prefect work-pool create --type docker my-pool
12+
```
13+
14+
=== "pip"
15+
16+
```shell
17+
source .venv/bin/activate
18+
19+
prefect work-pool create --type docker my-pool
20+
```
1121

1222
Start a worker that consumes work from this newly-created pool
1323

@@ -20,19 +30,28 @@ Start a worker that consumes work from this newly-created pool
2030

2131
=== "uv"
2232

33+
```shell
34+
uv add prefect-docker
35+
```
36+
37+
=== "pip"
38+
39+
```shell
40+
pip install prefect-docker
41+
```
42+
43+
=== "uv"
44+
2345
```shell
24-
uv add prefect-docker
46+
uv run prefect worker start --pool my-pool
2547
```
2648

27-
=== "pip"
49+
=== "pip"
2850

2951
```shell
30-
pip install prefect-docker
52+
prefect worker start --pool my-pool
3153
```
3254

33-
```shell
34-
prefect worker start --pool my-pool
35-
```
3655

3756
#### Flow deployment
3857

@@ -76,9 +95,17 @@ if __name__ == "__main__":
7695

7796
Deploy it:
7897

79-
```shell
80-
python simple_flow.py
81-
```
98+
=== "uv"
99+
100+
```shell
101+
uv run python simple_flow.py
102+
```
103+
104+
=== "pip"
105+
106+
```shell
107+
python simple_flow.py
108+
```
82109

83110
This creates a docker image named `pygeoapi-prefect/flows/simple-flow:{date}` with the flow contents and registers
84111
a deployment named `simple-flow/first-deployment` with the Prefect server. Because our `simple_flow.deploy()` call
@@ -87,9 +114,17 @@ includes `push=False`, this docker image lives in the local filesystem only.
87114
You can check the Prefect server UI in order to verify that your deployment is now registered. You can also used the
88115
Prefect API:
89116

90-
```shell
91-
prefect deployment ls
92-
```
117+
=== "uv"
118+
119+
```shell
120+
uv run prefect deployment ls
121+
```
122+
123+
=== "pip"
124+
125+
```shell
126+
prefect deployment ls
127+
```
93128

94129

95130
#### Pygeoapi processor configuration

docs/user-guide.md

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,64 @@ either sync or async, as both types are supported by the Prefect manager.
151151
If you prefer, you can write Prefect flows, deploy them using any of the multiple techniques supported by Prefect
152152
and then adapt them to run as pygeoapi processors.
153153

154-
In order to do so, your Prefect flows need to implement the
155-
[protocol](https://typing.python.org/en/latest/spec/protocol.html)
156-
defined in `pygeoapi_prefect.PygeoapiPrefectFlowProtocol`
154+
In order to be runnable via pygeoapi, your flows need to:
155+
156+
1. Implement the [protocol](https://typing.python.org/en/latest/spec/protocol.html)
157+
defined in `pygeoapi_prefect.PygeoapiPrefectFlowProtocol`:
158+
159+
```python
160+
from typing import Protocol
161+
162+
class PygeoapiPrefectFlowProtocol(Protocol):
163+
def __call__(
164+
self,
165+
processor_id: str,
166+
pygeoapi_job_id: str,
167+
inputs: dict,
168+
outputs: dict | None = None,
169+
) -> None: ...
170+
```
171+
172+
2. The flow must have at least one Prefect task, which is where the processing output is be generated. This task
173+
must return the generated output, which enables it to be managed by
174+
[Prefect's result management facilities](https://docs.prefect.io/v3/advanced/results). This means this
175+
result-generating task must:
176+
177+
- Be configured to persist results;
178+
- Be configured with a result storage key that uses the pygeoapi job id - this is needed in order to ensure
179+
pygeoapi is able to reconstruct the result's storage key for retrieval;
180+
- The generated output must be a two-element tuple where the first element is the media type and the second
181+
element is the actual output.
182+
183+
3. The flow must already have been deployed in your Prefect environment. You can use any of the Prefect deployment
184+
types (local processes, docker containers, k8s, etc.). Check the [examples](docker-example.md) section for more information
185+
186+
4. Your pygeoapi configuration for the process needs to include a `prefect` section, with `deployment` and `metadata`
187+
sub-sections.
188+
189+
This means that a very minimal flow looks like this:
157190

158191
```python
159-
from prefect import flow
192+
from prefect import flow, task
160193
161194
@flow()
162-
def my_custom_flow()
195+
def my_custom_flow(
196+
processor_id: str,
197+
pygeoapi_job_id: str,
198+
inputs: dict,
199+
outputs: dict | None = None
200+
) -> None:
201+
# perform whatever preparatory steps
202+
generate_result(pygeoapi_job_id)
203+
204+
205+
@task(
206+
persist_result=True,
207+
result_storage_key="{parameters[pygeoapi_job_id].pickle}",
208+
log_prints=True
209+
)
210+
def generate_result(pygeoapi_job_id: str) -> tuple[str, bytes]:
211+
return "text/plain", "Hi there, I am a result".encode()
163212
164213
```
165214

example-config.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,13 @@ resources:
7777
simple-flow:
7878
type: process
7979
processor:
80+
job_control_options:
81+
- sync-execute
82+
- async-execute
8083
prefect:
8184
deployment:
8285
name: simple-flow/first-deployment
83-
# result_storage_block: remote-file-system/test-sb1
84-
result_storage_key_template: "{parameters[pygeoapi_job_id]}.pickle"
86+
result_storage_key_template: "{job_id}.pickle"
8587
metadata:
8688
version: 0.0.1
8789
title: Hi world prefect example

src/pygeoapi_prefect/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from .manager import PrefectManager
2-
from .process import BasePrefectProcessor
32
from .protocols import PygeoapiPrefectFlowProtocol
43

5-
__all__ = ["BasePrefectProcessor", "PrefectManager", "PygeoapiPrefectFlowProtocol"]
4+
__all__ = [
5+
"PrefectManager",
6+
"PygeoapiPrefectFlowProtocol",
7+
]

src/pygeoapi_prefect/cli.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
from pygeoapi.process.manager.base import get_manager
88
from pygeoapi.util import yaml_load
99

10-
from . import (
11-
BasePrefectProcessor,
12-
PrefectManager,
13-
vanilla_flow,
14-
)
10+
from .process import PrefectDeploymentProcessor
11+
from .manager import PrefectManager
1512
from .schemas import ProcessId
13+
from . import vanilla_flow
1614

1715
logger = logging.getLogger(__name__)
1816

@@ -38,7 +36,7 @@ def deploy_processors_locally(
3836
to_serve = []
3937
for processor_id in manager.processes:
4038
match processor := manager.get_processor(ProcessId(processor_id)):
41-
case BasePrefectProcessor():
39+
case PrefectDeploymentProcessor():
4240
logger.debug(
4341
f"Skipping process {processor_id!r} - it provides its own "
4442
f"deployment configuration"
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from prefect import flow, task
2+
3+
4+
@flow(log_prints=True)
5+
def simple_flow(
6+
processor_id: str,
7+
pygeoapi_job_id: str, # noqa, this is used for naming flow_runs
8+
inputs: dict,
9+
outputs: dict | None = None,
10+
) -> None:
11+
print(f"Hi from simple_flow locals: {locals()}")
12+
generate_greeting(
13+
name=inputs["name"],
14+
pygeoapi_job_id=pygeoapi_job_id,
15+
message=inputs.get("message"),
16+
)
17+
18+
19+
@task(
20+
persist_result=True,
21+
result_storage_key="{parameters[pygeoapi_job_id]}.pickle",
22+
log_prints=True,
23+
)
24+
def generate_greeting(
25+
name: str,
26+
pygeoapi_job_id: str, # noqa, this is used for naming flow_runs
27+
message: str | None = None
28+
) -> tuple[str, bytes]:
29+
result = f"Hi {name}!"
30+
if message:
31+
result += f" {message}"
32+
return "text/plain", result.encode()
33+
34+
35+
if __name__ == "__main__":
36+
simple_flow.serve(
37+
name="first-deployment",
38+
)

src/pygeoapi_prefect/examples/simple_prefect.py

Lines changed: 0 additions & 107 deletions
This file was deleted.

0 commit comments

Comments
 (0)