Skip to content

Commit 1d1fec4

Browse files
OL facets - PR2 - read facets from views pointing to lineage_events table
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 7f546ed commit 1d1fec4

17 files changed

Lines changed: 264 additions & 99 deletions

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ jobs:
166166
working_directory: ~/marquez
167167
machine:
168168
image: ubuntu-2004:current
169+
resource_class: large
169170
steps:
170171
- checkout
171172
- run: ./.circleci/get-docker-compose.sh

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -76,22 +76,22 @@ WITH selected_datasets AS (
7676
FROM datasets_view d
7777
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
7878
), dataset_runs AS (
79-
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
79+
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
8080
FROM selected_datasets d
81-
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
81+
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
8282
LEFT JOIN LATERAL (
83-
SELECT run_uuid, event_time, event FROM lineage_events
84-
WHERE run_uuid = dv.run_uuid
85-
) e ON e.run_uuid = dv.run_uuid
83+
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
84+
WHERE dataset_uuid = dv.dataset_uuid
85+
) df ON df.run_uuid = dv.run_uuid
8686
UNION
87-
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
87+
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
8888
FROM selected_datasets d
8989
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
9090
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
9191
LEFT JOIN LATERAL (
92-
SELECT run_uuid, event_time, event FROM lineage_events
93-
WHERE run_uuid = rim.run_uuid
94-
) e ON e.run_uuid = rim.run_uuid
92+
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
93+
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
94+
) df ON df.run_uuid = rim.run_uuid
9595
)
9696
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
9797
FROM selected_datasets d
@@ -104,13 +104,9 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
104104
GROUP BY m.dataset_uuid
105105
) t ON t.dataset_uuid = d.uuid
106106
LEFT JOIN (
107-
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
108-
FROM dataset_runs d2,
109-
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
110-
WHERE d2.run_uuid = d2.run_uuid
111-
AND ds -> 'facets' IS NOT NULL
112-
AND ds ->> 'name' = d2.name
113-
AND ds ->> 'namespace' = d2.namespace_name
107+
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
108+
FROM dataset_runs AS d2
109+
WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL
114110
GROUP BY d2.uuid
115111
) f ON f.dataset_uuid = d.uuid""")
116112
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
@@ -148,22 +144,22 @@ WITH selected_datasets AS (
148144
ORDER BY d.name
149145
LIMIT :limit OFFSET :offset
150146
), dataset_runs AS (
151-
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
147+
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
152148
FROM selected_datasets d
153149
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
154150
LEFT JOIN LATERAL (
155-
SELECT run_uuid, event_time, event FROM lineage_events
156-
WHERE run_uuid = dv.run_uuid
157-
) e ON e.run_uuid = dv.run_uuid
151+
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
152+
WHERE dataset_uuid = dv.dataset_uuid
153+
) df ON df.run_uuid = dv.run_uuid
158154
UNION
159-
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
155+
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
160156
FROM selected_datasets d
161157
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
162158
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
163159
LEFT JOIN LATERAL (
164-
SELECT run_uuid, event_time, event FROM lineage_events
165-
WHERE run_uuid = rim.run_uuid
166-
) e ON e.run_uuid = rim.run_uuid
160+
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
161+
WHERE dataset_uuid = dv.dataset_uuid
162+
) df ON df.run_uuid = rim.run_uuid
167163
)
168164
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
169165
FROM selected_datasets d
@@ -176,13 +172,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
176172
GROUP BY m.dataset_uuid
177173
) t ON t.dataset_uuid = d.uuid
178174
LEFT JOIN (
179-
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets
180-
FROM dataset_runs d2,
181-
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
175+
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
176+
FROM dataset_runs AS d2
182177
WHERE d2.run_uuid = d2.run_uuid
183-
AND ds -> 'facets' IS NOT NULL
184-
AND ds ->> 'name' = d2.name
185-
AND ds ->> 'namespace' = d2.namespace_name
178+
AND d2.facet IS NOT NULL
186179
GROUP BY d2.uuid
187180
) f ON f.dataset_uuid = d.uuid
188181
ORDER BY d.name""")

api/src/main/java/marquez/db/DatasetFacetsDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 contributors to the Marquez project
2+
* Copyright 2018-2023 contributors to the Marquez project
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

