Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.21.0...HEAD)

### Added

* Add support for `LifecycleStateChangeFacet` with an ability to softly delete datasets.

## [0.21.0](https://github.com/MarquezProject/marquez/compare/0.20.0...0.21.0) - 2022-03-03

### Added
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public static Version newJobVersionFor(
* @param sourceName The source name of the dataset.
* @param physicalName The physical name of the dataset.
* @param datasetName The dataset name.
* @param lifecycleState The dataset change like CREATE, DROP, TRUNCATE.
* @param fields The fields of the dataset.
* @param runId The UUID of the run linked to the dataset.
* @return A {@link Version} object based on the specified job meta.
Expand All @@ -213,6 +214,7 @@ public static Version newDatasetVersionFor(
String sourceName,
String physicalName,
String datasetName,
String lifecycleState,
List<LineageEvent.SchemaField> fields,
UUID runId) {
DatasetVersionData data =
Expand All @@ -221,6 +223,7 @@ public static Version newDatasetVersionFor(
.sourceName(sourceName)
.physicalName(physicalName)
.datasetName(datasetName)
.lifecycleState(lifecycleState)
.schemaFields(fields)
.runId(runId)
.build();
Expand Down Expand Up @@ -259,6 +262,7 @@ private static Version newDatasetVersionFor(DatasetVersionData data) {
data.getPhysicalName(),
data.getSchemaLocation(),
data.getFields().stream().map(Utils::joinField).collect(joining(VERSION_DELIM)),
data.getLifecycleState(),
data.getRunId())
.getBytes(UTF_8);
return Version.of(UUID.nameUUIDFromBytes(bytes));
Expand All @@ -275,6 +279,7 @@ private static class DatasetVersionData {
private String sourceName;
private String physicalName;
private String datasetName;
private String lifecycleState;
private String schemaLocation;
private Set<Triple<String, String, String>> fields;
private UUID runId;
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ private Columns() {}
public static final String TAG_UUIDS = "tag_uuids";
public static final String TAGGED_AT = "tagged_at";
public static final String LAST_MODIFIED_AT = "last_modified_at";
public static final String IS_DELETED = "is_deleted";

/* DATASET VERSION ROW COLUMNS */
public static final String FIELD_UUIDS = "field_uuids";
public static final String LIFECYCLE_STATE = "lifecycle_state";

/* STREAM VERSION ROW COLUMNS */
public static final String SCHEMA_LOCATION = "schema_location";
Expand Down Expand Up @@ -160,6 +162,15 @@ public static String stringOrThrow(final ResultSet results, final String column)
return results.getString(column);
}

public static boolean booleanOrDefault(
final ResultSet results, final String column, final boolean defaultValue)
throws SQLException {
if (results.getObject(column) == null) {
return defaultValue;
}
return results.getBoolean(column);
}

public static int intOrThrow(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
Expand Down
28 changes: 17 additions & 11 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ void updateLastModifiedAt(
+ " WHERE d.namespace_name = :namespaceName\n"
+ " AND d.name = :datasetName\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
Expand All @@ -89,7 +89,7 @@ void updateLastModifiedAt(
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
Expand Down Expand Up @@ -142,15 +142,15 @@ default void setFields(Dataset ds) {
+ " ORDER BY d.name\n"
+ " LIMIT :limit OFFSET :offset\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
Expand All @@ -159,7 +159,7 @@ default void setFields(Dataset ds) {
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
Expand Down Expand Up @@ -205,7 +205,8 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
+ "source_name, "
+ "name, "
+ "physical_name, "
+ "description "
+ "description, "
+ "is_deleted "
+ ") VALUES ( "
+ ":uuid, "
+ ":type, "
Expand All @@ -217,13 +218,15 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
+ ":sourceName, "
+ ":name, "
+ ":physicalName, "
+ ":description) "
+ ":description, "
+ ":isDeleted) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
+ "physical_name = EXCLUDED.physical_name, "
+ "description = EXCLUDED.description "
+ "description = EXCLUDED.description, "
+ "is_deleted = EXCLUDED.is_deleted "
+ "RETURNING *")
DatasetRow upsert(
UUID uuid,
Expand All @@ -235,7 +238,8 @@ DatasetRow upsert(
String sourceName,
String name,
String physicalName,
String description);
String description,
boolean isDeleted);

@SqlQuery(
"INSERT INTO datasets ("
Expand Down Expand Up @@ -308,7 +312,8 @@ default Dataset upsertDatasetMeta(
sourceRow.getName(),
datasetName.getValue(),
datasetMeta.getPhysicalName().getValue(),
datasetMeta.getDescription().orElse(null));
datasetMeta.getDescription().orElse(null),
false);
} else {
datasetRow =
upsert(
Expand Down Expand Up @@ -340,6 +345,7 @@ default Dataset upsertDatasetMeta(
now,
namespaceName.getValue(),
datasetName.getValue(),
null,
datasetMeta);

return findWithTags(namespaceName.getValue(), datasetName.getValue()).get();
Expand Down
17 changes: 10 additions & 7 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ default DatasetVersionRow upsertDatasetVersion(
Instant now,
String namespaceName,
String datasetName,
String lifecycleState,
DatasetMeta datasetMeta) {
TagDao tagDao = createTagDao();
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
Expand All @@ -63,7 +64,8 @@ default DatasetVersionRow upsertDatasetVersion(
datasetMeta.getRunId().map(RunId::getValue).orElse(null),
toPgObjectFields(datasetMeta.getFields()),
namespaceName,
datasetName);
datasetName,
lifecycleState);
updateDatasetVersionMetric(
namespaceName,
datasetMeta.getType().toString(),
Expand Down Expand Up @@ -167,7 +169,7 @@ default void updateDatasetVersionMetric(
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, \n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
Expand Down Expand Up @@ -209,7 +211,7 @@ default void updateDatasetVersionMetric(
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, \n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
Expand Down Expand Up @@ -280,7 +282,7 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
Expand Down Expand Up @@ -324,9 +326,9 @@ default List<DatasetVersion> findAllWithRun(

@SqlQuery(
"INSERT INTO dataset_versions "
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name) "
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name, lifecycle_state) "
+ "VALUES "
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName) "
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName, :lifecycleState) "
+ "ON CONFLICT(version) "
+ "DO UPDATE SET "
+ "run_uuid = EXCLUDED.run_uuid "
Expand All @@ -339,7 +341,8 @@ DatasetVersionRow upsert(
UUID runUuid,
PGobject fields,
String namespaceName,
String datasetName);
String datasetName,
String lifecycleState);

@SqlUpdate("UPDATE dataset_versions SET fields = :fields WHERE uuid = :uuid")
void updateFields(UUID uuid, PGobject fields);
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public interface LineageDao {
Optional<UUID> getJobUuid(String jobName, String namespace);

@SqlQuery(
"SELECT ds.*, dv.fields\n"
"SELECT ds.*, dv.fields, dv.lifecycle_state\n"
+ "FROM datasets ds\n"
+ "LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid\n"
+ "WHERE ds.uuid IN (<dsUuids>);")
Expand Down
15 changes: 12 additions & 3 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import marquez.service.models.LineageEvent.Dataset;
import marquez.service.models.LineageEvent.DatasetFacets;
import marquez.service.models.LineageEvent.Job;
import marquez.service.models.LineageEvent.LifecycleStateChangeFacet;
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
Expand Down Expand Up @@ -350,6 +351,12 @@ default DatasetRecord upsertLineageDataset(
formatNamespaceName(ds.getNamespace()),
DEFAULT_NAMESPACE_OWNER);

String dslifecycleState =
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getLifecycleStateChange)
.map(LifecycleStateChangeFacet::getLifecycleStateChange)
.orElse("");

DatasetRow datasetRow =
datasetDao.upsert(
UUID.randomUUID(),
Expand All @@ -361,7 +368,8 @@ default DatasetRecord upsertLineageDataset(
source.getName(),
formatDatasetName(ds.getName()),
ds.getName(),
dsDescription);
dsDescription,
dslifecycleState.equalsIgnoreCase("DROP"));

List<SchemaField> fields =
Optional.ofNullable(ds.getFacets())
Expand All @@ -385,6 +393,7 @@ default DatasetRecord upsertLineageDataset(
source.getName(),
dsRow.getPhysicalName(),
dsRow.getName(),
dslifecycleState,
fields,
runUuid)
.getValue();
Expand All @@ -397,8 +406,8 @@ default DatasetRecord upsertLineageDataset(
isInput ? null : runUuid,
datasetVersionDao.toPgObjectSchemaFields(fields),
dsNamespace.getName(),
ds.getName());

ds.getName(),
dslifecycleState);
return row;
});
List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();
Expand Down
4 changes: 3 additions & 1 deletion api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
d.getSourceName().getValue(),
d.getPhysicalName().getValue(),
d.getName().getValue(),
null,
toSchemaFields(d.getFields()),
runUuid)
.getValue();
Expand All @@ -288,7 +289,8 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
runUuid,
datasetVersionDao.toPgObjectFields(d.getFields()),
d.getNamespace().getValue(),
d.getName().getValue());
d.getName().getValue(),
null);
});
}
}
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/db/mappers/DatasetDataMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public DatasetData map(@NonNull ResultSet results, @NonNull StatementContext con
toFields(results, "fields"),
ImmutableSet.of(),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.DESCRIPTION));
stringOrNull(results, Columns.DESCRIPTION),
stringOrNull(results, Columns.LIFECYCLE_STATE));
}

