55
66package marquez .db ;
77
8+ import static org .jdbi .v3 .sqlobject .customizer .BindList .EmptyHandling .NULL_STRING ;
9+
810import java .time .Instant ;
911import java .util .Collections ;
1012import java .util .List ;
13+ import java .util .Set ;
1114import java .util .UUID ;
1215import java .util .stream .Collectors ;
16+ import marquez .db .mappers .ColumnLineageNodeDataMapper ;
1317import marquez .db .mappers .ColumnLineageRowMapper ;
18+ import marquez .db .models .ColumnLineageNodeData ;
1419import marquez .db .models .ColumnLineageRow ;
1520import org .apache .commons .lang3 .tuple .Pair ;
1621import org .jdbi .v3 .sqlobject .config .RegisterRowMapper ;
1722import org .jdbi .v3 .sqlobject .customizer .BindBeanList ;
23+ import org .jdbi .v3 .sqlobject .customizer .BindList ;
1824import org .jdbi .v3 .sqlobject .statement .SqlQuery ;
1925import org .jdbi .v3 .sqlobject .statement .SqlUpdate ;
2026
2127@ RegisterRowMapper (ColumnLineageRowMapper .class )
28+ @ RegisterRowMapper (ColumnLineageNodeDataMapper .class )
2229public interface ColumnLineageDao extends BaseDao {
2330
2431 default List <ColumnLineageRow > upsertColumnLineageRow (
@@ -74,6 +81,7 @@ ON CONFLICT (output_dataset_version_uuid, output_dataset_field_uuid, input_datas
7481 transformation_description = EXCLUDED.transformation_description,
7582 transformation_type = EXCLUDED.transformation_type,
7683 updated_at = EXCLUDED.updated_at
84+ RETURNING *
7785 """ )
7886 void doUpsertColumnLineageRow (
7987 @ BindBeanList (
@@ -89,4 +97,59 @@ void doUpsertColumnLineageRow(
8997 },
9098 value = "values" )
9199 ColumnLineageRow ... rows );
100+
101+ @ SqlQuery (
102+ """
103+ WITH RECURSIVE
104+ dataset_fields_view AS (
105+ SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
106+ FROM dataset_fields df
107+ INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
108+ ),
109+ column_lineage_recursive AS (
110+ SELECT *, 0 as depth
111+ FROM column_lineage
112+ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
113+ UNION
114+ SELECT
115+ upstream_node.output_dataset_version_uuid,
116+ upstream_node.output_dataset_field_uuid,
117+ upstream_node.input_dataset_version_uuid,
118+ upstream_node.input_dataset_field_uuid,
119+ upstream_node.transformation_description,
120+ upstream_node.transformation_type,
121+ upstream_node.created_at,
122+ upstream_node.updated_at,
123+ node.depth + 1 as depth
124+ FROM column_lineage upstream_node, column_lineage_recursive node
125+ WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
126+ AND node.depth < :depth
127+ )
128+ SELECT
129+ output_fields.namespace_name,
130+ output_fields.dataset_name,
131+ output_fields.field_name,
132+ output_fields.type,
133+ ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
134+ clr.transformation_description,
135+ clr.transformation_type,
136+ clr.created_at,
137+ clr.updated_at
138+ FROM column_lineage_recursive clr
139+ INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
140+ LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
141+ GROUP BY
142+ output_fields.namespace_name,
143+ output_fields.dataset_name,
144+ output_fields.field_name,
145+ output_fields.type,
146+ clr.transformation_description,
147+ clr.transformation_type,
148+ clr.created_at,
149+ clr.updated_at
150+ """ )
151+ Set <ColumnLineageNodeData > getLineage (
152+ int depth ,
153+ @ BindList (onEmpty = NULL_STRING ) List <UUID > datasetFieldUuids ,
154+ Instant createdAtUntil );
92155}
0 commit comments