|
2 | 2 | API for asynchronous job running mechanisms can use to fetch or put files related to running and queued jobs. |
3 | 3 | """ |
4 | 4 |
|
| 5 | +import asyncio |
5 | 6 | import logging |
6 | 7 | import os |
7 | 8 | import re |
|
19 | 20 | Path, |
20 | 21 | Query, |
21 | 22 | Request, |
| 23 | + Response, |
22 | 24 | UploadFile, |
23 | 25 | ) |
24 | 26 | from fastapi.params import Depends |
|
36 | 38 | DependsOnTrans, |
37 | 39 | Router, |
38 | 40 | ) |
| 41 | +from galaxy.work.context import SessionRequestContext |
39 | 42 | from . import BaseGalaxyAPIController |
40 | 43 |
|
41 | 44 | __all__ = ("FastAPIJobFiles", "JobFilesAPIController", "router") |
@@ -179,6 +182,63 @@ def index( |
179 | 182 |
|
180 | 183 | return GalaxyFileResponse(path) |
181 | 184 |
|
| 185 | + # The ARC remote job runner (`lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects a `PUT` endpoint to stage |
| 186 | + # out result files back to Galaxy. |
| 187 | + @router.put( |
| 188 | + "/api/jobs/{job_id}/files", |
| 189 | + summary="Populate an output file.", |
| 190 | + responses={ |
| 191 | + 201: {"description": "A new file has been created."}, |
| 192 | + 204: {"description": "An existing file has been replaced."}, |
| 193 | + 400: {"description": "Bad request."}, |
| 194 | + }, |
| 195 | + ) |
| 196 | + def populate( |
| 197 | + self, |
| 198 | + job_id: Annotated[str, Path(description="Encoded id string of the job.")], |
| 199 | + path: Annotated[str, Query(description="Path to file to create/replace.")], |
| 200 | + job_key: Annotated[ |
| 201 | + str, |
| 202 | + Query( |
| 203 | + description=( |
| 204 | + "A key used to authenticate this request as acting on behalf of a job runner for the specified job." |
| 205 | + ), |
| 206 | + ), |
| 207 | + ], |
| 208 | + trans: SessionRequestContext = DependsOnTrans, |
| 209 | + ): |
| 210 | + path = unquote(path) |
| 211 | + |
| 212 | + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) |
| 213 | + self.__check_job_can_write_to_path(trans, job, path) |
| 214 | + |
| 215 | + destination_file_exists = os.path.exists(path) |
| 216 | + |
| 217 | + # FastAPI can only read the file contents from the request body in an async context. To write the file without |
| 218 | + # using an async endpoint, the async code that reads the file from the body and writes it to disk will have to |
| 219 | + # run within the sync endpoint. Since the code that writes the data to disk is blocking |
| 220 | + # `destination_file.write(chunk)`, it has to run on its own event loop within the thread spawned to answer the |
| 221 | + # request to the sync endpoint. |
| 222 | + async def write(): |
| 223 | + with open(path, "wb") as destination_file: |
| 224 | + async for chunk in trans.request.stream(): |
| 225 | + destination_file.write(chunk) |
| 226 | + |
| 227 | + target_dir = os.path.dirname(path) |
| 228 | + util.safe_makedirs(target_dir) |
| 229 | + event_loop = asyncio.new_event_loop() |
| 230 | + try: |
| 231 | + asyncio.set_event_loop(event_loop) |
| 232 | + event_loop.run_until_complete(write()) |
| 233 | + finally: |
| 234 | + event_loop.close() |
| 235 | + |
| 236 | + return ( |
| 237 | + Response(status_code=201, headers={"Location": str(trans.request.url)}) |
| 238 | + if not destination_file_exists |
| 239 | + else Response(status_code=204) |
| 240 | + ) |
| 241 | + |
182 | 242 | @router.post( |
183 | 243 | "/api/jobs/{job_id}/files", |
184 | 244 | summary="Populate an output file.", |
|
0 commit comments