Skip to content

Commit ed2fa27

Browse files
downstream column lineage (#2159)
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 04baff3 commit ed2fa27

7 files changed

Lines changed: 95 additions & 29 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
88
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
99
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
10+
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1011

1112
### Fixed
1213
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)

api/src/main/java/marquez/api/ColumnLineageResource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
4141
@Produces(APPLICATION_JSON)
4242
public Response getLineage(
4343
@QueryParam("nodeId") @NotNull NodeId nodeId,
44-
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
44+
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
45+
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
4546
throws ExecutionException, InterruptedException {
46-
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
47+
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
48+
.build();
4749
}
4850
}

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,17 +113,20 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
113113
)
114114
UNION
115115
SELECT
116-
upstream_node.output_dataset_version_uuid,
117-
upstream_node.output_dataset_field_uuid,
118-
upstream_node.input_dataset_version_uuid,
119-
upstream_node.input_dataset_field_uuid,
120-
upstream_node.transformation_description,
121-
upstream_node.transformation_type,
122-
upstream_node.created_at,
123-
upstream_node.updated_at,
116+
adjacent_node.output_dataset_version_uuid,
117+
adjacent_node.output_dataset_field_uuid,
118+
adjacent_node.input_dataset_version_uuid,
119+
adjacent_node.input_dataset_field_uuid,
120+
adjacent_node.transformation_description,
121+
adjacent_node.transformation_type,
122+
adjacent_node.created_at,
123+
adjacent_node.updated_at,
124124
node.depth + 1 as depth
125-
FROM column_lineage upstream_node, column_lineage_recursive node
126-
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
125+
FROM column_lineage adjacent_node, column_lineage_recursive node
126+
WHERE (
127+
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage
128+
OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage
129+
)
127130
AND node.depth < :depth
128131
)
129132
SELECT
@@ -152,6 +155,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
152155
Set<ColumnLineageNodeData> getLineage(
153156
int depth,
154157
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
158+
boolean withDownstream,
155159
Instant createdAtUntil);
156160

157161
@SqlQuery(

api/src/main/java/marquez/service/ColumnLineageService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,12 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa
4040
this.datasetFieldDao = datasetFieldDao;
4141
}
4242

43-
public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) {
43+
public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream, Instant createdAtUntil) {
4444
List<UUID> columnNodeUuids = getColumnNodeUuids(nodeId);
4545
if (columnNodeUuids.isEmpty()) {
4646
throw new NodeIdNotFoundException("Could not find node");
4747
}
48-
49-
return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil));
48+
return toLineage(getLineage(depth, columnNodeUuids, withDownstream, createdAtUntil));
5049
}
5150

5251
private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData) {

api/src/test/java/marquez/api/ColumnLineageResourceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import static org.junit.jupiter.api.Assertions.assertEquals;
99
import static org.mockito.ArgumentMatchers.any;
10-
import static org.mockito.ArgumentMatchers.anyInt;
10+
import static org.mockito.ArgumentMatchers.eq;
1111
import static org.mockito.Mockito.mock;
1212
import static org.mockito.Mockito.when;
1313

@@ -40,7 +40,7 @@ public class ColumnLineageResourceTest {
4040
ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"),
4141
new TypeReference<>() {});
4242
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
43-
when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class)))
43+
when(lineageService.lineage(any(NodeId.class), eq(20), eq(false), any(Instant.class)))
4444
.thenReturn(LINEAGE);
4545

4646
ServiceFactory serviceFactory =

api/src/test/java/marquez/db/ColumnLineageDaoTest.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ void testGetLineage() {
257257
UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getOutputs().get().get(0);
258258
UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get();
259259
Set<ColumnLineageNodeData> lineage =
260-
dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now());
260+
dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now());
261261

262262
assertEquals(2, lineage.size());
263263

@@ -326,7 +326,8 @@ void testGetLineageWhenNoLineageForColumn() {
326326
UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get();
327327

328328
// assert lineage is empty
329-
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())).isEmpty();
329+
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), false, Instant.now()))
330+
.isEmpty();
330331
}
331332

