Skip to content
41 changes: 30 additions & 11 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
Expand Down Expand Up @@ -49,14 +50,27 @@ public LineageService(LineageDao delegate, JobDao jobDao) {

// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
Optional<UUID> optionalUUID = getJobUuid(nodeId);
if (optionalUUID.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
throw new NodeIdNotFoundException(String.format("Node '%s' not found!", nodeId.getValue()));
}
Comment thread
wslulciuc marked this conversation as resolved.
UUID job = optionalUUID.get();

log.debug("Attempting to get lineage for job '{}'", job);
Set<JobData> jobData = getLineage(Collections.singleton(job), depth);

// Ensure job data is not empty, an empty set cannot be passed to LineageDao.getCurrentRuns() or
// LineageDao.getCurrentRunsWithFacets().
if (jobData.isEmpty()) {
// Log warning, then return an orphan lineage graph; a graph should contain at most one
// job->dataset relationship.
log.warn(
"Failed to get lineage for job '{}' associated with node '{}', returning orphan graph...",
job,
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

List<Run> runs =
withRunFacets
? getCurrentRunsWithFacets(
Expand Down Expand Up @@ -85,19 +99,23 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
if (nodeId.isDatasetType()
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
log.warn(
"Found jobs {} which no longer share lineage with dataset {} - discarding",
jobData.stream().map(JobData::getId).toList());
DatasetId datasetId = nodeId.asDatasetId();
DatasetData datasetData =
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
return new Lineage(
ImmutableSortedSet.of(
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
"Found jobs '{}' which no longer share lineage with dataset '{}' - discarding",
Comment thread
wslulciuc marked this conversation as resolved.
Outdated
jobData.stream().map(JobData::getId).toList(),
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

return toLineage(jobData, datasets);
}

private Lineage toLineageWithOrphanDataset(@NonNull DatasetId datasetId) {
final DatasetData datasetData =
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
return new Lineage(
ImmutableSortedSet.of(
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
}

private Lineage toLineage(Set<JobData> jobData, Set<DatasetData> datasets) {
Set<Node> nodes = new LinkedHashSet<>();
// build mapping for later
Expand Down Expand Up @@ -227,7 +245,8 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
return getJobFromInputOrOutput(
datasetId.getName().getValue(), datasetId.getNamespace().getValue());
} else {
throw new NodeIdNotFoundException("Node must be a dataset node or job node");
throw new NodeIdNotFoundException(
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
}
}
}