4343public interface DatasetDao extends BaseDao {
4444 @ SqlQuery (
4545 "SELECT EXISTS ("
46- + "SELECT 1 FROM datasets AS d "
46+ + "SELECT 1 FROM datasets_view AS d "
4747 + "WHERE d.name = :datasetName AND d.namespace_name = :namespaceName)" )
4848 boolean exists (String namespaceName , String datasetName );
4949
@@ -69,49 +69,50 @@ void updateLastModifiedAt(
6969 void updateVersion (UUID rowUuid , Instant updatedAt , UUID currentVersionUuid );
7070
7171 @ SqlQuery (
72- "WITH selected_datasets AS (\n "
73- + " SELECT d.*\n "
74- + " FROM datasets d\n "
75- + " WHERE d.namespace_name = :namespaceName\n "
76- + " AND d.name = :datasetName\n "
77- + "), dataset_runs AS (\n "
78- + " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n "
79- + " FROM selected_datasets d\n "
80- + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n "
81- + " LEFT JOIN LATERAL (\n "
82- + " SELECT run_uuid, event_time, event FROM lineage_events\n "
83- + " WHERE run_uuid = dv.run_uuid\n "
84- + " ) e ON e.run_uuid = dv.run_uuid\n "
85- + " UNION\n "
86- + " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n "
87- + " FROM selected_datasets d\n "
88- + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n "
89- + " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n "
90- + " LEFT JOIN LATERAL (\n "
91- + " SELECT run_uuid, event_time, event FROM lineage_events\n "
92- + " WHERE run_uuid = rim.run_uuid\n "
93- + " ) e ON e.run_uuid = rim.run_uuid\n "
94- + ")\n "
95- + "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n "
96- + "FROM selected_datasets d\n "
97- + "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n "
98- + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n "
99- + "LEFT JOIN (\n "
100- + " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n "
101- + " FROM tags AS t\n "
102- + " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n "
103- + " GROUP BY m.dataset_uuid\n "
104- + ") t ON t.dataset_uuid = d.uuid\n "
105- + "LEFT JOIN (\n "
106- + " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets\n "
107- + " FROM dataset_runs d2,\n "
108- + " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n "
109- + " WHERE d2.run_uuid = d2.run_uuid\n "
110- + " AND ds -> 'facets' IS NOT NULL\n "
111- + " AND ds ->> 'name' = d2.name\n "
112- + " AND ds ->> 'namespace' = d2.namespace_name\n "
113- + " GROUP BY d2.uuid\n "
114- + ") f ON f.dataset_uuid = d.uuid" )
72+ """
73+ WITH selected_datasets AS (
74+ SELECT d.*
75+ FROM datasets_view d
76+ WHERE d.namespace_name = :namespaceName
77+ AND d.name = :datasetName
78+ ), dataset_runs AS (
79+ SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
80+ FROM selected_datasets d
81+ INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
82+ LEFT JOIN LATERAL (
83+ SELECT run_uuid, event_time, event FROM lineage_events
84+ WHERE run_uuid = dv.run_uuid
85+ ) e ON e.run_uuid = dv.run_uuid
86+ UNION
87+ SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
88+ FROM selected_datasets d
89+ INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
90+ LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
91+ LEFT JOIN LATERAL (
92+ SELECT run_uuid, event_time, event FROM lineage_events
93+ WHERE run_uuid = rim.run_uuid
94+ ) e ON e.run_uuid = rim.run_uuid
95+ )
96+ SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
97+ FROM selected_datasets d
98+ LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
99+ LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
100+ LEFT JOIN (
101+ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
102+ FROM tags AS t
103+ INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
104+ GROUP BY m.dataset_uuid
105+ ) t ON t.dataset_uuid = d.uuid
106+ LEFT JOIN (
107+ SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
108+ FROM dataset_runs d2,
109+ jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
110+ WHERE d2.run_uuid = d2.run_uuid
111+ AND ds -> 'facets' IS NOT NULL
112+ AND ds ->> 'name' = d2.name
113+ AND ds ->> 'namespace' = d2.namespace_name
114+ GROUP BY d2.uuid
115+ ) f ON f.dataset_uuid = d.uuid""" )
115116 Optional <Dataset > findDatasetByName (String namespaceName , String datasetName );
116117
117118 default Optional <Dataset > findWithTags (String namespaceName , String datasetName ) {
@@ -131,64 +132,66 @@ default void setFields(Dataset ds) {
131132 }
132133
133134 @ SqlQuery (
134- "SELECT d.* FROM datasets AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName" )
135+ "SELECT d.* FROM datasets_view AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName" )
135136 Optional <DatasetRow > findDatasetAsRow (String namespaceName , String datasetName );
136137
137- @ SqlQuery ("SELECT * FROM datasets WHERE name = :datasetName AND namespace_name = :namespaceName" )
138+ @ SqlQuery (
139+ "SELECT * FROM datasets_view WHERE name = :datasetName AND namespace_name = :namespaceName" )
138140 Optional <DatasetRow > getUuid (String namespaceName , String datasetName );
139141
140142 @ SqlQuery (
141- "WITH selected_datasets AS (\n "
142- + " SELECT d.*\n "
143- + " FROM datasets d\n "
144- + " WHERE d.namespace_name = :namespaceName\n "
145- + " ORDER BY d.name\n "
146- + " LIMIT :limit OFFSET :offset\n "
147- + "), dataset_runs AS (\n "
148- + " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n "
149- + " FROM selected_datasets d\n "
150- + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n "
151- + " LEFT JOIN LATERAL (\n "
152- + " SELECT run_uuid, event_time, event FROM lineage_events\n "
153- + " WHERE run_uuid = dv.run_uuid\n "
154- + " ) e ON e.run_uuid = dv.run_uuid\n "
155- + " UNION\n "
156- + " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n "
157- + " FROM selected_datasets d\n "
158- + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n "
159- + " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n "
160- + " LEFT JOIN LATERAL (\n "
161- + " SELECT run_uuid, event_time, event FROM lineage_events\n "
162- + " WHERE run_uuid = rim.run_uuid\n "
163- + " ) e ON e.run_uuid = rim.run_uuid\n "
164- + ")\n "
165- + "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n "
166- + "FROM selected_datasets d\n "
167- + "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n "
168- + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n "
169- + "LEFT JOIN (\n "
170- + " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n "
171- + " FROM tags AS t\n "
172- + " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n "
173- + " GROUP BY m.dataset_uuid\n "
174- + ") t ON t.dataset_uuid = d.uuid\n "
175- + "LEFT JOIN (\n "
176- + " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets\n "
177- + " FROM dataset_runs d2,\n "
178- + " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n "
179- + " WHERE d2.run_uuid = d2.run_uuid\n "
180- + " AND ds -> 'facets' IS NOT NULL\n "
181- + " AND ds ->> 'name' = d2.name\n "
182- + " AND ds ->> 'namespace' = d2.namespace_name\n "
183- + " GROUP BY d2.uuid\n "
184- + ") f ON f.dataset_uuid = d.uuid\n "
185- + "ORDER BY d.name" )
143+ """
144+ WITH selected_datasets AS (
145+ SELECT d.*
146+ FROM datasets_view d
147+ WHERE d.namespace_name = :namespaceName
148+ ORDER BY d.name
149+ LIMIT :limit OFFSET :offset
150+ ), dataset_runs AS (
151+ SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
152+ FROM selected_datasets d
153+ INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
154+ LEFT JOIN LATERAL (
155+ SELECT run_uuid, event_time, event FROM lineage_events
156+ WHERE run_uuid = dv.run_uuid
157+ ) e ON e.run_uuid = dv.run_uuid
158+ UNION
159+ SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
160+ FROM selected_datasets d
161+ INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
162+ LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
163+ LEFT JOIN LATERAL (
164+ SELECT run_uuid, event_time, event FROM lineage_events
165+ WHERE run_uuid = rim.run_uuid
166+ ) e ON e.run_uuid = rim.run_uuid
167+ )
168+ SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
169+ FROM selected_datasets d
170+ LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
171+ LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
172+ LEFT JOIN (
173+ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
174+ FROM tags AS t
175+ INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
176+ GROUP BY m.dataset_uuid
177+ ) t ON t.dataset_uuid = d.uuid
178+ LEFT JOIN (
179+ SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets
180+ FROM dataset_runs d2,
181+ jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
182+ WHERE d2.run_uuid = d2.run_uuid
183+ AND ds -> 'facets' IS NOT NULL
184+ AND ds ->> 'name' = d2.name
185+ AND ds ->> 'namespace' = d2.namespace_name
186+ GROUP BY d2.uuid
187+ ) f ON f.dataset_uuid = d.uuid
188+ ORDER BY d.name""" )
186189 List <Dataset > findAll (String namespaceName , int limit , int offset );
187190
188- @ SqlQuery ("SELECT count(*) FROM datasets " )
191+ @ SqlQuery ("SELECT count(*) FROM datasets_view " )
189192 int count ();
190193
191- @ SqlQuery ("SELECT count(*) FROM datasets AS j WHERE j.namespace_name = :namespaceName" )
194+ @ SqlQuery ("SELECT count(*) FROM datasets_view AS j WHERE j.namespace_name = :namespaceName" )
192195 int countFor (String namespaceName );
193196
194197 default List <Dataset > findAllWithTags (String namespaceName , int limit , int offset ) {
@@ -197,40 +200,45 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
197200 }
198201
199202 @ SqlQuery (
200- "INSERT INTO datasets ("
201- + "uuid, "
202- + "type, "
203- + "created_at, "
204- + "updated_at, "
205- + "namespace_uuid, "
206- + "namespace_name, "
207- + "source_uuid, "
208- + "source_name, "
209- + "name, "
210- + "physical_name, "
211- + "description, "
212- + "is_deleted "
213- + ") VALUES ( "
214- + ":uuid, "
215- + ":type, "
216- + ":now, "
217- + ":now, "
218- + ":namespaceUuid, "
219- + ":namespaceName, "
220- + ":sourceUuid, "
221- + ":sourceName, "
222- + ":name, "
223- + ":physicalName, "
224- + ":description, "
225- + ":isDeleted) "
226- + "ON CONFLICT (namespace_uuid, name) "
227- + "DO UPDATE SET "
228- + "type = EXCLUDED.type, "
229- + "updated_at = EXCLUDED.updated_at, "
230- + "physical_name = EXCLUDED.physical_name, "
231- + "description = EXCLUDED.description, "
232- + "is_deleted = EXCLUDED.is_deleted "
233- + "RETURNING *" )
203+ """
204+ INSERT INTO datasets (
205+ uuid,
206+ type,
207+ created_at,
208+ updated_at,
209+ namespace_uuid,
210+ namespace_name,
211+ source_uuid,
212+ source_name,
213+ name,
214+ physical_name,
215+ description,
216+ is_deleted,
217+ is_hidden
218+ ) VALUES (
219+ :uuid,
220+ :type,
221+ :now,
222+ :now,
223+ :namespaceUuid,
224+ :namespaceName,
225+ :sourceUuid,
226+ :sourceName,
227+ :name,
228+ :physicalName,
229+ :description,
230+ :isDeleted,
231+ false
232+ ) ON CONFLICT (namespace_uuid, name)
233+ DO UPDATE SET
234+ type = EXCLUDED.type,
235+ updated_at = EXCLUDED.updated_at,
236+ physical_name = EXCLUDED.physical_name,
237+ description = EXCLUDED.description,
238+ is_deleted = EXCLUDED.is_deleted,
239+ is_hidden = EXCLUDED.is_hidden
240+ RETURNING *
241+ """ )
234242 DatasetRow upsert (
235243 UUID uuid ,
236244 DatasetType type ,
@@ -284,6 +292,16 @@ DatasetRow upsert(
284292 String name ,
285293 String physicalName );
286294
295+ @ SqlQuery (
296+ """
297+ UPDATE datasets
298+ SET is_hidden = true
299+ WHERE namespace_name = :namespaceName
300+ AND name = :name
301+ RETURNING *
302+ """ )
303+ Optional <DatasetRow > delete (String namespaceName , String name );
304+
287305 @ Transaction
288306 default Dataset upsertDatasetMeta (
289307 NamespaceName namespaceName , DatasetName datasetName , DatasetMeta datasetMeta ) {
0 commit comments