Skip to content

Commit e325862

Browse files
fix downstream recursion
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 7b6265d commit e325862

4 files changed

Lines changed: 30 additions & 9 deletions

File tree

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,27 @@ void doUpsertColumnLineageRow(
9999
@SqlQuery(
100100
"""
101101
WITH RECURSIVE
102+
column_lineage_latest AS (
103+
SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *
104+
FROM column_lineage
105+
ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at
106+
),
102107
dataset_fields_view AS (
103108
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
104109
FROM dataset_fields df
105110
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
106111
),
107112
column_lineage_recursive AS (
108113
(
109-
SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *, 0 as depth
110-
FROM column_lineage
114+
SELECT
115+
*,
116+
0 as depth,
117+
false as is_cycle,
118+
ARRAY[ROW(output_dataset_field_uuid, input_dataset_field_uuid)] as path -- path and is_cycle mechanism as describe here https://www.postgresql.org/docs/current/queries-with.html (CYCLE clause not available in postgresql 12)
119+
FROM column_lineage_latest
111120
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
112-
ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at
113121
)
114-
UNION
122+
UNION ALL
115123
SELECT
116124
adjacent_node.output_dataset_version_uuid,
117125
adjacent_node.output_dataset_field_uuid,
@@ -121,27 +129,31 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
121129
adjacent_node.transformation_type,
122130
adjacent_node.created_at,
123131
adjacent_node.updated_at,
124-
node.depth + 1 as depth
125-
FROM column_lineage adjacent_node, column_lineage_recursive node
132+
node.depth + 1 as depth,
133+
ROW(adjacent_node.input_dataset_field_uuid, adjacent_node.output_dataset_field_uuid) = ANY(path) as is_cycle,
134+
path || ROW(adjacent_node.input_dataset_field_uuid, adjacent_node.output_dataset_field_uuid) as path
135+
FROM column_lineage_latest adjacent_node, column_lineage_recursive node
126136
WHERE (
127137
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage
128138
OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage
129139
)
130140
AND node.depth < :depth
141+
AND NOT is_cycle
131142
)
132143
SELECT
133144
output_fields.namespace_name,
134145
output_fields.dataset_name,
135146
output_fields.field_name,
136147
output_fields.type,
137-
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
148+
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
138149
clr.transformation_description,
139150
clr.transformation_type,
140151
clr.created_at,
141152
clr.updated_at
142153
FROM column_lineage_recursive clr
143154
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
144155
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
156+
WHERE NOT clr.is_cycle
145157
GROUP BY
146158
output_fields.namespace_name,
147159
output_fields.dataset_name,

api/src/main/java/marquez/service/ColumnLineageService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData) {
7171
DatasetFieldId.of(i.getNamespace(), i.getDataset(), i.getField())))
7272
.forEach(
7373
inputNodeId -> {
74-
graphNodes.put(inputNodeId, Node.datasetField().id(inputNodeId));
74+
graphNodes.putIfAbsent(inputNodeId, Node.datasetField().id(inputNodeId));
7575
Optional.ofNullable(outEdges.get(inputNodeId))
7676
.ifPresentOrElse(
7777
nodeEdges -> nodeEdges.add(nodeId),

api/src/test/java/marquez/PostgresContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.testcontainers.utility.DockerImageName;
1212

1313
public final class PostgresContainer extends PostgreSQLContainer<PostgresContainer> {
14-
private static final DockerImageName POSTGRES = DockerImageName.parse("postgres:11.8");
14+
private static final DockerImageName POSTGRES = DockerImageName.parse("postgres:12.1");
1515
private static final int JDBC = 5;
1616

1717
private static final Map<String, PostgresContainer> containers = new HashMap<>();

api/src/test/java/marquez/service/ColumnLineageServiceTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,15 @@ public void testGetLineageWithDownstream() {
318318
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d"))
319319
.findAny())
320320
.isPresent();
321+
322+
ColumnLineageNodeData nodeData_C =
323+
(ColumnLineageNodeData)
324+
lineage.getGraph().stream()
325+
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_c"))
326+
.findAny()
327+
.get()
328+
.getData();
329+
assertThat(nodeData_C.getInputFields()).hasSize(2);
321330
}
322331

323332
@Test

0 commit comments

Comments
 (0)