Skip to content

Commit 8d6e907

Browse files
point-in-timea for column-level lineage
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 0c78cd8 commit 8d6e907

17 files changed

Lines changed: 467 additions & 357 deletions

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Changelog
22

33
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.28.0...HEAD)
4+
* Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
5+
*Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.*
46

57
## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21
68

api/src/main/java/marquez/api/ColumnLineageResource.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.codahale.metrics.annotation.ExceptionMetered;
1111
import com.codahale.metrics.annotation.ResponseMetered;
1212
import com.codahale.metrics.annotation.Timed;
13-
import java.time.Instant;
1413
import java.util.concurrent.ExecutionException;
1514
import javax.validation.constraints.NotNull;
1615
import javax.ws.rs.DefaultValue;
@@ -44,7 +43,10 @@ public Response getLineage(
4443
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
4544
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
4645
throws ExecutionException, InterruptedException {
47-
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
48-
.build();
46+
if (nodeId.hasVersion() && withDownstream) {
47+
return Response.status(400, "Node version cannot be specified when withDownstream is true")
48+
.build();
49+
}
50+
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream)).build();
4951
}
5052
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.common.models;
7+
8+
import java.util.UUID;
9+
import lombok.AllArgsConstructor;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.ToString;
13+
14+
/** ID for {@code DatasetField} with a version of {@code Dataset}. */
15+
@EqualsAndHashCode
16+
@AllArgsConstructor
17+
@ToString
18+
public class DatasetFieldVersionId {
19+
20+
@Getter private final DatasetId datasetId;
21+
@Getter private final FieldName fieldName;
22+
@Getter private final UUID version;
23+
24+
public static DatasetFieldVersionId of(
25+
String namespace, String datasetName, String field, UUID version) {
26+
return new DatasetFieldVersionId(
27+
new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)),
28+
FieldName.of(field),
29+
version);
30+
}
31+
}

