Skip to content

Commit 0e6f707

Browse files
committed
Removed jobs_fqn table and moved FQN into jobs directly in order to enforce unique name constraints
Signed-off-by: Michael Collado <collado.mike@gmail.com>
1 parent 8d28ed5 commit 0e6f707

11 files changed

Lines changed: 301 additions & 149 deletions

File tree

api/src/main/java/marquez/api/JobResource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,7 @@ public Response delete(
187187
.findJobByName(namespaceName.getValue(), jobName.getValue())
188188
.orElseThrow(() -> new JobNotFoundException(jobName));
189189

190-
// Should be simple name from `jobs_fqn`.
191-
jobService.delete(namespaceName.getValue(), job.getSimpleName());
190+
jobService.delete(namespaceName.getValue(), job.getName().getValue());
192191
return Response.ok(job).build();
193192
}
194193

api/src/main/java/marquez/db/JobDao.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ SELECT run_uuid, JSON_AGG(e.facet) AS facets
7272
GROUP BY e.run_uuid
7373
) f ON f.run_uuid=jv.latest_run_uuid
7474
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
75-
AND j.symlink_target_uuid IS NULL
7675
""")
7776
Optional<Job> findJobByName(String namespaceName, String jobName);
7877

@@ -121,7 +120,6 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
121120
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
122121
WHERE j.namespace_name=:namespaceName AND
123122
(j.name=:jobName OR :jobName = ANY(j.aliases))
124-
AND j.symlink_target_uuid IS NULL
125123
""")
126124
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);
127125

@@ -143,7 +141,6 @@ SELECT run_uuid, JSON_AGG(e.facet) AS facets
143141
GROUP BY e.run_uuid
144142
) f ON f.run_uuid=jv.latest_run_uuid
145143
WHERE j.namespace_name = :namespaceName
146-
AND j.symlink_target_uuid IS NULL
147144
ORDER BY j.name LIMIT :limit OFFSET :offset
148145
""")
149146
List<Job> findAll(String namespaceName, int limit, int offset);
@@ -292,7 +289,6 @@ JobRow upsertJob(
292289
INSERT INTO jobs_view AS j (
293290
uuid,
294291
parent_job_uuid,
295-
parent_job_uuid_string,
296292
type,
297293
created_at,
298294
updated_at,
@@ -307,7 +303,6 @@ INSERT INTO jobs_view AS j (
307303
) VALUES (
308304
:uuid,
309305
:parentJobUuid,
310-
COALESCE(:parentJobUuid::text, ''),
311306
:type,
312307
:now,
313308
:now,

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ public interface LineageDao {
4141
WITH RECURSIVE
4242
job_io AS (
4343
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
44+
ARRAY_AGG(DISTINCT j.uuid) AS ids,
4445
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
4546
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
46-
FROM job_versions_io_mapping io
47-
INNER JOIN job_versions v ON io.job_version_uuid=v.uuid
48-
INNER JOIN jobs_view j on j.current_version_uuid = v.uuid
49-
LEFT JOIN jobs_view s ON s.uuid=j.symlink_target_uuid
50-
WHERE s.current_version_uuid IS NULL
47+
FROM jobs j
48+
LEFT JOIN jobs_view s On s.uuid=j.symlink_target_uuid
49+
LEFT JOIN job_versions v on v.uuid=COALESCE(s.current_version_uuid, j.current_version_uuid)
50+
LEFT JOIN job_versions_io_mapping io ON io.job_version_uuid=v.uuid
5151
GROUP BY COALESCE(j.symlink_target_uuid, j.uuid)
5252
),
5353
lineage(job_uuid, inputs, outputs) AS (
@@ -56,8 +56,8 @@ SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
5656
COALESCE(outputs, Array[]::uuid[]) AS outputs,
5757
0 AS depth
5858
FROM jobs_view j
59-
LEFT JOIN job_io io ON io.job_uuid=j.uuid OR j.symlink_target_uuid=io.job_uuid
60-
WHERE j.uuid IN (<jobIds>)
59+
INNER JOIN job_io io ON j.uuid=ANY(io.ids)
60+
WHERE io.ids && ARRAY[<jobIds>]::uuid[]
6161
UNION
6262
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
6363
FROM job_io io,

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
170170
Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);
171171

172172
JobRow job =
173-
runDao
174-
.findJobRowByRunUuid(runToUuid(event.getRun().getRunId()))
175-
.orElseGet(
176-
() ->
177-
buildJobFromEvent(
178-
event,
179-
mapper,
180-
jobDao,
181-
now,
182-
namespace,
183-
nominalStartTime,
184-
nominalEndTime,
185-
parentRun));
173+
buildJobFromEvent(
174+
event, mapper, jobDao, now, namespace, nominalStartTime, nominalEndTime, parentRun);
186175

187176
bag.setJob(job);
188177

@@ -812,7 +801,7 @@ private List<ColumnLineageRow> upsertColumnLineage(
812801
log.error(
813802
"Cannot produce column lineage for missing output field in output dataset: {}",
814803
columnName);
815-
return Stream.<ColumnLineageRow>empty();
804+
return Stream.empty();
816805
}
817806

818807
// get field uuids of input columns related to this run

api/src/main/java/marquez/db/RunDao.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -411,22 +411,12 @@ default RunRow upsertRunMeta(
411411
void updateJobVersion(UUID runUuid, UUID jobVersionUuid);
412412

413413
@SqlQuery(
414-
"""
415-
WITH RECURSIVE job_names AS (
416-
SELECT uuid, namespace_name, name, symlink_target_uuid
417-
FROM jobs_view j
418-
WHERE j.namespace_name=:namespace AND j.name=:jobName
419-
UNION
420-
SELECT j.uuid, j.namespace_name, j.name, j.symlink_target_uuid
421-
FROM jobs_view j
422-
INNER JOIN job_names jn ON j.uuid=jn.symlink_target_uuid OR j.symlink_target_uuid=jn.uuid
423-
)
424-
"""
425-
+ BASE_FIND_RUN_SQL
414+
BASE_FIND_RUN_SQL
426415
+ """
427416
WHERE r.uuid=(
428417
SELECT r.uuid FROM runs_view r
429-
INNER JOIN job_names j ON j.namespace_name=r.namespace_name AND j.name=r.job_name
418+
INNER JOIN jobs_view j ON j.namespace_name=r.namespace_name AND j.name=r.job_name
419+
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR j.name=ANY(j.aliases))
430420
ORDER BY transitioned_at DESC
431421
LIMIT 1
432422
)

