Skip to content

Commit 281da00

Browse files
committed
Update insert job function to avoid joining on symlinks for jobs that have no symlinks
Signed-off-by: Michael Collado <collado.mike@gmail.com>
1 parent 5431dab commit 281da00

2 files changed

Lines changed: 46 additions & 11 deletions

File tree

api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ SELECT f.uuid,
2121
FROM jobs_fqn f,
2222
jobs j
2323
WHERE j.uuid = f.uuid
24-
AND j.is_hidden IS FALSE;
24+
AND j.is_hidden IS FALSE;
2525

2626

2727
CREATE OR REPLACE FUNCTION rewrite_jobs_fqn_table() RETURNS TRIGGER AS
2828
$$
2929
DECLARE
3030
job_uuid uuid;
31+
job_updated_at timestamp with time zone;
3132
new_symlink_target_uuid uuid;
3233
old_symlink_target_uuid uuid;
3334
inserted_job jobs_view%rowtype;
@@ -53,7 +54,7 @@ BEGIN
5354
COALESCE(NEW.parent_job_uuid::char(36), ''),
5455
false
5556
ON CONFLICT (name, namespace_uuid, parent_job_uuid_string)
56-
DO UPDATE SET updated_at = EXCLUDED.updated_at,
57+
DO UPDATE SET updated_at = now(),
5758
type = EXCLUDED.type,
5859
description = EXCLUDED.description,
5960
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
@@ -64,14 +65,47 @@ BEGIN
6465
EXCLUDED.symlink_target_uuid),
6566
is_hidden = false
6667
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
67-
-- version in case of insert
68-
RETURNING uuid, symlink_target_uuid, (SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
69-
INTO job_uuid, new_symlink_target_uuid, old_symlink_target_uuid;
68+
-- version in case of insert
69+
RETURNING uuid,
70+
updated_at,
71+
symlink_target_uuid,
72+
(SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
73+
INTO job_uuid, job_updated_at, new_symlink_target_uuid, old_symlink_target_uuid;
7074

71-
-- update the jobs_fqn table only when inserting a new record (NEW.uuid will equal the job_uuid
72-
-- when inserting a new record) or when the symlink_target_uuid is being updated.
73-
IF NEW.uuid = job_uuid OR
74-
(new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
75+
76+
-- update the jobs_fqn table when inserting a new record
77+
-- (NEW.uuid will equal the job_uuid when inserting a new record)
78+
-- AND if the symlink target is null
79+
-- Avoid constructing the symlinks and aliases, as that is expensive
80+
IF TG_OP='INSERT'
81+
AND NEW.uuid = job_uuid
82+
AND NEW.symlink_target_uuid IS NULL
83+
AND NEW.updated_at=job_updated_at THEN
84+
RAISE LOG 'Inserting into jobs_fqn for new job % (%)', NEW.name, job_uuid;
85+
WITH fqn AS (SELECT j.uuid,
86+
CASE
87+
WHEN j.parent_job_uuid IS NULL THEN j.name
88+
ELSE jf.job_fqn || '.' || j.name
89+
END AS name,
90+
j.namespace_uuid,
91+
j.namespace_name,
92+
jf.job_fqn AS parent_job_name,
93+
j.parent_job_uuid
94+
FROM jobs j
95+
LEFT JOIN jobs_fqn jf ON jf.uuid=j.parent_job_uuid
96+
WHERE j.uuid=job_uuid)
97+
INSERT
98+
INTO jobs_fqn
99+
SELECT j.uuid,
100+
jf.namespace_uuid,
101+
jf.namespace_name,
102+
jf.parent_job_name,
103+
ARRAY[jf.name]::text[],
104+
jf.name AS job_fqn
105+
FROM jobs j
106+
INNER JOIN fqn jf ON jf.uuid = j.uuid;
107+
-- or when the symlink_target_uuid is being updated.
108+
ELSIF (new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
75109
RAISE LOG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid;
76110
WITH RECURSIVE
77111
jobs_symlink AS (SELECT j.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid

api/src/test/java/marquez/db/LineageDaoTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.base.Functions;
1717
import java.sql.SQLException;
18+
import java.time.Instant;
1819
import java.util.Arrays;
1920
import java.util.Collections;
2021
import java.util.HashSet;
@@ -220,7 +221,7 @@ public void testGetLineageForSymlinkedJob() throws SQLException {
220221
.upsertJob(
221222
UUID.randomUUID(),
222223
JobType.valueOf(writeJob.getJob().getType()),
223-
writeJob.getJob().getCreatedAt(),
224+
Instant.now(),
224225
namespaceRow.getUuid(),
225226
writeJob.getJob().getNamespaceName(),
226227
symlinkTargetJobName,
@@ -233,7 +234,7 @@ public void testGetLineageForSymlinkedJob() throws SQLException {
233234
.upsertJob(
234235
writeJob.getJob().getUuid(),
235236
JobType.valueOf(writeJob.getJob().getType()),
236-
writeJob.getJob().getCreatedAt(),
237+
Instant.now(),
237238
namespaceRow.getUuid(),
238239
writeJob.getJob().getNamespaceName(),
239240
writeJob.getJob().getName(),

0 commit comments

Comments
 (0)