Skip to content

Commit ab7641f

Browse files
committed
deletes: add views instead of filtering
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
1 parent 6e3198e commit ab7641f

23 files changed

Lines changed: 605 additions & 389 deletions

api/src/main/java/marquez/api/DatasetResource.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,18 +157,19 @@ public Response list(
157157
@DELETE
158158
@Path("{dataset}")
159159
@Produces(APPLICATION_JSON)
160-
public Response delete(
160+
public Response hide(
161161
@PathParam("namespace") NamespaceName namespaceName,
162162
@PathParam("dataset") DatasetName datasetName) {
163163
throwIfNotExists(namespaceName);
164164

165-
datasetService
166-
.softDelete(namespaceName.getValue(), datasetName.getValue())
167-
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
168165
Dataset dataset =
169166
datasetService
170167
.findDatasetByName(namespaceName.getValue(), datasetName.getValue())
171168
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
169+
170+
datasetService
171+
.hide(namespaceName.getValue(), datasetName.getValue())
172+
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
172173
return Response.ok(dataset).build();
173174
}
174175

api/src/main/java/marquez/api/JobResource.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import javax.validation.Valid;
1717
import javax.validation.constraints.Min;
1818
import javax.ws.rs.Consumes;
19+
import javax.ws.rs.DELETE;
1920
import javax.ws.rs.DefaultValue;
2021
import javax.ws.rs.GET;
2122
import javax.ws.rs.POST;
@@ -160,6 +161,25 @@ public Response list(
160161
return Response.ok(new ResultsPage<>("jobs", jobs, totalCount)).build();
161162
}
162163

164+
@Timed
165+
@ResponseMetered
166+
@ExceptionMetered
167+
@DELETE
168+
@Path("/namespaces/{namespace}/jobs/{job}")
169+
@Produces(APPLICATION_JSON)
170+
public Response hideJob(
171+
@PathParam("namespace") NamespaceName namespaceName, @PathParam("job") JobName jobName) {
172+
throwIfNotExists(namespaceName);
173+
174+
Job job =
175+
jobService
176+
.findJobByName(namespaceName.getValue(), jobName.getValue())
177+
.orElseThrow(() -> new JobNotFoundException(jobName));
178+
179+
jobService.hide(namespaceName.getValue(), jobName.getValue());
180+
return Response.ok(job).build();
181+
}
182+
163183
@Timed
164184
@ResponseMetered
165185
@ExceptionMetered

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 92 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
public interface DatasetDao extends BaseDao {
4444
@SqlQuery(
4545
"SELECT EXISTS ("
46-
+ "SELECT 1 FROM datasets AS d "
46+
+ "SELECT 1 FROM datasets_view AS d "
4747
+ "WHERE d.name = :datasetName AND d.namespace_name = :namespaceName)")
4848
boolean exists(String namespaceName, String datasetName);
4949

@@ -69,49 +69,50 @@ void updateLastModifiedAt(
6969
void updateVersion(UUID rowUuid, Instant updatedAt, UUID currentVersionUuid);
7070

7171
@SqlQuery(
72-
"WITH selected_datasets AS (\n"
73-
+ " SELECT d.*\n"
74-
+ " FROM datasets d\n"
75-
+ " WHERE d.namespace_name = :namespaceName\n"
76-
+ " AND d.name = :datasetName\n"
77-
+ "), dataset_runs AS (\n"
78-
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
79-
+ " FROM selected_datasets d\n"
80-
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
81-
+ " LEFT JOIN LATERAL (\n"
82-
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
83-
+ " WHERE run_uuid = dv.run_uuid\n"
84-
+ " ) e ON e.run_uuid = dv.run_uuid\n"
85-
+ " UNION\n"
86-
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
87-
+ " FROM selected_datasets d\n"
88-
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
89-
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
90-
+ " LEFT JOIN LATERAL (\n"
91-
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
92-
+ " WHERE run_uuid = rim.run_uuid\n"
93-
+ " ) e ON e.run_uuid = rim.run_uuid\n"
94-
+ ")\n"
95-
+ "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
96-
+ "FROM selected_datasets d\n"
97-
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
98-
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
99-
+ "LEFT JOIN (\n"
100-
+ " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n"
101-
+ " FROM tags AS t\n"
102-
+ " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n"
103-
+ " GROUP BY m.dataset_uuid\n"
104-
+ ") t ON t.dataset_uuid = d.uuid\n"
105-
+ "LEFT JOIN (\n"
106-
+ " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets\n"
107-
+ " FROM dataset_runs d2,\n"
108-
+ " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n"
109-
+ " WHERE d2.run_uuid = d2.run_uuid\n"
110-
+ " AND ds -> 'facets' IS NOT NULL\n"
111-
+ " AND ds ->> 'name' = d2.name\n"
112-
+ " AND ds ->> 'namespace' = d2.namespace_name\n"
113-
+ " GROUP BY d2.uuid\n"
114-
+ ") f ON f.dataset_uuid = d.uuid")
72+
"""
73+
WITH selected_datasets AS (
74+
SELECT d.*
75+
FROM datasets_view d
76+
WHERE d.namespace_name = :namespaceName
77+
AND d.name = :datasetName
78+
), dataset_runs AS (
79+
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
80+
FROM selected_datasets d
81+
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
82+
LEFT JOIN LATERAL (
83+
SELECT run_uuid, event_time, event FROM lineage_events
84+
WHERE run_uuid = dv.run_uuid
85+
) e ON e.run_uuid = dv.run_uuid
86+
UNION
87+
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
88+
FROM selected_datasets d
89+
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
90+
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
91+
LEFT JOIN LATERAL (
92+
SELECT run_uuid, event_time, event FROM lineage_events
93+
WHERE run_uuid = rim.run_uuid
94+
) e ON e.run_uuid = rim.run_uuid
95+
)
96+
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
97+
FROM selected_datasets d
98+
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
99+
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
100+
LEFT JOIN (
101+
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
102+
FROM tags AS t
103+
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
104+
GROUP BY m.dataset_uuid
105+
) t ON t.dataset_uuid = d.uuid
106+
LEFT JOIN (
107+
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
108+
FROM dataset_runs d2,
109+
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
110+
WHERE d2.run_uuid = d2.run_uuid
111+
AND ds -> 'facets' IS NOT NULL
112+
AND ds ->> 'name' = d2.name
113+
AND ds ->> 'namespace' = d2.namespace_name
114+
GROUP BY d2.uuid
115+
) f ON f.dataset_uuid = d.uuid""")
115116
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
116117

117118
default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
@@ -131,19 +132,19 @@ default void setFields(Dataset ds) {
131132
}
132133

133134
@SqlQuery(
134-
"SELECT d.* FROM datasets AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName")
135+
"SELECT d.* FROM datasets_view AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName")
135136
Optional<DatasetRow> findDatasetAsRow(String namespaceName, String datasetName);
136137

137-
@SqlQuery("SELECT * FROM datasets WHERE name = :datasetName AND namespace_name = :namespaceName")
138+
@SqlQuery(
139+
"SELECT * FROM datasets_view WHERE name = :datasetName AND namespace_name = :namespaceName")
138140
Optional<DatasetRow> getUuid(String namespaceName, String datasetName);
139141

140142
@SqlQuery(
141143
"""
142144
WITH selected_datasets AS (
143145
SELECT d.*
144-
FROM datasets d
146+
FROM datasets_view d
145147
WHERE d.namespace_name = :namespaceName
146-
AND d.is_deleted is false
147148
ORDER BY d.name
148149
LIMIT :limit OFFSET :offset
149150
), dataset_runs AS (
@@ -187,10 +188,10 @@ SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS f
187188
ORDER BY d.name""")
188189
List<Dataset> findAll(String namespaceName, int limit, int offset);
189190

190-
@SqlQuery("SELECT count(*) FROM datasets")
191+
@SqlQuery("SELECT count(*) FROM datasets_view")
191192
int count();
192193

193-
@SqlQuery("SELECT count(*) FROM datasets AS j WHERE j.namespace_name = :namespaceName")
194+
@SqlQuery("SELECT count(*) FROM datasets_view AS j WHERE j.namespace_name = :namespaceName")
194195
int countFor(String namespaceName);
195196

196197
default List<Dataset> findAllWithTags(String namespaceName, int limit, int offset) {
@@ -199,40 +200,45 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
199200
}
200201

201202
@SqlQuery(
202-
"INSERT INTO datasets ("
203-
+ "uuid, "
204-
+ "type, "
205-
+ "created_at, "
206-
+ "updated_at, "
207-
+ "namespace_uuid, "
208-
+ "namespace_name, "
209-
+ "source_uuid, "
210-
+ "source_name, "
211-
+ "name, "
212-
+ "physical_name, "
213-
+ "description, "
214-
+ "is_deleted "
215-
+ ") VALUES ( "
216-
+ ":uuid, "
217-
+ ":type, "
218-
+ ":now, "
219-
+ ":now, "
220-
+ ":namespaceUuid, "
221-
+ ":namespaceName, "
222-
+ ":sourceUuid, "
223-
+ ":sourceName, "
224-
+ ":name, "
225-
+ ":physicalName, "
226-
+ ":description, "
227-
+ ":isDeleted) "
228-
+ "ON CONFLICT (namespace_uuid, name) "
229-
+ "DO UPDATE SET "
230-
+ "type = EXCLUDED.type, "
231-
+ "updated_at = EXCLUDED.updated_at, "
232-
+ "physical_name = EXCLUDED.physical_name, "
233-
+ "description = EXCLUDED.description, "
234-
+ "is_deleted = EXCLUDED.is_deleted "
235-
+ "RETURNING *")
203+
"""
204+
INSERT INTO datasets (
205+
uuid,
206+
type,
207+
created_at,
208+
updated_at,
209+
namespace_uuid,
210+
namespace_name,
211+
source_uuid,
212+
source_name,
213+
name,
214+
physical_name,
215+
description,
216+
is_deleted,
217+
is_hidden
218+
) VALUES (
219+
:uuid,
220+
:type,
221+
:now,
222+
:now,
223+
:namespaceUuid,
224+
:namespaceName,
225+
:sourceUuid,
226+
:sourceName,
227+
:name,
228+
:physicalName,
229+
:description,
230+
:isDeleted,
231+
false
232+
) ON CONFLICT (namespace_uuid, name)
233+
DO UPDATE SET
234+
type = EXCLUDED.type,
235+
updated_at = EXCLUDED.updated_at,
236+
physical_name = EXCLUDED.physical_name,
237+
description = EXCLUDED.description,
238+
is_deleted = EXCLUDED.is_deleted,
239+
is_hidden = EXCLUDED.is_hidden
240+
RETURNING *
241+
""")
236242
DatasetRow upsert(
237243
UUID uuid,
238244
DatasetType type,
@@ -289,12 +295,12 @@ DatasetRow upsert(
289295
@SqlQuery(
290296
"""
291297
UPDATE datasets
292-
SET is_deleted = true
298+
SET is_hidden = true
293299
WHERE namespace_name = :namespaceName
294300
AND name = :name
295301
RETURNING *
296302
""")
297-
Optional<DatasetRow> softDelete(String namespaceName, String name);
303+
Optional<DatasetRow> hide(String namespaceName, String name);
298304

299305
@Transaction
300306
default Dataset upsertDatasetMeta(

api/src/main/java/marquez/db/DatasetFieldDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public interface DatasetFieldDao extends BaseDao {
3333
@SqlQuery(
3434
"SELECT EXISTS ("
3535
+ "SELECT 1 FROM dataset_fields AS df "
36-
+ "INNER JOIN datasets AS d "
36+
+ "INNER JOIN datasets_view AS d "
3737
+ " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName "
3838
+ "WHERE df.name = :name)")
3939
boolean exists(String namespaceName, String datasetName, String name);

0 commit comments

Comments
 (0)