forked from datajoint/datajoint-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlineage.py
More file actions
375 lines (303 loc) · 11.6 KB
/
lineage.py
File metadata and controls
375 lines (303 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
"""
Lineage management for semantic matching in DataJoint.
Lineage identifies the origin of an attribute - where it was first defined.
It is represented as a string in the format: schema_name.table_name.attribute_name
Semantic matching is applied to all binary operations that match attributes by name:
- Join (A * B): matches on homologous namesakes
- Restriction (A & B, A - B): matches on homologous namesakes
- Aggregation (A.aggr(B, ...)): requires homologous namesakes for grouping
- Union (A + B): requires all namesakes to have matching lineage
If namesake attributes have different lineages (including either being None),
a DataJointError is raised.
If the ~lineage table doesn't exist for a schema, a warning is issued and
semantic checking is disabled for operations involving that schema.
The ~lineage table stores lineage information for each schema, populated at table
declaration time. Use schema.rebuild_lineage() to restore lineage for legacy schemas.
"""
import logging
from .errors import DataJointError
logger = logging.getLogger(__name__.split(".")[0])
def ensure_lineage_table(connection, database):
"""
Create the ~lineage table in the schema if it doesn't exist.
Parameters
----------
connection : Connection
A DataJoint connection object.
database : str
The schema/database name.
"""
adapter = connection.adapter
# Build fully qualified table name
lineage_table = f"{adapter.quote_identifier(database)}.{adapter.quote_identifier('~lineage')}"
# Build column definitions using adapter
columns = [
adapter.format_column_definition("table_name", "VARCHAR(64)", nullable=False, comment="table name within the schema"),
adapter.format_column_definition("attribute_name", "VARCHAR(64)", nullable=False, comment="attribute name"),
adapter.format_column_definition("lineage", "VARCHAR(255)", nullable=False, comment="origin: schema.table.attribute"),
]
# Build PRIMARY KEY using adapter
pk_cols = adapter.quote_identifier("table_name") + ", " + adapter.quote_identifier("attribute_name")
pk_clause = f"PRIMARY KEY ({pk_cols})"
sql = (
f"CREATE TABLE IF NOT EXISTS {lineage_table} (\n"
+ ",\n".join(columns + [pk_clause])
+ f"\n) {adapter.table_options_clause()}"
)
connection.query(sql)
def lineage_table_exists(connection, database):
"""
Check if the ~lineage table exists in the schema.
Parameters
----------
connection : Connection
A DataJoint connection object.
database : str
The schema/database name.
Returns
-------
bool
True if the table exists, False otherwise.
"""
try:
result = connection.query(
connection.adapter.get_table_info_sql(database, "~lineage")
).fetchone()
return result is not None
except Exception:
return False
def get_lineage(connection, database, table_name, attribute_name):
"""
Get the lineage for an attribute from the ~lineage table.
Parameters
----------
connection : Connection
A DataJoint connection object.
database : str
The schema/database name.
table_name : str
The table name.
attribute_name : str
The attribute name.
Returns
-------
str or None
The lineage string, or None if not found.
"""
if not lineage_table_exists(connection, database):
return None
adapter = connection.adapter
lineage_table = f"{adapter.quote_identifier(database)}.{adapter.quote_identifier('~lineage')}"
result = connection.query(
f"""
SELECT lineage FROM {lineage_table}
WHERE table_name = %s AND attribute_name = %s
""",
args=(table_name, attribute_name),
).fetchone()
return result[0] if result else None
def get_table_lineages(connection, database, table_name):
"""
Get all lineages for a table from the ~lineage table.
Parameters
----------
connection : Connection
A DataJoint connection object.
database : str
The schema/database name.
table_name : str
The table name.
Returns
-------
dict[str, str]
Dict mapping attribute names to lineage strings.
"""
if not lineage_table_exists(connection, database):
return {}
adapter = connection.adapter
lineage_table = f"{adapter.quote_identifier(database)}.{adapter.quote_identifier('~lineage')}"
results = connection.query(
f"""
SELECT attribute_name, lineage FROM {lineage_table}
WHERE table_name = %s
""",
args=(table_name,),
).fetchall()
return {row[0]: row[1] for row in results}
def get_schema_lineages(connection, database):
"""
Get all lineages for a schema from the ~lineage table.
Parameters
----------
connection : Connection
A DataJoint connection object.
database : str
The schema/database name.
Returns
-------
dict[str, str]
Dict mapping 'schema.table.attribute' to its lineage.
"""
if not lineage_table_exists(connection, database):
return {}
adapter = connection.adapter
lineage_table = f"{adapter.quote_identifier(database)}.{adapter.quote_identifier('~lineage')}"
results = connection.query(
f"""
SELECT table_name, attribute_name, lineage FROM {lineage_table}
""",
).fetchall()
return {f"{database}.{table}.{attr}": lineage for table, attr, lineage in results}
def insert_lineages(connection, database, entries):
"""
Insert multiple lineage entries in the ~lineage table as a single transaction.
Parameters
----------
connection : Connection
A DataJoint connection object.
database : str
The schema/database name.
entries : list[tuple[str, str, str]]
List of (table_name, attribute_name, lineage) tuples.
"""
if not entries:
return
ensure_lineage_table(connection, database)
adapter = connection.adapter
lineage_table = f"{adapter.quote_identifier(database)}.{adapter.quote_identifier('~lineage')}"
# Build backend-agnostic upsert statement
columns = ["table_name", "attribute_name", "lineage"]
primary_key = ["table_name", "attribute_name"]
sql = adapter.upsert_on_duplicate_sql(
lineage_table,
columns,
primary_key,
len(entries),
)
# Flatten the entries into a single args tuple
args = tuple(val for entry in entries for val in entry)
connection.query(sql, args=args)
def delete_table_lineages(connection, database, table_name):
"""
Delete all lineage entries for a table.
Parameters
----------
connection : Connection
A DataJoint connection object.
database : str
The schema/database name.
table_name : str
The table name.
"""
if not lineage_table_exists(connection, database):
return
adapter = connection.adapter
lineage_table = f"{adapter.quote_identifier(database)}.{adapter.quote_identifier('~lineage')}"
connection.query(
f"""
DELETE FROM {lineage_table}
WHERE table_name = %s
""",
args=(table_name,),
)
def rebuild_schema_lineage(connection, database):
"""
Rebuild the ~lineage table for all tables in a schema.
This utility recomputes lineage for all attributes in all tables
by querying FK relationships from the information_schema. Use this
to restore lineage after corruption or for schemas that predate
the lineage system.
This function assumes that any upstream schemas (referenced via
cross-schema foreign keys) have already had their lineage rebuilt.
If a referenced attribute in another schema has no lineage entry,
a DataJointError is raised.
Parameters
----------
connection : Connection
A DataJoint connection object.
database : str
The schema/database name.
Raises
------
DataJointError
If a referenced attribute in another schema has no lineage entry.
"""
# Ensure the lineage table exists
ensure_lineage_table(connection, database)
adapter = connection.adapter
lineage_table = f"{adapter.quote_identifier(database)}.{adapter.quote_identifier('~lineage')}"
# Clear all existing lineage entries for this schema
connection.query(f"DELETE FROM {lineage_table}")
# Get all tables in the schema (excluding hidden tables)
tables_result = connection.query(
"""
SELECT TABLE_NAME FROM information_schema.tables
WHERE TABLE_SCHEMA = %s AND TABLE_NAME NOT LIKE '~%%'
""",
args=(database,),
).fetchall()
all_tables = {row[0] for row in tables_result}
if not all_tables:
return
# Get all primary key columns for all tables
pk_result = connection.query(
"""
SELECT TABLE_NAME, COLUMN_NAME FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = %s AND CONSTRAINT_NAME = 'PRIMARY'
""",
args=(database,),
).fetchall()
# table -> set of PK columns
pk_columns = {}
for table, col in pk_result:
pk_columns.setdefault(table, set()).add(col)
# Get all FK relationships within and across schemas
fk_result = connection.query(
"""
SELECT TABLE_NAME, COLUMN_NAME,
REFERENCED_TABLE_SCHEMA, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = %s AND REFERENCED_TABLE_NAME IS NOT NULL
""",
args=(database,),
).fetchall()
# Build FK map: (table, column) -> (parent_schema, parent_table, parent_column)
fk_map = {(table, col): (ref_schema, ref_table, ref_col) for table, col, ref_schema, ref_table, ref_col in fk_result}
# Lineage cache: (table, column) -> lineage string (for this schema)
lineage_cache = {}
def resolve_lineage(table, col):
"""Recursively resolve lineage for an attribute."""
if (table, col) in lineage_cache:
return lineage_cache[(table, col)]
if (table, col) in fk_map:
# FK attribute - get parent's lineage
parent_schema, parent_table, parent_col = fk_map[(table, col)]
if parent_schema == database:
# Same schema - recurse
lineage = resolve_lineage(parent_table, parent_col)
else:
# Cross-schema - query parent's lineage table
lineage = get_lineage(connection, parent_schema, parent_table, parent_col)
if not lineage:
raise DataJointError(
f"Cannot rebuild lineage for `{database}`.`{table}`: "
f"referenced attribute `{parent_schema}`.`{parent_table}`.`{parent_col}` "
f"has no lineage. Rebuild lineage for schema `{parent_schema}` first."
)
else:
# Native PK attribute - lineage is self
lineage = f"{database}.{table}.{col}"
lineage_cache[(table, col)] = lineage
return lineage
# Resolve lineage for all PK and FK attributes
for table in all_tables:
table_pk = pk_columns.get(table, set())
table_fk_cols = {col for (t, col) in fk_map if t == table}
# Process all attributes that need lineage (PK and FK)
for col in table_pk | table_fk_cols:
if not col.startswith("_"):
resolve_lineage(table, col)
# Insert all lineages in one batch
if lineage_cache:
entries = [(table, col, lineage) for (table, col), lineage in lineage_cache.items()]
insert_lineages(connection, database, entries)