Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private Columns() {}
public static final String PARENT_JOB_UUID = "parent_job_uuid";
public static final String SIMPLE_NAME = "simple_name";
public static final String SYMLINK_TARGET_UUID = "symlink_target_uuid";
public static final String CURRENT_RUN_UUID = "current_run_uuid";

/* JOB VERSION I/O ROW COLUMNS */
public static final String INPUT_UUIDS = "input_uuids";
Expand Down
49 changes: 41 additions & 8 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import marquez.common.models.JobName;
import marquez.common.models.JobType;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.RunState;
import marquez.db.JobVersionDao.IoType;
import marquez.db.JobVersionDao.JobDataset;
Expand Down Expand Up @@ -239,7 +240,7 @@ job_tags as (
LEFT OUTER JOIN job_tags jt
ON j.uuid = jt.uuid
LEFT JOIN runs r
ON r.uuid = jv.latest_run_uuid
ON r.uuid = j.current_run_uuid
WHERE
(r.current_run_state IN (<lastRunStates>) OR r.uuid IS NULL)
ORDER BY
Expand Down Expand Up @@ -361,7 +362,8 @@ default JobRow upsertJobMeta(
jobMeta.getDescription().orElse(null),
toUrlString(jobMeta.getLocation().orElse(null)),
symlinkTargetUuid,
toJson(jobMeta.getInputs(), mapper));
toJson(jobMeta.getInputs(), mapper),
jobMeta.getRunId().map(RunId::getValue).orElse(null));
}

default String toUrlString(URL url) {
Expand All @@ -382,6 +384,31 @@ default PGobject toJson(Set<DatasetId> dataset, ObjectMapper mapper) {
}
}

default JobRow upsertJob(
UUID uuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
String location,
UUID symlinkTargetId,
PGobject inputs) {
return upsertJob(
uuid,
type,
now,
namespaceUuid,
namespaceName,
name,
description,
location,
symlinkTargetId,
inputs,
null);
}

