Skip to content

Commit ace6fe0

Browse files
fix column lineage when multiple jobs write to same dataset
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 0995b0a commit ace6fe0

14 files changed

Lines changed: 218 additions & 100 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* Allow null column type in column-lineage [`#2272`](https://github.com/MarquezProject/marquez/pull/2272) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1313
* Include error message for JSON processing exception [`#2271`](https://github.com/MarquezProject/marquez/pull/2271) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1414
*In case of JSON processing exceptions Marquez API should return exception message to a client.*
15+
* Fix column lineage when multiple jobs write to same dataset [`#2289`](https://github.com/MarquezProject/marquez/pull/2289) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1516

1617
## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21
1718

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,15 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
146146
output_fields.dataset_name,
147147
output_fields.field_name,
148148
output_fields.type,
149-
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
150-
clr.output_dataset_version_uuid as dataset_version_uuid,
151-
clr.transformation_description,
152-
clr.transformation_type,
153-
clr.created_at,
154-
clr.updated_at
149+
ARRAY_AGG(DISTINCT ARRAY[
150+
input_fields.namespace_name,
151+
input_fields.dataset_name,
152+
CAST(clr.input_dataset_version_uuid AS VARCHAR),
153+
input_fields.field_name,
154+
clr.transformation_description,
155+
clr.transformation_type
156+
]) AS inputFields,
157+
clr.output_dataset_version_uuid as dataset_version_uuid
155158
FROM column_lineage_recursive clr
156159
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
157160
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
@@ -161,11 +164,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
161164
output_fields.dataset_name,
162165
output_fields.field_name,
163166
output_fields.type,
164-
clr.output_dataset_version_uuid,
165-
clr.transformation_description,
166-
clr.transformation_type,
167-
clr.created_at,
168-
clr.updated_at
167+
clr.output_dataset_version_uuid
169168
""")
170169
Set<ColumnLineageNodeData> getLineage(
171170
int depth,
@@ -193,25 +192,23 @@ dataset_fields_view AS (
193192
output_fields.dataset_name,
194193
output_fields.field_name,
195194
output_fields.type,
196-
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
197-
c.output_dataset_version_uuid as dataset_version_uuid,
198-
c.transformation_description,
199-
c.transformation_type,
200-
c.created_at,
201-
c.updated_at
195+
ARRAY_AGG(DISTINCT ARRAY[
196+
input_fields.namespace_name,
197+
input_fields.dataset_name,
198+
CAST(c.input_dataset_version_uuid AS VARCHAR),
199+
input_fields.field_name,
200+
c.transformation_description,
201+
c.transformation_type
202+
]) AS inputFields,
203+
null as dataset_version_uuid
202204
FROM selected_column_lineage c
203205
INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid
204206
LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid
205207
GROUP BY
206208
output_fields.namespace_name,
207209
output_fields.dataset_name,
208210
output_fields.field_name,
209-
output_fields.type,
210-
c.output_dataset_version_uuid,
211-
c.transformation_description,
212-
c.transformation_type,
213-
c.created_at,
214-
c.updated_at
211+
output_fields.type
215212
""")
216213
/**
217214
* Each dataset is identified by a pair of strings (namespace and name). A query returns column

api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@
55

66
package marquez.db.mappers;
77

8-
import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
9-
import static marquez.db.Columns.TRANSFORMATION_TYPE;
108
import static marquez.db.Columns.stringOrNull;
119
import static marquez.db.Columns.stringOrThrow;
12-
import static marquez.db.Columns.uuidOrThrow;
10+
import static marquez.db.Columns.uuidOrNull;
1311

1412
import com.fasterxml.jackson.databind.ObjectMapper;
1513
import com.google.common.collect.ImmutableList;
@@ -37,11 +35,9 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws
3735
return new ColumnLineageNodeData(
3836
stringOrThrow(results, Columns.NAMESPACE_NAME),
3937
stringOrThrow(results, Columns.DATASET_NAME),
40-
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
38+
uuidOrNull(results, Columns.DATASET_VERSION_UUID),
4139
stringOrThrow(results, Columns.FIELD_NAME),
4240
stringOrNull(results, Columns.TYPE),
43-
stringOrNull(results, TRANSFORMATION_DESCRIPTION),
44-
stringOrNull(results, TRANSFORMATION_TYPE),
4541
toInputFields(results, "inputFields"));
4642
}
4743

@@ -57,7 +53,10 @@ public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results,
5753
return ImmutableList.copyOf(
5854
Arrays.asList(deserializedArray).stream()
5955
.map(o -> (String[]) o)
60-
.map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3]))
56+
.map(
57+
arr ->
58+
new InputFieldNodeData(
59+
arr[0], arr[1], UUID.fromString(arr[2]), arr[3], arr[4], arr[5]))
6160
.collect(Collectors.toList()));
6261
}
6362
}

api/src/main/java/marquez/db/models/ColumnLineageNodeData.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,61 @@
55

66
package marquez.db.models;
77

8+
import com.google.common.collect.ImmutableList;
89
import java.util.List;
910
import java.util.UUID;
1011
import javax.annotation.Nullable;
11-
import lombok.AllArgsConstructor;
1212
import lombok.Getter;
1313
import lombok.NonNull;
14+
import marquez.service.models.ColumnLineageInputField;
1415

1516
@Getter
16-
@AllArgsConstructor
1717
public class ColumnLineageNodeData implements NodeData {
1818
@NonNull String namespace;
1919
@NonNull String dataset;
2020
@Nullable UUID datasetVersion;
2121
@NonNull String field;
2222
@Nullable String fieldType;
23-
String transformationDescription;
24-
String transformationType;
23+
@Nullable String transformationDescription;
24+
@Nullable String transformationType;
2525
@NonNull List<InputFieldNodeData> inputFields;
26+
27+
public ColumnLineageNodeData(
28+
String namespace,
29+
String dataset,
30+
UUID datasetVersion,
31+
String field,
32+
String fieldType,
33+
ImmutableList<InputFieldNodeData> inputFields) {
34+
this.namespace = namespace;
35+
this.dataset = dataset;
36+
this.datasetVersion = datasetVersion;
37+
this.field = field;
38+
this.fieldType = fieldType;
39+
this.inputFields = inputFields;
40+
}
41+
42+
/**
43+
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
44+
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
45+
*/
46+
public String getTransformationDescription() {
47+
if (inputFields == null) {
48+
return null;
49+
} else {
50+
return inputFields.get(0).getTransformationDescription();
51+
}
52+
}
53+
54+
/**
55+
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
56+
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
57+
*/
58+
public String getTransformationType() {
59+
if (inputFields == null) {
60+
return null;
61+
} else {
62+
return inputFields.get(0).getTransformationType();
63+
}
64+
}
2665
}

api/src/main/java/marquez/db/models/InputFieldNodeData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@ public class InputFieldNodeData {
2020
@NonNull String dataset;
2121
@Nullable UUID datasetVersion;
2222
@NonNull String field;
23+
String transformationDescription;
24+
String transformationType;
2325
}

api/src/main/java/marquez/service/ColumnLineageService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,14 +226,16 @@ public void enrichWithColumnLineage(List<Dataset> datasets) {
226226
.add(
227227
ColumnLineage.builder()
228228
.name(nodeData.getField())
229-
.transformationDescription(nodeData.getTransformationDescription())
230-
.transformationType(nodeData.getTransformationType())
231229
.inputFields(
232230
nodeData.getInputFields().stream()
233231
.map(
234232
f ->
235233
new ColumnLineageInputField(
236-
f.getNamespace(), f.getDataset(), f.getField()))
234+
f.getNamespace(),
235+
f.getDataset(),
236+
f.getField(),
237+
f.getTransformationDescription(),
238+
f.getTransformationType()))
237239
.collect(Collectors.toList()))
238240
.build());
239241
});

