|
1 | 1 | """IWC (Intergalactic Workflows Commission) manifest fetching and search helpers. |
2 | 2 |
|
3 | | -The previous attempt at IWC integration kept its manifest cache on |
4 | | -``AgentOperationsManager``, which is instantiated per request, so the cache |
5 | | -never hit. Putting the cache at module scope keeps it shared across requests |
6 | | -within a worker process. |
| 3 | +Manifest fetches go through a process-wide TTL cache so the per-request |
| 4 | +``AgentOperationsManager`` instances all share the same hit. Pre-warming |
| 5 | +the cache via celery beat is a reasonable follow-up. |
7 | 6 | """ |
8 | 7 |
|
9 | 8 | import logging |
10 | 9 | import re |
11 | | -import threading |
12 | | -import time |
| 10 | +from threading import Lock |
13 | 11 | from typing import ( |
14 | 12 | Any, |
15 | 13 | Optional, |
16 | 14 | ) |
17 | 15 |
|
| 16 | +from cachetools import ( |
| 17 | + cached, |
| 18 | + TTLCache, |
| 19 | +) |
| 20 | + |
18 | 21 | from galaxy.util import requests |
19 | 22 |
|
20 | 23 | log = logging.getLogger(__name__) |
21 | 24 |
|
22 | 25 | IWC_MANIFEST_URL = "https://iwc.galaxyproject.org/workflow_manifest.json" |
23 | 26 | CACHE_TTL_SECONDS = 60 * 60 # one hour |
24 | 27 |
|
25 | | -_cache_lock = threading.Lock() |
26 | | -_cached_manifest: Optional[list[dict[str, Any]]] = None |
27 | | -_cached_at: float = 0.0 |
| 28 | +_manifest_cache: TTLCache = TTLCache(maxsize=1, ttl=CACHE_TTL_SECONDS) |
| 29 | +_manifest_cache_lock = Lock() |
28 | 30 |
|
29 | 31 |
|
30 | 32 | def clear_manifest_cache() -> None: |
31 | 33 | """Reset the manifest cache. Tests use this; production normally won't.""" |
32 | | - global _cached_manifest, _cached_at |
33 | | - with _cache_lock: |
34 | | - _cached_manifest = None |
35 | | - _cached_at = 0.0 |
| 34 | + with _manifest_cache_lock: |
| 35 | + _manifest_cache.clear() |
36 | 36 |
|
37 | 37 |
|
| 38 | +@cached(cache=_manifest_cache, lock=_manifest_cache_lock) |
38 | 39 | def fetch_manifest(timeout: float = 30.0) -> list[dict[str, Any]]: |
39 | 40 | """Fetch the IWC manifest, returning a cached copy when fresh.""" |
40 | | - global _cached_manifest, _cached_at |
41 | | - with _cache_lock: |
42 | | - now = time.monotonic() |
43 | | - if _cached_manifest is not None and (now - _cached_at) < CACHE_TTL_SECONDS: |
44 | | - return _cached_manifest |
45 | | - |
46 | | - response = requests.get(IWC_MANIFEST_URL, timeout=timeout) |
47 | | - response.raise_for_status() |
48 | | - manifest = response.json() |
49 | | - if not isinstance(manifest, list): |
50 | | - raise ValueError(f"IWC manifest at {IWC_MANIFEST_URL} did not return a JSON array") |
51 | | - _cached_manifest = manifest |
52 | | - _cached_at = now |
53 | | - return manifest |
| 41 | + response = requests.get(IWC_MANIFEST_URL, timeout=timeout) |
| 42 | + response.raise_for_status() |
| 43 | + manifest = response.json() |
| 44 | + if not isinstance(manifest, list): |
| 45 | + raise ValueError(f"IWC manifest at {IWC_MANIFEST_URL} did not return a JSON array") |
| 46 | + return manifest |
54 | 47 |
|
55 | 48 |
|
56 | 49 | def all_workflows(manifest: list[dict[str, Any]]) -> list[dict[str, Any]]: |
|
0 commit comments