-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathutils.py
More file actions
299 lines (220 loc) · 12.2 KB
/
Copy pathutils.py
File metadata and controls
299 lines (220 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import functools
import logging
import multiprocessing
import os
import pathlib
from collections import defaultdict
from typing import DefaultDict, Dict, List
import networkx as nx
from common import Configuration
from include_analysis import IncludeAnalysisOutput
def get_worker_count():
try:
return len(os.sched_getaffinity(0))
except AttributeError:
return multiprocessing.cpu_count()
def load_config(name: str):
config_file = pathlib.Path(__file__).parent.joinpath("configs", name).with_suffix(".json")
if not config_file.exists():
print(f"error: no config file found: {config_file}")
return 1
config = Configuration.parse_file(config_file)
# TODO - Make it recursive so it can handle deeply nested configs
# TODO - Maybe warn about duplicates when merging?
# Merge the dependencies (maybe this should be made generic to sub-configs?)
for dependency in config.dependencies:
# Dependency paths are automatically added to include directories
config.includeDirs.append(dependency)
if isinstance(config.dependencies[dependency], str):
dependency_config_file = config_file.parent.joinpath(config.dependencies[dependency])
if not dependency_config_file.exists():
print(f"error: no config file found: {dependency_config_file}")
return 1
dependency_config = Configuration.parse_file(dependency_config_file)
dependency_ignores = dependency_config.ignores
else:
dependency_ignores = config.dependencies[dependency].ignores
# Files to skip are relative to the source root
for file_to_skip in dependency_ignores.skip:
config.ignores.skip.append(str(pathlib.Path(dependency).joinpath(file_to_skip)))
for op in ("add", "remove"):
# Filenames are relative to the source root
for filename in getattr(dependency_ignores, op).filenames:
getattr(config.ignores, op).filenames.append(str(pathlib.Path(dependency).joinpath(filename)))
# Headers are accessible both internally and externally, so include them as-is and
# also include them relative to the source root for top-level inclusion
for header in getattr(dependency_ignores, op).headers:
headers = getattr(config.ignores, op).headers
headers.append(header)
headers.append(str(pathlib.Path(dependency).joinpath(header)))
# Edges are only processed if their file is, and that file is relative to the source root
for filename, header in getattr(dependency_ignores, op).edges:
getattr(config.ignores, op).edges.append((str(pathlib.Path(dependency).joinpath(filename)), header))
return config
def get_include_analysis_edge_sizes(include_analysis: IncludeAnalysisOutput):
edge_sizes: DefaultDict[str, Dict[str, int]] = defaultdict(dict)
for filename in include_analysis["esizes"]:
for include, size in include_analysis["esizes"][filename].items():
edge_sizes[filename][include] = size
return edge_sizes
def get_include_analysis_edge_expanded_sizes(include_analysis: IncludeAnalysisOutput):
files = include_analysis["files"]
edge_expanded_sizes: DefaultDict[str, Dict[str, int]] = defaultdict(dict)
for filename in files:
for include in include_analysis["includes"][filename]:
try:
edge_expanded_sizes[filename][include] = include_analysis["tsizes"][include]
except KeyError:
pass
return edge_expanded_sizes
def get_include_analysis_edge_file_sizes(include_analysis: IncludeAnalysisOutput):
if "_edge_file_sizes" in include_analysis:
return include_analysis["_edge_file_sizes"]
files = include_analysis["files"]
edge_file_sizes: DefaultDict[str, Dict[str, int]] = defaultdict(dict)
for filename in files:
for include in include_analysis["includes"][filename]:
try:
edge_file_sizes[filename][include] = include_analysis["sizes"][include]
except KeyError:
logging.warning(f"Couldn't get include file size for {include}")
pass
include_analysis["_edge_file_sizes"] = edge_file_sizes
return edge_file_sizes
def get_include_file_size(include_analysis: IncludeAnalysisOutput, include: str):
try:
return include_analysis["sizes"][include]
except KeyError:
logging.warning(f"Couldn't get include file size for {include}")
include_analysis["sizes"][include] = 0
return 0
def get_include_analysis_edge_prevalence(include_analysis: IncludeAnalysisOutput):
files = include_analysis["files"]
root_count = len(include_analysis["roots"])
edge_prevalence: DefaultDict[str, Dict[str, float]] = defaultdict(dict)
for filename in files:
for include in include_analysis["includes"][filename]:
prevalence = include_analysis["prevalence"][filename]
edge_prevalence[filename][include] = (100.0 * prevalence) / root_count
return edge_prevalence
def create_graph_from_include_analysis(include_analysis: IncludeAnalysisOutput):
DG = nx.DiGraph()
files = include_analysis["files"]
sizes = include_analysis["sizes"]
file_idx_lookup = {filename: idx for idx, filename in enumerate(files)}
# Add nodes and edges to the graph
for filename, idx in file_idx_lookup.items():
DG.add_node(idx, filename=filename, filesize=sizes.get(filename, 0))
for include in include_analysis["includes"][filename]:
DG.add_edge(idx, file_idx_lookup[include], capacity=1)
return DG
def get_include_analysis_edges_centrality(include_analysis: IncludeAnalysisOutput):
DG: nx.DiGraph = create_graph_from_include_analysis(include_analysis)
nodes_in = nx.in_degree_centrality(DG)
nodes_out = nx.out_degree_centrality(DG)
files = include_analysis["files"]
file_idx_lookup = {filename: idx for idx, filename in enumerate(files)}
edges_centrality: DefaultDict[str, Dict[str, float]] = defaultdict(dict)
# Centrality is a metric for a node, but we want to create a metric for an edge.
# For the moment, this will use a heuristic which combines the in-degree centrality
# of the node where the edge starts, and the out-degree centrality of the node the
# edge is pulling into the graph. This hopefully creates a metric which lets us find
# edges in commonly included nodes, which pull lots of nodes into the graph.
for idx, filename in enumerate(files):
for include in include_analysis["includes"][filename]:
# Scale the value up so it's more human-friendly instead of having lots of leading zeroes
edges_centrality[filename][include] = 100000 * nodes_out[file_idx_lookup[include]] * nodes_in[idx]
return edges_centrality
def get_include_analysis_edge_includer_size(include_analysis: IncludeAnalysisOutput):
edge_sizes: DefaultDict[str, Dict[str, int]] = defaultdict(dict)
for filename in include_analysis["esizes"]:
for include, _ in include_analysis["esizes"][filename].items():
edge_sizes[filename][include] = include_analysis["sizes"][filename]
return edge_sizes
@functools.cache
def _init_path(value: str):
return pathlib.Path(value)
# Cache for normalized include paths to avoid repeated normalization
_normalized_include_paths: Dict[str, str] = {}
def normalize_include_path(
include_analysis: IncludeAnalysisOutput, includer: str, include: str, include_directories: List[str] = None
) -> str:
"""Normalize an include path to long form (e.g. full file path)."""
if include not in _normalized_include_paths:
# TODO - Make include_analysis["files"] a set always, after verifying no existing code relies on it being a list
if "files_set" not in include_analysis:
include_analysis["files_set"] = set(include_analysis["files"])
files = include_analysis["files_set"]
normalized = None
if include_directories is None:
include_directories = []
# Normalization may not be necessary
if include in files:
normalized = include
else:
if include.startswith("<") and include.endswith(">"):
include = include.strip("<>")
# Angle bracket headers might be in either libc++ or the sysroot
sysroot = _init_path(include_analysis["sysroot"])
normalized = f"third_party/libc++/src/include/{include}"
if normalized not in files:
normalized = str(sysroot.joinpath("usr", "include", include))
if normalized not in files:
normalized = str(sysroot.joinpath("usr", "include", include_analysis["sysroot_platform"], include))
if normalized not in files:
for include_directory in include_directories:
# Replace {sysroot} and {sysroot-platform} with the actual path from the include analysis
if include_directory.startswith("{sysroot}"):
resolved_include_directory = str(sysroot.joinpath(include_directory[10:]))
full_path = str(_init_path(resolved_include_directory).joinpath(include))
if full_path in files:
normalized = full_path
break
elif include_directory.startswith("{sysroot-platform}"):
for part in ["include", "lib"]:
resolved_include_directory = str(
sysroot.joinpath(
"usr", part, include_analysis["sysroot_platform"], include_directory[19:]
)
)
full_path = str(_init_path(resolved_include_directory).joinpath(include))
if full_path in files:
normalized = full_path
break
if normalized in files:
break
else:
# First check if it might be a relative file
relative_file_path = str(_init_path(includer).parent.joinpath(include))
if relative_file_path in files:
normalized = relative_file_path
else:
# Then check if it might be a generated file
gen_prefix = _init_path(include_analysis["gen_prefix"])
generated_file_path = str(gen_prefix.joinpath(include))
if generated_file_path in files:
normalized = generated_file_path
else:
# If not, try to find it in the include directories
for include_directory in include_directories:
if include_directory.startswith("{gen}"):
# Replace {gen} with the actual gen prefix from the include analysis
include_directory = str(gen_prefix.joinpath(include_directory[6:]))
elif include_directory.startswith("{sysroot}") or include_directory.startswith(
"{sysroot-platform}"
):
continue # These are handled as angle brackets
full_path = str(_init_path(include_directory).joinpath(include))
if full_path in files:
normalized = full_path
break
if normalized is None:
logging.warning(f"Could not normalize include path: {include}.")
normalized = include
elif normalized not in files:
logging.warning(
f"Normalized include path not found in include analysis files: {normalized}. Falling back to original include path."
)
normalized = include
_normalized_include_paths[include] = normalized
return _normalized_include_paths[include]