Skip to content

Commit d1027fa

Browse files
improve dataset facets access (#2407)
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
1 parent c6b5ed0 commit d1027fa

3 files changed

Lines changed: 53 additions & 110 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
*Makes the button visible only if the link is not empty.*
2626

2727

28+
* Improve dataset facets access [`#2407`](https://github.com/MarquezProject/marquez/pull/2407) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
29+
* Improves database query performance for accessing datasets and datasets' versions.*
30+
2831
## [0.30.0](https://github.com/MarquezProject/marquez/compare/0.29.0...0.30.0) - 2023-01-31
2932

3033
### Added

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 22 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -71,30 +71,8 @@ void updateLastModifiedAt(
7171

7272
@SqlQuery(
7373
"""
74-
WITH selected_datasets AS (
75-
SELECT d.*
76-
FROM datasets_view d
77-
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
78-
), dataset_runs AS (
79-
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
80-
FROM selected_datasets d
81-
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
82-
LEFT JOIN LATERAL (
83-
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
84-
WHERE dataset_uuid = dv.dataset_uuid
85-
) df ON df.run_uuid = dv.run_uuid
86-
UNION
87-
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
88-
FROM selected_datasets d
89-
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
90-
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
91-
LEFT JOIN LATERAL (
92-
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
93-
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
94-
) df ON df.run_uuid = rim.run_uuid
95-
)
9674
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
97-
FROM selected_datasets d
75+
FROM datasets_view d
9876
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
9977
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
10078
LEFT JOIN (
@@ -104,11 +82,15 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
10482
GROUP BY m.dataset_uuid
10583
) t ON t.dataset_uuid = d.uuid
10684
LEFT JOIN (
107-
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
108-
FROM dataset_runs AS d2
109-
WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL
110-
GROUP BY d2.uuid
111-
) f ON f.dataset_uuid = d.uuid""")
85+
SELECT
86+
df.dataset_version_uuid,
87+
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
88+
FROM dataset_facets_view AS df
89+
WHERE df.facet IS NOT NULL
90+
GROUP BY df.dataset_version_uuid
91+
) f ON f.dataset_version_uuid = d.current_version_uuid
92+
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
93+
""")
11294
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
11395

11496
default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
@@ -137,32 +119,8 @@ default void setFields(Dataset ds) {
137119

138120
@SqlQuery(
139121
"""
140-
WITH selected_datasets AS (
141-
SELECT d.*
142-
FROM datasets_view d
143-
WHERE d.namespace_name = :namespaceName
144-
ORDER BY d.name
145-
LIMIT :limit OFFSET :offset
146-
), dataset_runs AS (
147-
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
148-
FROM selected_datasets d
149-
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
150-
LEFT JOIN LATERAL (
151-
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
152-
WHERE dataset_uuid = dv.dataset_uuid
153-
) df ON df.run_uuid = dv.run_uuid
154-
UNION
155-
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
156-
FROM selected_datasets d
157-
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
158-
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
159-
LEFT JOIN LATERAL (
160-
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
161-
WHERE dataset_uuid = dv.dataset_uuid
162-
) df ON df.run_uuid = rim.run_uuid
163-
)
164122
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
165-
FROM selected_datasets d
123+
FROM datasets_view d
166124
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
167125
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
168126
LEFT JOIN (
@@ -172,13 +130,17 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
172130
GROUP BY m.dataset_uuid
173131
) t ON t.dataset_uuid = d.uuid
174132
LEFT JOIN (
175-
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
176-
FROM dataset_runs AS d2
177-
WHERE d2.run_uuid = d2.run_uuid
178-
AND d2.facet IS NOT NULL
179-
GROUP BY d2.uuid
180-
) f ON f.dataset_uuid = d.uuid
181-
ORDER BY d.name""")
133+
SELECT
134+
df.dataset_version_uuid,
135+
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
136+
FROM dataset_facets_view AS df
137+
WHERE df.facet IS NOT NULL
138+
GROUP BY df.dataset_version_uuid
139+
) f ON f.dataset_version_uuid = d.current_version_uuid
140+
WHERE d.namespace_name = :namespaceName
141+
ORDER BY d.name
142+
LIMIT :limit OFFSET :offset
143+
""")
182144
List<Dataset> findAll(String namespaceName, int limit, int offset);
183145

184146
@SqlQuery("SELECT count(*) FROM datasets_view")

api/src/main/java/marquez/db/DatasetVersionDao.java

Lines changed: 28 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -156,19 +156,10 @@ default void updateDatasetVersionMetric(
156156

157157
@SqlQuery(
158158
"""
159-
WITH selected_dataset_versions AS (
160-
SELECT dv.*
161-
FROM dataset_versions dv
162-
WHERE dv.version = :version
163-
), selected_dataset_version_facets AS (
164-
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
165-
FROM selected_dataset_versions dv
166-
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
167-
)
168159
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
169160
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
170161
t.tags, f.facets
171-
FROM selected_dataset_versions dv
162+
FROM dataset_versions dv
172163
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
173164
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
174165
LEFT JOIN (
@@ -178,28 +169,21 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
178169
GROUP BY m.dataset_uuid
179170
) t ON t.dataset_uuid = dv.dataset_uuid
180171
LEFT JOIN (
181-
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
182-
FROM selected_dataset_version_facets dvf
183-
WHERE dvf.run_uuid = dvf.run_uuid
184-
GROUP BY dvf.uuid
185-
) f ON f.dataset_uuid = dv.uuid""")
172+
SELECT dvf.dataset_version_uuid,
173+
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
174+
FROM dataset_facets_view dvf
175+
GROUP BY dataset_version_uuid
176+
) f ON f.dataset_version_uuid = dv.uuid
177+
WHERE dv.version = :version
178+
""")
186179
Optional<DatasetVersion> findBy(UUID version);
187180

188181
@SqlQuery(
189182
"""
190-
WITH selected_dataset_versions AS (
191-
SELECT dv.*
192-
FROM dataset_versions dv
193-
WHERE dv.uuid = :uuid
194-
), selected_dataset_version_facets AS (
195-
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
196-
FROM selected_dataset_versions dv
197-
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
198-
)
199183
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
200184
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
201185
t.tags, f.facets
202-
FROM selected_dataset_versions dv
186+
FROM dataset_versions dv
203187
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
204188
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
205189
LEFT JOIN (
@@ -208,12 +192,14 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
208192
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
209193
GROUP BY m.dataset_uuid
210194
) t ON t.dataset_uuid = dv.dataset_uuid
211-
LEFT JOIN (
212-
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
213-
FROM selected_dataset_version_facets dvf
214-
WHERE dvf.run_uuid = dvf.run_uuid
215-
GROUP BY dvf.uuid
216-
) f ON f.dataset_uuid = dv.uuid""")
195+
LEFT JOIN (
196+
SELECT dvf.dataset_version_uuid,
197+
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
198+
FROM dataset_facets_view dvf
199+
GROUP BY dataset_version_uuid
200+
) f ON f.dataset_version_uuid = dv.uuid
201+
WHERE dv.uuid = :uuid
202+
""")
217203
Optional<DatasetVersion> findByUuid(UUID uuid);
218204

