Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ celerybeat.pid
*.sage.py

# Environments
.env
*.env
.venv
env/
venv/
Expand Down
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ repos:
- id: end-of-file-fixer
- id: check-added-large-files
args: ["--maxkb=1000"]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.10
hooks:
- id: ruff-check
- id: ruff-format
# the below hooks are disabled for now
# - repo: https://github.com/astral-sh/ruff-pre-commit
# rev: v0.9.6
# hooks:
# - id: ruff
# - id: ruff-format
# - repo: local
# hooks:
# - id: ty
Expand Down
9 changes: 5 additions & 4 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ uv run pygeoapi openapi generate example-config.yml > example-openapi.yml
### Flow Function Signature

Custom Prefect processors must implement a flow function with this signature:

```python
@flow(persist_result=True, ...)
def process_flow(
job_id: str,
result_storage_block: str | None,
process_description: schemas.ProcessDescription,
execution_request: schemas.ExecuteRequest
job_id: str,
result_storage_block: str | None,
process_description: schemas.InternalProcessDescription,
execution_request: schemas.ExecuteRequest
) -> schemas.JobStatusInfoInternal:
```

Expand Down
94 changes: 69 additions & 25 deletions docs/development.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,62 @@
# Development

## Installation

Install this project with uv

```shell
git clone
uv sync
```

Start the prefect server:

```shell
uv run prefect server start
```
## Set-up

Update the prefect config, as directed by the prefect server:
Prepare a `.env` suitable for development, similar to:

```shell
uv run prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
```dotenv
# prefect variables
PREFECT_API_URL=http://127.0.0.1:4200/api
PREFECT_SERVER_ANALYTICS_ENABLED=false
DO_NOT_TRACK=1

# pygeoapi variables
PYGEOAPI_CONFIG=example-config.yml
PYGEOAPI_OPENAPI=example-openapi.yml
```

If you want to deploy a pygeoapi process locally with prefect, then also start a prefect agent:
!!! WARNING

We recommend naming the env file `pygeoapi-prefect.env` and sourcing it like this:

```shell
set -o allexport; source pygeoapi-prefect.env; set +o allexport
```

All following examples assume the env file has already been sourced.


Start the prefect server:

```shell
uv run prefect agent start --work-queue pygeoapi
uv run prefect server start
```

Now stand up pygeoapi with the provided config files:
The prefect UI shall now be available at http://localhost:4200


If you want to deploy a pygeoapi process locally with prefect, then also start a prefect worker. Here we specify a
worker of type `process` (i.e. flow runs execute locally, by spawning aditional Python processes) which consumes from
a pool named `pygeoapi`:

```shell
PYGEOAPI_CONFIG=example-config.yml PYGEOAPI_OPENAPI=example-openapi.yml uv run pygeoapi serve
uv run prefect worker start --type process --pool pygeoapi
```

If you need to regenerate the openapi description file, run:
Now stand up pygeoapi

```shell
uv run pygeoapi openapi generate example-config.yml > example-openapi.yml
uv run pygeoapi serve --starlette
```

Deploy the `hi-prefect-world` process:
Expand All @@ -50,20 +72,42 @@ uv run prefect deployment run --param
```


- List processes
## Operations


!!! TIP

All examples below use:

