-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Expand file tree
/
Copy pathtest_load_flows_concurrently.py
More file actions
40 lines (33 loc) · 1.22 KB
/
test_load_flows_concurrently.py
File metadata and controls
40 lines (33 loc) · 1.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import tempfile
from pathlib import Path
from typing import Any
from prefect import Flow
from prefect.runner.storage import GitRepository
async def load_flow(entrypoint: str, base_dir: Path) -> Flow[..., Any]:
source = GitRepository(
url="https://github.com/PrefectHQ/examples.git",
)
source.set_base_path(base_dir)
return await Flow.from_source( # type: ignore # sync_compatible causes issues
source=source,
entrypoint=entrypoint,
)
async def run_iteration():
with tempfile.TemporaryDirectory() as tmpdir:
entrypoints = [
"flows/hello_world.py:hello",
"flows/whoami.py:whoami",
] * 5 # Load each flow 5 times concurrently
futures = [load_flow(entrypoint, Path(tmpdir)) for entrypoint in entrypoints]
flows = await asyncio.gather(*futures)
return len(flows)
async def test_load_flows_concurrently():
for i in range(10): # Run 10 iterations
try:
count = await run_iteration()
print(f"Iteration {i + 1}: Successfully loaded {count} flows")
except Exception as e:
print(f"Iteration {i + 1}: Failed with error: {str(e)}")
return False
return True