219205
default Optional<DatasetVersion> findByWithRun(UUID version) {
@@ -244,22 +230,10 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {
244230

245231
@SqlQuery(
246232
"""
247-
WITH selected_dataset_versions AS (
248-
SELECT dv.*
249-
FROM dataset_versions dv
250-
WHERE dv.namespace_name = :namespaceName
251-
AND dv.dataset_name = :datasetName
252-
ORDER BY dv.created_at DESC
253-
LIMIT :limit OFFSET :offset
254-
), selected_dataset_version_facets AS (
255-
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
256-
FROM selected_dataset_versions dv
257-
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
258-
)
259233
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,
260234
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
261235
t.tags, f.facets
262-
FROM selected_dataset_versions dv
236+
FROM dataset_versions dv
263237
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
264238
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
265239
LEFT JOIN (
@@ -269,12 +243,16 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
269243
GROUP BY m.dataset_uuid
270244
) t ON t.dataset_uuid = dv.dataset_uuid
271245
LEFT JOIN (
272-
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
273-
FROM selected_dataset_version_facets dvf
274-
WHERE dvf.run_uuid = dvf.run_uuid
275-
GROUP BY dvf.uuid
276-
) f ON f.dataset_uuid = dv.uuid
277-
ORDER BY dv.created_at DESC""")
246+
SELECT dvf.dataset_version_uuid,
247+
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
248+
FROM dataset_facets_view dvf
249+
GROUP BY dataset_version_uuid
250+
) f ON f.dataset_version_uuid = dv.uuid
251+
WHERE dv.namespace_name = :namespaceName
252+
AND dv.dataset_name = :datasetName
253+
ORDER BY dv.created_at DESC
254+
LIMIT :limit OFFSET :offset
255+
""")
278256
List<DatasetVersion> findAll(String namespaceName, String datasetName, int limit, int offset);
279257

280258
default List<DatasetVersion> findAllWithRun(

0 commit comments

Comments
 (0)