Skip to content

Commit 98f3114

Browse files
authored
Update lineage query to only look at jobs with inputs or outputs (#2068)
Signed-off-by: Michael Collado <collado.mike@gmail.com> Signed-off-by: Michael Collado <collado.mike@gmail.com>
1 parent 476e472 commit 98f3114

2 files changed

Lines changed: 65 additions & 29 deletions

File tree

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,35 +37,39 @@ public interface LineageDao {
3737
* @return
3838
*/
3939
@SqlQuery(
40-
// dataset_ids: all the input and output datasets of the current version of the specified jobs
41-
"WITH RECURSIVE\n"
42-
+ " job_io AS (\n"
43-
+ " SELECT j.uuid AS job_uuid,\n"
44-
+ " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,\n"
45-
+ " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs\n"
46-
+ " FROM jobs_view j\n"
47-
+ " LEFT JOIN jobs_view s ON s.symlink_target_uuid=j.uuid\n"
48-
+ " LEFT JOIN job_versions v on COALESCE(j.current_version_uuid, s.current_version_uuid) = v.uuid\n"
49-
+ " LEFT JOIN job_versions_io_mapping io on v.uuid = io.job_version_uuid\n"
50-
+ " GROUP BY j.uuid\n"
51-
+ " ),\n"
52-
+ " lineage(job_uuid, inputs, outputs) AS (\n"
53-
+ " SELECT job_uuid, inputs, outputs, 0 AS depth\n"
54-
+ " FROM job_io\n"
55-
+ " WHERE job_uuid IN (<jobIds>)\n"
56-
+ " UNION\n"
57-
+ " SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1\n"
58-
+ " FROM job_io io,\n"
59-
+ " lineage l\n"
60-
+ " WHERE io.job_uuid != l.job_uuid AND\n"
61-
+ " array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)\n"
62-
+ " AND depth < :depth"
63-
+ " )\n"
64-
+ "SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
65-
+ "FROM lineage l2\n"
66-
+ "INNER JOIN jobs_view s ON s.uuid=l2.job_uuid\n"
67-
+ "INNER JOIN jobs_view j ON j.uuid=COALESCE(s.symlink_target_uuid, s.uuid)\n"
68-
+ "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid")
40+
"""
41+
WITH RECURSIVE
42+
job_io AS (
43+
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
44+
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
45+
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
46+
FROM job_versions_io_mapping io
47+
INNER JOIN job_versions v ON io.job_version_uuid=v.uuid
48+
INNER JOIN jobs_view j on j.current_version_uuid = v.uuid
49+
LEFT JOIN jobs_view s ON s.uuid=j.symlink_target_uuid
50+
WHERE s.current_version_uuid IS NULL
51+
GROUP BY COALESCE(j.symlink_target_uuid, j.uuid)
52+
),
53+
lineage(job_uuid, inputs, outputs) AS (
54+
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
55+
COALESCE(inputs, Array[]::uuid[]) AS inputs,
56+
COALESCE(outputs, Array[]::uuid[]) AS outputs,
57+
0 AS depth
58+
FROM jobs_view j
59+
LEFT JOIN job_io io ON io.job_uuid=j.uuid OR j.symlink_target_uuid=io.job_uuid
60+
WHERE j.uuid IN (<jobIds>)
61+
UNION
62+
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
63+
FROM job_io io,
64+
lineage l
65+
WHERE io.job_uuid != l.job_uuid AND
66+
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
67+
AND depth < :depth)
68+
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context
69+
FROM lineage l2
70+
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid
71+
LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid;
72+
""")
6973
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);
7074

7175
@SqlQuery(

api/src/test/java/marquez/db/LineageDaoTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,38 @@ public void testGetLineageForSymlinkedJob() throws SQLException {
276276
.map(JobData::getUuid)
277277
.collect(Collectors.toSet());
278278
assertThat(lineageForOriginalJob).isEqualTo(jobIds);
279+
280+
UpdateLineageRow updatedTargetJob =
281+
LineageTestUtils.createLineageRow(
282+
openLineageDao,
283+
symlinkTargetJobName,
284+
"COMPLETE",
285+
jobFacet,
286+
Arrays.asList(),
287+
Arrays.asList(
288+
new Dataset(
289+
NAMESPACE,
290+
"a_new_dataset",
291+
newDatasetFacet(new SchemaField("firstname", "string", "the first name")))));
292+
assertThat(updatedTargetJob.getJob().getUuid()).isEqualTo(targetJob.getUuid());
293+
294+
// get lineage for original job - the old datasets/jobs should no longer be present
295+
assertThat(
296+
lineageDao
297+
.getLineage(new HashSet<>(Arrays.asList(writeJob.getJob().getUuid())), 2)
298+
.stream()
299+
.map(JobData::getUuid)
300+
.collect(Collectors.toSet()))
301+
.hasSize(1)
302+
.containsExactlyInAnyOrder(targetJob.getUuid());
303+
304+
// fetching lineage for target job should yield the same results
305+
assertThat(
306+
lineageDao.getLineage(new HashSet<>(Arrays.asList(targetJob.getUuid())), 2).stream()
307+
.map(JobData::getUuid)
308+
.collect(Collectors.toSet()))
309+
.hasSize(1)
310+
.containsExactlyInAnyOrder(targetJob.getUuid());
279311
}
280312

281313
@Test

0 commit comments

Comments
 (0)