Skip to content

Commit 56424c3

Browse files
add operation column
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent ddfcd37 commit 56424c3

37 files changed

Lines changed: 314 additions & 38 deletions

api/src/main/java/marquez/common/Utils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ public static Version newJobVersionFor(
204204
* @param sourceName The source name of the dataset.
205205
* @param physicalName The physical name of the dataset.
206206
* @param datasetName The dataset name.
207+
* @param lifecycleStateChange The dataset change like CREATE, DROP, TRUNCATE.
207208
* @param fields The fields of the dataset.
208209
* @param runId The UUID of the run linked to the dataset.
209210
* @return A {@link Version} object based on the specified job meta.
@@ -213,6 +214,7 @@ public static Version newDatasetVersionFor(
213214
String sourceName,
214215
String physicalName,
215216
String datasetName,
217+
String lifecycleStateChange,
216218
List<LineageEvent.SchemaField> fields,
217219
UUID runId) {
218220
DatasetVersionData data =
@@ -221,6 +223,7 @@ public static Version newDatasetVersionFor(
221223
.sourceName(sourceName)
222224
.physicalName(physicalName)
223225
.datasetName(datasetName)
226+
.lifecycleStateChange(lifecycleStateChange)
224227
.schemaFields(fields)
225228
.runId(runId)
226229
.build();
@@ -259,6 +262,7 @@ private static Version newDatasetVersionFor(DatasetVersionData data) {
259262
data.getPhysicalName(),
260263
data.getSchemaLocation(),
261264
data.getFields().stream().map(Utils::joinField).collect(joining(VERSION_DELIM)),
265+
data.getLifecycleStateChange(),
262266
data.getRunId())
263267
.getBytes(UTF_8);
264268
return Version.of(UUID.nameUUIDFromBytes(bytes));
@@ -275,6 +279,7 @@ private static class DatasetVersionData {
275279
private String sourceName;
276280
private String physicalName;
277281
private String datasetName;
282+
private String lifecycleStateChange;
278283
private String schemaLocation;
279284
private Set<Triple<String, String, String>> fields;
280285
private UUID runId;

api/src/main/java/marquez/db/Columns.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,11 @@ private Columns() {}
6969
public static final String TAG_UUIDS = "tag_uuids";
7070
public static final String TAGGED_AT = "tagged_at";
7171
public static final String LAST_MODIFIED_AT = "last_modified_at";
72+
public static final String IS_DELETED = "is_deleted";
7273

7374
/* DATASET VERSION ROW COLUMNS */
7475
public static final String FIELD_UUIDS = "field_uuids";
76+
public static final String LIFECYCLE_STATE_CHANGE = "lifecycle_state_change";
7577

7678
/* STREAM VERSION ROW COLUMNS */
7779
public static final String SCHEMA_LOCATION = "schema_location";
@@ -160,6 +162,15 @@ public static String stringOrThrow(final ResultSet results, final String column)
160162
return results.getString(column);
161163
}
162164

165+
public static boolean booleanOrDefault(
166+
final ResultSet results, final String column, final boolean defaultValue)
167+
throws SQLException {
168+
if (results.getObject(column) == null) {
169+
return defaultValue;
170+
}
171+
return results.getBoolean(column);
172+
}
173+
163174
public static int intOrThrow(final ResultSet results, final String column) throws SQLException {
164175
if (results.getObject(column) == null) {
165176
throw new IllegalArgumentException();

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,15 @@ void updateLastModifiedAt(
7272
+ " WHERE d.namespace_name = :namespaceName\n"
7373
+ " AND d.name = :datasetName\n"
7474
+ "), dataset_runs AS (\n"
75-
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n"
75+
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state_change, event_time, event\n"
7676
+ " FROM selected_datasets d\n"
7777
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
7878
+ " LEFT JOIN LATERAL (\n"
7979
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
8080
+ " WHERE run_uuid = dv.run_uuid\n"
8181
+ " ) e ON e.run_uuid = dv.run_uuid\n"
8282
+ " UNION\n"
83-
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n"
83+
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state_change, event_time, event\n"
8484
+ " FROM selected_datasets d\n"
8585
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
8686
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
@@ -89,7 +89,7 @@ void updateLastModifiedAt(
8989
+ " WHERE run_uuid = rim.run_uuid\n"
9090
+ " ) e ON e.run_uuid = rim.run_uuid\n"
9191
+ ")\n"
92-
+ "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n"
92+
+ "SELECT d.*, dv.fields, dv.lifecycle_state_change, sv.schema_location, t.tags, facets\n"
9393
+ "FROM selected_datasets d\n"
9494
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
9595
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
@@ -142,15 +142,15 @@ default void setFields(Dataset ds) {
142142
+ " ORDER BY d.name\n"
143143
+ " LIMIT :limit OFFSET :offset\n"
144144
+ "), dataset_runs AS (\n"
145-
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n"
145+
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state_change, event_time, event\n"
146146
+ " FROM selected_datasets d\n"
147147
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
148148
+ " LEFT JOIN LATERAL (\n"
149149
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
150150
+ " WHERE run_uuid = dv.run_uuid\n"
151151
+ " ) e ON e.run_uuid = dv.run_uuid\n"
152152
+ " UNION\n"
153-
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n"
153+
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state_change, event_time, event\n"
154154
+ " FROM selected_datasets d\n"
155155
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
156156
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
@@ -159,7 +159,7 @@ default void setFields(Dataset ds) {
159159
+ " WHERE run_uuid = rim.run_uuid\n"
160160
+ " ) e ON e.run_uuid = rim.run_uuid\n"
161161
+ ")\n"
162-
+ "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n"
162+
+ "SELECT d.*, dv.fields, dv.lifecycle_state_change, sv.schema_location, t.tags, facets\n"
163163
+ "FROM selected_datasets d\n"
164164
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
165165
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
@@ -205,7 +205,8 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
205205
+ "source_name, "
206206
+ "name, "
207207
+ "physical_name, "
208-
+ "description "
208+
+ "description, "
209+
+ "is_deleted "
209210
+ ") VALUES ( "
210211
+ ":uuid, "
211212
+ ":type, "
@@ -217,13 +218,15 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
217218
+ ":sourceName, "
218219
+ ":name, "
219220
+ ":physicalName, "
220-
+ ":description) "
221+
+ ":description, "
222+
+ ":isDeleted) "
221223
+ "ON CONFLICT (namespace_uuid, name) "
222224
+ "DO UPDATE SET "
223225
+ "type = EXCLUDED.type, "
224226
+ "updated_at = EXCLUDED.updated_at, "
225227
+ "physical_name = EXCLUDED.physical_name, "
226-
+ "description = EXCLUDED.description "
228+
+ "description = EXCLUDED.description, "
229+
+ "is_deleted = EXCLUDED.is_deleted "
227230
+ "RETURNING *")
228231
DatasetRow upsert(
229232
UUID uuid,
@@ -235,7 +238,8 @@ DatasetRow upsert(
235238
String sourceName,
236239
String name,
237240
String physicalName,
238-
String description);
241+
String description,
242+
boolean isDeleted);
239243

240244
@SqlQuery(
241245
"INSERT INTO datasets ("
@@ -308,7 +312,8 @@ default Dataset upsertDatasetMeta(
308312
sourceRow.getName(),
309313
datasetName.getValue(),
310314
datasetMeta.getPhysicalName().getValue(),
311-
datasetMeta.getDescription().orElse(null));
315+
datasetMeta.getDescription().orElse(null),
316+
false);
312317
} else {
313318
datasetRow =
314319
upsert(
@@ -340,6 +345,7 @@ default Dataset upsertDatasetMeta(
340345
now,
341346
namespaceName.getValue(),
342347
datasetName.getValue(),
348+
null,
343349
datasetMeta);
344350

345351
return findWithTags(namespaceName.getValue(), datasetName.getValue()).get();

api/src/main/java/marquez/db/DatasetVersionDao.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ default DatasetVersionRow upsertDatasetVersion(
4848
Instant now,
4949
String namespaceName,
5050
String datasetName,
51+
String lifecycleStateChange,
5152
DatasetMeta datasetMeta) {
5253
TagDao tagDao = createTagDao();
5354
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
@@ -63,7 +64,8 @@ default DatasetVersionRow upsertDatasetVersion(
6364
datasetMeta.getRunId().map(RunId::getValue).orElse(null),
6465
toPgObjectFields(datasetMeta.getFields()),
6566
namespaceName,
66-
datasetName);
67+
datasetName,
68+
lifecycleStateChange);
6769
updateDatasetVersionMetric(
6870
namespaceName,
6971
datasetMeta.getType().toString(),
@@ -167,7 +169,7 @@ default void updateDatasetVersionMetric(
167169
+ " FROM selected_dataset_version_runs dv\n"
168170
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
169171
+ ")\n"
170-
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
172+
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change, \n"
171173
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
172174
+ " t.tags, f.facets\n"
173175
+ "FROM selected_dataset_versions dv\n"
@@ -209,7 +211,7 @@ default void updateDatasetVersionMetric(
209211
+ " FROM selected_dataset_version_runs dv\n"
210212
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
211213
+ ")\n"
212-
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
214+
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change, \n"
213215
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
214216
+ " t.tags, f.facets\n"
215217
+ "FROM selected_dataset_versions dv\n"
@@ -280,7 +282,7 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {
280282
+ " FROM selected_dataset_version_runs dv\n"
281283
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
282284
+ ")\n"
283-
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
285+
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state_change,\n"
284286
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
285287
+ " t.tags, f.facets\n"
286288
+ "FROM selected_dataset_versions dv\n"
@@ -324,9 +326,9 @@ default List<DatasetVersion> findAllWithRun(
324326

325327
@SqlQuery(
326328
"INSERT INTO dataset_versions "
327-
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name) "
329+
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name, lifecycle_state_change) "
328330
+ "VALUES "
329-
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName) "
331+
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName, :lifecycleStateChange) "
330332
+ "ON CONFLICT(version) "
331333
+ "DO UPDATE SET "
332334
+ "run_uuid = EXCLUDED.run_uuid "
@@ -339,7 +341,8 @@ DatasetVersionRow upsert(
339341
UUID runUuid,
340342
PGobject fields,
341343
String namespaceName,
342-
String datasetName);
344+
String datasetName,
345+
String lifecycleStateChange);
343346

344347
@SqlUpdate("UPDATE dataset_versions SET fields = :fields WHERE uuid = :uuid")
345348
void updateFields(UUID uuid, PGobject fields);

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public interface LineageDao {
6767
Optional<UUID> getJobUuid(String jobName, String namespace);
6868

6969
@SqlQuery(
70-
"SELECT ds.*, dv.fields\n"
70+
"SELECT ds.*, dv.fields, dv.lifecycle_state_change\n"
7171
+ "FROM datasets ds\n"
7272
+ "LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid\n"
7373
+ "WHERE ds.uuid IN (<dsUuids>);")

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import marquez.service.models.LineageEvent.Dataset;
4242
import marquez.service.models.LineageEvent.DatasetFacets;
4343
import marquez.service.models.LineageEvent.Job;
44+
import marquez.service.models.LineageEvent.LifecycleStateChangeFacet;
4445
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
4546
import marquez.service.models.LineageEvent.SchemaField;
4647
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
@@ -350,6 +351,12 @@ default DatasetRecord upsertLineageDataset(
350351
formatNamespaceName(ds.getNamespace()),
351352
DEFAULT_NAMESPACE_OWNER);
352353

354+
String dsLifecycleStateChange =
355+
Optional.ofNullable(ds.getFacets())
356+
.map(DatasetFacets::getLifecycleStateChange)
357+
.map(LifecycleStateChangeFacet::getLifecycleStateChange)
358+
.orElse("");
359+
353360
DatasetRow datasetRow =
354361
datasetDao.upsert(
355362
UUID.randomUUID(),
@@ -361,7 +368,8 @@ default DatasetRecord upsertLineageDataset(
361368
source.getName(),
362369
formatDatasetName(ds.getName()),
363370
ds.getName(),
364-
dsDescription);
371+
dsDescription,
372+
dsLifecycleStateChange.equalsIgnoreCase("DROP"));
365373

366374
List<SchemaField> fields =
367375
Optional.ofNullable(ds.getFacets())
@@ -385,6 +393,7 @@ default DatasetRecord upsertLineageDataset(
385393
source.getName(),
386394
dsRow.getPhysicalName(),
387395
dsRow.getName(),
396+
dsLifecycleStateChange,
388397
fields,
389398
runUuid)
390399
.getValue();
@@ -397,8 +406,8 @@ default DatasetRecord upsertLineageDataset(
397406
isInput ? null : runUuid,
398407
datasetVersionDao.toPgObjectSchemaFields(fields),
399408
dsNamespace.getName(),
400-
ds.getName());
401-
409+
ds.getName(),
410+
dsLifecycleStateChange);
402411
return row;
403412
});
404413
List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();

api/src/main/java/marquez/db/RunDao.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
277277
d.getSourceName().getValue(),
278278
d.getPhysicalName().getValue(),
279279
d.getName().getValue(),
280+
null,
280281
toSchemaFields(d.getFields()),
281282
runUuid)
282283
.getValue();
@@ -288,7 +289,8 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
288289
runUuid,
289290
datasetVersionDao.toPgObjectFields(d.getFields()),
290291
d.getNamespace().getValue(),
291-
d.getName().getValue());
292+
d.getName().getValue(),
293+
null);
292294
});
293295
}
294296
}

api/src/main/java/marquez/db/mappers/DatasetDataMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public DatasetData map(@NonNull ResultSet results, @NonNull StatementContext con
5252
toFields(results, "fields"),
5353
ImmutableSet.of(),
5454
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
55-
stringOrNull(results, Columns.DESCRIPTION));
55+
stringOrNull(results, Columns.DESCRIPTION),
56+
stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE));
5657
}
5758