- [httpie](https://httpie.io/) as a CLI HTTP client;
- [jq](https://jqlang.org/) as a CLI JSON processor;


- List available process ids

```shell
http localhost:5000/processes | jq '.processes[].id'
```

- Retrieve details about a process (for example the process with id `hello-world`):

```shell
http localhost:5000/processes/hello-world | jq '.'
```

```shell
http -v localhost:5000/processes
```
- Execute the pygeoapi `hello-world` process, via prefect. In this example we pass a JSON object
with the inputs that the process needs:

- Execute the standard `hello-world` process via prefect:
```shell
http localhost:5000/processes/hello-world/execution \
inputs:='{"message": "Yo", "name": "planet Earth"}'
```

```shell
http -v localhost:5000/processes/hello-world/execution inputs:='{"message": "Yo", "name": "planet Earth"}'
```
Prefect records flow run executions - you can check them in the Prefect UI, under `runs`

- Execute our `hi-prefect-world` process:
- Execute our `hi-prefect-world` process:

```shell
http -v localhost:5000/processes/hi-prefect-world/execution inputs:='{"message": "Yo", "name": "planet Earth"}'
```
```shell
http -v localhost:5000/processes/hi-prefect-world/execution \
inputs:='{"message": "Yo", "name": "planet Earth"}'
```
207 changes: 207 additions & 0 deletions docs/docker-example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# Docker example

#### Work pool

Prefect is able to run flows inside ephemeral docker containers.
This requires that a Prefect work pool be created with the `docker` type:

=== "uv"

```shell
uv run prefect work-pool create --type docker my-pool
```

=== "pip"

```shell
source .venv/bin/activate

prefect work-pool create --type docker my-pool
```

Start a worker that consumes work from this newly-created pool

??? note "Install prefect-docker"

In order to start a Prefect worker that executes flow runs in docker containers, you need to have the
[prefect-docker](https://pypi.org/project/prefect-docker/) package installed.

Add it with:

=== "uv"

```shell
uv add prefect-docker
```

=== "pip"

```shell
pip install prefect-docker
```

=== "uv"

```shell
uv run prefect worker start --pool my-pool
```

=== "pip"

```shell
prefect worker start --pool my-pool
```


#### Flow deployment

Write your processing flow, for example:

```python
from prefect import flow, task


@flow(log_prints=True)
def simple_flow(
processor_id: str,
pygeoapi_job_id: str, # noqa, this is used for naming flow_runs
inputs: dict,
outputs: dict | None = None,
) -> None:
print(f"Hi from simple_flow locals: {locals()}")
generate_greeting(name=inputs["name"], message=inputs.get("message"))


@task(
persist_result=True,
result_storage_key="{parameters[pygeoapi_job_id]}.pickle",
log_prints=True,
)
def generate_greeting(name: str, message: str | None = None) -> str:
result = f"Hi {name}!"
if message:
result += f" {message}"
return result


if __name__ == "__main__":
simple_flow.deploy(
name="first-deployment",
work_pool_name="my-pool",
image="pygeoapi-prefect/flows/simple-flow",
push=False
)
```

Deploy it:

=== "uv"

```shell
uv run python simple_flow.py
```

=== "pip"

```shell
python simple_flow.py
```

This creates a docker image named `pygeoapi-prefect/flows/simple-flow:{date}` with the flow contents and registers
a deployment named `simple-flow/first-deployment` with the Prefect server. Because our `simple_flow.deploy()` call
includes `push=False`, this docker image lives in the local filesystem only.

You can check the Prefect server UI in order to verify that your deployment is now registered. You can also used the
Prefect API:

=== "uv"

```shell
uv run prefect deployment ls
```

=== "pip"

```shell
prefect deployment ls
```


#### Pygeoapi processor configuration

Configure pygeoapi with this newly deployed flow:

```yaml
# snippet of pygeoapi configuration file
# (the rest of the configuration has been omitted for brevity)
resources:
simple-flow:
type: process
processor:
prefect:
deployment:
name: simple-flow/first-deployment
result_storage_key_template: "{parameters[pygeoapi_job_id]}.pickle"
metadata:
version: 0.0.1
title: Hi world prefect example
description: >
An example processor that is powered by a Prefect deployment.
inputs:
name:
description: Some name you think is cool. It will be used to greet you
schema:
type: string
message:
title: My message
description: An optional additional message to be echoed
schema:
type: string
minOccurs: 0
outputs:
result:
schema:
type: string
contentMediaType: text/plain
example:
inputs:
name: Joe
```

Finally, start pygeoapi:

```shell
pygeoapi serve
```


#### Process execution via pygeoapi

Check that pygeoapi recognizes our Prefect flow:

```shell
http localhost:5000/processes/simple-flow
```

The response should include the description of the `simple-flow` processor and also a link for executing it. Execution
can be requested with:

```shell
curl \
-sSi \
-X POST "http://localhost:5000/processes/simple-flow/execution" \
-H "Content-Type: application/json" \
-d '{"inputs": {"name": "Joe"}}'
```

The response:

```shell
HTTP/1.1 201 CREATED
Content-Type: None
Content-Language: en-US
Preference-Applied: respond-async
Location: http://localhost:5000/jobs/af101d53-7dcc-4882-a15b-a85fd4381153

{"jobID":"af101d53-7dcc-4882-a15b-a85fd4381153","type":"process","status":"accepted"}
```
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A [pygeoapi] job/process manager that enables running pygeoapi jobs as [prefect]

## Quickstart

Follow the [user guide](user-guide/user-guide.md) for installation and initial usage instructions.
Follow the [user guide](user-guide/installation.md) for installation and initial usage instructions.


## License
Expand Down
Loading