Skip to content

Commit 90a2f65

Browse files
authored
Add col current_run_uuid to jobs (#2929)
* Add col current_run_uuid to jobs Signed-off-by: Willy Lulciuc <willy.lulciuc@gmail.com> * Apply formatting Signed-off-by: Willy Lulciuc <willy.lulciuc@gmail.com> --------- Signed-off-by: Willy Lulciuc <willy.lulciuc@gmail.com>
1 parent 05d16aa commit 90a2f65

11 files changed

Lines changed: 111 additions & 39 deletions

File tree

api/src/main/java/marquez/db/Columns.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ private Columns() {}
9595
public static final String PARENT_JOB_UUID = "parent_job_uuid";
9696
public static final String SIMPLE_NAME = "simple_name";
9797
public static final String SYMLINK_TARGET_UUID = "symlink_target_uuid";
98+
public static final String CURRENT_RUN_UUID = "current_run_uuid";
9899

99100
/* JOB VERSION I/O ROW COLUMNS */
100101
public static final String INPUT_UUIDS = "input_uuids";

api/src/main/java/marquez/db/JobDao.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import marquez.common.models.JobName;
2121
import marquez.common.models.JobType;
2222
import marquez.common.models.NamespaceName;
23+
import marquez.common.models.RunId;
2324
import marquez.common.models.RunState;
2425
import marquez.db.JobVersionDao.IoType;
2526
import marquez.db.JobVersionDao.JobDataset;
@@ -239,7 +240,7 @@ job_tags as (
239240
LEFT OUTER JOIN job_tags jt
240241
ON j.uuid = jt.uuid
241242
LEFT JOIN runs r
242-
ON r.uuid = jv.latest_run_uuid
243+
ON r.uuid = j.current_run_uuid
243244
WHERE
244245
(r.current_run_state IN (<lastRunStates>) OR r.uuid IS NULL)
245246
ORDER BY
@@ -361,7 +362,8 @@ default JobRow upsertJobMeta(
361362
jobMeta.getDescription().orElse(null),
362363
toUrlString(jobMeta.getLocation().orElse(null)),
363364
symlinkTargetUuid,
364-
toJson(jobMeta.getInputs(), mapper));
365+
toJson(jobMeta.getInputs(), mapper),
366+
jobMeta.getRunId().map(RunId::getValue).orElse(null));
365367
}
366368

367369
default String toUrlString(URL url) {
@@ -382,6 +384,31 @@ default PGobject toJson(Set<DatasetId> dataset, ObjectMapper mapper) {
382384
}
383385
}
384386

