Skip to content

Commit f7405d3

Browse files
committed
perf: use multiprocessing to speed up recalculate_expanded_sizes.py
1 parent 99e9304 commit f7405d3

1 file changed

Lines changed: 125 additions & 31 deletions

File tree

recalculate_expanded_sizes.py

Lines changed: 125 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,103 @@
33
import argparse
44
import csv
55
import logging
6+
import math
7+
import multiprocessing
68
import os
9+
import queue
710
import sys
11+
import time
12+
from itertools import batched
813

914
from common import FilteredIncludeChangeList, IgnoresConfiguration, IncludeChange
1015
from filter_include_changes import Change, filter_changes
1116
from list_transitive_includes import list_transitive_includes
1217
from include_analysis import IncludeAnalysisOutput, ParseError, parse_raw_include_analysis_output
1318
from typing import Callable, Dict, Iterator, List, Tuple
14-
from utils import load_config
19+
from utils import get_worker_count, load_config
1520

1621

17-
def recalculate_expanded_sizes(
22+
def recalculate_expanded_size(
23+
include_analysis: IncludeAnalysisOutput,
24+
filename: str,
25+
changes: List[Change],
26+
ignores: IgnoresConfiguration = None,
27+
filter_generated_files=True,
28+
filter_mojom_headers=True,
29+
filter_third_party=False,
30+
header_mappings: Dict[str, str] = None,
31+
include_directories: List[str] = None,
32+
remove_only=False,
33+
) -> Tuple[str, int]:
34+
# Recalculate all of the transitive includes for this file
35+
includes = list_transitive_includes(
36+
include_analysis,
37+
filename,
38+
metric="file_size",
39+
changes=changes,
40+
ignores=ignores,
41+
filter_generated_files=filter_generated_files,
42+
filter_mojom_headers=filter_mojom_headers,
43+
filter_third_party=filter_third_party,
44+
header_mappings=header_mappings,
45+
include_directories=include_directories,
46+
apply_changes=True,
47+
full=True,
48+
remove_only=remove_only,
49+
)
50+
51+
# The expanded size for the file is all of its include sizes, and its own size
52+
expanded_size = sum(map(lambda entry: entry[1], set(map(lambda entry: entry[1:3], includes))))
53+
expanded_size += include_analysis["sizes"][filename]
54+
55+
if expanded_size > include_analysis["tsizes"][filename]:
56+
logging.warning(
57+
f"{filename} unexpectedly increased in size from {include_analysis["tsizes"][filename]} to {expanded_size} - ignoring"
58+
)
59+
expanded_size = include_analysis["tsizes"][filename]
60+
61+
return (filename, expanded_size)
62+
63+
64+
def work_func(
65+
filenames: List[str],
66+
result_queue: multiprocessing.JoinableQueue,
1867
include_analysis: IncludeAnalysisOutput,
68+
changes: List[Change],
69+
ignores: IgnoresConfiguration = None,
70+
filter_generated_files=True,
71+
filter_mojom_headers=True,
72+
filter_third_party=False,
73+
header_mappings: Dict[str, str] = None,
74+
include_directories: List[str] = None,
75+
remove_only=False,
76+
):
77+
try:
78+
for filename in filenames:
79+
result = recalculate_expanded_size(
80+
include_analysis,
81+
filename,
82+
changes,
83+
ignores=ignores,
84+
filter_generated_files=filter_generated_files,
85+
filter_mojom_headers=filter_mojom_headers,
86+
filter_third_party=filter_third_party,
87+
header_mappings=header_mappings,
88+
include_directories=include_directories,
89+
remove_only=remove_only,
90+
)
91+
92+
result_queue.put_nowait(result)
93+
94+
# Don't exit until the result queue has been fully consumed
95+
result_queue.join()
96+
except KeyboardInterrupt:
97+
pass # Don't show the user anything
98+
99+
100+
def recalculate_expanded_sizes(
19101
filenames: List[str],
102+
include_analysis: IncludeAnalysisOutput,
20103
changes: List[Change],
21104
progress_callback: Callable[[str], None] = None,
22105
ignores: IgnoresConfiguration = None,
@@ -27,6 +110,8 @@ def recalculate_expanded_sizes(
27110
include_directories: List[str] = None,
28111
remove_only=False,
29112
) -> Iterator[Tuple[str, int]]:
113+
result_queue: multiprocessing.JoinableQueue[Tuple[str, int]] = multiprocessing.JoinableQueue()
114+
30115
include_changes = FilteredIncludeChangeList(
31116
filter_changes(
32117
changes,
@@ -38,38 +123,47 @@ def recalculate_expanded_sizes(
38123
)
39124
)
40125

41-
for filename in filenames:
42-
# Recalculate all of the transitive includes for this file
43-
includes = list_transitive_includes(
44-
include_analysis,
45-
filename,
46-
metric="file_size",
47-
changes=include_changes,
48-
ignores=ignores,
49-
filter_generated_files=filter_generated_files,
50-
filter_mojom_headers=filter_mojom_headers,
51-
filter_third_party=filter_third_party,
52-
header_mappings=header_mappings,
53-
include_directories=include_directories,
54-
apply_changes=True,
55-
full=True,
56-
remove_only=remove_only,
57-
)
126+
worker_count = min(len(filenames), get_worker_count())
127+
chunk_size = math.ceil(float(len(filenames)) / worker_count)
58128

59-
# The expanded size for the file is all of its include sizes, and its own size
60-
expanded_size = sum(map(lambda entry: entry[1], set(map(lambda entry: entry[1:3], includes))))
61-
expanded_size += include_analysis["sizes"][filename]
129+
chunked = list(batched(filenames, chunk_size))
62130

63-
if expanded_size > include_analysis["tsizes"][filename]:
64-
logging.warning(
65-
f"{filename} unexpectedly increased in size from {include_analysis["tsizes"][filename]} to {expanded_size} - ignoring"
66-
)
67-
expanded_size = include_analysis["tsizes"][filename]
131+
workers = [
132+
multiprocessing.Process(
133+
target=work_func,
134+
args=(chunked[idx], result_queue, include_analysis, include_changes),
135+
kwargs={
136+
"ignores": ignores,
137+
"filter_generated_files": filter_generated_files,
138+
"filter_mojom_headers": filter_mojom_headers,
139+
"filter_third_party": filter_third_party,
140+
"header_mappings": header_mappings,
141+
"include_directories": include_directories,
142+
"remove_only": remove_only,
143+
},
144+
)
145+
for idx in range(worker_count)
146+
]
147+
148+
for worker in workers:
149+
worker.start()
68150

69-
yield (filename, expanded_size)
151+
done = False
70152

71-
if progress_callback:
153+
while not done:
154+
try:
155+
(filename, expanded_size) = result_queue.get_nowait()
156+
yield (filename, expanded_size)
72157
progress_callback(filename)
158+
result_queue.task_done()
159+
except queue.Empty:
160+
pass
161+
162+
# If all workers have exited, then there should be no more results
163+
if any((worker.is_alive() for worker in workers)):
164+
time.sleep(0.01)
165+
else:
166+
done = True
73167

74168

75169
def main():
@@ -141,8 +235,8 @@ def main():
141235
disable=len(filenames) == 1, total=len(filenames), unit="file"
142236
) as progress_output:
143237
for row in recalculate_expanded_sizes(
144-
include_analysis,
145238
filenames,
239+
include_analysis,
146240
list(csv.reader(args.changes_file)),
147241
progress_callback=lambda _: progress_output.update(),
148242
ignores=ignores,
@@ -151,7 +245,7 @@ def main():
151245
filter_third_party=args.filter_third_party,
152246
header_mappings=config.headerMappings if config else None,
153247
include_directories=config.includeDirs if config else None,
154-
remove_only=args.remove_only
248+
remove_only=args.remove_only,
155249
):
156250
csv_writer.writerow(row)
157251

0 commit comments

Comments
 (0)