api/src/main/java/marquez/common/models/JobVersionId.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,9 @@ public class JobVersionId {
1919
@NonNull NamespaceName namespace;
2020
@NonNull JobName name;
2121
@NonNull UUID version;
22+
23+
public static JobVersionId of(
24+
final NamespaceName namespaceName, final JobName jobName, final UUID version) {
25+
return new JobVersionId(namespaceName, jobName, version);
26+
}
2227
}

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
146146
output_fields.dataset_name,
147147
output_fields.field_name,
148148
output_fields.type,
149-
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
149+
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
150+
clr.output_dataset_version_uuid as dataset_version_uuid,
150151
clr.transformation_description,
151152
clr.transformation_type,
152153
clr.created_at,
@@ -160,6 +161,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
160161
output_fields.dataset_name,
161162
output_fields.field_name,
162163
output_fields.type,
164+
clr.output_dataset_version_uuid,
163165
clr.transformation_description,
164166
clr.transformation_type,
165167
clr.created_at,
@@ -191,7 +193,8 @@ dataset_fields_view AS (
191193
output_fields.dataset_name,
192194
output_fields.field_name,
193195
output_fields.type,
194-
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
196+
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
197+
c.output_dataset_version_uuid as dataset_version_uuid,
195198
c.transformation_description,
196199
c.transformation_type,
197200
c.created_at,
@@ -204,6 +207,7 @@ dataset_fields_view AS (
204207
output_fields.dataset_name,
205208
output_fields.field_name,
206209
output_fields.type,
210+
c.output_dataset_version_uuid,
207211
c.transformation_description,
208212
c.transformation_type,
209213
c.created_at,

api/src/main/java/marquez/db/DatasetFieldDao.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
import marquez.db.mappers.DatasetFieldMapper;
1818
import marquez.db.mappers.DatasetFieldRowMapper;
1919
import marquez.db.mappers.FieldDataMapper;
20+
import marquez.db.mappers.PairUuidInstantMapper;
2021
import marquez.db.models.DatasetFieldRow;
2122
import marquez.db.models.DatasetRow;
2223
import marquez.db.models.InputFieldData;
2324
import marquez.db.models.TagRow;
2425
import marquez.service.models.Dataset;
2526
import marquez.service.models.DatasetVersion;
27+
import org.apache.commons.lang3.tuple.Pair;
2628
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
2729
import org.jdbi.v3.sqlobject.customizer.BindBean;
2830
import org.jdbi.v3.sqlobject.statement.SqlBatch;
@@ -32,6 +34,7 @@
3234
@RegisterRowMapper(DatasetFieldRowMapper.class)
3335
@RegisterRowMapper(DatasetFieldMapper.class)
3436
@RegisterRowMapper(FieldDataMapper.class)
37+
@RegisterRowMapper(PairUuidInstantMapper.class)
3538
public interface DatasetFieldDao extends BaseDao {
3639
@SqlQuery(
3740
"""
@@ -98,13 +101,26 @@ default Dataset updateTags(
98101

99102
@SqlQuery(
100103
"""
101-
SELECT df.uuid
102-
FROM dataset_fields df
104+
SELECT df.uuid, max(dv.created_at)
105+
FROM dataset_fields df
103106
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
107+
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
108+
JOIN dataset_versions AS dv ON dv.uuid = fm.dataset_version_uuid
104109
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
110+
GROUP BY df.uuid
105111
""")
106112
List<UUID> findDatasetFieldsUuids(String namespaceName, String datasetName);
107113

114+
@SqlQuery(
115+
"""
116+
SELECT df.uuid, dv.created_at
117+
FROM dataset_fields df
118+
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
119+
JOIN dataset_versions AS dv ON dv.uuid = :datasetVersion
120+
WHERE fm.dataset_version_uuid = :datasetVersion
121+
""")
122+
List<Pair<UUID, Instant>> findDatasetVersionFieldsUuids(UUID datasetVersion);
123+
108124
@SqlQuery(
109125
"""
110126
WITH latest_run AS (
@@ -121,6 +137,15 @@ WITH latest_run AS (
121137
""")
122138
List<UUID> findFieldsUuidsByJob(String namespaceName, String jobName);
123139

140+
@SqlQuery(
141+
"""
142+
SELECT dataset_fields.uuid, r.created_at
143+
FROM dataset_fields
144+
JOIN dataset_versions ON dataset_versions.dataset_uuid = dataset_fields.dataset_uuid
145+
JOIN runs_view r ON r.job_version_uuid = :jobVersion
146+
""")
147+
List<Pair<UUID, Instant>> findFieldsUuidsByJobVersion(UUID jobVersion);
148+
124149
@SqlQuery(
125150
"""
126151
SELECT df.uuid
@@ -131,6 +156,17 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli
131156
""")
132157
Optional<UUID> findUuid(String namespaceName, String datasetName, String name);
133158

159+
@SqlQuery(
160+
"""
161+
SELECT df.uuid, dv.created_at
162+
FROM dataset_fields df
163+
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
164+
JOIN dataset_versions AS dv ON dv.uuid = :datasetVersion
165+
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
166+
WHERE fm.dataset_version_uuid = :datasetVersion AND df.name = :fieldName
167+
""")
168+
List<Pair<UUID, Instant>> findDatasetVersionFieldsUuids(String fieldName, UUID datasetVersion);
169+
134170
@SqlQuery(
135171
"SELECT f.*, "
136172
+ "ARRAY(SELECT t.name "

api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99
import static marquez.db.Columns.TRANSFORMATION_TYPE;
1010
import static marquez.db.Columns.stringOrNull;
1111
import static marquez.db.Columns.stringOrThrow;
12+
import static marquez.db.Columns.uuidOrThrow;
1213

1314
import com.fasterxml.jackson.databind.ObjectMapper;
1415
import com.google.common.collect.ImmutableList;
1516
import java.sql.ResultSet;
1617
import java.sql.SQLException;
1718
import java.util.Arrays;
19+
import java.util.UUID;
1820
import java.util.stream.Collectors;
1921
import lombok.extern.slf4j.Slf4j;
2022
import marquez.common.Utils;
@@ -35,6 +37,7 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws
3537
return new ColumnLineageNodeData(
3638
stringOrThrow(results, Columns.NAMESPACE_NAME),
3739
stringOrThrow(results, Columns.DATASET_NAME),
40+
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
3841
stringOrThrow(results, Columns.FIELD_NAME),
3942
stringOrThrow(results, Columns.TYPE),
4043
stringOrNull(results, TRANSFORMATION_DESCRIPTION),
@@ -54,7 +57,7 @@ public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results,
5457
return ImmutableList.copyOf(
5558
Arrays.asList(deserializedArray).stream()
5659
.map(o -> (String[]) o)
57-
.map(arr -> new InputFieldNodeData(arr[0], arr[1], arr[2]))
60+
.map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3]))
5861
.collect(Collectors.toList()));
5962
}
6063
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db.mappers;
7+
8+
import static marquez.db.Columns.timestampOrNull;
9+
import static marquez.db.Columns.uuidOrThrow;
10+
11+
import java.sql.ResultSet;
12+
import java.sql.SQLException;
13+
import java.time.Instant;
14+
import java.util.UUID;
15+
import lombok.NonNull;
16+
import marquez.db.Columns;
17+
import org.apache.commons.lang3.tuple.Pair;
18+
import org.jdbi.v3.core.mapper.RowMapper;
19+
import org.jdbi.v3.core.statement.StatementContext;
20+
21+
public final class PairUuidInstantMapper implements RowMapper<Pair<UUID, Instant>> {
22+
@Override
23+
public Pair<UUID, Instant> map(@NonNull ResultSet results, @NonNull StatementContext context)
24+
throws SQLException {
25+
return Pair.of(
26+
uuidOrThrow(results, Columns.ROW_UUID), timestampOrNull(results, Columns.CREATED_AT));
27+
}
28+
}

api/src/main/java/marquez/db/models/ColumnLineageNodeData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package marquez.db.models;
77

88
import java.util.List;
9+
import java.util.UUID;
10+
import javax.annotation.Nullable;
911
import lombok.AllArgsConstructor;
1012
import lombok.Getter;
1113
import lombok.NonNull;
@@ -15,6 +17,7 @@
1517
public class ColumnLineageNodeData implements NodeData {
1618
@NonNull String namespace;
1719
@NonNull String dataset;
20+
@Nullable UUID datasetVersion;
1821
@NonNull String field;
1922
@NonNull String fieldType;
2023
String transformationDescription;

api/src/main/java/marquez/db/models/InputFieldNodeData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55

66
package marquez.db.models;
77

8+
import java.util.UUID;
9+
import javax.annotation.Nullable;
810
import lombok.AllArgsConstructor;
911
import lombok.Getter;
1012
import lombok.NonNull;
13+
import lombok.ToString;
1114

1215
@Getter
1316
@AllArgsConstructor
17+
@ToString
1418
public class InputFieldNodeData {
1519
@NonNull String namespace;
1620
@NonNull String dataset;
21+
@Nullable UUID datasetVersion;
1722
@NonNull String field;
1823
}

0 commit comments

Comments
 (0)