Skip to content

Commit eab1c91

Browse files
committed
allow running vanilla processes in-band, when called as sync
1 parent 603c4f4 commit eab1c91

File tree

5 files changed

+94
-28
lines changed

5 files changed

+94
-28
lines changed

docs/guide.md renamed to docs/user-guide.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
---
2+
hide:
3+
- navigation
4+
---
5+
16
# User guide
27

38
## Installation
@@ -69,6 +74,25 @@ server:
6974
name: pygeoapi_prefect.PrefectManager
7075
```
7176
77+
#### Prefect manager configuration
78+
79+
The Prefect Manager accepts the following configuration parameters, all of which are optional.
80+
These should be specified as properties of the `server.manager` object:
81+
82+
- `use_vanilla_processor_deployments: bool = True` - Whether to call native pygeoapi processors via the
83+
Prefect deployment interface. If this is enabled (which is the default):
84+
85+
- Jobs are executed via the Prefect worker, which you must start (as mentioned below)
86+
- Jobs are coordinated by the Prefect scheduler, which means that they may not start immediately
87+
- Jobs are run in a different process than pygeoapi
88+
- This means that job execution is slower, as there is a temporal overhead which is introduced by
89+
the coordination that happens between the Prefect scheduler service and the Prefect worker
90+
91+
If this is disabled, then jobs are executed immediately and run in the same process as pygeoapi
92+
93+
- `sync_job_execution_timeout_seconds: int = 60` - How much time to give a sync job to finish its
94+
processing before declaring it as failed
95+
7296

7397
#### Processors
7498

mkdocs.yml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ edit_uri: edit/main/docs/
77

88
nav:
99
- Home: 'index.md'
10-
- New User Guide: 'guide.md'
11-
- User Guide:
12-
- Installation: 'user-guide/installation.md'
13-
- Defining processes: 'user-guide/defining-processes.md'
14-
- Running prefect: 'user-guide/prefect.md'
15-
- Deployments: 'user-guide/deployments.md'
16-
- Executing processes: 'user-guide/executing-processes.md'
17-
- Complete example: 'user-guide/complete-example.md'
10+
- User Guide: 'user-guide.md'
11+
# - User Guide:
12+
# - Installation: 'user-guide/installation.md'
13+
# - Defining processes: 'user-guide/defining-processes.md'
14+
# - Running prefect: 'user-guide/prefect.md'
15+
# - Deployments: 'user-guide/deployments.md'
16+
# - Executing processes: 'user-guide/executing-processes.md'
17+
# - Complete example: 'user-guide/complete-example.md'
1818
- Development: 'development.md'
1919

2020
theme:
@@ -27,6 +27,7 @@ theme:
2727
- content.tabs.link
2828
- header.autohide
2929
- navigation.footer
30+
- navigation.indexes
3031
- navigation.tabs
3132
- navigation.tabs.sticky
3233

src/pygeoapi_prefect/cli.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,7 @@ def deploy_processors_locally(
4545
)
4646
continue
4747
case BaseProcessor():
48-
configured_flow = vanilla_flow.run_vanilla_processor.with_options(
49-
name=processor.metadata["id"],
50-
version=processor.metadata.get("version"),
51-
flow_run_name=vanilla_flow.generate_flow_run_name,
52-
validate_parameters=True,
53-
)
48+
configured_flow = vanilla_flow.get_processor_as_flow(processor)
5449
flow_deployment = configured_flow.to_deployment(
5550
name=vanilla_flow.get_deployment_name(processor_id),
5651
parameters={"processor_id": processor_id},

src/pygeoapi_prefect/manager.py

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import copy
44
import importlib
5-
import json
65
import logging
76
import uuid
87
from collections.abc import (
@@ -18,11 +17,9 @@
1817
import httpx
1918
import jsonschema.exceptions
2019
import jsonschema.validators
21-
from prefect import flow
2220
from prefect.blocks.core import Block
2321
from prefect.client.schemas import FlowRun
2422
from prefect.deployments import run_deployment
25-
from prefect.exceptions import MissingResult
2623
from prefect.results import (
2724
ResultRecord,
2825
ResultStore,
@@ -102,6 +99,7 @@ class PrefectManager:
10299
_processor_configurations: dict[ProcessId, dict[str, Any]]
103100

104101
is_async: bool = True
102+
use_vanilla_processor_deployments: bool
105103
supports_subscribing: bool
106104
prefect_state_map = {
107105
StateType.SCHEDULED: JobStatus.accepted,
@@ -120,6 +118,9 @@ class PrefectManager:
120118
sync_job_execution_timeout_seconds: int
121119

122120
def __init__(self, manager_def: dict[str, Any]):
121+
self.use_vanilla_processor_deployments = manager_def.get(
122+
"use_vanilla_processor_deployments", True)
123+
# self.use_vanilla_processor_deployments = False
123124
self.name = ".".join(
124125
(self.__class__.__module__, self.__class__.__qualname__)
125126
)
@@ -322,6 +323,15 @@ def execute_process(
322323
if execution_mode == RequestedProcessExecutionMode.respond_async:
323324
chosen_mode = ProcessExecutionMode.async_execute
324325
response_headers["Preference-Applied"] = RequestedProcessExecutionMode.respond_async.value
326+
if isinstance(requested_outputs, Sequence) and not isinstance(requested_outputs, str):
327+
outs = {out_name: ExecutionOutput() for out_name in requested_outputs}
328+
elif isinstance(requested_outputs, Mapping):
329+
outs = {
330+
out_name: ExecutionOutput(**out_info)
331+
for out_name, out_info in requested_outputs.items()
332+
}
333+
else:
334+
outs = None
325335

326336
match processor := self.get_processor(process_id):
327337
case BasePrefectProcessor():
@@ -334,15 +344,7 @@ def execute_process(
334344
)
335345
case _:
336346
raise ProcessError(f"Unknown processor type {type(processor)!r}")
337-
if isinstance(requested_outputs, Sequence) and not isinstance(requested_outputs, str):
338-
outs = {out_name: ExecutionOutput() for out_name in requested_outputs}
339-
elif isinstance(requested_outputs, Mapping):
340-
outs = {
341-
out_name: ExecutionOutput(**out_info)
342-
for out_name, out_info in requested_outputs.items()
343-
}
344-
else:
345-
outs = None
347+
346348
execution_request = ExecuteRequest(
347349
deployment_info=deployment_info,
348350
inputs=data_,
@@ -351,8 +353,15 @@ def execute_process(
351353
subscriber=subscriber,
352354
)
353355
if chosen_mode == ProcessExecutionMode.sync_execute:
354-
media_type, generated_output = self._execute_job_sync(
355-
job_id, processor, execution_request)
356+
if isinstance(processor, BaseProcessor) and not self.use_vanilla_processor_deployments:
357+
print(f"Executing processor {processor.metadata['id']!r} in-process...")
358+
media_type, generated_output = _execute_job_sync_without_deployment(
359+
job_id, processor, execution_request
360+
)
361+
else:
362+
print(f"Executing processor {processor.metadata['id']!r} via Prefect deployment...")
363+
media_type, generated_output = self._execute_job_sync(
364+
job_id, processor, execution_request)
356365
print(f"{media_type=}")
357366
print(f"{generated_output=}")
358367
return job_id, media_type, generated_output, JobStatus.successful, response_headers
@@ -464,3 +473,29 @@ def _retrieve_result_from_prefect(
464473
media_type = MediaType(result_record.result[0])
465474
generated_output = result_record.result[1]
466475
return media_type, generated_output
476+
477+
478+
def _execute_job_sync_without_deployment(
479+
job_id: PygeoapiPrefectJobId,
480+
processor: BaseProcessor,
481+
execution_request: ExecuteRequest,
482+
) -> tuple[MediaType, Any]:
483+
"""Execute a job in sync mode by calling a flow directly
484+
485+
This execution mode is only suitable for:
486+
487+
- flows that wrap vanilla pygeoapi processors
488+
- flows whose source code is reachable in the same process
489+
as this one
490+
"""
491+
configured_flow = vanilla_flow.get_processor_as_flow(processor)
492+
configured_flow(
493+
processor.metadata["id"],
494+
job_id,
495+
execution_request.inputs,
496+
execution_request.outputs
497+
)
498+
return _retrieve_result_from_prefect(
499+
result_storage_block=execution_request.deployment_info.result_storage_block,
500+
result_storage_key_template=execution_request.deployment_info.result_storage_key_template
501+
)

src/pygeoapi_prefect/vanilla_flow.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,25 @@
77
from pathlib import Path
88

99
from prefect import (
10+
Flow,
1011
flow,
1112
task,
1213
)
1314
from prefect.runtime import flow_run
1415
from pygeoapi.process.manager.base import get_manager
16+
from pygeoapi.process.base import BaseProcessor
1517
from pygeoapi.util import yaml_load
1618

1719

20+
def get_processor_as_flow(processor: BaseProcessor) -> Flow:
21+
return run_vanilla_processor.with_options(
22+
name=processor.metadata["id"],
23+
version=processor.metadata.get("version"),
24+
flow_run_name=generate_flow_run_name,
25+
validate_parameters=True,
26+
)
27+
28+
1829
def get_deployment_name(processor_id: str) -> str:
1930
return f"pygeoapi-{processor_id}-local"
2031

0 commit comments

Comments
 (0)