api/src/main/java/marquez/db/DatasetVersionDao.java

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ WITH selected_dataset_versions AS (
168168
FROM selected_dataset_versions dv
169169
LEFT JOIN runs_input_mapping rim
170170
ON rim.dataset_version_uuid = dv.uuid
171-
), selected_dataset_version_events AS (
172-
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event
171+
), selected_dataset_version_facets AS (
172+
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet
173173
FROM selected_dataset_version_runs dv
174-
LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid
174+
LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid
175175
)
176176
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
177177
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
@@ -186,14 +186,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
186186
GROUP BY m.dataset_uuid
187187
) t ON t.dataset_uuid = dv.dataset_uuid
188188
LEFT JOIN (
189-
SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
190-
FROM selected_dataset_version_events dve,
191-
jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds
192-
WHERE dve.run_uuid = dve.run_uuid
193-
AND ds -> 'facets' IS NOT NULL
194-
AND ds ->> 'name' = dve.dataset_name
195-
AND ds ->> 'namespace' = dve.namespace_name
196-
GROUP BY dve.uuid
189+
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
190+
FROM selected_dataset_version_facets dvf
191+
WHERE dvf.run_uuid = dvf.run_uuid
192+
GROUP BY dvf.uuid
197193
) f ON f.dataset_uuid = dv.uuid""")
198194
Optional<DatasetVersion> findBy(UUID version);
199195

@@ -211,10 +207,10 @@ WITH selected_dataset_versions AS (
211207
FROM selected_dataset_versions dv
212208
LEFT JOIN runs_input_mapping rim
213209
ON rim.dataset_version_uuid = dv.uuid
214-
), selected_dataset_version_events AS (
215-
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event
210+
), selected_dataset_version_facets AS (
211+
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet
216212
FROM selected_dataset_version_runs dv
217-
LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid
213+
LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid
218214
)
219215
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
220216
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
@@ -229,14 +225,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
229225
GROUP BY m.dataset_uuid
230226
) t ON t.dataset_uuid = dv.dataset_uuid
231227
LEFT JOIN (
232-
SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
233-
FROM selected_dataset_version_events dve,
234-
jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds
235-
WHERE dve.run_uuid = dve.run_uuid
236-
AND ds -> 'facets' IS NOT NULL
237-
AND ds ->> 'name' = dve.dataset_name
238-
AND ds ->> 'namespace' = dve.namespace_name
239-
GROUP BY dve.uuid
228+
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
229+
FROM selected_dataset_version_facets dvf
230+
WHERE dvf.run_uuid = dvf.run_uuid
231+
GROUP BY dvf.uuid
240232
) f ON f.dataset_uuid = dv.uuid""")
241233
Optional<DatasetVersion> findByUuid(UUID uuid);
242234

