Skip to content

Commit 50adb00

Browse files
downstream column lineage
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 8b669df commit 50adb00

5 files changed

Lines changed: 123 additions & 9 deletions

File tree

api/src/main/java/marquez/api/ColumnLineageResource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
4343
@Produces(APPLICATION_JSON)
4444
public Response getLineage(
4545
@QueryParam("nodeId") @NotNull NodeId nodeId,
46-
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
46+
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
47+
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
4748
throws ExecutionException, InterruptedException {
48-
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
49+
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
50+
.build();
4951
}
5052
}

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,64 @@ Set<ColumnLineageNodeData> getLineage(
151151
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
152152
Instant createdAtUntil);
153153

154+
@SqlQuery(
155+
"""
156+
WITH RECURSIVE
157+
dataset_fields_view AS (
158+
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
159+
FROM dataset_fields df
160+
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
161+
),
162+
column_lineage_recursive AS (
163+
SELECT *, 0 as depth
164+
FROM column_lineage
165+
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
166+
UNION
167+
SELECT
168+
adjacent_node.output_dataset_version_uuid,
169+
adjacent_node.output_dataset_field_uuid,
170+
adjacent_node.input_dataset_version_uuid,
171+
adjacent_node.input_dataset_field_uuid,
172+
adjacent_node.transformation_description,
173+
adjacent_node.transformation_type,
174+
adjacent_node.created_at,
175+
adjacent_node.updated_at,
176+
node.depth + 1 as depth
177+
FROM column_lineage adjacent_node, column_lineage_recursive node
178+
WHERE (
179+
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid)
180+
OR (adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid)
181+
)
182+
AND node.depth < :depth
183+
)
184+
SELECT
185+
output_fields.namespace_name,
186+
output_fields.dataset_name,
187+
output_fields.field_name,
188+
output_fields.type,
189+
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
190+
clr.transformation_description,
191+
clr.transformation_type,
192+
clr.created_at,
193+
clr.updated_at
194+
FROM column_lineage_recursive clr
195+
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
196+
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
197+
GROUP BY
198+
output_fields.namespace_name,
199+
output_fields.dataset_name,
200+
output_fields.field_name,
201+
output_fields.type,
202+
clr.transformation_description,
203+
clr.transformation_type,
204+
clr.created_at,
205+
clr.updated_at
206+
""")
207+
Set<ColumnLineageNodeData> getLineageWithDownstream(
208+
int depth,
209+
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
210+
Instant createdAtUntil);
211+
154212
@SqlQuery(
155213
"""
156214
WITH selected_column_lineage AS (

api/src/main/java/marquez/service/ColumnLineageService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,16 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa
4040
this.datasetFieldDao = datasetFieldDao;
4141
}
4242

43-
public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) {
43+
public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream, Instant createdAtUntil) {
4444
List<UUID> columnNodeUuids = getColumnNodeUuids(nodeId);
4545

4646
if (columnNodeUuids.isEmpty()) {
4747
throw new NodeIdNotFoundException("Could not find node");
48+
} else if (withDownstream) {
49+
return toLineage(getLineageWithDownstream(depth, columnNodeUuids, createdAtUntil));
50+
} else {
51+
return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil));
4852
}
49-
50-
return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil));
5153
}
5254

5355
private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData) {

api/src/test/java/marquez/api/ColumnLineageResourceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import static org.junit.jupiter.api.Assertions.assertEquals;
99
import static org.mockito.ArgumentMatchers.any;
10-
import static org.mockito.ArgumentMatchers.anyInt;
10+
import static org.mockito.ArgumentMatchers.eq;
1111
import static org.mockito.Mockito.mock;
1212
import static org.mockito.Mockito.when;
1313

@@ -40,7 +40,7 @@ public class ColumnLineageResourceTest {
4040
ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"),
4141
new TypeReference<>() {});
4242
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
43-
when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class)))
43+
when(lineageService.lineage(any(NodeId.class), eq(20), eq(false), any(Instant.class)))
4444
.thenReturn(LINEAGE);
4545

4646
ServiceFactory serviceFactory =

api/src/test/java/marquez/service/ColumnLineageServiceTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public void testLineageByDatasetFieldId() {
9090

9191
Lineage lineage =
9292
lineageService.lineage(
93-
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
93+
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
94+
20,
95+
false,
96+
Instant.now());
9497

9598
assertThat(lineage.getGraph()).hasSize(3);
9699

@@ -156,12 +159,16 @@ public void testLineageByDatasetId() {
156159

157160
Lineage lineageByField =
158161
lineageService.lineage(
159-
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
162+
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
163+
20,
164+
false,
165+
Instant.now());
160166

161167
Lineage lineageByDataset =
162168
lineageService.lineage(
163169
NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))),
164170
20,
171+
false,
165172
Instant.now());
166173

167174
// lineage of dataset and column should be equal
@@ -195,6 +202,7 @@ public void testLineageWhenLineageEmpty() {
195202
lineageService.lineage(
196203
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")),
197204
20,
205+
false,
198206
Instant.now()));
199207

200208
assertThrows(
@@ -204,13 +212,15 @@ public void testLineageWhenLineageEmpty() {
204212
NodeId.of(
205213
new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))),
206214
20,
215+
false,
207216
Instant.now()));
208217

209218
assertThat(
210219
lineageService
211220
.lineage(
212221
NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")),
213222
20,
223+
false,
214224
Instant.now())
215225
.getGraph())
216226
.hasSize(0);
@@ -268,6 +278,48 @@ public void testEnrichDatasets() {
268278
.contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c"));
269279
}
270280

281+
@Test
282+
public void testGetLineageWithDownstream() {
283+
LineageEvent.Dataset dataset_A = getDatasetA();
284+
LineageEvent.Dataset dataset_B = getDatasetB();
285+
LineageEvent.Dataset dataset_C = getDatasetC();
286+
287+
LineageTestUtils.createLineageRow(
288+
openLineageDao,
289+
"job1",
290+
"COMPLETE",
291+
jobFacet,
292+
Arrays.asList(dataset_A),
293+
Arrays.asList(dataset_B));
294+
295+
LineageTestUtils.createLineageRow(
296+
openLineageDao,
297+
"job2",
298+
"COMPLETE",
299+
jobFacet,
300+
Arrays.asList(dataset_B),
301+
Arrays.asList(dataset_C));
302+
303+
Lineage lineage =
304+
lineageService.lineage(
305+
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
306+
20,
307+
true,
308+
Instant.now());
309+
310+
// assert that get lineage of dataset_B should co also return dataset_A and dataset_C
311+
assertThat(
312+
lineage.getGraph().stream()
313+
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_a"))
314+
.findAny())
315+
.isPresent();
316+
assertThat(
317+
lineage.getGraph().stream()
318+
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d"))
319+
.findAny())
320+
.isPresent();
321+
}
322+
271323
private Optional<Node> getNode(Lineage lineage, String datasetName, String fieldName) {
272324
return lineage.getGraph().stream()
273325
.filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName))

0 commit comments

Comments
 (0)