public static ImmutableList<Field> toFields(ResultSet results, String column)
Expand Down
9 changes: 7 additions & 2 deletions api/src/main/java/marquez/db/mappers/DatasetMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package marquez.db.mappers;

import static marquez.db.Columns.booleanOrDefault;
import static marquez.db.Columns.stringArrayOrThrow;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
Expand Down Expand Up @@ -62,9 +63,11 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context
toFields(results, "fields"),
toTags(results, "tags"),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.LIFECYCLE_STATE),
stringOrNull(results, Columns.DESCRIPTION),
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
toFacetsOrNull(results, Columns.FACETS));
toFacetsOrNull(results, Columns.FACETS),
booleanOrDefault(results, Columns.IS_DELETED, false));
} else {
return new Stream(
new DatasetId(
Expand All @@ -79,9 +82,11 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context
toFields(results, "fields"),
toTags(results, "tags"),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.LIFECYCLE_STATE),
stringOrNull(results, Columns.DESCRIPTION),
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
toFacetsOrNull(results, Columns.FACETS));
toFacetsOrNull(results, Columns.FACETS),
booleanOrDefault(results, Columns.IS_DELETED, false));
}
}

Expand Down
4 changes: 3 additions & 1 deletion api/src/main/java/marquez/db/mappers/DatasetRowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package marquez.db.mappers;

import static marquez.db.Columns.booleanOrDefault;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrNull;
Expand Down Expand Up @@ -32,6 +33,7 @@ public DatasetRow map(@NonNull ResultSet results, @NonNull StatementContext cont
stringOrThrow(results, Columns.PHYSICAL_NAME),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.DESCRIPTION),
uuidOrNull(results, Columns.CURRENT_VERSION_UUID));
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
booleanOrDefault(results, Columns.IS_DELETED, false));
}
}
Loading