Skip to content

Commit 89717e2

Browse files
add operation column
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 68cce15 commit 89717e2

33 files changed

Lines changed: 209 additions & 22 deletions

api/src/main/java/marquez/common/Utils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ public static Version newJobVersionFor(
216216
* @param sourceName The source name of the dataset.
217217
* @param physicalName The physical name of the dataset.
218218
* @param datasetName The dataset name.
219+
* @param stateChange The dataset stateChange like CREATE, DROP, TRUNCATE.
219220
* @param fields The fields of the dataset.
220221
* @param runId The UUID of the run linked to the dataset.
221222
* @return A {@link Version} object based on the specified job meta.
@@ -225,6 +226,7 @@ public static Version newDatasetVersionFor(
225226
String sourceName,
226227
String physicalName,
227228
String datasetName,
229+
String stateChange,
228230
List<LineageEvent.SchemaField> fields,
229231
UUID runId) {
230232
DatasetVersionData data =
@@ -233,6 +235,7 @@ public static Version newDatasetVersionFor(
233235
.sourceName(sourceName)
234236
.physicalName(physicalName)
235237
.datasetName(datasetName)
238+
.stateChange(stateChange)
236239
.schemaFields(fields)
237240
.runId(runId)
238241
.build();
@@ -271,6 +274,7 @@ private static Version newDatasetVersionFor(DatasetVersionData data) {
271274
data.getPhysicalName(),
272275
data.getSchemaLocation(),
273276
data.getFields().stream().map(Utils::joinField).collect(joining(VERSION_DELIM)),
277+
data.getStateChange(),
274278
data.getRunId())
275279
.getBytes(UTF_8);
276280
return Version.of(UUID.nameUUIDFromBytes(bytes));
@@ -287,6 +291,7 @@ private static class DatasetVersionData {
287291
private String sourceName;
288292
private String physicalName;
289293
private String datasetName;
294+
private String stateChange;
290295
private String schemaLocation;
291296
private Set<Triple<String, String, String>> fields;
292297
private UUID runId;

api/src/main/java/marquez/db/Columns.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ private Columns() {}
8484

8585
/* DATASET VERSION ROW COLUMNS */
8686
public static final String FIELD_UUIDS = "field_uuids";
87+
public static final String STATE_CHANGE = "state_change";
8788

8889
/* STREAM VERSION ROW COLUMNS */
8990
public static final String SCHEMA_LOCATION = "schema_location";

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,15 @@ void updateLastModifiedAt(
8484
+ " WHERE d.namespace_name = :namespaceName\n"
8585
+ " AND d.name = :datasetName\n"
8686
+ "), dataset_runs AS (\n"
87-
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n"
87+
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.state_change, event_time, event\n"
8888
+ " FROM selected_datasets d\n"
8989
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
9090
+ " LEFT JOIN LATERAL (\n"
9191
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
9292
+ " WHERE run_uuid = dv.run_uuid\n"
9393
+ " ) e ON e.run_uuid = dv.run_uuid\n"
9494
+ " UNION\n"
95-
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n"
95+
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, state_change, event_time, event\n"
9696
+ " FROM selected_datasets d\n"
9797
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
9898
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
@@ -101,7 +101,7 @@ void updateLastModifiedAt(
101101
+ " WHERE run_uuid = rim.run_uuid\n"
102102
+ " ) e ON e.run_uuid = rim.run_uuid\n"
103103
+ ")\n"
104-
+ "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n"
104+
+ "SELECT d.*, dv.fields, dv.state_change, sv.schema_location, t.tags, facets\n"
105105
+ "FROM selected_datasets d\n"
106106
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
107107
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
@@ -154,15 +154,15 @@ default void setFields(Dataset ds) {
154154
+ " ORDER BY d.name\n"
155155
+ " LIMIT :limit OFFSET :offset\n"
156156
+ "), dataset_runs AS (\n"
157-
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, event_time, event\n"
157+
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.state_change, event_time, event\n"
158158
+ " FROM selected_datasets d\n"
159159
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
160160
+ " LEFT JOIN LATERAL (\n"
161161
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
162162
+ " WHERE run_uuid = dv.run_uuid\n"
163163
+ " ) e ON e.run_uuid = dv.run_uuid\n"
164164
+ " UNION\n"
165-
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, event_time, event\n"
165+
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, state_change, event_time, event\n"
166166
+ " FROM selected_datasets d\n"
167167
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
168168
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
@@ -171,7 +171,7 @@ default void setFields(Dataset ds) {
171171
+ " WHERE run_uuid = rim.run_uuid\n"
172172
+ " ) e ON e.run_uuid = rim.run_uuid\n"
173173
+ ")\n"
174-
+ "SELECT d.*, dv.fields, sv.schema_location, t.tags, facets\n"
174+
+ "SELECT d.*, dv.fields, dv.state_change, sv.schema_location, t.tags, facets\n"
175175
+ "FROM selected_datasets d\n"
176176
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
177177
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
@@ -352,6 +352,7 @@ default Dataset upsertDatasetMeta(
352352
now,
353353
namespaceName.getValue(),
354354
datasetName.getValue(),
355+
null,
355356
datasetMeta);
356357

357358
return findWithTags(namespaceName.getValue(), datasetName.getValue()).get();

api/src/main/java/marquez/db/DatasetVersionDao.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ default DatasetVersionRow upsertDatasetVersion(
6060
Instant now,
6161
String namespaceName,
6262
String datasetName,
63+
String stateChange,
6364
DatasetMeta datasetMeta) {
6465
TagDao tagDao = createTagDao();
6566
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
@@ -75,7 +76,8 @@ default DatasetVersionRow upsertDatasetVersion(
7576
datasetMeta.getRunId().map(RunId::getValue).orElse(null),
7677
toPgObjectFields(datasetMeta.getFields()),
7778
namespaceName,
78-
datasetName);
79+
datasetName,
80+
stateChange);
7981
updateDatasetVersionMetric(
8082
namespaceName,
8183
datasetMeta.getType().toString(),
@@ -179,7 +181,7 @@ default void updateDatasetVersionMetric(
179181
+ " FROM selected_dataset_version_runs dv\n"
180182
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
181183
+ ")\n"
182-
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
184+
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.state_change, \n"
183185
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
184186
+ " t.tags, f.facets\n"
185187
+ "FROM selected_dataset_versions dv\n"
@@ -221,7 +223,7 @@ default void updateDatasetVersionMetric(
221223
+ " FROM selected_dataset_version_runs dv\n"
222224
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
223225
+ ")\n"
224-
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
226+
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.state_change, \n"
225227
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
226228
+ " t.tags, f.facets\n"
227229
+ "FROM selected_dataset_versions dv\n"
@@ -292,7 +294,7 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {
292294
+ " FROM selected_dataset_version_runs dv\n"
293295
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
294296
+ ")\n"
295-
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description,\n"
297+
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.state_change,\n"
296298
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
297299
+ " t.tags, f.facets\n"
298300
+ "FROM selected_dataset_versions dv\n"
@@ -336,9 +338,9 @@ default List<DatasetVersion> findAllWithRun(
336338

337339
@SqlQuery(
338340
"INSERT INTO dataset_versions "
339-
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name) "
341+
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name, state_change) "
340342
+ "VALUES "
341-
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName) "
343+
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName, :stateChange) "
342344
+ "ON CONFLICT(version) "
343345
+ "DO UPDATE SET "
344346
+ "run_uuid = EXCLUDED.run_uuid "
@@ -351,7 +353,8 @@ DatasetVersionRow upsert(
351353
UUID runUuid,
352354
PGobject fields,
353355
String namespaceName,
354-
String datasetName);
356+
String datasetName,
357+
String stateChange);
355358

356359
@SqlUpdate("UPDATE dataset_versions SET fields = :fields WHERE uuid = :uuid")
357360
void updateFields(UUID uuid, PGobject fields);

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public interface LineageDao {
7979
Optional<UUID> getJobUuid(String jobName, String namespace);
8080

8181
@SqlQuery(
82-
"SELECT ds.*, dv.fields\n"
82+
"SELECT ds.*, dv.fields, dv.state_change\n"
8383
+ "FROM datasets ds\n"
8484
+ "LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid\n"
8585
+ "WHERE ds.uuid IN (<dsUuids>);")

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,12 @@ default DatasetRecord upsertLineageDataset(
367367
.map(SchemaDatasetFacet::getFields)
368368
.orElse(null);
369369

370+
String dsStateChange =
371+
Optional.ofNullable(ds.getFacets())
372+
.map(DatasetFacets::getTableStateChange)
373+
.map(LineageEvent.TableStateChangeFacet::getStateChange)
374+
.orElse(null);
375+
370376
final DatasetRow dsRow = datasetRow;
371377
DatasetVersionRow datasetVersionRow =
372378
datasetRow
@@ -383,6 +389,7 @@ default DatasetRecord upsertLineageDataset(
383389
source.getName(),
384390
dsRow.getPhysicalName(),
385391
dsRow.getName(),
392+
dsStateChange,
386393
fields,
387394
runUuid)
388395
.getValue();
@@ -395,8 +402,8 @@ default DatasetRecord upsertLineageDataset(
395402
isInput ? null : runUuid,
396403
datasetVersionDao.toPgObjectSchemaFields(fields),
397404
dsNamespace.getName(),
398-
ds.getName());
399-
405+
ds.getName(),
406+
dsStateChange);
400407
return row;
401408
});
402409
List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();

api/src/main/java/marquez/db/RunDao.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
289289
d.getSourceName().getValue(),
290290
d.getPhysicalName().getValue(),
291291
d.getName().getValue(),
292+
null,
292293
toSchemaFields(d.getFields()),
293294
runUuid)
294295
.getValue();
@@ -300,7 +301,8 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
300301
runUuid,
301302
datasetVersionDao.toPgObjectFields(d.getFields()),
302303
d.getNamespace().getValue(),
303-
d.getName().getValue());
304+
d.getName().getValue(),
305+
null);
304306
});
305307
}
306308
}

api/src/main/java/marquez/db/mappers/DatasetDataMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public DatasetData map(@NonNull ResultSet results, @NonNull StatementContext con
5050
toFields(results, "fields"),
5151
ImmutableSet.of(),
5252
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
53-
stringOrNull(results, Columns.DESCRIPTION));
53+
stringOrNull(results, Columns.DESCRIPTION),
54+
stringOrNull(results, Columns.STATE_CHANGE));
5455
}
5556

