Skip to content

Commit 40bfe6b

Browse files
Runless events - run upsert builder
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 6d38a7d commit 40bfe6b

5 files changed

Lines changed: 132 additions & 87 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://git
2727

2828
### Added
2929
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
30-
* Save into Marquez model datasets sent via `DatasetEvent` event type
30+
*Save into Marquez model datasets sent via `DatasetEvent` event type
3131

3232
## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20
3333
### Added

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 67 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import marquez.common.models.SourceType;
3434
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
3535
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
36+
import marquez.db.RunDao.RunUpsert;
37+
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
3638
import marquez.db.mappers.LineageEventMapper;
3739
import marquez.db.models.ColumnLineageRow;
3840
import marquez.db.models.DatasetFieldRow;
@@ -149,7 +151,6 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map
149151

150152
default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) {
151153
daos.initBaseDao(this);
152-
153154
Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();
154155

155156
UpdateLineageRow bag = new UpdateLineageRow();
@@ -164,9 +165,9 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map
164165

165166
Dataset dataset = event.getDataset();
166167
List<DatasetRecord> datasetOutputs = new ArrayList<>();
167-
DatasetRecord record = upsertLineageDataset(dataset, now, null, false, daos);
168+
DatasetRecord record = upsertLineageDataset(dataset, now, null, false);
168169
datasetOutputs.add(record);
169-
insertOutputFacets(dataset, record, null, null, now, daos);
170+
insertOutputFacets(dataset, record, null, null, now);
170171

171172
daos.getDatasetDao()
172173
.updateVersion(
@@ -192,18 +193,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
192193
DEFAULT_NAMESPACE_OWNER);
193194
bag.setNamespace(namespace);
194195

195-
Instant nominalStartTime =
196-
Optional.ofNullable(event.getRun().getFacets())
197-
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
198-
.map(NominalTimeRunFacet::getNominalStartTime)
199-
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
200-
.orElse(null);
201-
Instant nominalEndTime =
202-
Optional.ofNullable(event.getRun().getFacets())
203-
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
204-
.map(NominalTimeRunFacet::getNominalEndTime)
205-
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
206-
.orElse(null);
196+
Instant nominalStartTime = getNominalStartTime(event);
197+
Instant nominalEndTime = getNominalEndTime(event);
207198

208199
Optional<ParentRunFacet> parentRun =
209200
Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent);
@@ -230,51 +221,25 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
230221
bag.setRunArgs(runArgs);
231222

232223
final UUID runUuid = runToUuid(event.getRun().getRunId());
233-
234224
RunRow run;
225+
RunUpsertBuilder runUpsertBuilder =
226+
RunUpsert.builder()
227+
.runUuid(runUuid)
228+
.parentRunUuid(parentUuid.orElse(null))
229+
.externalId(event.getRun().getRunId())
230+
.now(now)
231+
.jobUuid(job.getUuid())
232+
.jobVersionUuid(null)
233+
.runArgsUuid(runArgs.getUuid())
234+
.namespaceName(namespace.getName())
235+
.jobName(job.getName())
236+
.location(job.getLocation());
237+
235238
if (event.getEventType() != null) {
236-
RunState runStateType = getRunState(event.getEventType());
237-
run =
238-
daos.getRunDao()
239-
.upsert(
240-
runUuid,
241-
parentUuid.orElse(null),
242-
event.getRun().getRunId(),
243-
now,
244-
job.getUuid(),
245-
null,
246-
runArgs.getUuid(),
247-
nominalStartTime,
248-
nominalEndTime,
249-
runStateType,
250-
now,
251-
namespace.getName(),
252-
job.getName(),
253-
job.getLocation());
254-
// Add ...
255-
Optional.ofNullable(event.getRun().getFacets())
256-
.ifPresent(
257-
runFacet ->
258-
daos.getRunFacetsDao()
259-
.insertRunFacetsFor(
260-
runUuid, now, event.getEventType(), event.getRun().getFacets()));
261-
} else {
262-
run =
263-
daos.getRunDao()
264-
.upsert(
265-
runUuid,
266-
parentUuid.orElse(null),
267-
event.getRun().getRunId(),
268-
now,
269-
job.getUuid(),
270-
null,
271-
runArgs.getUuid(),
272-
nominalStartTime,
273-
nominalEndTime,
274-
namespace.getName(),
275-
job.getName(),
276-
job.getLocation());
239+
runUpsertBuilder.runStateType(getRunState(event.getEventType())).runStateTime(now);
277240
}
241+
run = daos.getRunDao().upsert(runUpsertBuilder.build());
242+
insertRunFacets(event, runUuid, now);
278243
bag.setRun(run);
279244

