-
Notifications
You must be signed in to change notification settings - Fork 403
Model and store column lineage in Marquez DB #2096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
9e2cd20
Create database representation, model classes
374bdd3
Implement ColumnLevelLineageDao
mzareba382 148836c
Instantiate ColumnLevelLineageDao in updateBaseMarquezModel
mzareba382 e5d9490
Merge branch 'main' into add-column-level-lineage
mzareba382 c28ea86
Merge branch 'main' into add-column-level-lineage
mzareba382 48b1ebf
Upsert ColumnLevelLineageRow to db, model representation in LineageEvent
mzareba382 3e6674e
Fix problems in OpenLineageDao, add a list of ColumnLevelLineageRow t…
mzareba382 b1eab3f
Change wildcard imports to single class imports
mzareba382 03e2963
Change wildcard imports to single class imports
mzareba382 2ad3bc4
Change wildcard imports to single class imports
mzareba382 4a5ec90
Apply spotless
mzareba382 d547fd6
Merge branch 'main' into add-column-level-lineage
mzareba382 153a19a
Merge branch 'main' into add-column-level-lineage
mzareba382 bce38ff
Check for ds.getFacets not null
mzareba382 a0251a0
Format fix
mzareba382 f17cdda
Update testUpdateMarquezModelDatasetWithColumnLineageFacet
mzareba382 dbabd08
Merge branch 'main' into add-column-level-lineage
mzareba382 2dff0f6
Test for column_level_lineage upsert.
mzareba382 b42a2ad
Apply spotless
mzareba382 95ba2e2
Merge branch 'main' into add-column-level-lineage
mzareba382 8e3fc65
switch to data field references
pawel-big-lebowski 7ce339e
fix broken tests
pawel-big-lebowski bfd7555
test when dataset_field is missing
pawel-big-lebowski 08539dd
add input_dataset_version_uuid field
pawel-big-lebowski 6fce3df
Merge branch 'main' into add-column-level-lineage
pawel-big-lebowski bf8c84e
increase db file version
pawel-big-lebowski c816996
increase db file version
pawel-big-lebowski b6d37fe
Merge branch 'add-column-level-lineage' of github.com:MarquezProject/…
pawel-big-lebowski 1aefca2
Merge branch 'main' into add-column-level-lineage
pawel-big-lebowski 21dac22
rename ColumnLevelLineage -> ColumnLineage
pawel-big-lebowski File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| /* | ||
| * Copyright 2018-2022 contributors to the Marquez project | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package marquez.db; | ||
|
|
||
| import java.time.Instant; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
| import java.util.stream.Collectors; | ||
| import marquez.db.mappers.ColumnLineageRowMapper; | ||
| import marquez.db.models.ColumnLineageRow; | ||
| import org.apache.commons.lang3.tuple.Pair; | ||
| import org.jdbi.v3.sqlobject.config.RegisterRowMapper; | ||
| import org.jdbi.v3.sqlobject.customizer.BindBeanList; | ||
| import org.jdbi.v3.sqlobject.statement.SqlQuery; | ||
| import org.jdbi.v3.sqlobject.statement.SqlUpdate; | ||
|
|
||
| @RegisterRowMapper(ColumnLineageRowMapper.class) | ||
| public interface ColumnLineageDao extends BaseDao { | ||
|
|
||
| default List<ColumnLineageRow> upsertColumnLineageRow( | ||
| UUID outputDatasetVersionUuid, | ||
| UUID outputDatasetFieldUuid, | ||
| List<Pair<UUID, UUID>> inputs, | ||
| String transformationDescription, | ||
| String transformationType, | ||
| Instant now) { | ||
|
|
||
| if (inputs.isEmpty()) { | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| doUpsertColumnLineageRow( | ||
| inputs.stream() | ||
| .map( | ||
| input -> | ||
| new ColumnLineageRow( | ||
| outputDatasetVersionUuid, | ||
| outputDatasetFieldUuid, | ||
| input.getLeft(), // input_dataset_version_uuid | ||
| input.getRight(), // input_dataset_field_uuid | ||
| transformationDescription, | ||
| transformationType, | ||
| now, | ||
| now)) | ||
| .collect(Collectors.toList())); | ||
| return findColumnLineageByDatasetVersionColumnAndOutputDatasetField( | ||
| outputDatasetVersionUuid, outputDatasetFieldUuid); | ||
| } | ||
|
|
||
| @SqlQuery( | ||
| "SELECT * FROM column_lineage WHERE output_dataset_version_uuid = :datasetVersionUuid AND output_dataset_field_uuid = :outputDatasetFieldUuid") | ||
| List<ColumnLineageRow> findColumnLineageByDatasetVersionColumnAndOutputDatasetField( | ||
| UUID datasetVersionUuid, UUID outputDatasetFieldUuid); | ||
|
|
||
| @SqlUpdate( | ||
| """ | ||
| INSERT INTO column_lineage ( | ||
| output_dataset_version_uuid, | ||
| output_dataset_field_uuid, | ||
| input_dataset_version_uuid, | ||
| input_dataset_field_uuid, | ||
| transformation_description, | ||
| transformation_type, | ||
| created_at, | ||
| updated_at | ||
| ) VALUES <values> | ||
| ON CONFLICT (output_dataset_version_uuid, output_dataset_field_uuid, input_dataset_version_uuid, input_dataset_field_uuid) | ||
| DO UPDATE SET | ||
| transformation_description = EXCLUDED.transformation_description, | ||
| transformation_type = EXCLUDED.transformation_type, | ||
| updated_at = EXCLUDED.updated_at | ||
| """) | ||
| void doUpsertColumnLineageRow( | ||
| @BindBeanList( | ||
| propertyNames = { | ||
| "outputDatasetVersionUuid", | ||
| "outputDatasetFieldUuid", | ||
| "inputDatasetVersionUuid", | ||
| "inputDatasetFieldUuid", | ||
| "transformationDescription", | ||
| "transformationType", | ||
| "createdAt", | ||
| "updatedAt" | ||
| }, | ||
| value = "values") | ||
| List<ColumnLineageRow> rows); | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
37 changes: 37 additions & 0 deletions
37
api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| /* | ||
| * Copyright 2018-2022 contributors to the Marquez project | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package marquez.db.mappers; | ||
|
|
||
| import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION; | ||
| import static marquez.db.Columns.TRANSFORMATION_TYPE; | ||
| import static marquez.db.Columns.stringOrThrow; | ||
| import static marquez.db.Columns.timestampOrThrow; | ||
| import static marquez.db.Columns.uuidOrThrow; | ||
|
|
||
| import java.sql.ResultSet; | ||
| import java.sql.SQLException; | ||
| import lombok.NonNull; | ||
| import marquez.db.Columns; | ||
| import marquez.db.models.ColumnLineageRow; | ||
| import org.jdbi.v3.core.mapper.RowMapper; | ||
| import org.jdbi.v3.core.statement.StatementContext; | ||
|
|
||
| public class ColumnLineageRowMapper implements RowMapper<ColumnLineageRow> { | ||
|
|
||
| @Override | ||
| public ColumnLineageRow map(@NonNull ResultSet results, @NonNull StatementContext context) | ||
| throws SQLException { | ||
| return new ColumnLineageRow( | ||
| uuidOrThrow(results, Columns.OUTPUT_DATASET_VERSION_UUID), | ||
| uuidOrThrow(results, Columns.OUTPUT_DATASET_FIELD_UUID), | ||
| uuidOrThrow(results, Columns.INPUT_DATASET_VERSION_UUID), | ||
| uuidOrThrow(results, Columns.INPUT_DATASET_FIELD_UUID), | ||
| stringOrThrow(results, TRANSFORMATION_DESCRIPTION), | ||
| stringOrThrow(results, TRANSFORMATION_TYPE), | ||
| timestampOrThrow(results, Columns.CREATED_AT), | ||
| timestampOrThrow(results, Columns.UPDATED_AT)); | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.