-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
69 lines (51 loc) · 2.01 KB
/
test.py
File metadata and controls
69 lines (51 loc) · 2.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import random
import time
import multiprocessing
import os
import mp
# very important processing stuff comes here.
# Note: annotated by @mp.catch_exception - that's how we keep track of errors during task execution
@mp.catch_exception
def example_worker_task(task):
sleep_time = random.uniform(2, 5)
time.sleep(sleep_time)
do_crash = random.uniform(1, 20)
if do_crash < 2:
# crash the party, ocassionally
return 1/0
# task callback has to conform to the same form:
# can't abstract that away, as
# - per worker task must init its own resources (in the real world)
# - passing the task class around processes is.. challenging
# - want to keep task callback local to the worker process, don't want to loose that.
def task_cb(task_queue, counters):
print(f"Worker: {os.getpid()} start worker")
while True:
task = task_queue.get()
if task is None:
print(f"Worker: {os.getpid()} - get null task")
task_queue.task_done()
break
start = time.monotonic_ns()
# the function that does the worker task
res = example_worker_task(task)
duration = time.monotonic_ns() - start
duration_ms = duration // 1000000
task_queue.task_done()
counters.inc_completed(duration_ms, res)
print(f"Worker: {os.getpid()} exit worker")
counters.dec_workers()
def run_system():
with multiprocessing.Manager() as manager:
# multiprocessor helper with statistics + http server to query stats.
mph = mp.MultiprocessingHelper(num_process=10, process_cb=task_cb, manager=manager, http_port=8080)
# generate workload
for idx in range(0, 100):
task_msg = (idx)
mph.post(task_msg)
# wrap it up - tell workers to finish & close workers & wait for completion
mph.finish()
# with multiprocessing: must run the command via this check.
# doesn't run without it.
if __name__ == '__main__':
run_system()