280245
if (event.getEventType() != null) {
@@ -290,51 +255,73 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
290255
}
291256
}
292257

293-
// Add ...
294-
Optional.ofNullable(event.getJob().getFacets())
295-
.ifPresent(
296-
jobFacet ->
297-
daos.getJobFacetsDao()
298-
.insertJobFacetsFor(
299-
job.getUuid(),
300-
runUuid,
301-
now,
302-
event.getEventType(),
303-
event.getJob().getFacets()));
258+
insertJobFacets(event, job.getUuid(), runUuid, now);
304259

305260
// RunInput list uses null as a sentinel value
306261
List<DatasetRecord> datasetInputs = null;
307262
if (event.getInputs() != null) {
308263
datasetInputs = new ArrayList<>();
309264
for (Dataset dataset : event.getInputs()) {
310-
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true, daos);
265+
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, true);
311266
datasetInputs.add(record);
312-
insertInputFacets(dataset, record, runUuid, event.getEventType(), now, daos);
267+
insertInputFacets(dataset, record, runUuid, event.getEventType(), now);
313268
}
314269
}
315270
bag.setInputs(Optional.ofNullable(datasetInputs));
271+
316272
// RunInput list uses null as a sentinel value
317273
List<DatasetRecord> datasetOutputs = null;
318274
if (event.getOutputs() != null) {
319275
datasetOutputs = new ArrayList<>();
320276
for (Dataset dataset : event.getOutputs()) {
321-
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false, daos);
277+
DatasetRecord record = upsertLineageDataset(dataset, now, runUuid, false);
322278
datasetOutputs.add(record);
323-
insertOutputFacets(dataset, record, runUuid, event.getEventType(), now, daos);
279+
insertOutputFacets(dataset, record, runUuid, event.getEventType(), now);
324280
}
325281
}
326282

327283
bag.setOutputs(Optional.ofNullable(datasetOutputs));
328284
return bag;
329285
}
330286

