Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
5 changes: 4 additions & 1 deletion api/src/main/java/marquez/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import javax.validation.Valid;
Expand Down Expand Up @@ -85,10 +86,11 @@ public Response getDataset(
@PathParam("dataset") DatasetName datasetName) {
throwIfNotExists(namespaceName);

final Dataset dataset =
Dataset dataset =
datasetService
.findWithTags(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
columnLineageService.enrichWithColumnLineage(Arrays.asList(dataset));
return Response.ok(dataset).build();
}

Expand Down Expand Up @@ -147,6 +149,7 @@ public Response list(

final List<Dataset> datasets =
datasetService.findAllWithTags(namespaceName.getValue(), limit, offset);
columnLineageService.enrichWithColumnLineage(datasets);
final int totalCount = datasetService.countFor(namespaceName.getValue());
return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build();
}
Expand Down
50 changes: 50 additions & 0 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,54 @@ Set<ColumnLineageNodeData> getLineage(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
Instant createdAtUntil);

@SqlQuery(
"""
WITH selected_column_lineage AS (
SELECT DISTINCT ON (cl.output_dataset_field_uuid, cl.input_dataset_field_uuid) cl.*
FROM column_lineage cl
JOIN dataset_fields df ON df.uuid = cl.output_dataset_field_uuid
JOIN datasets_view dv ON dv.uuid = df.dataset_uuid
WHERE ARRAY[<values>]::DATASET_NAME[] && dv.dataset_symlinks -- array of string pairs is cast onto array of DATASET_NAME types to be checked if it has non-empty intersection with dataset symlinks
ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at
),
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
FROM selected_column_lineage c
INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid
LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
""")
/**
* Each dataset is identified by a pair of strings (namespace and name). A query returns column
* lineage for multiple datasets, that's why a list of pairs is expected as an argument. "left"
* and "right" properties correspond to Java Pair class properties defined to bind query template
* with values
*/
Set<ColumnLineageNodeData> getLineageRowsForDatasets(
@BindBeanList(
propertyNames = {"left", "right"},
Comment thread
wslulciuc marked this conversation as resolved.
value = "values")
List<Pair<String, String>> datasets);
}
27 changes: 15 additions & 12 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
@RegisterRowMapper(FieldDataMapper.class)
public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
+ "SELECT 1 FROM dataset_fields AS df "
+ "INNER JOIN datasets_view AS d "
+ " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName "
+ "WHERE df.name = :name)")
"""
SELECT EXISTS (
SELECT 1 FROM dataset_fields AS df
INNER JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
AND df.name = :name
)
""")
boolean exists(String namespaceName, String datasetName, String name);

