Skip to content

Commit 5a0ee77

Browse files
author
Ricardo Garcia Silva
committed
Execute vanilla pygeoapi processes
Brought the code inline with the expectations of the current implementation of pygeoapi
1 parent 5bc5f6b commit 5a0ee77

File tree

1 file changed

+136
-19
lines changed

1 file changed

+136
-19
lines changed

src/pygeoapi_prefect/manager.py

Lines changed: 136 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""pygeoapi process manager based on Prefect."""
22

3+
import json
34
import logging
45
import uuid
56
from typing import (
@@ -21,7 +22,10 @@
2122
from prefect.task_runners import ConcurrentTaskRunner
2223

2324
from pygeoapi.process import exceptions
24-
from pygeoapi.process.base import BaseProcessor
25+
from pygeoapi.process.base import (
26+
BaseProcessor,
27+
ProcessorExecuteError,
28+
)
2529
from pygeoapi.process.manager.base import BaseManager
2630
from pygeoapi.util import JobStatus
2731

@@ -165,12 +169,51 @@ def delete_job( # type: ignore [empty-body]
165169
"""Delete a job and associated results/ouptuts."""
166170
pass
167171

172+
173+
168174
def _select_execution_mode(
169-
self, requested: RequestedProcessExecutionMode | None, processor: BaseProcessor
175+
self,
176+
requested: Optional[RequestedProcessExecutionMode],
177+
processor: BaseProcessor
170178
) -> tuple[ProcessExecutionMode, dict[str, str]]:
171-
chosen_mode, additional_headers = super()._select_execution_mode(
172-
requested, processor
173-
)
179+
"""Select the execution mode to be employed
180+
181+
The execution mode to use depends on a number of factors:
182+
183+
- what mode, if any, was requested by the client?
184+
- does the process support sync and async execution modes?
185+
- does the process manager support sync and async modes?
186+
"""
187+
if requested == RequestedProcessExecutionMode.respond_async:
188+
# client wants async - do we support it?
189+
process_supports_async = (
190+
ProcessExecutionMode.async_execute.value in
191+
processor.process_description.job_control_options
192+
)
193+
if self.is_async and process_supports_async:
194+
chosen_mode = ProcessExecutionMode.async_execute
195+
additional_headers = {
196+
'Preference-Applied': (
197+
RequestedProcessExecutionMode.respond_async.value)
198+
}
199+
else:
200+
chosen_mode = ProcessExecutionMode.sync_execute
201+
additional_headers = {
202+
'Preference-Applied': (
203+
RequestedProcessExecutionMode.wait.value)
204+
}
205+
elif requested == RequestedProcessExecutionMode.wait:
206+
# client wants sync - pygeoapi implicitly supports sync mode
207+
logger.debug('Synchronous execution')
208+
chosen_mode = ProcessExecutionMode.sync_execute
209+
additional_headers = {
210+
'Preference-Applied': RequestedProcessExecutionMode.wait.value}
211+
else: # client has no preference
212+
# according to OAPI - Processes spec we ought to respond with sync
213+
logger.debug('Synchronous execution')
214+
chosen_mode = ProcessExecutionMode.sync_execute
215+
additional_headers = {}
216+
174217
has_deployment = getattr(processor, "deployment_info", None) is not None
175218
if chosen_mode == ProcessExecutionMode.async_execute and not has_deployment:
176219
logger.warning(
@@ -254,10 +297,22 @@ def _execute_base_processor(
254297
job_id: str,
255298
processor: BaseProcessor,
256299
execution_request: ExecuteRequest,
257-
) -> JobStatusInfoInternal:
258-
"""Execute a regular pygeoapi process via prefect."""
300+
# ) -> JobStatusInfoInternal:
301+
) -> tuple[str, Any, JobStatus]:
302+
"""Execute a regular pygeoapi process via prefect.
259303
260-
execution_parameters = execution_request.dict(by_alias=True)
304+
This wraps the pygeoapi processor.execute() call in a prefect flow,
305+
which is then run locally.
306+
307+
After the process is executed, this method mimics the default pygeoapi
308+
manager's behavior of saving generated outputs to disk.
309+
"""
310+
311+
execution_parameters = execution_request.dict(
312+
by_alias=True, exclude_none=True)
313+
input_parameters = execution_parameters.get("inputs", {})
314+
logger.warning(f"{execution_parameters=}")
315+
logger.warning(f"{input_parameters=}")
261316

262317
@flow(
263318
name=processor.metadata["id"],
@@ -280,7 +335,33 @@ def executor(data_: dict):
280335
"""
281336
return processor.execute(data_)
282337