@@ -283,10 +275,10 @@ WITH selected_dataset_versions AS (
283275
FROM selected_dataset_versions dv
284276
LEFT JOIN runs_input_mapping rim
285277
ON rim.dataset_version_uuid = dv.uuid
286-
), selected_dataset_version_events AS (
287-
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event
278+
), selected_dataset_version_facets AS (
279+
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet
288280
FROM selected_dataset_version_runs dv
289-
LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid
281+
LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid
290282
)
291283
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,
292284
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
@@ -301,14 +293,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
301293
GROUP BY m.dataset_uuid
302294
) t ON t.dataset_uuid = dv.dataset_uuid
303295
LEFT JOIN (
304-
SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
305-
FROM selected_dataset_version_events dve,
306-
jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds
307-
WHERE dve.run_uuid = dve.run_uuid
308-
AND ds -> 'facets' IS NOT NULL
309-
AND ds ->> 'name' = dve.dataset_name
310-
AND ds ->> 'namespace' = dve.namespace_name
311-
GROUP BY dve.uuid
296+
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
297+
FROM selected_dataset_version_facets dvf
298+
WHERE dvf.run_uuid = dvf.run_uuid
299+
GROUP BY dvf.uuid
312300
) f ON f.dataset_uuid = dv.uuid
313301
ORDER BY dv.created_at DESC""")
314302
List<DatasetVersion> findAll(String namespaceName, String datasetName, int limit, int offset);

api/src/main/java/marquez/db/FacetUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 contributors to the Marquez project
2+
* Copyright 2018-2023 contributors to the Marquez project
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

api/src/main/java/marquez/db/JobDao.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ SELECT EXISTS (
6363
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
6464
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
6565
LEFT OUTER JOIN (
66-
SELECT run_uuid, JSON_AGG(e.facets) AS facets
66+
SELECT run_uuid, JSON_AGG(e.facet) AS facets
6767
FROM (
68-
SELECT run_uuid, event->'job'->'facets' AS facets
69-
FROM lineage_events AS le
70-
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid
68+
SELECT jf.run_uuid, jf.facet
69+
FROM job_facets_view AS jf
70+
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid
7171
INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
7272
WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName
73-
ORDER BY event_time ASC
73+
ORDER BY lineage_event_time ASC
7474
) e
7575
GROUP BY e.run_uuid
7676
) f ON f.run_uuid=jv.latest_run_uuid
@@ -135,14 +135,14 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
135135
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
136136
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
137137
LEFT OUTER JOIN (
138-
SELECT run_uuid, JSON_AGG(e.facets) AS facets
138+
SELECT run_uuid, JSON_AGG(e.facet) AS facets
139139
FROM (
140-
SELECT run_uuid, event->'job'->'facets' AS facets
141-
FROM lineage_events AS le
142-
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid
140+
SELECT jf.run_uuid, jf.facet
141+
FROM job_facets_view AS jf
142+
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid
143143
INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
144144
WHERE j2.namespace_name=:namespaceName
145-
ORDER BY event_time ASC
145+
ORDER BY lineage_event_time ASC
146146
) e
147147
GROUP BY e.run_uuid
148148
) f ON f.run_uuid=jv.latest_run_uuid

api/src/main/java/marquez/db/JobFacetsDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 contributors to the Marquez project
2+
* Copyright 2018-2023 contributors to the Marquez project
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

api/src/main/java/marquez/db/JobVersionDao.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,10 @@ WITH job_version_io AS (
102102
LEFT JOIN job_version_io dsio ON dsio.job_version_uuid = jv.uuid
103103
LEFT OUTER JOIN runs r ON r.uuid = jv.latest_run_uuid
104104
LEFT JOIN LATERAL (
105-
SELECT le.run_uuid, JSON_AGG(event -> 'run' -> 'facets') AS facets
106-
FROM lineage_events le
107-
WHERE le.run_uuid=jv.latest_run_uuid
108-
GROUP BY le.run_uuid
105+
SELECT jf.run_uuid, JSON_AGG(jf.facet ORDER BY jf.lineage_event_time ASC) AS facets
106+
FROM job_facets_view AS jf
107+
WHERE jf.run_uuid=jv.latest_run_uuid AND jf.job_uuid = jv.job_uuid
108+
GROUP BY jf.run_uuid
109109
) AS f ON r.uuid = f.run_uuid
110110
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
111111
LEFT JOIN LATERAL (

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,6 @@ WHERE ds.uuid IN (<dsUuids>)""")
114114
+ "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
115115
+ "LEFT JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
116116
+ "LEFT JOIN LATERAL (\n"
117-
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
118-
+ " FROM lineage_events le\n"
119-
+ " WHERE le.run_uuid=r.uuid\n"
120-
+ " GROUP BY le.run_uuid\n"
121-
+ ") AS f ON r.uuid=f.run_uuid\n"
122-
+ "LEFT JOIN LATERAL (\n"
123117
+ " SELECT im.run_uuid,\n"
124118
+ " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
125119
+ " 'name', dv.dataset_name,\n"
@@ -130,6 +124,12 @@ WHERE ds.uuid IN (<dsUuids>)""")
130124
+ " GROUP BY im.run_uuid\n"
131125
+ ") ri ON ri.run_uuid=r.uuid\n"
132126
+ "LEFT JOIN LATERAL (\n"
127+
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
128+
+ " FROM run_facets_view AS rf\n"
129+
+ " WHERE rf.run_uuid=r.uuid\n"
130+
+ " GROUP BY rf.run_uuid\n"
131+
+ ") AS f ON r.uuid=f.run_uuid\n"
132+
+ "LEFT JOIN LATERAL (\n"
133133
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
134134
+ " 'name', dataset_name,\n"
135135
+ " 'version', version)) AS output_versions\n"

api/src/main/java/marquez/db/RunDao.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public interface RunDao extends BaseDao {
8080
+ "FROM runs_view AS r\n"
8181
+ "LEFT OUTER JOIN\n"
8282
+ "(\n"
83-
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
84-
+ " FROM lineage_events le\n"
85-
+ " GROUP BY le.run_uuid\n"
83+
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
84+
+ " FROM run_facets_view rf\n"
85+
+ " GROUP BY rf.run_uuid\n"
8686
+ ") AS f ON r.uuid=f.run_uuid\n"
8787
+ "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
8888
+ "LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
@@ -129,10 +129,10 @@ public interface RunDao extends BaseDao {
129129
INNER JOIN jobs_view j ON r.job_uuid=j.uuid
130130
LEFT JOIN LATERAL
131131
(
132-
SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets
133-
FROM lineage_events le
134-
WHERE le.run_uuid=r.uuid
135-
GROUP BY le.run_uuid
132+
SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets
133+
FROM run_facets_view rf
134+
WHERE rf.run_uuid=r.uuid
135+
GROUP BY rf.run_uuid
136136
) AS f ON r.uuid=f.run_uuid
137137
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
138138
LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid

0 commit comments

Comments
 (0)