api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql

Lines changed: 48 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,26 @@
11
CREATE OR REPLACE VIEW jobs_view
22
AS
3-
SELECT f.uuid,
4-
f.job_fqn AS name,
5-
f.namespace_name,
6-
j.name AS simple_name,
3+
SELECT j.uuid,
4+
j.name,
5+
j.namespace_name,
6+
j.simple_name AS simple_name,
77
j.parent_job_uuid,
8-
f.parent_job_name::text,
8+
p.name::text AS parent_job_name,
99
j.type,
1010
j.created_at,
1111
j.updated_at,
12-
f.namespace_uuid,
12+
j.namespace_uuid,
1313
j.description,
1414
j.current_version_uuid,
1515
j.current_job_context_uuid,
1616
j.current_location,
1717
j.current_inputs,
1818
j.symlink_target_uuid,
19-
j.parent_job_uuid_string,
20-
f.aliases
21-
FROM jobs_fqn f,
22-
jobs j
23-
WHERE j.uuid = f.uuid
24-
AND j.is_hidden IS FALSE;
25-
19+
j.parent_job_uuid::char(36) AS parent_job_uuid_string,
20+
j.aliases
21+
FROM jobs j
22+
LEFT JOIN jobs p ON j.parent_job_uuid=p.uuid
23+
WHERE j.is_hidden IS FALSE AND j.symlink_target_uuid IS NULL;
2624

2725
CREATE OR REPLACE FUNCTION rewrite_jobs_fqn_table() RETURNS TRIGGER AS
2826
$$
@@ -32,16 +30,24 @@ DECLARE
3230
new_symlink_target_uuid uuid;
3331
old_symlink_target_uuid uuid;
3432
inserted_job jobs_view%rowtype;
33+
full_name varchar;
3534
BEGIN
36-
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, description,
35+
full_name = NEW.name;
36+
IF NEW.parent_job_uuid IS NOT NULL THEN
37+
SELECT p.name || '.' || NEW.name INTO full_name
38+
FROM jobs p
39+
WHERE p.uuid=NEW.parent_job_uuid;
40+
END IF;
41+
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, simple_name, description,
3742
current_version_uuid, namespace_name, current_job_context_uuid,
3843
current_location, current_inputs, symlink_target_uuid, parent_job_uuid,
39-
parent_job_uuid_string, is_hidden)
44+
is_hidden)
4045
SELECT NEW.uuid,
4146
NEW.type,
4247
NEW.created_at,
4348
NEW.updated_at,
4449
NEW.namespace_uuid,
50+
full_name,
4551
NEW.name,
4652
NEW.description,
4753
NEW.current_version_uuid,
@@ -51,10 +57,14 @@ BEGIN
5157
NEW.current_inputs,
5258
NEW.symlink_target_uuid,
5359
NEW.parent_job_uuid,
54-
COALESCE(NEW.parent_job_uuid::char(36), ''),
5560
false
56-
ON CONFLICT (name, namespace_uuid, parent_job_uuid_string)
61+
ON CONFLICT (namespace_uuid, name)
5762
DO UPDATE SET updated_at = now(),
63+
parent_job_uuid = COALESCE(jobs.parent_job_uuid, EXCLUDED.parent_job_uuid),
64+
simple_name = CASE
65+
WHEN EXCLUDED.parent_job_uuid IS NOT NULL THEN EXCLUDED.name
66+
ELSE jobs.name
67+
END,
5868
type = EXCLUDED.type,
5969
description = EXCLUDED.description,
6070
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
@@ -73,94 +83,38 @@ BEGIN
7383
INTO job_uuid, job_updated_at, new_symlink_target_uuid, old_symlink_target_uuid;
7484

