33
44import click
55from prefect import serve
6- from prefect . filesystems import RemoteFileSystem
6+ from pygeoapi . process . base import BaseProcessor
77from pygeoapi .process .manager .base import get_manager
88from pygeoapi .util import yaml_load
99
1010from . import (
1111 BasePrefectProcessor ,
1212 PrefectManager ,
13+ vanilla_flow ,
1314)
14- from .manager import ProcessId
15- from .vanilla_flow import (
16- generate_flow_run_name ,
17- run_vanilla_processor ,
18- )
15+ from .schemas import ProcessId
1916
2017logger = logging .getLogger (__name__ )
2118
2421def root (): ...
2522
2623
27- @root .command ()
28- @click .argument ("block_name" )
29- @click .argument ("base_path" )
30- @click .argument ("endpoint_url" )
31- @click .argument ("key" )
32- @click .argument ("secret" )
33- def create_storage_block (
34- block_name : str , base_path : str , endpoint_url : str , key : str , secret : str
35- ):
36- """Create storage block of type 'remote-file-system' on the prefect server
37-
38- PARAMETERS
39-
40- block_name - A name for the storage block. Note that it will then be referenced
41- by prefect as 'remote-file-system/<block_name>'
42-
43- base_path - The base path for the storage block. This depends on the remote storage
44- being used. As an example, with S3-compatible storage you can use
45- 's3://<bucket-name>'
46-
47- endpoint_url - Base URL of the remote storage. For example, a local minIO instance
48- could use 'http://localhost:9000'
49-
50- key - User id of the remote storage
51-
52- secret - User password of the remote storage
53-
54- """
55- print (f"Creating block remote-file-system/{ block_name } ..." )
56- block = RemoteFileSystem (
57- basepath = base_path ,
58- settings = {
59- "key" : key ,
60- "secret" : secret ,
61- "client_kwargs" : {"endpoint_url" : endpoint_url },
62- },
63- )
64- block .save (block_name , overwrite = True )
65- print ("Done!" )
66-
67-
6824@root .command (name = "deploy-static" )
6925@click .option ("-c" , "--pygeoapi-config" , type = Path , envvar = "PYGEOAPI_CONFIG" )
7026def deploy_processors_locally (
@@ -88,81 +44,19 @@ def deploy_processors_locally(
8844 f"deployment configuration"
8945 )
9046 continue
91- case BasePrefectProcessor ():
92- configured_flow = run_vanilla_processor .with_options (
47+ case BaseProcessor ():
48+ configured_flow = vanilla_flow . run_vanilla_processor .with_options (
9349 name = processor .metadata ["id" ],
9450 version = processor .metadata .get ("version" ),
95- flow_run_name = generate_flow_run_name ,
51+ flow_run_name = vanilla_flow . generate_flow_run_name ,
9652 validate_parameters = True ,
9753 )
9854 flow_deployment = configured_flow .to_deployment (
99- name = f"pygeoapi- { processor_id } -local" ,
55+ name = vanilla_flow . get_deployment_name ( processor_id ) ,
10056 parameters = {"processor_id" : processor_id },
10157 )
10258 to_serve .append (flow_deployment )
10359 case _:
10460 logger .warning (f"Unknown processor type { processor_id } , ignoring..." )
10561 continue
106- # processor = manager.get_processor(ProcessId(processor_id))
107- # if isinstance(processor, BasePrefectProcessor):
108- # logger.debug(
109- # f"Skipping process {processor_id!r} - it provides its own "
110- # f"deployment configuration"
111- # )
112- # continue
113- # else:
114- # configured_flow = run_vanilla_processor.with_options(
115- # name=processor.metadata["id"],
116- # version=processor.metadata.get("version"),
117- # flow_run_name=generate_flow_run_name,
118- # validate_parameters=True,
119- # )
120- # flow_deployment = configured_flow.to_deployment(
121- # name=f"pygeoapi-{processor_id}-local",
122- # parameters={"processor_id": processor_id},
123- # )
124- # to_serve.append(flow_deployment)
12562 serve (* to_serve )
126-
127-
128- # @root.command()
129- # @click.argument("process_id")
130- # @click.option("-c", "--pygeoapi-config", type=Path, envvar="PYGEOAPI_CONFIG")
131- # def deploy_process(
132- # process_id: str,
133- # pygeoapi_config: Path,
134- # ):
135- # """Create and apply prefect deployment for PROCESS_ID.
136- #
137- # Configure deployment parameters for the process in pygeoapi's configuration
138- # file.
139- # """
140- # with pygeoapi_config.open() as fh:
141- # config = yaml_load(fh)
142- # manager = get_manager(config)
143- # try:
144- # processor = manager.get_processor(process_id)
145- # except UnknownProcessError as err:
146- # raise click.BadParameter(f"Process {process_id!r} not found") from err
147- # else:
148- # if isinstance(processor, BasePrefectProcessor):
149- # if processor.deployment_info is not None:
150- # print(f"Deploying process {process_id!r} with prefect...")
151- # if (sb := processor.deployment_info.storage_block) is not None:
152- # storage = Block.load(sb)
153- # else:
154- # storage = None
155- # deployment = Deployment.build_from_flow(
156- # processor.process_flow,
157- # name=processor.deployment_info.name,
158- # work_queue_name=processor.deployment_info.queue,
159- # storage=storage,
160- # path=processor.deployment_info.storage_sub_path,
161- # ignore_file=None,
162- # )
163- # deployment.apply()
164- # print("Done!")
165- # else:
166- # raise click.Abort("Deployment not specified in pygeoapi config file")
167- # else:
168- # print(f"Process {process_id!r} is not deployable with prefect, skipping...")
0 commit comments