5657
public static ImmutableList<Field> toFields(ResultSet results, String column)

api/src/main/java/marquez/db/mappers/DatasetMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context
7474
toFields(results, "fields"),
7575
toTags(results, "tags"),
7676
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
77+
stringOrNull(results, Columns.STATE_CHANGE),
7778
stringOrNull(results, Columns.DESCRIPTION),
7879
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
7980
toFacetsOrNull(results, Columns.FACETS));
@@ -91,6 +92,7 @@ public Dataset map(@NonNull ResultSet results, @NonNull StatementContext context
9192
toFields(results, "fields"),
9293
toTags(results, "tags"),
9394
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
95+
stringOrNull(results, Columns.STATE_CHANGE),
9496
stringOrNull(results, Columns.DESCRIPTION),
9597
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
9698
toFacetsOrNull(results, Columns.FACETS));

api/src/main/java/marquez/db/mappers/DatasetVersionMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public DatasetVersion map(@NonNull ResultSet results, @NonNull StatementContext
6666
toFields(results, "fields"),
6767
columnNames.contains("tags") ? toTags(results, "tags") : null,
6868
stringOrNull(results, Columns.DESCRIPTION),
69+
stringOrNull(results, Columns.STATE_CHANGE),
6970
null,
7071
toFacetsOrNull(results, Columns.FACETS));
7172
} else {
@@ -83,6 +84,7 @@ public DatasetVersion map(@NonNull ResultSet results, @NonNull StatementContext
8384
toFields(results, "fields"),
8485
columnNames.contains("tags") ? toTags(results, "tags") : null,
8586
stringOrNull(results, Columns.DESCRIPTION),
87+
stringOrNull(results, Columns.STATE_CHANGE),
8688
null,
8789
toFacetsOrNull(results, Columns.FACETS));
8890
}

0 commit comments

Comments
 (0)