7585

76-
-- update the jobs_fqn table when inserting a new record
77-
-- (NEW.uuid will equal the job_uuid when inserting a new record)
78-
-- AND if the symlink target is null
79-
-- Avoid constructing the symlinks and aliases, as that is expensive
80-
IF TG_OP='INSERT'
81-
AND NEW.uuid = job_uuid
82-
AND NEW.symlink_target_uuid IS NULL
83-
AND NEW.updated_at=job_updated_at THEN
84-
RAISE DEBUG 'Inserting into jobs_fqn for new job % (%)', NEW.name, job_uuid;
85-
WITH fqn AS (SELECT j.uuid,
86-
CASE
87-
WHEN j.parent_job_uuid IS NULL THEN j.name
88-
ELSE jf.job_fqn || '.' || j.name
89-
END AS name,
90-
j.namespace_uuid,
91-
j.namespace_name,
92-
jf.job_fqn AS parent_job_name,
93-
j.parent_job_uuid
94-
FROM jobs j
95-
LEFT JOIN jobs_fqn jf ON jf.uuid=j.parent_job_uuid
96-
WHERE j.uuid=job_uuid)
97-
INSERT
98-
INTO jobs_fqn
99-
SELECT j.uuid,
100-
jf.namespace_uuid,
101-
jf.namespace_name,
102-
jf.parent_job_name,
103-
ARRAY[jf.name]::text[],
104-
jf.name AS job_fqn
105-
FROM jobs j
106-
INNER JOIN fqn jf ON jf.uuid = j.uuid;
107-
-- or when the symlink_target_uuid is being updated.
108-
ELSIF (new_symlink_target_uuid IS NOT NULL AND new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
109-
RAISE DEBUG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid;
86+
-- update the jobs table when updating a job's symlink target
87+
IF (new_symlink_target_uuid IS NOT NULL AND new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
88+
RAISE INFO 'Updating jobs aliases and symlinks due to % to job % (%)', TG_OP, NEW.name, job_uuid;
11089
WITH RECURSIVE
11190
jobs_symlink AS (SELECT j.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
11291
FROM jobs j
113-
-- include only jobs that have symlinks pointing to them to keep this table small
114-
INNER JOIN jobs js ON js.symlink_target_uuid=j.uuid
92+
-- include only jobs that have symlinks pointing to them to keep this table small
93+
INNER JOIN jobs js ON js.symlink_target_uuid=j.uuid
11594
WHERE j.symlink_target_uuid IS NULL
11695
UNION
11796
SELECT j.uuid, jn.link_target_uuid, j.symlink_target_uuid
11897
FROM jobs j
119-
INNER JOIN jobs_symlink jn ON j.symlink_target_uuid = jn.uuid),
120-
fqn AS (SELECT j.uuid,
121-
CASE
122-
WHEN j.parent_job_uuid IS NULL THEN j.name
123-
ELSE jf.job_fqn || '.' || j.name
124-
END AS name,
125-
j.namespace_uuid,
126-
j.namespace_name,
127-
jf.job_fqn AS parent_job_name,
128-
j.parent_job_uuid
129-
FROM jobs j
130-
LEFT JOIN jobs_fqn jf ON jf.uuid=j.parent_job_uuid
131-
LEFT JOIN jobs_symlink js ON js.link_target_uuid=j.uuid
132-
WHERE j.uuid=job_uuid OR j.symlink_target_uuid=job_uuid OR js.uuid=job_uuid
133-
UNION
134-
SELECT j1.uuid,
135-
f.name || '.' || j1.name AS name,
136-
f.namespace_uuid AS namespace_uuid,
137-
f.namespace_name AS namespace_name,
138-
f.name AS parent_job_name,
139-
j1.parent_job_uuid
140-
FROM jobs j1
141-
INNER JOIN fqn f ON f.uuid = j1.parent_job_uuid),
98+
INNER JOIN jobs_symlink jn ON j.symlink_target_uuid = jn.uuid),
14299
aliases AS (SELECT s.link_target_uuid,
143-
ARRAY_AGG(DISTINCT f.job_fqn) FILTER (WHERE f.job_fqn IS NOT NULL) AS aliases
100+
ARRAY_AGG(DISTINCT f.name) AS aliases
144101
FROM jobs_symlink s
145-
INNER JOIN jobs_fqn f ON f.uuid = s.uuid
102+
INNER JOIN jobs f ON f.uuid = s.uuid
146103
GROUP BY s.link_target_uuid)
147-
INSERT
148-
INTO jobs_fqn
149-
SELECT j.uuid,
150-
jf.namespace_uuid,
151-
jf.namespace_name,
152-
jf.parent_job_name,
153-
a.aliases,
154-
jf.name AS job_fqn
155-
FROM jobs j
156-
LEFT JOIN jobs_symlink js ON j.uuid = js.uuid
157-
LEFT JOIN aliases a ON a.link_target_uuid = js.link_target_uuid
158-
INNER JOIN fqn jf ON jf.uuid = COALESCE(js.link_target_uuid, j.uuid)
159-
ON CONFLICT (uuid) DO UPDATE
160-
SET job_fqn=EXCLUDED.job_fqn,
161-
aliases = (SELECT array_agg(DISTINCT a) FROM (SELECT unnest(jobs_fqn.aliases) AS a UNION SELECT unnest(EXCLUDED.aliases) AS a) al);
104+
UPDATE jobs
105+
SET aliases = j.aliases, symlink_target_uuid=j.link_target_uuid
106+
FROM (
107+
SELECT j.uuid,
108+
CASE WHEN j.uuid=s.link_target_uuid THEN NULL ELSE s.link_target_uuid END AS link_target_uuid,
109+
a.aliases
110+
FROM jobs j
111+
LEFT JOIN jobs_symlink s ON s.uuid=j.uuid
112+
LEFT JOIN aliases a ON a.link_target_uuid = j.uuid
113+
) j
114+
WHERE jobs.uuid=j.uuid;
162115
END IF;
163-
SELECT * INTO inserted_job FROM jobs_view WHERE uuid=job_uuid;
116+
SELECT * INTO inserted_job FROM jobs_view
117+
WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid);
164118
return inserted_job;
165119
END;
166120
$$ LANGUAGE plpgsql;