5859
public static ImmutableList<Field> toFields(ResultSet results, String column)

api/src/main/java/marquez/db/mappers/DatasetMapper.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
package marquez.db.mappers;
44

5+
import static marquez.db.Columns.booleanOrDefault;
56
import static marquez.db.Columns.stringArrayOrThrow;
67
import static marquez.db.Columns.stringOrNull;
78
import static marquez.db.Columns.stringOrThrow;
@@ -62,9 +63,11 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context
6263
toFields(results, "fields"),
6364
toTags(results, "tags"),
6465
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
66+
stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE),
6567
stringOrNull(results, Columns.DESCRIPTION),
6668
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
67-
toFacetsOrNull(results, Columns.FACETS));
69+
toFacetsOrNull(results, Columns.FACETS),
70+
booleanOrDefault(results, Columns.IS_DELETED, false));
6871
} else {
6972
return new Stream(
7073
new DatasetId(
@@ -79,9 +82,11 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context
7982
toFields(results, "fields"),
8083
toTags(results, "tags"),
8184
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
85+
stringOrNull(results, Columns.LIFECYCLE_STATE_CHANGE),
8286
stringOrNull(results, Columns.DESCRIPTION),
8387
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
84-
toFacetsOrNull(results, Columns.FACETS));
88+
toFacetsOrNull(results, Columns.FACETS),
89+
booleanOrDefault(results, Columns.IS_DELETED, false));
8590
}
8691
}
8792

api/src/main/java/marquez/db/mappers/DatasetRowMapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
package marquez.db.mappers;
44

5+
import static marquez.db.Columns.booleanOrDefault;
56
import static marquez.db.Columns.stringOrNull;
67
import static marquez.db.Columns.stringOrThrow;
78
import static marquez.db.Columns.timestampOrNull;
@@ -32,6 +33,7 @@ public DatasetRow map(@NonNull ResultSet results, @NonNull StatementContext cont
3233
stringOrThrow(results, Columns.PHYSICAL_NAME),
3334
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
3435
stringOrNull(results, Columns.DESCRIPTION),
35-
uuidOrNull(results, Columns.CURRENT_VERSION_UUID));
36+
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
37+
booleanOrDefault(results, Columns.IS_DELETED, false));
3638
}
3739
}

0 commit comments

Comments
 (0)