Skip to content

Commit 57820f7

Browse files
committed
adapting code to match current pygeoapi
1 parent 228aae6 commit 57820f7

File tree

7 files changed

+204
-81
lines changed

7 files changed

+204
-81
lines changed

docs/development.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ uv run prefect server start
4545
The prefect UI shall now be available at http://localhost:4200
4646

4747

48-
If you want to deploy a pygeoapi process locally with prefect, then also start a prefect agent:
48+
If you want to deploy a pygeoapi process locally with prefect, then also start a prefect worker. Here we specify a
49+
worker of type `process` (i.e. flow runs execute locally, by spawning aditional Python processes) which consumes from
50+
a pool named `pygeoapi`:
4951

5052
```shell
51-
uv run prefect agent start --type process --pool pygeoapi
53+
uv run prefect worker start --type process --pool pygeoapi
5254
```
5355

5456
Now stand up pygeoapi

src/pygeoapi_prefect/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from .manager import PrefectManager
2+
from .process import BasePrefectProcessor
3+
4+
__all__ = [
5+
"BasePrefectProcessor",
6+
"PrefectManager",
7+
]

src/pygeoapi_prefect/cli.py

Lines changed: 109 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
1+
import logging
12
from pathlib import Path
23

34
import click
4-
from prefect.blocks.core import Block
5-
from prefect.deployments import Deployment
5+
from prefect import serve
66
from prefect.filesystems import RemoteFileSystem
7-
from pygeoapi.process.manager import get_manager
8-
from pygeoapi.process import exceptions
7+
from pygeoapi.process.manager.base import get_manager
98
from pygeoapi.util import yaml_load
109

11-
from .process import BasePrefectProcessor
10+
from . import (
11+
BasePrefectProcessor,
12+
PrefectManager,
13+
)
14+
from .manager import ProcessId
15+
from .vanilla_flow import (
16+
generate_flow_run_name,
17+
run_vanilla_processor,
18+
)
19+
20+
logger = logging.getLogger(__name__)
1221

1322

