Skip to content

Commit aa7a47d

Browse files
fix column lineage returning multiple entries for job run multiple times (#2176)
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 496566e commit aa7a47d

2 files changed

Lines changed: 34 additions & 3 deletions

File tree

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,12 @@ dataset_fields_view AS (
105105
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
106106
),
107107
column_lineage_recursive AS (
108-
SELECT *, 0 as depth
109-
FROM column_lineage
110-
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
108+
(
109+
SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *, 0 as depth
110+
FROM column_lineage
111+
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
112+
ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at
113+
)
111114
UNION
112115
SELECT
113116
upstream_node.output_dataset_version_uuid,

api/src/test/java/marquez/db/ColumnLineageDaoTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,4 +565,32 @@ void testGetLineagePointInTime() {
565565
20, Collections.singletonList(field_col_b), columnLineageCreatedAt.plusSeconds(1)))
566566
.hasSize(1);
567567
}
568+
569+
@Test
570+
void testGetLineageWhenJobRunMultipleTimes() {
571+
Dataset dataset_A = getDatasetA();
572+
Dataset dataset_B = getDatasetB();
573+
574+
LineageTestUtils.createLineageRow(
575+
openLineageDao,
576+
"job1",
577+
"COMPLETE",
578+
jobFacet,
579+
Arrays.asList(dataset_A),
580+
Arrays.asList(dataset_B));
581+
UpdateLineageRow lineageRow =
582+
LineageTestUtils.createLineageRow(
583+
openLineageDao,
584+
"job1",
585+
"COMPLETE",
586+
jobFacet,
587+
Arrays.asList(dataset_A),
588+
Arrays.asList(dataset_B));
589+
590+
UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0);
591+
UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get();
592+
593+
assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), Instant.now()))
594+
.hasSize(1);
595+
}
568596
}

0 commit comments

Comments
 (0)