Skip to content

Commit 2c21ab0

Browse files
authored
Fix recursive views perf (#2043)
* Changed RunDao to use simple RunRow rather than ExtendedRunRow where possible Signed-off-by: Michael Collado <collado.mike@gmail.com> * Remove need to query JobRow on run completion Signed-off-by: Michael Collado <collado.mike@gmail.com> * Refactor jobs_view to use job_fqn table Signed-off-by: Michael Collado <collado.mike@gmail.com> * Update changelog Signed-off-by: Michael Collado <collado.mike@gmail.com> * Move jobs_view and runs_view to repeatable migrations so that future changes can be easily compared in version control Signed-off-by: Michael Collado <collado.mike@gmail.com> * Address comments for column names and migration files Signed-off-by: Michael Collado <collado.mike@gmail.com>
1 parent ee44ae0 commit 2c21ab0

10 files changed

Lines changed: 318 additions & 147 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.23.0...HEAD)
44

5+
### Changed
6+
* Updated `jobs_view` to stop computing FQN on reads and to compute on _writes_ instead [@collado-mike](https://github.com/collado-mike)
7+
58
## [0.23.0](https://github.com/MarquezProject/marquez/compare/0.22.0...0.23.0) - 2022-06-16
69

710
### Added

api/src/main/java/marquez/db/JobDao.java

Lines changed: 30 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
@RegisterRowMapper(JobRowMapper.class)
3838
@RegisterRowMapper(JobMapper.class)
3939
public interface JobDao extends BaseDao {
40+
4041
@SqlQuery(
4142
"SELECT EXISTS (SELECT 1 FROM jobs_view AS j "
4243
+ "WHERE j.namespace_name= :namespaceName AND "
@@ -52,18 +53,8 @@ public interface JobDao extends BaseDao {
5253

5354
@SqlQuery(
5455
"""
55-
WITH RECURSIVE job_ids AS (
56-
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
57-
FROM jobs_view j
58-
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
59-
UNION
60-
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
61-
FROM jobs_view j
62-
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
63-
)
6456
SELECT j.*, jc.context, f.facets
6557
FROM jobs_view j
66-
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
6758
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
6859
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
6960
LEFT OUTER JOIN (
@@ -78,6 +69,8 @@ SELECT run_uuid, JSON_AGG(e.facets) AS facets
7869
) e
7970
GROUP BY e.run_uuid
8071
) f ON f.run_uuid=jv.latest_run_uuid
72+
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
73+
AND j.symlink_target_uuid IS NULL
8174
""")
8275
Optional<Job> findJobByName(String namespaceName, String jobName);
8376

@@ -93,38 +86,22 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
9386

9487
@SqlQuery(
9588
"""
96-
WITH RECURSIVE job_ids AS (
97-
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
98-
FROM jobs_view j
99-
WHERE j.uuid=:jobUuid
100-
UNION
101-
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
102-
FROM jobs_view j
103-
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
104-
)
105-
SELECT j.*, n.name AS namespace_name
106-
FROM jobs_view AS j
107-
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
108-
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
109-
""")
89+
SELECT j.*, n.name AS namespace_name
90+
FROM jobs_view AS j
91+
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
92+
WHERE j.uuid=:jobUuid
93+
""")
11094
Optional<JobRow> findJobByUuidAsRow(UUID jobUuid);
11195

11296
@SqlQuery(
11397
"""
114-
WITH RECURSIVE job_ids AS (
115-
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
116-
FROM jobs_view j
117-
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
118-
UNION
119-
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
120-
FROM jobs_view j
121-
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
122-
)
123-
SELECT j.*, n.name AS namespace_name
124-
FROM jobs_view AS j
125-
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
126-
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
127-
""")
98+
SELECT j.*, n.name AS namespace_name
99+
FROM jobs_view AS j
100+
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
101+
WHERE j.namespace_name=:namespaceName AND
102+
(j.name=:jobName OR :jobName = ANY(j.aliases))
103+
AND j.symlink_target_uuid IS NULL
104+
""")
128105
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);
129106

130107
@SqlQuery(
@@ -150,11 +127,11 @@ WITH RECURSIVE job_ids AS (
150127
+ "LIMIT :limit OFFSET :offset")
151128
List<Job> findAll(String namespaceName, int limit, int offset);
152129

153-
@SqlQuery("SELECT count(*) FROM jobs AS j WHERE symlink_target_uuid IS NULL")
130+
@SqlQuery("SELECT count(*) FROM jobs_view AS j WHERE symlink_target_uuid IS NULL")
154131
int count();
155132

156133
@SqlQuery(
157-
"SELECT count(*) FROM jobs AS j WHERE j.namespace_name = :namespaceName\n"
134+
"SELECT count(*) FROM jobs_view AS j WHERE j.namespace_name = :namespaceName\n"
158135
+ "AND symlink_target_uuid IS NULL")
159136
int countFor(String namespaceName);
160137

@@ -213,7 +190,6 @@ default JobRow upsertJobMeta(
213190
createdAt,
214191
Utils.toJson(jobMeta.getContext()),
215192
Utils.checksumFor(jobMeta.getContext()));
216-
217193
return upsertJob(
218194
UUID.randomUUID(),
219195
jobMeta.getType(),
@@ -248,7 +224,7 @@ default PGobject toJson(Set<DatasetId> dataset, ObjectMapper mapper) {
248224

249225
@SqlQuery(
250226
"""
251-
INSERT INTO jobs AS j (
227+
INSERT INTO jobs_view AS j (
252228
uuid,
253229
type,
254230
created_at,
@@ -260,7 +236,8 @@ INSERT INTO jobs AS j (
260236
current_job_context_uuid,
261237
current_location,
262238
current_inputs,
263-
symlink_target_uuid
239+
symlink_target_uuid,
240+
parent_job_uuid_string
264241
) VALUES (
265242
:uuid,
266243
:type,
@@ -273,20 +250,11 @@ INSERT INTO jobs AS j (
273250
:jobContextUuid,
274251
:location,
275252
:inputs,
276-
:symlinkTargetId
277-
) ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL DO
278-
UPDATE SET
279-
updated_at = EXCLUDED.updated_at,
280-
type = EXCLUDED.type,
281-
description = EXCLUDED.description,
282-
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
283-
current_location = EXCLUDED.current_location,
284-
current_inputs = EXCLUDED.current_inputs,
285-
-- update the symlink target if not null. otherwise, keep the old value
286-
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
287-
RETURNING uuid
253+
:symlinkTargetId,
254+
''
255+
) RETURNING *
288256
""")
289-
UUID upsertJobNoParent(
257+
JobRow upsertJob(
290258
UUID uuid,
291259
JobType type,
292260
Instant now,
@@ -299,39 +267,12 @@ UUID upsertJobNoParent(
299267
UUID symlinkTargetId,
300268
PGobject inputs);
301269

302-
default JobRow upsertJob(
303-
UUID uuid,
304-
JobType type,
305-
Instant now,
306-
UUID namespaceUuid,
307-
String namespaceName,
308-
String name,
309-
String description,
310-
UUID jobContextUuid,
311-
String location,
312-
UUID symlinkTargetId,
313-
PGobject inputs) {
314-
UUID jobUuid =
315-
upsertJobNoParent(
316-
uuid,
317-
type,
318-
now,
319-
namespaceUuid,
320-
namespaceName,
321-
name,
322-
description,
323-
jobContextUuid,
324-
location,
325-
symlinkTargetId,
326-
inputs);
327-
return findJobByUuidAsRow(jobUuid).get();
328-
}
329-
330270
@SqlQuery(
331271
"""
332-
INSERT INTO jobs AS j (
272+
INSERT INTO jobs_view AS j (
333273
uuid,
334274
parent_job_uuid,
275+
parent_job_uuid_string,
335276
type,
336277
created_at,
337278
updated_at,
@@ -346,6 +287,7 @@ INSERT INTO jobs AS j (
346287
) VALUES (
347288
:uuid,
348289
:parentJobUuid,
290+
COALESCE(:parentJobUuid::text, ''),
349291
:type,
350292
:now,
351293
:now,
@@ -357,19 +299,10 @@ INSERT INTO jobs AS j (
357299
:location,
358300
:inputs,
359301
:symlinkTargetId
360-
) ON CONFLICT (name, namespace_uuid, parent_job_uuid) DO
361-
UPDATE SET
362-
updated_at = EXCLUDED.updated_at,
363-
type = EXCLUDED.type,
364-
description = EXCLUDED.description,
365-
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
366-
current_location = EXCLUDED.current_location,
367-
current_inputs = EXCLUDED.current_inputs,
368-
-- update the symlink target if not null. otherwise, keep the old value
369-
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
370-
RETURNING uuid
302+
)
303+
RETURNING *
371304
""")
372-
UUID upsertJobWithParent(
305+
JobRow upsertJob(
373306
UUID uuid,
374307
UUID parentJobUuid,
375308
JobType type,
@@ -382,34 +315,4 @@ UUID upsertJobWithParent(
382315
String location,
383316
UUID symlinkTargetId,
384317
PGobject inputs);
385-
386-
default JobRow upsertJob(
387-
UUID uuid,
388-
UUID parentJobUuid,
389-
JobType type,
390-
Instant now,
391-
UUID namespaceUuid,
392-
String namespaceName,
393-
String name,
394-
String description,
395-
UUID jobContextUuid,
396-
String location,
397-
UUID symlinkTargetId,
398-
PGobject inputs) {
399-
UUID jobUuid =
400-
upsertJobWithParent(
401-
uuid,
402-
parentJobUuid,
403-
type,
404-
now,
405-
namespaceUuid,
406-
namespaceName,
407-
name,
408-
description,
409-
jobContextUuid,
410-
location,
411-
symlinkTargetId,
412-
inputs);
413-
return findJobByUuidAsRow(jobUuid).get();
414-
}
415318
}

api/src/main/java/marquez/db/RunDao.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -122,26 +122,16 @@ public interface RunDao extends BaseDao {
122122

123123
@SqlQuery(
124124
"""
125-
WITH RECURSIVE job_names AS (
126-
SELECT uuid, namespace_name, name, symlink_target_uuid
127-
FROM jobs_view j
128-
WHERE j.namespace_name=:namespace AND j.name=:jobName
129-
UNION
130-
SELECT j.uuid, j.namespace_name, j.name, j.symlink_target_uuid
131-
FROM jobs_view j
132-
INNER JOIN job_names jn ON j.uuid=jn.symlink_target_uuid OR j.symlink_target_uuid=jn.uuid
133-
)
134125
SELECT r.*, ra.args, ctx.context, f.facets,
135-
jv.namespace_name, jv.job_name, jv.version AS job_version,
126+
j.namespace_name, j.name, jv.version AS job_version,
136127
ri.input_versions, ro.output_versions
137128
FROM runs_view AS r
138-
INNER JOIN job_names j ON r.namespace_name=j.namespace_name AND r.job_name=j.name
139-
LEFT OUTER JOIN
129+
INNER JOIN jobs_view j ON r.job_uuid=j.uuid
130+
LEFT JOIN LATERAL
140131
(
141132
SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets
142133
FROM lineage_events le
143-
INNER JOIN runs_view r2 ON r2.uuid=le.run_uuid
144-
WHERE r2.job_name=:jobName AND r2.namespace_name=:namespace
134+
WHERE le.run_uuid=r.uuid
145135
GROUP BY le.run_uuid
146136
) AS f ON r.uuid=f.run_uuid
147137
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
@@ -162,6 +152,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
162152
FROM dataset_versions
163153
GROUP BY run_uuid
164154
) ro ON ro.run_uuid=r.uuid
155+
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases))
165156
ORDER BY STARTED_AT DESC NULLS LAST
166157
LIMIT :limit OFFSET :offset
167158
""")

0 commit comments

Comments
 (0)