Skip to content

Commit 17fb0f7

Browse files
fix column lineage when multiple jobs write to same dataset
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 2ffb8e0 commit 17fb0f7

16 files changed

Lines changed: 327 additions & 100 deletions

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
* Allow null column type in column-lineage [`#2272`](https://github.com/MarquezProject/marquez/pull/2272) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1313
* Include error message for JSON processing exception [`#2271`](https://github.com/MarquezProject/marquez/pull/2271) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1414
*In case of JSON processing exceptions Marquez API should return exception message to a client.*
15+
* Fix column lineage when multiple jobs write to same dataset [`#2289`](https://github.com/MarquezProject/marquez/pull/2289) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
16+
*The fix deprecates the way fields `transformationDescription` and `transformationType` are returned. The depracated way of returning those fields will be removed in 0.30.0.*
1517

1618
## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21
1719

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,15 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
146146
output_fields.dataset_name,
147147
output_fields.field_name,
148148
output_fields.type,
149-
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
150-
clr.output_dataset_version_uuid as dataset_version_uuid,
151-
clr.transformation_description,
152-
clr.transformation_type,
153-
clr.created_at,
154-
clr.updated_at
149+
ARRAY_AGG(DISTINCT ARRAY[
150+
input_fields.namespace_name,
151+
input_fields.dataset_name,
152+
CAST(clr.input_dataset_version_uuid AS VARCHAR),
153+
input_fields.field_name,
154+
clr.transformation_description,
155+
clr.transformation_type
156+
]) AS inputFields,
157+
clr.output_dataset_version_uuid as dataset_version_uuid
155158
FROM column_lineage_recursive clr
156159
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
157160
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
@@ -161,11 +164,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
161164
output_fields.dataset_name,
162165
output_fields.field_name,
163166
output_fields.type,
164-
clr.output_dataset_version_uuid,
165-
clr.transformation_description,
166-
clr.transformation_type,
167-
clr.created_at,
168-
clr.updated_at
167+
clr.output_dataset_version_uuid
169168
""")
170169
Set<ColumnLineageNodeData> getLineage(
171170
int depth,
@@ -193,25 +192,23 @@ dataset_fields_view AS (
193192
output_fields.dataset_name,
194193
output_fields.field_name,
195194
output_fields.type,
196-
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
197-
c.output_dataset_version_uuid as dataset_version_uuid,
198-
c.transformation_description,
199-
c.transformation_type,
200-
c.created_at,
201-
c.updated_at
195+
ARRAY_AGG(DISTINCT ARRAY[
196+
input_fields.namespace_name,
197+
input_fields.dataset_name,
198+
CAST(c.input_dataset_version_uuid AS VARCHAR),
199+
input_fields.field_name,
200+
c.transformation_description,
201+
c.transformation_type
202+
]) AS inputFields,
203+
null as dataset_version_uuid
202204
FROM selected_column_lineage c
203205
INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid
204206
LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid
205207
GROUP BY
206208
output_fields.namespace_name,
207209
output_fields.dataset_name,
208210
output_fields.field_name,
209-
output_fields.type,
210-
c.output_dataset_version_uuid,
211-
c.transformation_description,
212-
c.transformation_type,
213-
c.created_at,
214-
c.updated_at
211+
output_fields.type
215212
""")
216213
/**
217214
* Each dataset is identified by a pair of strings (namespace and name). A query returns column

api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@
55

66
package marquez.db.mappers;
77

8-
import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
9-
import static marquez.db.Columns.TRANSFORMATION_TYPE;
108
import static marquez.db.Columns.stringOrNull;
119
import static marquez.db.Columns.stringOrThrow;
12-
import static marquez.db.Columns.uuidOrThrow;
10+
import static marquez.db.Columns.uuidOrNull;
1311

1412
import com.fasterxml.jackson.databind.ObjectMapper;
1513
import com.google.common.collect.ImmutableList;
@@ -37,11 +35,9 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws
3735
return new ColumnLineageNodeData(
3836
stringOrThrow(results, Columns.NAMESPACE_NAME),
3937
stringOrThrow(results, Columns.DATASET_NAME),
40-
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
38+
uuidOrNull(results, Columns.DATASET_VERSION_UUID),
4139
stringOrThrow(results, Columns.FIELD_NAME),
4240
stringOrNull(results, Columns.TYPE),
43-
stringOrNull(results, TRANSFORMATION_DESCRIPTION),
44-
stringOrNull(results, TRANSFORMATION_TYPE),
4541
toInputFields(results, "inputFields"));
4642
}
4743

@@ -57,7 +53,10 @@ public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results,
5753
return ImmutableList.copyOf(
5854
Arrays.asList(deserializedArray).stream()
5955
.map(o -> (String[]) o)
60-
.map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3]))
56+
.map(
57+
arr ->
58+
new InputFieldNodeData(
59+
arr[0], arr[1], UUID.fromString(arr[2]), arr[3], arr[4], arr[5]))
6160
.collect(Collectors.toList()));
6261
}
6362
}

api/src/main/java/marquez/db/models/ColumnLineageNodeData.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,63 @@
55

66
package marquez.db.models;
77

8+
import com.google.common.collect.ImmutableList;
89
import java.util.List;
10+
import java.util.Optional;
911
import java.util.UUID;
12+
import java.util.function.Function;
1013
import javax.annotation.Nullable;
11-
import lombok.AllArgsConstructor;
1214
import lombok.Getter;
1315
import lombok.NonNull;
16+
import marquez.service.models.ColumnLineageInputField;
1417

1518
@Getter
16-
@AllArgsConstructor
1719
public class ColumnLineageNodeData implements NodeData {
1820
@NonNull String namespace;
1921
@NonNull String dataset;
2022
@Nullable UUID datasetVersion;
2123
@NonNull String field;
2224
@Nullable String fieldType;
23-
String transformationDescription;
24-
String transformationType;
25+
@Nullable String transformationDescription;
26+
@Nullable String transformationType;
2527
@NonNull List<InputFieldNodeData> inputFields;
28+
29+
public ColumnLineageNodeData(
30+
String namespace,
31+
String dataset,
32+
UUID datasetVersion,
33+
String field,
34+
String fieldType,
35+
ImmutableList<InputFieldNodeData> inputFields) {
36+
this.namespace = namespace;
37+
this.dataset = dataset;
38+
this.datasetVersion = datasetVersion;
39+
this.field = field;
40+
this.fieldType = fieldType;
41+
this.inputFields = inputFields;
42+
}
43+
44+
/**
45+
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
46+
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
47+
*/
48+
public String getTransformationDescription() {
49+
return Optional.ofNullable(inputFields).map(List::stream).stream()
50+
.flatMap(Function.identity())
51+
.findAny()
52+
.map(d -> d.getTransformationDescription())
53+
.orElse(null);
54+
}
55+
56+
/**
57+
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
58+
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
59+
*/
60+
public String getTransformationType() {
61+
return Optional.ofNullable(inputFields).map(List::stream).stream()
62+
.flatMap(Function.identity())
63+
.findAny()
64+
.map(d -> d.getTransformationType())
65+
.orElse(null);
66+
}
2667
}

api/src/main/java/marquez/db/models/InputFieldNodeData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@ public class InputFieldNodeData {
2020
@NonNull String dataset;
2121
@Nullable UUID datasetVersion;
2222
@NonNull String field;
23+
String transformationDescription;
24+
String transformationType;
2325
}

api/src/main/java/marquez/service/ColumnLineageService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,14 +226,16 @@ public void enrichWithColumnLineage(List<Dataset> datasets) {
226226
.add(
227227
ColumnLineage.builder()
228228
.name(nodeData.getField())
229-
.transformationDescription(nodeData.getTransformationDescription())
230-
.transformationType(nodeData.getTransformationType())
231229
.inputFields(
232230
nodeData.getInputFields().stream()
233231
.map(
234232
f ->
235233
new ColumnLineageInputField(
236-
f.getNamespace(), f.getDataset(), f.getField()))
234+
f.getNamespace(),
235+
f.getDataset(),
236+
f.getField(),
237+
f.getTransformationDescription(),
238+
f.getTransformationType()))
237239
.collect(Collectors.toList()))
238240
.build());
239241
});