/*
* Note: following SQL never executes. There is database trigger on `jobs_view`
* that replaces following SQL
Expand All @@ -402,7 +429,8 @@ INSERT INTO jobs_view AS j (
current_location,
current_inputs,
symlink_target_uuid,
parent_job_uuid_string
parent_job_uuid_string,
current_run_uuid
) VALUES (
:uuid,
:type,
Expand All @@ -415,7 +443,8 @@ INSERT INTO jobs_view AS j (
:location,
:inputs,
:symlinkTargetId,
''
'',
:currentRunUuid
) RETURNING *
""")
JobRow upsertJob(
Expand All @@ -428,7 +457,8 @@ JobRow upsertJob(
String description,
String location,
UUID symlinkTargetId,
PGobject inputs);
PGobject inputs,
UUID currentRunUuid);

/*
* Note: following SQL never executes. There is database trigger on `jobs_view`
Expand All @@ -450,7 +480,8 @@ INSERT INTO jobs_view AS j (
description,
current_location,
current_inputs,
symlink_target_uuid
symlink_target_uuid,
current_run_uuid
) VALUES (
:uuid,
:parentJobUuid,
Expand All @@ -463,7 +494,8 @@ INSERT INTO jobs_view AS j (
:description,
:location,
:inputs,
:symlinkTargetId
:symlinkTargetId,
:currentRunUuid
)
RETURNING *
""")
Expand All @@ -478,7 +510,8 @@ JobRow upsertJob(
String description,
String location,
UUID symlinkTargetId,
PGobject inputs);
PGobject inputs,
UUID currentRunUuid);

@SqlUpdate(
"""
Expand Down
58 changes: 34 additions & 24 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import marquez.common.Utils;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
Expand Down Expand Up @@ -235,7 +236,8 @@ default UpdateLineageRow updateMarquezModel(JobEvent event, ObjectMapper mapper)
namespace,
null,
null,
Optional.empty());
Optional.empty(),
null);

bag.setJob(job);

Expand Down Expand Up @@ -304,6 +306,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent);
Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);

final UUID runUuid = runToUuid(event.getRun().getRunId());

JobRow job =
buildJobFromEvent(
event.getJob(),
Expand All @@ -316,7 +320,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
namespace,
nominalStartTime,
nominalEndTime,
parentRun);
parentRun,
runUuid);

bag.setJob(job);

Expand All @@ -327,7 +332,6 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
UUID.randomUUID(), now, Utils.toJson(runArgsMap), Utils.checksumFor(runArgsMap));
bag.setRunArgs(runArgs);

final UUID runUuid = runToUuid(event.getRun().getRunId());
RunRow run;
RunUpsert.RunUpsertBuilder runUpsertBuilder =
RunUpsert.builder()
Expand Down Expand Up @@ -509,7 +513,8 @@ private JobRow buildJobFromEvent(
NamespaceRow namespace,
Instant nominalStartTime,
Instant nominalEndTime,
Optional<ParentRunFacet> parentRun) {
Optional<ParentRunFacet> parentRun,
@Nullable UUID runUuid) {
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
String description =
Optional.ofNullable(job.getFacets())
Expand All @@ -523,10 +528,10 @@ private JobRow buildJobFromEvent(
.flatMap(s -> Optional.ofNullable(s.getUrl()))
.orElse(null);

Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);
Optional<UUID> parentRunUuid = parentRun.map(Utils::findParentRunUuid);
Optional<JobRow> parentJob =
parentUuid.map(
uuid ->
parentRunUuid.map(
parentRunUuidFound ->
findParentJobRow(
job,
eventTime,
Expand All @@ -537,7 +542,7 @@ private JobRow buildJobFromEvent(
nominalEndTime,
log,
parentRun.get(),
uuid));
parentRunUuidFound));

// construct the simple name of the job by removing the parent prefix plus the dot '.' separator
String jobName =
Expand Down Expand Up @@ -570,7 +575,8 @@ private JobRow buildJobFromEvent(
description,
location,
null,
jobDao.toJson(toDatasetId(inputs), mapper)))
jobDao.toJson(toDatasetId(inputs), mapper),
parent.getCurrentRunUuid().orElse(null)))
.orElseGet(
() ->
jobDao.upsertJob(
Expand All @@ -583,7 +589,8 @@ private JobRow buildJobFromEvent(
description,
location,
null,
jobDao.toJson(toDatasetId(inputs), mapper)));
jobDao.toJson(toDatasetId(inputs), mapper),
runUuid));
}

private JobRow findParentJobRow(
Expand All @@ -596,15 +603,15 @@ private JobRow findParentJobRow(
Instant nominalEndTime,
Logger log,
ParentRunFacet facet,
UUID uuid) {
UUID parentRunUuid) {
try {
log.debug("Found parent run event {}", facet);
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
JobRow parentJobRow =
createRunDao()
.findJobRowByRunUuid(uuid)
.findJobRowByRunUuid(parentRunUuid)
.map(
j -> {
String parentJobName =
Expand All @@ -617,18 +624,20 @@ private JobRow findParentJobRow(
} else {
// Addresses an Airflow integration bug that generated conflicting run UUIDs
// for DAGs that had the same name, but ran in different namespaces.
UUID parentRunUuid =
UUID parentRunUuidNoConflict =
Utils.toNameBasedUuid(
facet.getJob().getNamespace(), parentJobName, uuid.toString());
facet.getJob().getNamespace(),
parentJobName,
parentRunUuid.toString());
log.warn(
"Parent Run id {} has a different job name '{}.{}' from facet '{}.{}'. "
+ "Assuming Run UUID conflict and generating a new UUID {}",
uuid,
parentRunUuid,
j.getNamespaceName(),
j.getName(),
facet.getJob().getNamespace(),
facet.getJob().getName(),
parentRunUuid);
parentRunUuidNoConflict);
return createParentJobRunRecord(
job,
eventTime,
Expand All @@ -637,7 +646,7 @@ private JobRow findParentJobRow(
location,
nominalStartTime,
nominalEndTime,
parentRunUuid,
parentRunUuidNoConflict,
facet,
inputs);
}
Expand All @@ -652,7 +661,7 @@ private JobRow findParentJobRow(
location,
nominalStartTime,
nominalEndTime,
uuid,
parentRunUuid,
facet,
inputs));
log.debug("Found parent job record {}", parentJobRow);
Expand All @@ -670,7 +679,7 @@ private JobRow createParentJobRunRecord(
String location,
Instant nominalStartTime,
Instant nominalEndTime,
UUID uuid,
UUID parentRunUuid,
ParentRunFacet facet,
PGobject inputs) {
Instant now = eventTime.withZoneSameInstant(ZoneId.of("UTC")).toInstant();
Expand All @@ -691,7 +700,8 @@ private JobRow createParentJobRunRecord(
null,
location,
null,
inputs);
inputs,
parentRunUuid);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
Expand All @@ -702,7 +712,7 @@ private JobRow createParentJobRunRecord(
RunDao runDao = createRunDao();
RunRow newRow =
runDao.upsert(
uuid,
parentRunUuid,
null,
facet.getRun().getRunId(),
now,
Expand All @@ -719,14 +729,14 @@ private JobRow createParentJobRunRecord(
log.info("Created new parent run record {}", newRow);

runState
.map(rs -> createRunStateDao().upsert(UUID.randomUUID(), now, uuid, rs))
.map(rs -> createRunStateDao().upsert(UUID.randomUUID(), now, parentRunUuid, rs))
.ifPresent(
runStateRow -> {
UUID runStateUuid = runStateRow.getUuid();
if (RunState.valueOf(runStateRow.getState()).isDone()) {
runDao.updateEndState(uuid, now, runStateUuid);
runDao.updateEndState(parentRunUuid, now, runStateUuid);
} else {
runDao.updateStartState(uuid, now, runStateUuid);
runDao.updateStartState(parentRunUuid, now, runStateUuid);
}
});

Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/db/mappers/JobRowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public JobRow map(@NonNull ResultSet results, @NonNull StatementContext context)
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
stringOrNull(results, "current_location"),
getDatasetFromJsonOrNull(results, "current_inputs"),
uuidOrNull(results, Columns.SYMLINK_TARGET_UUID));
uuidOrNull(results, Columns.SYMLINK_TARGET_UUID),
uuidOrNull(results, Columns.CURRENT_RUN_UUID));
}

Set<DatasetId> getDatasetFromJsonOrNull(@NonNull ResultSet results, String column)
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/db/models/JobRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class JobRow {
@Nullable String location;
@Nullable Set<DatasetId> inputs;
@Nullable UUID symlinkTargetId;
@Nullable UUID currentRunUuid;

public Optional<String> getDescription() {
return Optional.ofNullable(description);
Expand All @@ -39,4 +40,8 @@ public Optional<String> getDescription() {
public Optional<UUID> getCurrentVersionUuid() {
return Optional.ofNullable(currentVersionUuid);
}

public Optional<UUID> getCurrentRunUuid() {
return Optional.ofNullable(currentRunUuid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ SELECT j.uuid,
j.current_inputs,
j.symlink_target_uuid,
j.parent_job_uuid::char(36) AS parent_job_uuid_string,
j.aliases
j.aliases,
j.current_run_uuid
FROM jobs j
LEFT JOIN jobs p ON j.parent_job_uuid=p.uuid
WHERE j.is_hidden IS FALSE AND j.symlink_target_uuid IS NULL;
Expand All @@ -40,7 +41,7 @@ BEGIN
END IF;
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, simple_name, description,
current_version_uuid, namespace_name, current_job_context_uuid,
current_location, current_inputs, symlink_target_uuid, parent_job_uuid,
current_location, current_inputs, symlink_target_uuid, parent_job_uuid, current_run_uuid,
is_hidden)
SELECT NEW.uuid,
NEW.type,
Expand All @@ -57,6 +58,7 @@ BEGIN
NEW.current_inputs,
NEW.symlink_target_uuid,
NEW.parent_job_uuid,
NEW.current_run_uuid,
false
ON CONFLICT (namespace_uuid, name)
DO UPDATE SET updated_at = now(),
Expand All @@ -72,6 +74,7 @@ BEGIN
-- update the symlink target if null. otherwise, keep the old value
symlink_target_uuid = COALESCE(jobs.symlink_target_uuid,
EXCLUDED.symlink_target_uuid),
current_run_uuid = EXCLUDED.current_run_uuid,
is_hidden = false
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
-- version in case of insert
Expand Down
Loading