Skip to content

Commit 942222b

Browse files
committed
adapting code to match current pygeoapi
1 parent 08ff0bf commit 942222b

File tree

1 file changed

+70
-68
lines changed

1 file changed

+70
-68
lines changed

src/pygeoapi_prefect/manager.py

Lines changed: 70 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
MediaType = NewType("MediaType", str)
6060
ProcessId = NewType("ProcessId", str)
6161
ResponseHeaders = NewType("ResponseHeaders", dict[str, str])
62+
JobOutputs = str | dict | list | bytes
6263

6364

6465

@@ -158,6 +159,7 @@ def execute_process(
158159
) -> tuple[
159160
PygeoapiPrefectJobId,
160161
MediaType,
162+
JobOutputs,
161163
JobStatus,
162164
ResponseHeaders | None
163165
]:
@@ -360,55 +362,17 @@ def execute_process(
360362
subscriber: Subscriber | None = None,
361363
requested_response: RequestedResponse | None = RequestedResponse.raw
362364
) -> tuple[
363-
PygeoapiPrefectJobId, MediaType, JobStatus, ResponseHeaders | None
365+
PygeoapiPrefectJobId,
366+
MediaType,
367+
JobOutputs,
368+
JobStatus,
369+
ResponseHeaders | None
364370
]:
365-
"""pygeoapi compatibility method.
366-
367-
Contrary to pygeoapi, which stores requested execution parameters as
368-
a plain dictionary, pygeoapi-prefect rather uses a
369-
`schemas.ExecuteRequest` instance instead - this allows parsing the
370-
input data with the pydantic models crafted from the OGC API -
371-
Processes schemas. Thus, this method performs a light validation of the
372-
input data, converts it from a `dict` to an `ExecuteRequest` and
373-
forwards it to the `_execute` method, where execution is handled.
374-
Finally, it receives whatever results are generated and converts
375-
back to the data structure expected by pygeoapi.
376-
"""
377-
logger.debug(f"inside execute_process {locals()=}")
378-
execution_result = self._execute(
379-
processor=self.get_processor(process_id),
380-
execution_request=schemas.ExecuteRequest(
381-
inputs=data_,
382-
outputs={
383-
k: schemas.ExecutionOutput(**v)
384-
for k, v in requested_outputs.items()
385-
} if requested_outputs else None,
386-
response=requested_response,
387-
subscriber=subscriber,
388-
),
389-
requested_execution_mode=execution_mode,
390-
)
391-
(job_id, output_media_type, generated_output, status, additional_headers) = (
392-
execution_result
393-
)
394-
return (
395-
job_id,
396-
output_media_type,
397-
status,
398-
additional_headers,
399-
)
400-
401-
def _execute(
402-
self,
403-
processor: PygeoapiProcessorProtocol,
404-
execution_request: schemas.ExecuteRequest,
405-
requested_execution_mode: RequestedProcessExecutionMode | None = None,
406-
) -> tuple[PygeoapiPrefectJobId, MediaType, Any, JobStatus, ResponseHeaders]:
407371
"""Process execution handler.
408372
409373
This manager is able to execute two types of processes:
410374
411-
- Normal pygeoapi processes, i.e. those that derive from
375+
- Vanilla pygeoapi processes, i.e. those that derive from
412376
`pygeoapi.process.base.BaseProcessor`. These are made into prefect
413377
flows and are run with prefect. These always run locally.
414378
@@ -417,31 +381,69 @@ def _execute(
417381
able to take full advantage of prefect's features, which includes
418382
running elsewhere, as defined by deployments.
419383
"""
384+
logger.debug(f"inside execute_process {locals()=}")
385+
execution_request = schemas.ExecuteRequest(
386+
inputs=data_,
387+
outputs={
388+
k: schemas.ExecutionOutput(**v)
389+
for k, v in requested_outputs.items()
390+
} if requested_outputs else None,
391+
response=requested_response,
392+
subscriber=subscriber,
393+
),
420394
job_id = PygeoapiPrefectJobId(str(uuid.uuid4()))
421-
if isinstance(processor, BasePrefectProcessor):
422-
chosen_mode, additional_headers = select_prefect_processor_execution_mode(
423-
requested_execution_mode, processor)
424-
internal_job_status = self._execute_prefect_processor(
425-
job_id, processor, chosen_mode, execution_request
426-
)
427-
return (
428-
job_id,
429-
None,
430-
None,
431-
internal_job_status.status,
432-
additional_headers
433-
)
434-
else:
435-
output_media_type, generated_output, current_job_status = (
436-
self._execute_vanilla_processor_sync(job_id, processor, execution_request)
437-
)
438-
return (
439-
job_id,
440-
output_media_type,
441-
generated_output,
442-
current_job_status,
443-
{"Preference-applied": RequestedProcessExecutionMode.wait.value},
444-
)
395+
# processor = self.get_processor(process_id)
396+
match processor := self.get_processor(process_id):
397+
case BasePrefectProcessor():
398+
chosen_mode, headers = select_prefect_processor_execution_mode(
399+
execution_mode, processor)
400+
internal_job_status = self._execute_prefect_processor(
401+
job_id, processor, chosen_mode, execution_request
402+
)
403+
return (
404+
job_id,
405+
None,
406+
None,
407+
internal_job_status.status,
408+
additional_headers
409+
)
410+
case BaseProcessor():
411+
output_media_type, generated_output, current_job_status = (
412+
self._execute_vanilla_processor_sync(job_id, processor, execution_request)
413+
)
414+
return (
415+
job_id,
416+
output_media_type,
417+
generated_output,
418+
current_job_status,
419+
{"Preference-applied": RequestedProcessExecutionMode.wait.value},
420+
)
421+
case _:
422+
raise ProcessorExecuteError("Unknown processor type")
423+
# if isinstance(processor, BasePrefectProcessor):
424+
# chosen_mode, additional_headers = select_prefect_processor_execution_mode(
425+
# execution_mode, processor)
426+
# internal_job_status = self._execute_prefect_processor(
427+
# job_id, processor, chosen_mode, execution_request
428+
# )
429+
# return (
430+
# job_id,
431+
# None,
432+
# None,
433+
# internal_job_status.status,
434+
# additional_headers
435+
# )
436+
# else:
437+
# output_media_type, generated_output, current_job_status = (
438+
# self._execute_vanilla_processor_sync(job_id, processor, execution_request)
439+
# )
440+
# return (
441+
# job_id,
442+
# output_media_type,
443+
# generated_output,
444+
# current_job_status,
445+
# {"Preference-applied": RequestedProcessExecutionMode.wait.value},
446+
# )
445447

446448
def _execute_prefect_processor(
447449
self,

0 commit comments

Comments
 (0)