287+
private static Instant getNominalStartTime(LineageEvent event) {
288+
return Optional.ofNullable(event.getRun().getFacets())
289+
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
290+
.map(NominalTimeRunFacet::getNominalStartTime)
291+
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
292+
.orElse(null);
293+
}
294+
295+
private static Instant getNominalEndTime(LineageEvent event) {
296+
return Optional.ofNullable(event.getRun().getFacets())
297+
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
298+
.map(NominalTimeRunFacet::getNominalEndTime)
299+
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
300+
.orElse(null);
301+
}
302+
303+
private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) {
304+
// Add ...
305+
Optional.ofNullable(event.getRun().getFacets())
306+
.ifPresent(
307+
runFacet ->
308+
daos.getRunFacetsDao()
309+
.insertRunFacetsFor(
310+
runUuid, now, event.getEventType(), event.getRun().getFacets()));
311+
}
312+
313+
private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) {
314+
// Add ...
315+
Optional.ofNullable(event.getJob().getFacets())
316+
.ifPresent(
317+
jobFacet ->
318+
daos.getJobFacetsDao()
319+
.insertJobFacetsFor(
320+
jobUuid, runUuid, now, event.getEventType(), event.getJob().getFacets()));
321+
}
322+
331323
private void insertInputFacets(
332-
Dataset dataset,
333-
DatasetRecord record,
334-
UUID runUuid,
335-
String eventType,
336-
Instant now,
337-
ModelDaos daos) {
324+
Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) {
338325
// Facets ...
339326
Optional.ofNullable(dataset.getFacets())
340327
.ifPresent(
@@ -363,12 +350,7 @@ private void insertInputFacets(
363350
}
364351

365352
private void insertOutputFacets(
366-
Dataset dataset,
367-
DatasetRecord record,
368-
UUID runUuid,
369-
String eventType,
370-
Instant now,
371-
ModelDaos daos) {
353+
Dataset dataset, DatasetRecord record, UUID runUuid, String eventType, Instant now) {
372354
// Facets ...
373355
Optional.ofNullable(dataset.getFacets())
374356
.ifPresent(
@@ -665,7 +647,7 @@ default JobType getJobType(Job job) {
665647
}
666648

667649
default DatasetRecord upsertLineageDataset(
668-
Dataset ds, Instant now, UUID runUuid, boolean isInput, ModelDaos daos) {
650+
Dataset ds, Instant now, UUID runUuid, boolean isInput) {
669651
NamespaceRow dsNamespace =
670652
daos.getNamespaceDao()
671653
.upsertNamespaceRow(UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER);

api/src/main/java/marquez/db/RunDao.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.Set;
1515
import java.util.UUID;
1616
import java.util.stream.Collectors;
17+
import lombok.Builder;
1718
import lombok.NonNull;
1819
import marquez.common.Utils;
1920
import marquez.common.models.DatasetId;
@@ -301,6 +302,40 @@ RunRow upsert(
301302
String jobName,
302303
String location);
303304

305+
default RunRow upsert(RunUpsert runUpsert) {
306+
if (runUpsert.runStateType == null) {
307+
return upsert(
308+
runUpsert.runUuid(),
309+
runUpsert.parentRunUuid(),
310+
runUpsert.externalId(),
311+
runUpsert.now(),
312+
runUpsert.jobUuid(),
313+
runUpsert.jobVersionUuid(),
314+
runUpsert.runArgsUuid(),
315+
runUpsert.nominalStartTime(),
316+
runUpsert.nominalEndTime(),
317+
runUpsert.namespaceName(),
318+
runUpsert.jobName(),
319+
runUpsert.location());
320+
} else {
321+
return upsert(
322+
runUpsert.runUuid(),
323+
runUpsert.parentRunUuid(),
324+
runUpsert.externalId(),
325+
runUpsert.now(),
326+
runUpsert.jobUuid(),
327+
runUpsert.jobVersionUuid(),
328+
runUpsert.runArgsUuid(),
329+
runUpsert.nominalStartTime(),
330+
runUpsert.nominalEndTime(),
331+
runUpsert.runStateType(),
332+
runUpsert.runStateTime(),
333+
runUpsert.namespaceName(),
334+
runUpsert.jobName(),
335+
runUpsert.location());
336+
}
337+
}
338+
304339
@SqlUpdate(
305340
"INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) "
306341
+ "VALUES (:runUuid, :datasetVersionUuid) ON CONFLICT DO NOTHING")
@@ -452,4 +487,21 @@ default RunRow upsertRunMeta(
452487
)
453488
""")
454489
Optional<Run> findByLatestJob(String namespace, String jobName);
490+
491+
@Builder
492+
record RunUpsert(
493+
UUID runUuid,
494+
UUID parentRunUuid,
495+
String externalId,
496+
Instant now,
497+
UUID jobUuid,
498+
UUID jobVersionUuid,
499+
UUID runArgsUuid,
500+
Instant nominalStartTime,
501+
Instant nominalEndTime,
502+
RunState runStateType,
503+
Instant runStateTime,
504+
String namespaceName,
505+
String jobName,
506+
String location) {}
455507
}

api/src/main/java/marquez/db/models/ModelDaos.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
16
package marquez.db.models;
27

38
import marquez.db.BaseDao;

build.gradle

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ buildscript {
2222
classpath 'com.adarshr:gradle-test-logger-plugin:3.2.0'
2323
classpath 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2'
2424
classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.20.0'
25+
classpath "io.freefair.gradle:lombok-plugin:8.4"
2526
}
2627
}
2728

@@ -39,6 +40,7 @@ subprojects {
3940
apply plugin: 'com.github.johnrengelman.shadow'
4041
apply plugin: "com.diffplug.spotless"
4142
apply plugin: "pmd"
43+
apply plugin: "io.freefair.lombok"
4244

4345
project(':api') {
4446
apply plugin: 'application'
@@ -95,7 +97,11 @@ subprojects {
9597
archiveClassifier.set("sources")
9698
}
9799

98-
task javadocJar(type: Jar, dependsOn: javadoc) {
100+
task delombokJavadocs(type: Javadoc) {
101+
source = delombok
102+
}
103+
104+
task javadocJar(type: Jar, dependsOn: delombokJavadocs) {
99105
from javadoc.destinationDir
100106
archiveClassifier.set("javadoc")
101107
}

0 commit comments

Comments
 (0)