Skip to content

Commit ad91189

Browse files
add column lineage graph endpoint
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent bf8c84e commit ad91189

22 files changed

Lines changed: 1173 additions & 16 deletions

api/src/main/java/marquez/MarquezContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import marquez.api.TagResource;
2323
import marquez.api.exceptions.JdbiExceptionExceptionMapper;
2424
import marquez.db.BaseDao;
25+
import marquez.db.ColumnLevelLineageDao;
2526
import marquez.db.DatasetDao;
2627
import marquez.db.DatasetFieldDao;
2728
import marquez.db.DatasetVersionDao;
@@ -39,6 +40,7 @@
3940
import marquez.db.TagDao;
4041
import marquez.graphql.GraphqlSchemaBuilder;
4142
import marquez.graphql.MarquezGraphqlServletBuilder;
43+
import marquez.service.ColumnLineageService;
4244
import marquez.service.DatasetFieldService;
4345
import marquez.service.DatasetService;
4446
import marquez.service.DatasetVersionService;
@@ -70,6 +72,7 @@ public final class MarquezContext {
7072
@Getter private final TagDao tagDao;
7173
@Getter private final OpenLineageDao openLineageDao;
7274
@Getter private final LineageDao lineageDao;
75+
@Getter private final ColumnLevelLineageDao columnLevelLineageDao;
7376
@Getter private final SearchDao searchDao;
7477
@Getter private final List<RunTransitionListener> runTransitionListeners;
7578

@@ -81,6 +84,7 @@ public final class MarquezContext {
8184
@Getter private final RunService runService;
8285
@Getter private final OpenLineageService openLineageService;
8386
@Getter private final LineageService lineageService;
87+
@Getter private final ColumnLineageService columnLineageService;
8488
@Getter private final NamespaceResource namespaceResource;
8589
@Getter private final SourceResource sourceResource;
8690
@Getter private final DatasetResource datasetResource;
@@ -115,6 +119,7 @@ private MarquezContext(
115119
this.tagDao = jdbi.onDemand(TagDao.class);
116120
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
117121
this.lineageDao = jdbi.onDemand(LineageDao.class);
122+
this.columnLevelLineageDao = jdbi.onDemand(ColumnLevelLineageDao.class);
118123
this.searchDao = jdbi.onDemand(SearchDao.class);
119124
this.runTransitionListeners = runTransitionListeners;
120125

@@ -128,6 +133,7 @@ private MarquezContext(
128133
this.tagService.init(tags);
129134
this.openLineageService = new OpenLineageService(baseDao, runService);
130135
this.lineageService = new LineageService(lineageDao, jobDao);
136+
this.columnLineageService = new ColumnLineageService(columnLevelLineageDao, datasetFieldDao);
131137
this.jdbiException = new JdbiExceptionExceptionMapper();
132138
final ServiceFactory serviceFactory =
133139
ServiceFactory.builder()
@@ -139,6 +145,7 @@ private MarquezContext(
139145
.openLineageService(openLineageService)
140146
.sourceService(sourceService)
141147
.lineageService(lineageService)
148+
.columnLineageService(columnLineageService)
142149
.datasetFieldService(new DatasetFieldService(baseDao))
143150
.datasetVersionService(new DatasetVersionService(baseDao))
144151
.build();

api/src/main/java/marquez/api/BaseResource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import marquez.common.models.NamespaceName;
2626
import marquez.common.models.RunId;
2727
import marquez.common.models.SourceName;
28+
import marquez.service.ColumnLineageService;
2829
import marquez.service.DatasetFieldService;
2930
import marquez.service.DatasetService;
3031
import marquez.service.DatasetVersionService;
@@ -50,6 +51,7 @@ public class BaseResource {
5051
protected DatasetVersionService datasetVersionService;
5152
protected DatasetFieldService datasetFieldService;
5253
protected LineageService lineageService;
54+
protected ColumnLineageService columnLineageService;
5355

5456
public BaseResource(ServiceFactory serviceFactory) {
5557
this.serviceFactory = serviceFactory;
@@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) {
6365
this.datasetVersionService = serviceFactory.getDatasetVersionService();
6466
this.datasetFieldService = serviceFactory.getDatasetFieldService();
6567
this.lineageService = serviceFactory.getLineageService();
68+
this.columnLineageService = serviceFactory.getColumnLineageService();
6669
}
6770

6871
void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.api;
7+
8+
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
9+
10+
import com.codahale.metrics.annotation.ExceptionMetered;
11+
import com.codahale.metrics.annotation.ResponseMetered;
12+
import com.codahale.metrics.annotation.Timed;
13+
import java.time.Instant;
14+
import java.util.concurrent.ExecutionException;
15+
import javax.validation.constraints.NotNull;
16+
import javax.ws.rs.Consumes;
17+
import javax.ws.rs.DefaultValue;
18+
import javax.ws.rs.GET;
19+
import javax.ws.rs.Path;
20+
import javax.ws.rs.Produces;
21+
import javax.ws.rs.QueryParam;
22+
import javax.ws.rs.core.Response;
23+
import lombok.NonNull;
24+
import lombok.extern.slf4j.Slf4j;
25+
import marquez.service.ServiceFactory;
26+
import marquez.service.models.NodeId;
27+
28+
@Slf4j
29+
@Path("/api/v1/column-lineage")
30+
public class ColumnLineageResource extends BaseResource {
31+
32+
private static final String DEFAULT_DEPTH = "20";
33+
34+
public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
35+
super(serviceFactory);
36+
}
37+
38+
@Timed
39+
@ResponseMetered
40+
@ExceptionMetered
41+
@GET
42+
@Consumes(APPLICATION_JSON)
43+
@Produces(APPLICATION_JSON)
44+
public Response getLineage(
45+
@QueryParam("nodeId") @NotNull NodeId nodeId,
46+
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
47+
throws ExecutionException, InterruptedException {
48+
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
49+
}
50+
51+
// TODO: endpoint tests
52+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.common.models;
7+
8+
import lombok.AllArgsConstructor;
9+
import lombok.EqualsAndHashCode;
10+
import lombok.Getter;
11+
import lombok.ToString;
12+
13+
/** ID for {@code DatasetField}. */
14+
@EqualsAndHashCode
15+
@AllArgsConstructor
16+
@ToString
17+
public class DatasetFieldId {
18+
19+
@Getter private final DatasetId datasetId;
20+
@Getter private final FieldName fieldName;
21+
22+
public static DatasetFieldId of(String namespace, String datasetName, String field) {
23+
return new DatasetFieldId(
24+
new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)),
25+
FieldName.of(field));
26+
}
27+
}

api/src/main/java/marquez/db/ColumnLevelLineageDao.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,27 @@
55

66
package marquez.db;
77

8+
import static org.jdbi.v3.sqlobject.customizer.BindList.EmptyHandling.NULL_STRING;
9+
810
import java.time.Instant;
911
import java.util.Collections;
1012
import java.util.List;
13+
import java.util.Set;
1114
import java.util.UUID;
1215
import java.util.stream.Collectors;
1316
import marquez.db.mappers.ColumnLevelLineageRowMapper;
17+
import marquez.db.mappers.ColumnLineageNodeDataMapper;
1418
import marquez.db.models.ColumnLevelLineageRow;
19+
import marquez.db.models.ColumnLineageNodeData;
1520
import org.apache.commons.lang3.tuple.Pair;
1621
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
1722
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
23+
import org.jdbi.v3.sqlobject.customizer.BindList;
1824
import org.jdbi.v3.sqlobject.statement.SqlQuery;
1925
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
2026

2127
@RegisterRowMapper(ColumnLevelLineageRowMapper.class)
28+
@RegisterRowMapper(ColumnLineageNodeDataMapper.class)
2229
public interface ColumnLevelLineageDao extends BaseDao {
2330

2431
default List<ColumnLevelLineageRow> upsertColumnLevelLineageRow(
@@ -90,4 +97,59 @@ void doUpsertColumnLevelLineageRow(
9097
},
9198
value = "values")
9299
ColumnLevelLineageRow... rows);
100+
101+
@SqlQuery(
102+
"""
103+
WITH RECURSIVE
104+
dataset_fields_view AS (
105+
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
106+
FROM dataset_fields df
107+
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
108+
),
109+
column_lineage_recursive AS (
110+
SELECT *, 0 as depth
111+
FROM column_level_lineage
112+
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
113+
UNION
114+
SELECT
115+
upstream_node.output_dataset_version_uuid,
116+
upstream_node.output_dataset_field_uuid,
117+
upstream_node.input_dataset_version_uuid,
118+
upstream_node.input_dataset_field_uuid,
119+
upstream_node.transformation_description,
120+
upstream_node.transformation_type,
121+
upstream_node.created_at,
122+
upstream_node.updated_at,
123+
node.depth + 1 as depth
124+
FROM column_level_lineage upstream_node, column_lineage_recursive node
125+
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
126+
AND node.depth < :depth
127+
)
128+
SELECT
129+
output_fields.namespace_name,
130+
output_fields.dataset_name,
131+
output_fields.field_name,
132+
output_fields.type,
133+
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
134+
clr.transformation_description,
135+
clr.transformation_type,
136+
clr.created_at,
137+
clr.updated_at
138+
FROM column_lineage_recursive clr
139+
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
140+
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
141+
GROUP BY
142+
output_fields.namespace_name,
143+
output_fields.dataset_name,
144+
output_fields.field_name,
145+
output_fields.type,
146+
clr.transformation_description,
147+
clr.transformation_type,
148+
clr.created_at,
149+
clr.updated_at
150+
""")
151+
Set<ColumnLineageNodeData> getLineage(
152+
int depth,
153+
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
154+
Instant createdAtUntil);
93155
}

api/src/main/java/marquez/db/DatasetFieldDao.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,25 @@ default Dataset updateTags(
9393
+ "WHERE dataset_uuid = :datasetUuid AND name = :name")
9494
Optional<UUID> findUuid(UUID datasetUuid, String name);
9595

96+
@SqlQuery(
97+
"""
98+
SELECT df.uuid
99+
FROM dataset_fields df
100+
INNER JOIN datasets_view AS d
101+
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
102+
""")
103+
List<UUID> findDatasetFieldsUuids(String namespace, String datasetName);
104+
105+
@SqlQuery(
106+
"""
107+
SELECT df.uuid
108+
FROM dataset_fields df
109+
INNER JOIN datasets_view AS d
110+
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
111+
WHERE df.name = :name
112+
""")
113+
Optional<UUID> findUuid(String namespace, String datasetName, String name);
114+
96115
@SqlQuery(
97116
"SELECT f.*, "
98117
+ "ARRAY(SELECT t.name "

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ private List<ColumnLevelLineageRow> upsertColumnLineage(
703703
if (outputField.isEmpty()) {
704704
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
705705
log.error(
706-
"Cannot produce column lineage for missing output field in output dataset: %s",
706+
"Cannot produce column lineage for missing output field in output dataset: {}",
707707
outputColumn.getName());
708708
return Stream.empty();
709709
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package marquez.db.mappers;
2+
3+
import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
4+
import static marquez.db.Columns.TRANSFORMATION_TYPE;
5+
import static marquez.db.Columns.stringOrThrow;
6+
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.google.common.collect.ImmutableList;
9+
import java.sql.ResultSet;
10+
import java.sql.SQLException;
11+
import java.util.Arrays;
12+
import java.util.List;
13+
import java.util.stream.Collectors;
14+
import lombok.extern.slf4j.Slf4j;
15+
import marquez.common.Utils;
16+
import marquez.db.Columns;
17+
import marquez.db.models.ColumnLineageNodeData;
18+
import org.jdbi.v3.core.mapper.RowMapper;
19+
import org.jdbi.v3.core.statement.StatementContext;
20+
import org.postgresql.jdbc.PgArray;
21+
22+
@Slf4j
23+
public class ColumnLineageNodeDataMapper implements RowMapper<ColumnLineageNodeData> {
24+
25+
private static final ObjectMapper MAPPER = Utils.getMapper();
26+
27+
@Override
28+
public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws SQLException {
29+
return new ColumnLineageNodeData(
30+
stringOrThrow(results, Columns.NAMESPACE_NAME),
31+
stringOrThrow(results, Columns.DATASET_NAME),
32+
stringOrThrow(results, Columns.FIELD_NAME),
33+
stringOrThrow(results, Columns.TYPE),
34+
stringOrThrow(results, TRANSFORMATION_DESCRIPTION),
35+
stringOrThrow(results, TRANSFORMATION_TYPE),
36+
toInputFields(results, "inputFields"));
37+
}
38+
39+
public static ImmutableList<List<String>> toInputFields(ResultSet results, String column)
40+
throws SQLException {
41+
if (results.getObject(column) == null) {
42+
return ImmutableList.of();
43+
}
44+
45+
PgArray pgArray = (PgArray) results.getObject(column);
46+
Object[] deserializedArray = (Object[]) pgArray.getArray();
47+
48+
return ImmutableList.copyOf(
49+
Arrays.asList(deserializedArray).stream()
50+
.map(o -> (String[]) o)
51+
.map(
52+
arr ->
53+
Arrays.asList(
54+
arr[0], arr[1],
55+
arr[2])) // TODO: add check array size and write unit test for this
56+
.collect(Collectors.toList()));
57+
}
58+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db.models;
7+
8+
import java.util.List;
9+
import lombok.AllArgsConstructor;
10+
import lombok.Getter;
11+
import lombok.NonNull;
12+
13+
@Getter
14+
@AllArgsConstructor
15+
public class ColumnLineageNodeData implements NodeData {
16+
@NonNull String namespace;
17+
@NonNull String name;
18+
@NonNull String field;
19+
@NonNull String type;
20+
@NonNull String transformationDescription;
21+
@NonNull String transformationType;
22+
@NonNull List<List<String>> inputFields;
23+
}

0 commit comments

Comments
 (0)