Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api/src/main/java/marquez/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -150,6 +151,28 @@ public Response list(
return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@DELETE
@Path("{dataset}")
@Produces(APPLICATION_JSON)
public Response delete(
@PathParam("namespace") NamespaceName namespaceName,
@PathParam("dataset") DatasetName datasetName) {
throwIfNotExists(namespaceName);

Dataset dataset =
Comment thread
pawel-big-lebowski marked this conversation as resolved.
datasetService
.findDatasetByName(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));

datasetService
.delete(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
return Response.ok(dataset).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
Expand Down
20 changes: 20 additions & 0 deletions api/src/main/java/marquez/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -160,6 +161,25 @@ public Response list(
return Response.ok(new ResultsPage<>("jobs", jobs, totalCount)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@DELETE
@Path("/namespaces/{namespace}/jobs/{job}")
@Produces(APPLICATION_JSON)
public Response delete(
@PathParam("namespace") NamespaceName namespaceName, @PathParam("job") JobName jobName) {
throwIfNotExists(namespaceName);

Job job =
jobService
.findJobByName(namespaceName.getValue(), jobName.getValue())
.orElseThrow(() -> new JobNotFoundException(jobName));

jobService.delete(namespaceName.getValue(), jobName.getValue());
return Response.ok(job).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
Expand Down
272 changes: 145 additions & 127 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public interface DatasetDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
+ "SELECT 1 FROM datasets AS d "
+ "SELECT 1 FROM datasets_view AS d "
+ "WHERE d.name = :datasetName AND d.namespace_name = :namespaceName)")
boolean exists(String namespaceName, String datasetName);

Expand All @@ -69,49 +69,50 @@ void updateLastModifiedAt(
void updateVersion(UUID rowUuid, Instant updatedAt, UUID currentVersionUuid);

@SqlQuery(
"WITH selected_datasets AS (\n"
+ " SELECT d.*\n"
+ " FROM datasets d\n"
+ " WHERE d.namespace_name = :namespaceName\n"
+ " AND d.name = :datasetName\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n"
+ " FROM tags AS t\n"
+ " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n"
+ " GROUP BY m.dataset_uuid\n"
+ ") t ON t.dataset_uuid = d.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets\n"
+ " FROM dataset_runs d2,\n"
+ " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n"
+ " WHERE d2.run_uuid = d2.run_uuid\n"
+ " AND ds -> 'facets' IS NOT NULL\n"
+ " AND ds ->> 'name' = d2.name\n"
+ " AND ds ->> 'namespace' = d2.namespace_name\n"
+ " GROUP BY d2.uuid\n"
+ ") f ON f.dataset_uuid = d.uuid")
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
AND d.name = :datasetName
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid""")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);

default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
Expand All @@ -131,64 +132,66 @@ default void setFields(Dataset ds) {
}

@SqlQuery(
"SELECT d.* FROM datasets AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName")
"SELECT d.* FROM datasets_view AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName")
Optional<DatasetRow> findDatasetAsRow(String namespaceName, String datasetName);

@SqlQuery("SELECT * FROM datasets WHERE name = :datasetName AND namespace_name = :namespaceName")
@SqlQuery(
"SELECT * FROM datasets_view WHERE name = :datasetName AND namespace_name = :namespaceName")
Optional<DatasetRow> getUuid(String namespaceName, String datasetName);

@SqlQuery(
"WITH selected_datasets AS (\n"
Comment thread
pawel-big-lebowski marked this conversation as resolved.
+ " SELECT d.*\n"
+ " FROM datasets d\n"
+ " WHERE d.namespace_name = :namespaceName\n"
+ " ORDER BY d.name\n"
+ " LIMIT :limit OFFSET :offset\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n"
+ " FROM tags AS t\n"
+ " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n"
+ " GROUP BY m.dataset_uuid\n"
+ ") t ON t.dataset_uuid = d.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets\n"
+ " FROM dataset_runs d2,\n"
+ " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n"
+ " WHERE d2.run_uuid = d2.run_uuid\n"
+ " AND ds -> 'facets' IS NOT NULL\n"
+ " AND ds ->> 'name' = d2.name\n"
+ " AND ds ->> 'namespace' = d2.namespace_name\n"
+ " GROUP BY d2.uuid\n"
+ ") f ON f.dataset_uuid = d.uuid\n"
+ "ORDER BY d.name")
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit OFFSET :offset
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid
ORDER BY d.name""")
List<Dataset> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM datasets")
@SqlQuery("SELECT count(*) FROM datasets_view")
int count();

@SqlQuery("SELECT count(*) FROM datasets AS j WHERE j.namespace_name = :namespaceName")
@SqlQuery("SELECT count(*) FROM datasets_view AS j WHERE j.namespace_name = :namespaceName")
int countFor(String namespaceName);

default List<Dataset> findAllWithTags(String namespaceName, int limit, int offset) {
Expand All @@ -197,40 +200,45 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
}

@SqlQuery(
"INSERT INTO datasets ("
+ "uuid, "
+ "type, "
+ "created_at, "
+ "updated_at, "
+ "namespace_uuid, "
+ "namespace_name, "
+ "source_uuid, "
+ "source_name, "
+ "name, "
+ "physical_name, "
+ "description, "
+ "is_deleted "
+ ") VALUES ( "
+ ":uuid, "
+ ":type, "
+ ":now, "
+ ":now, "
+ ":namespaceUuid, "
+ ":namespaceName, "
+ ":sourceUuid, "
+ ":sourceName, "
+ ":name, "
+ ":physicalName, "
+ ":description, "
+ ":isDeleted) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
+ "physical_name = EXCLUDED.physical_name, "
+ "description = EXCLUDED.description, "
+ "is_deleted = EXCLUDED.is_deleted "
+ "RETURNING *")
"""
INSERT INTO datasets (
uuid,
type,
created_at,
updated_at,
namespace_uuid,
namespace_name,
source_uuid,
source_name,
name,
physical_name,
description,
is_deleted,
is_hidden
) VALUES (
:uuid,
:type,
:now,
:now,
:namespaceUuid,
:namespaceName,
:sourceUuid,
:sourceName,
:name,
:physicalName,
:description,
:isDeleted,
false
) ON CONFLICT (namespace_uuid, name)
DO UPDATE SET
type = EXCLUDED.type,
updated_at = EXCLUDED.updated_at,
physical_name = EXCLUDED.physical_name,
description = EXCLUDED.description,
is_deleted = EXCLUDED.is_deleted,
is_hidden = EXCLUDED.is_hidden
RETURNING *
""")
DatasetRow upsert(
UUID uuid,
DatasetType type,
Expand Down Expand Up @@ -284,6 +292,16 @@ DatasetRow upsert(
String name,
String physicalName);

@SqlQuery(
"""
UPDATE datasets
SET is_hidden = true
WHERE namespace_name = :namespaceName
AND name = :name
RETURNING *
""")
Optional<DatasetRow> delete(String namespaceName, String name);

@Transaction
default Dataset upsertDatasetMeta(
NamespaceName namespaceName, DatasetName datasetName, DatasetMeta datasetMeta) {
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
+ "SELECT 1 FROM dataset_fields AS df "
+ "INNER JOIN datasets AS d "
+ "INNER JOIN datasets_view AS d "
+ " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName "
+ "WHERE df.name = :name)")
boolean exists(String namespaceName, String datasetName, String name);
Expand Down
Loading