283-
return executor(execution_parameters)
338+
try:
339+
output_media_type, generated_output = executor(input_parameters)
340+
except RuntimeError as err:
341+
# TODO: Change the exception once pygeoapi gets
342+
# process-execution-related exceptions in its main process.exceptions
343+
# module
344+
raise ProcessorExecuteError(str(err)) from err
345+
# raise exceptions.ProcessError() from err
346+
else:
347+
# now try to save outputs to local disk, similarly to what the
348+
# `pygeoapi.BaseManager._execute_handler_sync()` method does
349+
filename = f"{processor.metadata['id']}-{job_id}"
350+
job_path = self.output_dir / filename if self.output_dir is not None else None
351+
352+
if job_path is not None:
353+
logger.debug(f'writing output to {job_path}')
354+
if isinstance(generated_output, dict):
355+
mode = 'w'
356+
data = json.dumps(generated_output, sort_keys=True, indent=4)
357+
encoding = 'utf-8'
358+
else:
359+
mode = 'wb'
360+
data = generated_output
361+
encoding = None
362+
with job_path.open(mode=mode, encoding=encoding) as fh:
363+
fh.write(data)
364+
return output_media_type, generated_output, JobStatus.successful
284365

285366
def execute_process(
286367
self,
@@ -299,20 +380,48 @@ def execute_process(
299380
forwards it to the `_execute` method, where execution is handled.
300381
Finally, it receives whatever results are generated and converts
301382
back to the data structure expected by pygeoapi.
383+
384+
Also, note that current versions of pygeoapi only pass the `inputs`
385+
property of the execute request to the process manager. Therefore it
386+
is not possible to respond to additional execution request parameters,
387+
even if pygeoapi-prefect does support them.
388+
389+
This means that, for the moment, pygeoapi does not pass other keys in
390+
the OAPIP `execute.yaml` schema, which are:
391+
392+
- outputs
393+
- response
394+
- subscriber
395+
396+
for more on this see:
397+
398+
https://github.com/geopython/pygeoapi/issues/1285
399+
302400
"""
401+
# execution_request = ExecuteRequest(**data_dict)
402+
303403
# this can raise a pydantic validation error
304-
execution_request = ExecuteRequest(**data_dict)
404+
execution_request = ExecuteRequest(inputs=data_dict)
405+
logger.warning(f"{data_dict=}")
406+
logger.warning(f"{execution_request=}")
305407

306-
job_status, additional_headers = self._execute(
408+
execution_result = self._execute(
307409
process_id=process_id,
308410
execution_request=execution_request,
309411
requested_execution_mode=execution_mode,
310412
)
413+
(
414+
job_id,
415+
output_media_type,
416+
generated_output,
417+
status,
418+
additional_headers
419+
) = execution_result
311420
return (
312-
job_status.job_id,
313-
None, # mimetype?
314-
None, # outputs?
315-
job_status.status,
421+
job_id,
422+
output_media_type,
423+
generated_output,
424+
status,
316425
additional_headers,
317426
)
318427

@@ -321,7 +430,7 @@ def _execute(
321430
process_id: str,
322431
execution_request: ExecuteRequest,
323432
requested_execution_mode: RequestedProcessExecutionMode | None = None,
324-
) -> tuple[JobStatusInfoInternal, dict[str, str] | None]:
433+
) -> tuple[str, str, Any, JobStatus, dict[str, str]]:
325434
"""Process execution handler.
326435
327436
This manager is able to execute two types of processes:
@@ -344,11 +453,19 @@ def _execute(
344453
job_status = self._execute_prefect_processor(
345454
job_id, processor, chosen_mode, execution_request
346455
)
456+
output_media_type, generated_output, current_job_status = None
347457
else:
348-
job_status = self._execute_base_processor(
349-
job_id, processor, execution_request
458+
output_media_type, generated_output, current_job_status = (
459+
self._execute_base_processor(job_id, processor, execution_request)
350460
)
351-
return job_status, additional_headers
461+
# return job_status, additional_headers
462+
return (
463+
job_id,
464+
output_media_type,
465+
generated_output,
466+
current_job_status,
467+
additional_headers
468+
)
352469

353470
def get_output_data_raw(
354471
self, generated_output: OutputExecutionResultInternal, process_id: str

0 commit comments

Comments
 (0)