387+
default JobRow upsertJob(
388+
UUID uuid,
389+
JobType type,
390+
Instant now,
391+
UUID namespaceUuid,
392+
String namespaceName,
393+
String name,
394+
String description,
395+
String location,
396+
UUID symlinkTargetId,
397+
PGobject inputs) {
398+
return upsertJob(
399+
uuid,
400+
type,
401+
now,
402+
namespaceUuid,
403+
namespaceName,
404+
name,
405+
description,
406+
location,
407+
symlinkTargetId,
408+
inputs,
409+
null);
410+
}
411+
385412
/*
386413
* Note: following SQL never executes. There is database trigger on `jobs_view`
387414
* that replaces following SQL
@@ -402,7 +429,8 @@ INSERT INTO jobs_view AS j (
402429
current_location,
403430
current_inputs,
404431
symlink_target_uuid,
405-
parent_job_uuid_string
432+
parent_job_uuid_string,
433+
current_run_uuid
406434
) VALUES (
407435
:uuid,
408436
:type,
@@ -415,7 +443,8 @@ INSERT INTO jobs_view AS j (
415443
:location,
416444
:inputs,
417445
:symlinkTargetId,
418-
''
446+
'',
447+
:currentRunUuid
419448
) RETURNING *
420449
""")
421450
JobRow upsertJob(
@@ -428,7 +457,8 @@ JobRow upsertJob(
428457
String description,
429458
String location,
430459
UUID symlinkTargetId,
431-
PGobject inputs);
460+
PGobject inputs,
461+
UUID currentRunUuid);
432462

433463
/*
434464
* Note: following SQL never executes. There is database trigger on `jobs_view`
@@ -450,7 +480,8 @@ INSERT INTO jobs_view AS j (
450480
description,
451481
current_location,
452482
current_inputs,
453-
symlink_target_uuid
483+
symlink_target_uuid,
484+
current_run_uuid
454485
) VALUES (
455486
:uuid,
456487
:parentJobUuid,
@@ -463,7 +494,8 @@ INSERT INTO jobs_view AS j (
463494
:description,
464495
:location,
465496
:inputs,
466-
:symlinkTargetId
497+
:symlinkTargetId,
498+
:currentRunUuid
467499
)
468500
RETURNING *
469501
""")
@@ -478,7 +510,8 @@ JobRow upsertJob(
478510
String description,
479511
String location,
480512
UUID symlinkTargetId,
481-
PGobject inputs);
513+
PGobject inputs,
514+
UUID currentRunUuid);
482515

483516
@SqlUpdate(
484517
"""

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.UUID;
2424
import java.util.stream.Collectors;
2525
import java.util.stream.Stream;
26+
import javax.annotation.Nullable;
2627
import marquez.common.Utils;
2728
import marquez.common.models.DatasetId;
2829
import marquez.common.models.DatasetName;
@@ -235,7 +236,8 @@ default UpdateLineageRow updateMarquezModel(JobEvent event, ObjectMapper mapper)
235236
namespace,
236237
null,
237238
null,
238-
Optional.empty());
239+
Optional.empty(),
240+
null);
239241

240242
bag.setJob(job);
241243

@@ -304,6 +306,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
304306
Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent);
305307
Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);
306308

309+
final UUID runUuid = runToUuid(event.getRun().getRunId());
310+
307311
JobRow job =
308312
buildJobFromEvent(
309313
event.getJob(),
@@ -316,7 +320,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
316320
namespace,
317321
nominalStartTime,
318322
nominalEndTime,
319-
parentRun);
323+
parentRun,
324+
runUuid);
320325

321326
bag.setJob(job);
322327

@@ -327,7 +332,6 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
327332
UUID.randomUUID(), now, Utils.toJson(runArgsMap), Utils.checksumFor(runArgsMap));
328333
bag.setRunArgs(runArgs);
329334

330-
final UUID runUuid = runToUuid(event.getRun().getRunId());
331335
RunRow run;
332336
RunUpsert.RunUpsertBuilder runUpsertBuilder =
333337
RunUpsert.builder()
@@ -509,7 +513,8 @@ private JobRow buildJobFromEvent(
509513
NamespaceRow namespace,
510514
Instant nominalStartTime,
511515
Instant nominalEndTime,
512-
Optional<ParentRunFacet> parentRun) {
516+
Optional<ParentRunFacet> parentRun,
517+
@Nullable UUID runUuid) {
513518
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
514519
String description =
515520
Optional.ofNullable(job.getFacets())
@@ -523,10 +528,10 @@ private JobRow buildJobFromEvent(
523528
.flatMap(s -> Optional.ofNullable(s.getUrl()))
524529
.orElse(null);
525530

526-
Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);
531+
Optional<UUID> parentRunUuid = parentRun.map(Utils::findParentRunUuid);
527532
Optional<JobRow> parentJob =
528-
parentUuid.map(
529-
uuid ->
533+
parentRunUuid.map(
534+
parentRunUuidFound ->
530535
findParentJobRow(
531536
job,
532537
eventTime,
@@ -537,7 +542,7 @@ private JobRow buildJobFromEvent(
537542
nominalEndTime,
538543
log,
539544
parentRun.get(),
540-
uuid));
545+
parentRunUuidFound));
541546

542547
// construct the simple name of the job by removing the parent prefix plus the dot '.' separator
543548
String jobName =
@@ -570,7 +575,8 @@ private JobRow buildJobFromEvent(
570575
description,
571576
location,
572577
null,
573-
jobDao.toJson(toDatasetId(inputs), mapper)))
578+
jobDao.toJson(toDatasetId(inputs), mapper),
579+
parent.getCurrentRunUuid().orElse(null)))
574580
.orElseGet(
575581
() ->
576582
jobDao.upsertJob(
@@ -583,7 +589,8 @@ private JobRow buildJobFromEvent(
583589
description,
584590
location,
585591
null,
586-
jobDao.toJson(toDatasetId(inputs), mapper)));
592+
jobDao.toJson(toDatasetId(inputs), mapper),
593+
runUuid));
587594
}
588595

589596
private JobRow findParentJobRow(
@@ -596,15 +603,15 @@ private JobRow findParentJobRow(
596603
Instant nominalEndTime,
597604
Logger log,
598605
ParentRunFacet facet,
599-
UUID uuid) {
606+
UUID parentRunUuid) {
600607
try {
601608
log.debug("Found parent run event {}", facet);
602609
PGobject inputs = new PGobject();
603610
inputs.setType("json");
604611
inputs.setValue("[]");
605612
JobRow parentJobRow =
606613
createRunDao()
607-
.findJobRowByRunUuid(uuid)
614+
.findJobRowByRunUuid(parentRunUuid)
608615
.map(
609616
j -> {
610617
String parentJobName =
@@ -617,18 +624,20 @@ private JobRow findParentJobRow(
617624
} else {
618625
// Addresses an Airflow integration bug that generated conflicting run UUIDs
619626
// for DAGs that had the same name, but ran in different namespaces.
620-
UUID parentRunUuid =
627+
UUID parentRunUuidNoConflict =
621628
Utils.toNameBasedUuid(
622-
facet.getJob().getNamespace(), parentJobName, uuid.toString());
629+
facet.getJob().getNamespace(),
630+
parentJobName,
631+
parentRunUuid.toString());
623632
log.warn(
624633
"Parent Run id {} has a different job name '{}.{}' from facet '{}.{}'. "
625634
+ "Assuming Run UUID conflict and generating a new UUID {}",
626-
uuid,
635+
parentRunUuid,
627636
j.getNamespaceName(),
628637
j.getName(),
629638
facet.getJob().getNamespace(),
630639
facet.getJob().getName(),
631-
parentRunUuid);
640+
parentRunUuidNoConflict);
632641
return createParentJobRunRecord(
633642
job,
634643
eventTime,
@@ -637,7 +646,7 @@ private JobRow findParentJobRow(
637646
location,
638647
nominalStartTime,
639648
nominalEndTime,
640-
parentRunUuid,
649+
parentRunUuidNoConflict,
641650
facet,
642651
inputs);
643652
}
@@ -652,7 +661,7 @@ private JobRow findParentJobRow(
652661
location,
653662
nominalStartTime,
654663
nominalEndTime,
655-
uuid,
664+
parentRunUuid,
656665
facet,
657666
inputs));
658667
log.debug("Found parent job record {}", parentJobRow);
@@ -670,7 +679,7 @@ private JobRow createParentJobRunRecord(
670679
String location,
671680
Instant nominalStartTime,
672681
Instant nominalEndTime,
673-
UUID uuid,
682+
UUID parentRunUuid,
674683
ParentRunFacet facet,
675684
PGobject inputs) {
676685
Instant now = eventTime.withZoneSameInstant(ZoneId.of("UTC")).toInstant();
@@ -691,7 +700,8 @@ private JobRow createParentJobRunRecord(
691700
null,
692701
location,
693702
null,
694-
inputs);
703+
inputs,
704+
parentRunUuid);
695705
log.info("Created new parent job record {}", newParentJobRow);
696706

697707
RunArgsRow argsRow =
@@ -702,7 +712,7 @@ private JobRow createParentJobRunRecord(
702712
RunDao runDao = createRunDao();
703713
RunRow newRow =
704714
runDao.upsert(
705-
uuid,
715+
parentRunUuid,
706716
null,
707717
facet.getRun().getRunId(),
708718
now,
@@ -719,14 +729,14 @@ private JobRow createParentJobRunRecord(
719729
log.info("Created new parent run record {}", newRow);
720730

721731
runState
722-
.map(rs -> createRunStateDao().upsert(UUID.randomUUID(), now, uuid, rs))
732+
.map(rs -> createRunStateDao().upsert(UUID.randomUUID(), now, parentRunUuid, rs))
723733
.ifPresent(
724734
runStateRow -> {
725735
UUID runStateUuid = runStateRow.getUuid();
726736
if (RunState.valueOf(runStateRow.getState()).isDone()) {
727-
runDao.updateEndState(uuid, now, runStateUuid);
737+
runDao.updateEndState(parentRunUuid, now, runStateUuid);
728738
} else {
729-
runDao.updateStartState(uuid, now, runStateUuid);
739+
runDao.updateStartState(parentRunUuid, now, runStateUuid);
730740
}
731741
});
732742

api/src/main/java/marquez/db/mappers/JobRowMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public JobRow map(@NonNull ResultSet results, @NonNull StatementContext context)
5050
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
5151
stringOrNull(results, "current_location"),
5252
getDatasetFromJsonOrNull(results, "current_inputs"),
53-
uuidOrNull(results, Columns.SYMLINK_TARGET_UUID));
53+
uuidOrNull(results, Columns.SYMLINK_TARGET_UUID),
54+
uuidOrNull(results, Columns.CURRENT_RUN_UUID));
5455
}
5556

5657
Set<DatasetId> getDatasetFromJsonOrNull(@NonNull ResultSet results, String column)

api/src/main/java/marquez/db/models/JobRow.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class JobRow {
3131
@Nullable String location;
3232
@Nullable Set<DatasetId> inputs;
3333
@Nullable UUID symlinkTargetId;
34+
@Nullable UUID currentRunUuid;
3435

3536
public Optional<String> getDescription() {
3637
return Optional.ofNullable(description);
@@ -39,4 +40,8 @@ public Optional<String> getDescription() {
3940
public Optional<UUID> getCurrentVersionUuid() {
4041
return Optional.ofNullable(currentVersionUuid);
4142
}
43+
44+
public Optional<UUID> getCurrentRunUuid() {
45+
return Optional.ofNullable(currentRunUuid);
46+
}
4247
}

api/src/main/resources/marquez/db/migration/R__Jobs_view_and_rewrite_function.sql

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ SELECT j.uuid,
1717
j.current_inputs,
1818
j.symlink_target_uuid,
1919
j.parent_job_uuid::char(36) AS parent_job_uuid_string,
20-
j.aliases
20+
j.aliases,
21+
j.current_run_uuid
2122
FROM jobs j
2223
LEFT JOIN jobs p ON j.parent_job_uuid=p.uuid
2324
WHERE j.is_hidden IS FALSE AND j.symlink_target_uuid IS NULL;
@@ -40,7 +41,7 @@ BEGIN
4041
END IF;
4142
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, simple_name, description,
4243
current_version_uuid, namespace_name, current_job_context_uuid,
43-
current_location, current_inputs, symlink_target_uuid, parent_job_uuid,
44+
current_location, current_inputs, symlink_target_uuid, parent_job_uuid, current_run_uuid,
4445
is_hidden)
4546
SELECT NEW.uuid,
4647
NEW.type,
@@ -57,6 +58,7 @@ BEGIN
5758
NEW.current_inputs,
5859
NEW.symlink_target_uuid,
5960
NEW.parent_job_uuid,
61+
NEW.current_run_uuid,
6062
false
6163
ON CONFLICT (namespace_uuid, name)
6264
DO UPDATE SET updated_at = now(),
@@ -72,6 +74,7 @@ BEGIN
7274
-- update the symlink target if null. otherwise, keep the old value
7375
symlink_target_uuid = COALESCE(jobs.symlink_target_uuid,
7476
EXCLUDED.symlink_target_uuid),
77+
current_run_uuid = EXCLUDED.current_run_uuid,
7578
is_hidden = false
7679
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
7780
-- version in case of insert

0 commit comments

Comments
 (0)