api/src/main/java/marquez/service/models/ColumnLineage.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
package marquez.service.models;
77

88
import java.util.List;
9+
import java.util.Optional;
10+
import java.util.function.Function;
11+
import javax.annotation.Nullable;
912
import javax.validation.constraints.NotNull;
1013
import lombok.Builder;
1114
import lombok.EqualsAndHashCode;
@@ -19,6 +22,31 @@
1922
public class ColumnLineage {
2023
@NotNull private String name;
2124
@NotNull private List<ColumnLineageInputField> inputFields;
22-
@NotNull private String transformationDescription;
23-
@NotNull private String transformationType;
25+
26+
@Nullable private String transformationDescription;
27+
@Nullable private String transformationType;
28+
29+
/**
30+
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
31+
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
32+
*/
33+
public String getTransformationDescription() {
34+
return Optional.ofNullable(inputFields).map(List::stream).stream()
35+
.flatMap(Function.identity())
36+
.findAny()
37+
.map(d -> d.getTransformationDescription())
38+
.orElse(null);
39+
}
40+
41+
/**
42+
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
43+
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
44+
*/
45+
public String getTransformationType() {
46+
return Optional.ofNullable(inputFields).map(List::stream).stream()
47+
.flatMap(Function.identity())
48+
.findAny()
49+
.map(d -> d.getTransformationType())
50+
.orElse(null);
51+
}
2452
}

api/src/main/java/marquez/service/models/ColumnLineageInputField.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ public class ColumnLineageInputField {
1919
@NotNull private String namespace;
2020
@NotNull private String dataset;
2121
@NotNull private String field;
22+
@NotNull private String transformationDescription;
23+
@NotNull private String transformationType;
2224
}

0 commit comments

Comments
 (0)