Skip to content

Commit c65ecdd

Browse files
add column lineage graph endpoint
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent b6544ec commit c65ecdd

27 files changed

Lines changed: 1281 additions & 22 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Added
66
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
77
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
8+
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
89

910
### Fixed
1011
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)

api/src/main/java/marquez/MarquezContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import marquez.api.TagResource;
2323
import marquez.api.exceptions.JdbiExceptionExceptionMapper;
2424
import marquez.db.BaseDao;
25+
import marquez.db.ColumnLineageDao;
2526
import marquez.db.DatasetDao;
2627
import marquez.db.DatasetFieldDao;
2728
import marquez.db.DatasetVersionDao;
@@ -39,6 +40,7 @@
3940
import marquez.db.TagDao;
4041
import marquez.graphql.GraphqlSchemaBuilder;
4142
import marquez.graphql.MarquezGraphqlServletBuilder;
43+
import marquez.service.ColumnLineageService;
4244
import marquez.service.DatasetFieldService;
4345
import marquez.service.DatasetService;
4446
import marquez.service.DatasetVersionService;
@@ -70,6 +72,7 @@ public final class MarquezContext {
7072
@Getter private final TagDao tagDao;
7173
@Getter private final OpenLineageDao openLineageDao;
7274
@Getter private final LineageDao lineageDao;
75+
@Getter private final ColumnLineageDao columnLineageDao;
7376
@Getter private final SearchDao searchDao;
7477
@Getter private final List<RunTransitionListener> runTransitionListeners;
7578

@@ -81,6 +84,7 @@ public final class MarquezContext {
8184
@Getter private final RunService runService;
8285
@Getter private final OpenLineageService openLineageService;
8386
@Getter private final LineageService lineageService;
87+
@Getter private final ColumnLineageService columnLineageService;
8488
@Getter private final NamespaceResource namespaceResource;
8589
@Getter private final SourceResource sourceResource;
8690
@Getter private final DatasetResource datasetResource;
@@ -115,6 +119,7 @@ private MarquezContext(
115119
this.tagDao = jdbi.onDemand(TagDao.class);
116120
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
117121
this.lineageDao = jdbi.onDemand(LineageDao.class);
122+
this.columnLineageDao = jdbi.onDemand(ColumnLineageDao.class);
118123
this.searchDao = jdbi.onDemand(SearchDao.class);
119124
this.runTransitionListeners = runTransitionListeners;
120125

@@ -128,6 +133,7 @@ private MarquezContext(
128133
this.tagService.init(tags);
129134
this.openLineageService = new OpenLineageService(baseDao, runService);
130135
this.lineageService = new LineageService(lineageDao, jobDao);
136+
this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao);
131137
this.jdbiException = new JdbiExceptionExceptionMapper();
132138
final ServiceFactory serviceFactory =
133139
ServiceFactory.builder()
@@ -139,6 +145,7 @@ private MarquezContext(
139145
.openLineageService(openLineageService)
140146
.sourceService(sourceService)
141147
.lineageService(lineageService)
148+
.columnLineageService(columnLineageService)
142149
.datasetFieldService(new DatasetFieldService(baseDao))
143150
.datasetVersionService(new DatasetVersionService(baseDao))
144151
.build();

api/src/main/java/marquez/api/BaseResource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import marquez.common.models.NamespaceName;
2626
import marquez.common.models.RunId;
2727
import marquez.common.models.SourceName;
28+
import marquez.service.ColumnLineageService;
2829
import marquez.service.DatasetFieldService;
2930
import marquez.service.DatasetService;
3031
import marquez.service.DatasetVersionService;
@@ -50,6 +51,7 @@ public class BaseResource {
5051
protected DatasetVersionService datasetVersionService;
5152
protected DatasetFieldService datasetFieldService;
5253
protected LineageService lineageService;
54+
protected ColumnLineageService columnLineageService;
5355

5456
public BaseResource(ServiceFactory serviceFactory) {
5557
this.serviceFactory = serviceFactory;
@@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) {
6365
this.datasetVersionService = serviceFactory.getDatasetVersionService();
6466
this.datasetFieldService = serviceFactory.getDatasetFieldService();
6567
this.lineageService = serviceFactory.getLineageService();
68+
this.columnLineageService = serviceFactory.getColumnLineageService();
6669
}
6770

6871
void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.api;
7+
8+
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
9+
10+
import com.codahale.metrics.annotation.ExceptionMetered;
11+
import com.codahale.metrics.annotation.ResponseMetered;
12+
import com.codahale.metrics.annotation.Timed;
13+
import java.time.Instant;
14+
import java.util.concurrent.ExecutionException;
15+
import javax.validation.constraints.NotNull;
16+
import javax.ws.rs.Consumes;
17+
import javax.ws.rs.DefaultValue;
18+
import javax.ws.rs.GET;
19+
import javax.ws.rs.Path;
20+
import javax.ws.rs.Produces;
21+
import javax.ws.rs.QueryParam;
22+
import javax.ws.rs.core.Response;
23+
import lombok.NonNull;
24+
import lombok.extern.slf4j.Slf4j;
25+
import marquez.service.ServiceFactory;
26+
import marquez.service.models.NodeId;
27+
28+
@Slf4j
29+
@Path("/api/v1/column-lineage")
30+
public class ColumnLineageResource extends BaseResource {
31+
32+
private static final String DEFAULT_DEPTH = "20";
33+
34+
public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
35+
super(serviceFactory);
36+
}
37+
38+
@Timed
39+
@ResponseMetered
40+
@ExceptionMetered
41+
@GET
42+
@Consumes(APPLICATION_JSON)
43+
@Produces(APPLICATION_JSON)
44+
public Response getLineage(
45+
@QueryParam("nodeId") @NotNull NodeId nodeId,
46+
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
47+
throws ExecutionException, InterruptedException {
48+
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
49+
}
50+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.common.models;
7+
8+
import lombok.AllArgsConstructor;
9+
import lombok.EqualsAndHashCode;
10+
import lombok.Getter;
11+
import lombok.ToString;
12+
13+
/** ID for {@code DatasetField}. */
14+
@EqualsAndHashCode
15+
@AllArgsConstructor
16+
@ToString
17+
public class DatasetFieldId {
18+
19+
@Getter private final DatasetId datasetId;
20+
@Getter private final FieldName fieldName;
21+
22+
public static DatasetFieldId of(String namespace, String datasetName, String field) {
23+
return new DatasetFieldId(
24+
new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)),
25+
FieldName.of(field));
26+
}
27+
}

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,27 @@
55

66
package marquez.db;
77

8+
import static org.jdbi.v3.sqlobject.customizer.BindList.EmptyHandling.NULL_STRING;
9+
810
import java.time.Instant;
911
import java.util.Collections;
1012
import java.util.List;
13+
import java.util.Set;
1114
import java.util.UUID;
1215
import java.util.stream.Collectors;
16+
import marquez.db.mappers.ColumnLineageNodeDataMapper;
1317
import marquez.db.mappers.ColumnLineageRowMapper;
18+
import marquez.db.models.ColumnLineageNodeData;
1419
import marquez.db.models.ColumnLineageRow;
1520
import org.apache.commons.lang3.tuple.Pair;
1621
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
1722
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
23+
import org.jdbi.v3.sqlobject.customizer.BindList;
1824
import org.jdbi.v3.sqlobject.statement.SqlQuery;
1925
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
2026

2127
@RegisterRowMapper(ColumnLineageRowMapper.class)
28+
@RegisterRowMapper(ColumnLineageNodeDataMapper.class)
2229
public interface ColumnLineageDao extends BaseDao {
2330

2431
default List<ColumnLineageRow> upsertColumnLineageRow(
@@ -88,4 +95,59 @@ void doUpsertColumnLineageRow(
8895
},
8996
value = "values")
9097
List<ColumnLineageRow> rows);
98+
99+
@SqlQuery(
100+
"""
101+
WITH RECURSIVE
102+
dataset_fields_view AS (
103+
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
104+
FROM dataset_fields df
105+
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
106+
),
107+
column_lineage_recursive AS (
108+
SELECT *, 0 as depth
109+
FROM column_lineage
110+
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
111+
UNION
112+
SELECT
113+
upstream_node.output_dataset_version_uuid,
114+
upstream_node.output_dataset_field_uuid,
115+
upstream_node.input_dataset_version_uuid,
116+
upstream_node.input_dataset_field_uuid,
117+
upstream_node.transformation_description,
118+
upstream_node.transformation_type,
119+
upstream_node.created_at,
120+
upstream_node.updated_at,
121+
node.depth + 1 as depth
122+
FROM column_lineage upstream_node, column_lineage_recursive node
123+
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
124+
AND node.depth < :depth
125+
)
126+
SELECT
127+
output_fields.namespace_name,
128+
output_fields.dataset_name,
129+
output_fields.field_name,
130+
output_fields.type,
131+
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
132+
clr.transformation_description,
133+
clr.transformation_type,
134+
clr.created_at,
135+
clr.updated_at
136+
FROM column_lineage_recursive clr
137+
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
138+
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
139+
GROUP BY
140+
output_fields.namespace_name,
141+
output_fields.dataset_name,
142+
output_fields.field_name,
143+
output_fields.type,
144+
clr.transformation_description,
145+
clr.transformation_type,
146+
clr.created_at,
147+
clr.updated_at
148+
""")
149+
Set<ColumnLineageNodeData> getLineage(
150+
int depth,
151+
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
152+
Instant createdAtUntil);
91153
}

api/src/main/java/marquez/db/DatasetFieldDao.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,25 @@ default Dataset updateTags(
9393
+ "WHERE dataset_uuid = :datasetUuid AND name = :name")
9494
Optional<UUID> findUuid(UUID datasetUuid, String name);
9595

96+
@SqlQuery(
97+
"""
98+
SELECT df.uuid
99+
FROM dataset_fields df
100+
INNER JOIN datasets_view AS d
101+
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
102+
""")
103+
List<UUID> findDatasetFieldsUuids(String namespace, String datasetName);
104+
105+
@SqlQuery(
106+
"""
107+
SELECT df.uuid
108+
FROM dataset_fields df
109+
INNER JOIN datasets_view AS d
110+
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
111+
WHERE df.name = :name
112+
""")
113+
Optional<UUID> findUuid(String namespace, String datasetName, String name);
114+
96115
@SqlQuery(
97116
"SELECT f.*, "
98117
+ "ARRAY(SELECT t.name "

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -757,10 +757,9 @@ private List<ColumnLineageRow> upsertColumnLineage(
757757
outputColumn.getInputFields().stream()
758758
.filter(
759759
of ->
760-
of.getDatasetNamespace().equals(fieldData.getNamespace())
761-
&& of.getDatasetName()
762-
.equals(fieldData.getDatasetName())
763-
&& of.getFieldName().equals(fieldData.getField()))
760+
of.getNamespace().equals(fieldData.getNamespace())
761+
&& of.getName().equals(fieldData.getDatasetName())
762+
&& of.getField().equals(fieldData.getField()))
764763
.findAny()
765764
.isPresent())
766765
.map(
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package marquez.db.mappers;
2+
3+
import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
4+
import static marquez.db.Columns.TRANSFORMATION_TYPE;
5+
import static marquez.db.Columns.stringOrThrow;
6+
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.google.common.collect.ImmutableList;
9+
import java.sql.ResultSet;
10+
import java.sql.SQLException;
11+
import java.util.Arrays;
12+
import java.util.stream.Collectors;
13+
import lombok.extern.slf4j.Slf4j;
14+
import marquez.common.Utils;
15+
import marquez.db.Columns;
16+
import marquez.db.models.ColumnLineageNodeData;
17+
import marquez.db.models.InputFieldNodeData;
18+
import org.jdbi.v3.core.mapper.RowMapper;
19+
import org.jdbi.v3.core.statement.StatementContext;
20+
import org.postgresql.jdbc.PgArray;
21+
22+
@Slf4j
23+
public class ColumnLineageNodeDataMapper implements RowMapper<ColumnLineageNodeData> {
24+
25+
private static final ObjectMapper MAPPER = Utils.getMapper();
26+
27+
@Override
28+
public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws SQLException {
29+
return new ColumnLineageNodeData(
30+
stringOrThrow(results, Columns.NAMESPACE_NAME),
31+
stringOrThrow(results, Columns.DATASET_NAME),
32+
stringOrThrow(results, Columns.FIELD_NAME),
33+
stringOrThrow(results, Columns.TYPE),
34+
stringOrThrow(results, TRANSFORMATION_DESCRIPTION),
35+
stringOrThrow(results, TRANSFORMATION_TYPE),
36+
toInputFields(results, "inputFields"));
37+
}
38+
39+
public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results, String column)
40+
throws SQLException {
41+
if (results.getObject(column) == null) {
42+
return ImmutableList.of();
43+
}
44+
45+
PgArray pgArray = (PgArray) results.getObject(column);
46+
Object[] deserializedArray = (Object[]) pgArray.getArray();
47+
48+
return ImmutableList.copyOf(
49+
Arrays.asList(deserializedArray).stream()
50+
.map(o -> (String[]) o)
51+
.map(arr -> new InputFieldNodeData(arr[0], arr[1], arr[2]))
52+
.collect(Collectors.toList()));
53+
}
54+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db.models;
7+
8+
import java.util.List;
9+
import lombok.AllArgsConstructor;
10+
import lombok.Getter;
11+
import lombok.NonNull;
12+
13+
@Getter
14+
@AllArgsConstructor
15+
public class ColumnLineageNodeData implements NodeData {
16+
@NonNull String namespace;
17+
@NonNull String dataset;
18+
@NonNull String field;
19+
@NonNull String fieldType;
20+
@NonNull String transformationDescription;
21+
@NonNull String transformationType;
22+
@NonNull List<InputFieldNodeData> inputFields;
23+
}

0 commit comments

Comments
 (0)