api/src/main/resources/marquez/db/migration/R__2_Runs_view.sql

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ SELECT r.uuid,
1818
ended_at,
1919
job_context_uuid,
2020
job_uuid,
21-
j.name AS job_name,
22-
j.namespace_name
21+
COALESCE(s.name, j.name) AS job_name,
22+
COALESCE(s.namespace_name, j.namespace_name) AS namespace_name
2323
FROM runs r
24-
INNER JOIN jobs_view j ON j.uuid = r.job_uuid;
24+
INNER JOIN jobs j ON j.uuid = r.job_uuid
25+
LEFT JOIN jobs_view s ON j.symlink_target_uuid=s.uuid;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
UPDATE jobs SET symlink_target_uuid=q.target_uuid
2+
FROM (
3+
SELECT j.uuid, j.namespace_name, j.name, j.simple_name, jv.uuid AS target_uuid, jv.simple_name
4+
FROM jobs_view j INNER JOIN jobs_view jv ON j.namespace_name=jv.namespace_name AND j.name=jv.name AND j.simple_name != jv.simple_name
5+
WHERE j.symlink_target_uuid IS NULL
6+
AND jv.symlink_target_uuid IS NULL
7+
AND j.parent_job_uuid IS NULL) q
8+
WHERE jobs.uuid=q.uuid;
9+
10+
ALTER TABLE jobs RENAME COLUMN name TO simple_name;
11+
ALTER TABLE jobs ADD COLUMN name varchar;
12+
ALTER TABLE jobs ADD COLUMN aliases varchar[];
13+
14+
WITH RECURSIVE
15+
job_fqn AS (SELECT j.uuid,
16+
j.simple_name AS simple_name,
17+
j.simple_name AS name
18+
FROM jobs j
19+
WHERE j.parent_job_uuid IS NULL
20+
UNION
21+
SELECT j1.uuid,
22+
j1.simple_name AS simple_name,
23+
f.name || '.' || j1.simple_name AS name
24+
FROM jobs j1
25+
INNER JOIN job_fqn f ON f.uuid = j1.parent_job_uuid)
26+
UPDATE jobs SET simple_name=f.simple_name, name=f.name
27+
FROM job_fqn f
28+
WHERE jobs.uuid=f.uuid;
29+
30+
ALTER TABLE jobs ALTER COLUMN name SET NOT NULL;
31+
32+
ALTER TABLE jobs DROP CONSTRAINT unique_jobs_namespace_uuid_name_parent;
33+
ALTER TABLE jobs DROP COLUMN parent_job_uuid_string;
34+
ALTER TABLE jobs ADD CONSTRAINT unique_jobs_namespace_uuid_name_parent UNIQUE (namespace_uuid, name);

0 commit comments

Comments
 (0)