332333
/**
@@ -392,11 +393,12 @@ void testGetLineageWithLimitedDepth() {
392393
UUID field_col_e = fieldDao.findUuid(datasetRecord_d.getDatasetRow().getUuid(), "col_e").get();
393394

394395
// make sure dataset are constructed properly
395-
assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), Instant.now()))
396+
assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), false, Instant.now()))
396397
.hasSize(3);
397398

398399
// verify graph size is 2 when max depth is 1
399-
assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), Instant.now())).hasSize(2);
400+
assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), false, Instant.now()))
401+
.hasSize(2);
400402
}
401403

402404
@Test
@@ -462,9 +464,9 @@ void testGetLineageWhenCycleExists() {
462464
UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get();
463465

464466
// column lineages for col_a and col_e should be of size 3
465-
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now()))
467+
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), false, Instant.now()))
466468
.hasSize(3);
467-
assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now()))
469+
assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now()))
468470
.hasSize(3);
469471
}
470472

@@ -524,7 +526,7 @@ void testGetLineageWhenTwoJobsWriteToSameDataset() {
524526

525527
// assert input fields for col_d contain col_a and col_c
526528
List<String> inputFields =
527-
dao.getLineage(20, Collections.singletonList(field_col_c), Instant.now()).stream()
529+
dao.getLineage(20, Collections.singletonList(field_col_c), false, Instant.now()).stream()
528530
.filter(node -> node.getDataset().equals("dataset_b"))
529531
.flatMap(node -> node.getInputFields().stream())
530532
.map(input -> input.getField())
@@ -558,11 +560,17 @@ void testGetLineagePointInTime() {
558560
// assert lineage is empty before and present after
559561
assertThat(
560562
dao.getLineage(
561-
20, Collections.singletonList(field_col_b), columnLineageCreatedAt.minusSeconds(1)))
563+
20,
564+
Collections.singletonList(field_col_b),
565+
false,
566+
columnLineageCreatedAt.minusSeconds(1)))
562567
.isEmpty();
563568
assertThat(
564569
dao.getLineage(
565-
20, Collections.singletonList(field_col_b), columnLineageCreatedAt.plusSeconds(1)))
570+
20,
571+
Collections.singletonList(field_col_b),
572+
false,
573+
columnLineageCreatedAt.plusSeconds(1)))
566574
.hasSize(1);
567575
}
568576

@@ -590,7 +598,7 @@ void testGetLineageWhenJobRunMultipleTimes() {
590598
UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0);
591599
UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get();
592600

593-
assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), Instant.now()))
601+
assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), false, Instant.now()))
594602
.hasSize(1);
595603
}
596604
}

api/src/test/java/marquez/service/ColumnLineageServiceTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public void testLineageByDatasetFieldId() {
9090

9191
Lineage lineage =
9292
lineageService.lineage(
93-
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
93+
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
94+
20,
95+
false,
96+
Instant.now());
9497

9598
assertThat(lineage.getGraph()).hasSize(3);
9699

@@ -156,12 +159,16 @@ public void testLineageByDatasetId() {
156159

157160
Lineage lineageByField =
158161
lineageService.lineage(
159-
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
162+
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
163+
20,
164+
false,
165+
Instant.now());
160166

161167
Lineage lineageByDataset =
162168
lineageService.lineage(
163169
NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))),
164170
20,
171+
false,
165172
Instant.now());
166173

167174
// lineage of dataset and column should be equal
@@ -195,6 +202,7 @@ public void testLineageWhenLineageEmpty() {
195202
lineageService.lineage(
196203
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")),
197204
20,
205+
false,
198206
Instant.now()));
199207

200208
assertThrows(
@@ -204,13 +212,15 @@ public void testLineageWhenLineageEmpty() {
204212
NodeId.of(
205213
new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))),
206214
20,
215+
false,
207216
Instant.now()));
208217

209218
assertThat(
210219
lineageService
211220
.lineage(
212221
NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")),
213222
20,
223+
false,
214224
Instant.now())
215225
.getGraph())
216226
.hasSize(0);
@@ -268,6 +278,48 @@ public void testEnrichDatasets() {
268278
.contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c"));
269279
}
270280

281+
@Test
282+
public void testGetLineageWithDownstream() {
283+
LineageEvent.Dataset dataset_A = getDatasetA();
284+
LineageEvent.Dataset dataset_B = getDatasetB();
285+
LineageEvent.Dataset dataset_C = getDatasetC();
286+
287+
LineageTestUtils.createLineageRow(
288+
openLineageDao,
289+
"job1",
290+
"COMPLETE",
291+
jobFacet,
292+
Arrays.asList(dataset_A),
293+
Arrays.asList(dataset_B));
294+
295+
LineageTestUtils.createLineageRow(
296+
openLineageDao,
297+
"job2",
298+
"COMPLETE",
299+
jobFacet,
300+
Arrays.asList(dataset_B),
301+
Arrays.asList(dataset_C));
302+
303+
Lineage lineage =
304+
lineageService.lineage(
305+
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
306+
20,
307+
true,
308+
Instant.now());
309+
310+
// assert that get lineage of dataset_B should co also return dataset_A and dataset_C
311+
assertThat(
312+
lineage.getGraph().stream()
313+
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_a"))
314+
.findAny())
315+
.isPresent();
316+
assertThat(
317+
lineage.getGraph().stream()
318+
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d"))
319+
.findAny())
320+
.isPresent();
321+
}
322+
271323
@Test
272324
public void testEnrichDatasetsHasNoDuplicates() {
273325
LineageEvent.Dataset dataset_A = getDatasetA();

0 commit comments

Comments
 (0)