1423
@click.group(name="prefect")
@@ -56,44 +65,104 @@ def create_storage_block(
5665
print("Done!")
5766

5867

59-
@root.command()
60-
@click.argument("process_id")
68+
@root.command(name="deploy-static")
6169
@click.option("-c", "--pygeoapi-config", type=Path, envvar="PYGEOAPI_CONFIG")
62-
def deploy_process(
63-
process_id: str,
64-
pygeoapi_config: Path,
70+
def deploy_processors_locally(
71+
pygeoapi_config: Path,
6572
):
66-
"""Create and apply prefect deployment for PROCESS_ID.
67-
68-
Configure deployment parameters for the process in pygeoapi's configuration
69-
file.
70-
"""
73+
"""Deploy pygeoapi processes via Prefect, locally."""
7174
with pygeoapi_config.open() as fh:
7275
config = yaml_load(fh)
7376
manager = get_manager(config)
74-
try:
75-
processor = manager.get_processor(process_id)
76-
except exceptions.UnknownProcessError as err:
77-
raise click.BadParameter(f"Process {process_id!r} not found") from err
78-
else:
79-
if isinstance(processor, BasePrefectProcessor):
80-
if processor.deployment_info is not None:
81-
print(f"Deploying process {process_id!r} with prefect...")
82-
if (sb := processor.deployment_info.storage_block) is not None:
83-
storage = Block.load(sb)
84-
else:
85-
storage = None
86-
deployment = Deployment.build_from_flow(
87-
processor.process_flow,
88-
name=processor.deployment_info.name,
89-
work_queue_name=processor.deployment_info.queue,
90-
storage=storage,
91-
path=processor.deployment_info.storage_sub_path,
92-
ignore_file=None,
77+
if not isinstance(manager, PrefectManager):
78+
raise SystemExit(
79+
"Cannot deploy as prefect flows - pygeoapi_prefect.PrefectManager "
80+
"is not being used as the pygeoapi process manager"
81+
)
82+
to_serve = []
83+
for processor_id in manager.processes:
84+
match processor := manager.get_processor(ProcessId(processor_id)):
85+
case BasePrefectProcessor():
86+
logger.debug(
87+
f"Skipping process {processor_id!r} - it provides its own "
88+
f"deployment configuration"
89+
)
90+
continue
91+
case BasePrefectProcessor():
92+
configured_flow = run_vanilla_processor.with_options(
93+
name=processor.metadata["id"],
94+
version=processor.metadata.get("version"),
95+
flow_run_name=generate_flow_run_name,
96+
validate_parameters=True,
97+
)
98+
flow_deployment = configured_flow.to_deployment(
99+
name=f"pygeoapi-{processor_id}-local",
100+
parameters={"processor_id": processor_id},
93101
)
94-
deployment.apply()
95-
print("Done!")
96-
else:
97-
raise click.Abort("Deployment not specified in pygeoapi config file")
98-
else:
99-
print(f"Process {process_id!r} is not deployable with prefect, skipping...")
102+
to_serve.append(flow_deployment)
103+
case _:
104+
logger.warning(f"Unknown processor type {processor_id}, ignoring...")
105+
continue
106+
# processor = manager.get_processor(ProcessId(processor_id))
107+
# if isinstance(processor, BasePrefectProcessor):
108+
# logger.debug(
109+
# f"Skipping process {processor_id!r} - it provides its own "
110+
# f"deployment configuration"
111+
# )
112+
# continue
113+
# else:
114+
# configured_flow = run_vanilla_processor.with_options(
115+
# name=processor.metadata["id"],
116+
# version=processor.metadata.get("version"),
117+
# flow_run_name=generate_flow_run_name,
118+
# validate_parameters=True,
119+
# )
120+
# flow_deployment = configured_flow.to_deployment(
121+
# name=f"pygeoapi-{processor_id}-local",
122+
# parameters={"processor_id": processor_id},
123+
# )
124+
# to_serve.append(flow_deployment)
125+
serve(*to_serve)
126+
127+
128+
# @root.command()
129+
# @click.argument("process_id")
130+
# @click.option("-c", "--pygeoapi-config", type=Path, envvar="PYGEOAPI_CONFIG")
131+
# def deploy_process(
132+
# process_id: str,
133+
# pygeoapi_config: Path,
134+
# ):
135+
# """Create and apply prefect deployment for PROCESS_ID.
136+
#
137+
# Configure deployment parameters for the process in pygeoapi's configuration
138+
# file.
139+
# """
140+
# with pygeoapi_config.open() as fh:
141+
# config = yaml_load(fh)
142+
# manager = get_manager(config)
143+
# try:
144+
# processor = manager.get_processor(process_id)
145+
# except UnknownProcessError as err:
146+
# raise click.BadParameter(f"Process {process_id!r} not found") from err
147+
# else:
148+
# if isinstance(processor, BasePrefectProcessor):
149+
# if processor.deployment_info is not None:
150+
# print(f"Deploying process {process_id!r} with prefect...")
151+
# if (sb := processor.deployment_info.storage_block) is not None:
152+
# storage = Block.load(sb)
153+
# else:
154+
# storage = None
155+
# deployment = Deployment.build_from_flow(
156+
# processor.process_flow,
157+
# name=processor.deployment_info.name,
158+
# work_queue_name=processor.deployment_info.queue,
159+
# storage=storage,
160+
# path=processor.deployment_info.storage_sub_path,
161+
# ignore_file=None,
162+
# )
163+
# deployment.apply()
164+
# print("Done!")
165+
# else:
166+
# raise click.Abort("Deployment not specified in pygeoapi config file")
167+
# else:
168+
# print(f"Process {process_id!r} is not deployable with prefect, skipping...")

src/pygeoapi_prefect/examples/hi_prefect_world.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@
2525
from pygeoapi_prefect.process import BasePrefectProcessor
2626

2727

28-
@flow(
29-
persist_result=True,
30-
log_prints=True,
31-
)
28+
@flow(log_prints=True)
3229
def hi_prefect_world(
3330
job_id: str,
3431
result_storage_block: str | None,

src/pygeoapi_prefect/examples/simple_prefect.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,7 @@
2323
from pygeoapi_prefect.process import BasePrefectProcessor
2424

2525

26-
# When defining a prefect flow that will be deployed by prefect to some
27-
# infrastructure, be sure to specify persist_result=True - otherwise the
28-
# pygeoapi process manager will not be able to work properly
2926
@flow(
30-
persist_result=True,
3127
log_prints=True,
3228
)
3329
def simple_flow(

src/pygeoapi_prefect/manager.py

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
exceptions,
4949
prefect_client,
5050
schemas,
51+
vanilla_flow,
5152
)
5253
from .process import (
5354
BasePrefectProcessor,
@@ -399,33 +400,25 @@ def execute_process(
399400
subscriber=subscriber,
400401
)
401402
job_id = PygeoapiPrefectJobId(str(uuid.uuid4()))
402-
# processor = self.get_processor(process_id)
403-
match processor := self.get_processor(process_id):
404-
case BasePrefectProcessor():
405-
chosen_mode, headers = select_prefect_processor_execution_mode(
406-
execution_mode, processor)
407-
internal_job_status = execute_prefect_processor(
408-
job_id, processor, chosen_mode, execution_request
409-
)
410-
return (
411-
job_id,
412-
None,
413-
None,
414-
internal_job_status.status,
415-
additional_headers
416-
)
417-
case BaseProcessor():
418-
output_media_type, generated_output, current_job_status = (
419-
execute_vanilla_processor_sync(
420-
job_id, processor, execution_request, output_dir=self.output_dir)
421-
)
422-
return (
423-
job_id,
424-
output_media_type,
425-
generated_output,
426-
current_job_status,
427-
{"Preference-applied": RequestedProcessExecutionMode.wait.value},
428-
)
403+
chosen_mode, headers = select_execution_mode(execution_mode)
404+
processor = self.get_processor(process_id)
405+
# https://docs.prefect.io/v3/how-to-guides/deployments/run-deployments#run-a-deployment-from-python
406+
if chosen_mode == ProcessExecutionMode.sync_execute:
407+
# if it's a BaseProcessor type, run our locally deployed flow
408+
# if it's a PrefectProcessor type, run the name of the deployment
409+
# 1. get the name of the deployment
410+
...
411+
else:
412+
...
413+
match processor, chosen_mode:
414+
case BasePrefectProcessor(), ProcessExecutionMode.sync_execute:
415+
...
416+
case BasePrefectProcessor(), ProcessExecutionMode.async_execute:
417+
...
418+
case BaseProcessor(), ProcessExecutionMode.sync_execute:
419+
...
420+
case BaseProcessor(), ProcessExecutionMode.async_execute:
421+
...
429422
case _:
430423
raise ProcessorExecuteError("Unknown processor type")
431424

@@ -546,13 +539,19 @@ def get_job_status_from_flow_run(
546539
prefect_flow: Flow
547540
) -> schemas.JobStatusInfoInternal:
548541
job_id = PygeoapiPrefectJobId.from_flow_run_name(flow_run.name)
542+
logger.info(f"{flow_run=}")
543+
logger.info(f"{flow_run.parameters=}")
544+
logger.info(f"{job_id=}")
549545
try:
550-
partial_info = flow_run.state.result()
551-
generated_outputs = partial_info.generated_outputs
546+
media_type, generated_output = flow_run.state.result()
547+
logger.info(f"{media_type=}")
548+
logger.info(f"{generated_output=}")
552549
except MissingResult as err:
553550
logger.warning(f"Could not get flow_run results: {err}")
554-
generated_outputs = None
551+
generated_output = None
555552
execution_request = schemas.ExecuteRequest(**flow_run.parameters["execution_request"])
553+
logger.info(f"{execution_request=}")
554+
logger.info(f"{flow_run.state_type=}")
556555
return schemas.JobStatusInfoInternal(
557556
jobID=job_id,
558557
status=PREFECT_STATE_MAP[flow_run.state_type],
@@ -562,10 +561,28 @@ def get_job_status_from_flow_run(
562561
finished=flow_run.end_time,
563562
requested_response_type=execution_request.response,
564563
requested_outputs=execution_request.outputs,
565-
generated_outputs=generated_outputs,
564+
generated_outputs=generated_output,
566565
)
567566

568567

568+
def select_execution_mode(
569+
requested: RequestedProcessExecutionMode | None,
570+
) -> tuple[ProcessExecutionMode, dict[str, str]]:
571+
"""Select the execution mode to be employed for the input processor.
572+
573+
According to the OGC API - Processes (Section 7.11.2.3, Requirement 25)
574+
the execution mode shall default to `sync` mode of omitted by the client
575+
(and supported by the server). The Prefect process manager is able to
576+
run in either sync or async modes, regardless of the processor.
577+
"""
578+
headers = {"Preference-Applied": RequestedProcessExecutionMode.wait.value}
579+
chosen = ProcessExecutionMode.sync_execute
580+
if requested == RequestedProcessExecutionMode.respond_async:
581+
chosen = ProcessExecutionMode.async_execute
582+
headers["Preference-Applied"] = RequestedProcessExecutionMode.respond_async.value
583+
return chosen, headers
584+
585+
569586
def select_prefect_processor_execution_mode(
570587
requested: RequestedProcessExecutionMode | None,
571588
processor: BasePrefectProcessor,
@@ -598,6 +615,7 @@ def execute_vanilla_processor_sync(
598615
processor: PygeoapiProcessorProtocol,
599616
execution_request: schemas.ExecuteRequest,
600617
output_dir: Path | None,
618+
prefect_worker_pool: str | None = None,
601619
) -> tuple[MediaType, Any, JobStatus]:
602620
"""Execute a regular pygeoapi processor locally via prefect.
603621
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""Run a pygeoapi processor as a Prefect flow.
2+
3+
Note that this module cannot have any relative imports, as it is intended that
4+
it be importable by the Prefect worker as a script.
5+
"""
6+
import os
7+
from pathlib import Path
8+
from typing import Any
9+
10+
from prefect import flow
11+
from prefect.runtime import flow_run
12+
from pygeoapi.process.manager.base import get_manager
13+
from pygeoapi.util import yaml_load
14+
15+
16+
def generate_flow_run_name():
17+
pygeoapi_job_id = (flow_run.parameters or {}).get("pygeoapi_job_id") or "unknown"
18+
return f"pygeoapi_job_{pygeoapi_job_id}"
19+
20+
21+
@flow(persist_result=True, log_prints=True)
22+
def run_vanilla_processor(
23+
processor_id: str,
24+
data_: dict,
25+
pygeoapi_job_id: str, # noqa, this is used for naming flow_runs
26+
outputs: dict | None = None,
27+
) -> tuple[str, Any]:
28+
config_path = Path(os.environ["PYGEOAPI_CONFIG"])
29+
with config_path.open() as fh:
30+
config = yaml_load(fh)
31+
manager = get_manager(config)
32+
processor = manager.get_processor(processor_id)
33+
processor_output_media_type, processor_output = processor.execute(data_, outputs)
34+
return processor_output_media_type, processor_output

0 commit comments

Comments
 (0)