api/src/main/java/marquez/service/models/ColumnLineage.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package marquez.service.models;
77

88
import java.util.List;
9+
import javax.annotation.Nullable;
910
import javax.validation.constraints.NotNull;
1011
import lombok.Builder;
1112
import lombok.EqualsAndHashCode;
@@ -19,6 +20,31 @@
1920
public class ColumnLineage {
2021
@NotNull private String name;
2122
@NotNull private List<ColumnLineageInputField> inputFields;
22-
@NotNull private String transformationDescription;
23-
@NotNull private String transformationType;
23+
24+
@Nullable private String transformationDescription;
25+
@Nullable private String transformationType;
26+
27+
/**
28+
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
29+
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
30+
*/
31+
public String getTransformationDescription() {
32+
if (inputFields == null) {
33+
return null;
34+
} else {
35+
return inputFields.get(0).getTransformationDescription();
36+
}
37+
}
38+
39+
/**
40+
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
41+
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
42+
*/
43+
public String getTransformationType() {
44+
if (inputFields == null) {
45+
return null;
46+
} else {
47+
return inputFields.get(0).getTransformationType();
48+
}
49+
}
2450
}

api/src/main/java/marquez/service/models/ColumnLineageInputField.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ public class ColumnLineageInputField {
1919
@NotNull private String namespace;
2020
@NotNull private String dataset;
2121
@NotNull private String field;
22+
@NotNull private String transformationDescription;
23+
@NotNull private String transformationType;
2224
}

0 commit comments

Comments
 (0)