default Dataset updateTags(
Expand Down Expand Up @@ -97,20 +100,20 @@ default Dataset updateTags(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
""")
List<UUID> findDatasetFieldsUuids(String namespace, String datasetName);
List<UUID> findDatasetFieldsUuids(String namespaceName, String datasetName);

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
WHERE df.name = :name
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
AND df.name = :name
""")
Optional<UUID> findUuid(String namespace, String datasetName, String name);
Optional<UUID> findUuid(String namespaceName, String datasetName, String name);

@SqlQuery(
"SELECT f.*, "
Expand Down
51 changes: 51 additions & 0 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import marquez.db.ColumnLineageDao;
import marquez.db.DatasetFieldDao;
import marquez.db.models.ColumnLineageNodeData;
import marquez.service.models.ColumnLineage;
import marquez.service.models.ColumnLineageInputField;
import marquez.service.models.Dataset;
import marquez.service.models.Edge;
import marquez.service.models.Lineage;
import marquez.service.models.Node;
import marquez.service.models.NodeId;
import org.apache.commons.lang3.tuple.Pair;

@Slf4j
public class ColumnLineageService extends DelegatingDaos.DelegatingColumnLineageDao {
Expand Down Expand Up @@ -124,4 +128,51 @@ List<UUID> getColumnNodeUuids(NodeId nodeId) {
}
return columnNodeUuids;
}

public void enrichWithColumnLineage(List<Dataset> datasets) {
if (datasets.isEmpty()) {
return;
}

Set<ColumnLineageNodeData> lineageRowsForDatasets =
getLineageRowsForDatasets(
datasets.stream()
.map(d -> Pair.of(d.getNamespace().getValue(), d.getName().getValue()))
.collect(Collectors.toList()));

Map<Dataset, List<ColumnLineage>> datasetLineage = new HashMap<>();
lineageRowsForDatasets.stream()
.forEach(
nodeData -> {
Dataset dataset =
datasets.stream()
.filter(d -> d.getNamespace().getValue().equals(nodeData.getNamespace()))
.filter(d -> d.getName().getValue().equals(nodeData.getDataset()))
.findAny()
.get();

if (!datasetLineage.containsKey(dataset)) {
datasetLineage.put(dataset, new LinkedList<>());
}
datasetLineage
.get(dataset)
.add(
ColumnLineage.builder()
.name(nodeData.getField())
.transformationDescription(nodeData.getTransformationDescription())
.transformationType(nodeData.getTransformationType())
.inputFields(
nodeData.getInputFields().stream()
.map(
f ->
new ColumnLineageInputField(
f.getNamespace(), f.getDataset(), f.getField()))
.collect(Collectors.toList()))
.build());
});

datasets.stream()
.filter(dataset -> datasetLineage.containsKey(dataset))
.forEach(dataset -> dataset.setColumnLineage(datasetLineage.get(dataset)));
}
}
24 changes: 24 additions & 0 deletions api/src/main/java/marquez/service/models/ColumnLineage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import java.util.List;
import javax.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

@EqualsAndHashCode
@ToString
@Builder
@Getter
public class ColumnLineage {
@NotNull private String name;
@NotNull private List<ColumnLineageInputField> inputFields;
@NotNull private String transformationDescription;
@NotNull private String transformationType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

@EqualsAndHashCode
@ToString
@Getter
@AllArgsConstructor
public class ColumnLineageInputField {
@NotNull private String namespace;
@NotNull private String dataset;
@NotNull private String field;
}
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/service/models/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public abstract class Dataset {
@Nullable private final String lastLifecycleState;
@Nullable private final String description;
@Nullable private final UUID currentVersion;
@Getter @Setter @Nullable private List<ColumnLineage> columnLineage;
@Getter ImmutableMap<String, Object> facets;
@Getter private final boolean isDeleted;

Expand All @@ -70,6 +71,7 @@ public Dataset(
@Nullable final String lastLifecycleState,
@Nullable final String description,
@Nullable final UUID currentVersion,
@Nullable final ImmutableList<ColumnLineage> columnLineage,
@Nullable final ImmutableMap<String, Object> facets,
boolean isDeleted) {
this.id = id;
Expand All @@ -86,6 +88,7 @@ public Dataset(
this.lastLifecycleState = lastLifecycleState;
this.description = description;
this.currentVersion = currentVersion;
this.columnLineage = columnLineage;
this.facets = (facets == null) ? ImmutableMap.of() : facets;
this.isDeleted = isDeleted;
}
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/models/DbTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public DbTable(
lastLifecycleState,
description,
currentVersion,
null,
facets,
isDeleted);
}
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/models/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public Stream(
lastLifecycleState,
description,
currentVersion,
null,
facets,
isDeleted);
this.schemaLocation = schemaLocation;
Expand Down
55 changes: 55 additions & 0 deletions api/src/test/java/marquez/DatasetIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package marquez;

import static marquez.db.ColumnLineageTestUtils.getDatasetA;
import static marquez.db.ColumnLineageTestUtils.getDatasetB;
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -22,6 +24,8 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import marquez.api.JdbiUtils;
import marquez.client.models.ColumnLineage;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetId;
import marquez.client.models.DatasetVersion;
Expand All @@ -32,12 +36,17 @@
import marquez.client.models.StreamVersion;
import marquez.common.Utils;
import marquez.db.LineageTestUtils;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.LineageEvent;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@org.junit.jupiter.api.Tag("IntegrationTests")
@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class DatasetIntegrationTest extends BaseIntegrationTest {

@BeforeEach
Expand All @@ -47,6 +56,11 @@ public void setup() {
createSource(STREAM_SOURCE_NAME);
}

@AfterEach
public void tearDown(Jdbi jdbi) {
JdbiUtils.cleanDatabase(jdbi);
}

@Test
public void testApp_testTags() {
DbTableMeta DB_TABLE_META =
Expand Down Expand Up @@ -440,4 +454,45 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep
datasets = client.listDatasets(namespace);
assertThat(datasets).hasSize(1);
}

@Test
public void testApp_getDatasetContainsColumnLineage() {
LineageEvent event =
new LineageEvent(
"COMPLETE",
Instant.now().atZone(ZoneId.systemDefault()),
new LineageEvent.Run(UUID.randomUUID().toString(), null),
new LineageEvent.Job("namespace", "job_name", null),
List.of(getDatasetA()),
List.of(getDatasetB()),
"the_producer");

CompletableFuture<Integer> resp =
this.sendLineage(Utils.toJson(event))
.thenApply(HttpResponse::statusCode)
.whenComplete(
(val, error) -> {
if (error != null) {
Assertions.fail("Could not complete request");
}
});
resp.join();

// verify listDatasets contains column lineage
List<ColumnLineage> columnLineage;

columnLineage =
client.listDatasets("namespace").stream()
.filter(d -> d.getName().equals("dataset_b"))
.findAny()
.get()
.getColumnLineage();
assertThat(columnLineage).hasSize(1);
assertThat(columnLineage.get(0).getInputFields()).hasSize(2);

// verify getDataset returns non-empty column lineage
columnLineage = client.getDataset("namespace", "dataset_b").getColumnLineage();
assertThat(columnLineage).hasSize(1);
assertThat(columnLineage.get(0).getInputFields()).hasSize(2);
}
}
Loading