Skip to content

Commit e11a792

Browse files
authored
Ensure job data in lineage query is not null or empty (#2253)
* Ensure job data in lineage query is not null or empty Signed-off-by: wslulciuc <willy@datakin.com> * continued: Ensure job data in lineage query is not null or empty Signed-off-by: wslulciuc <willy@datakin.com> * Add toLineageWithOrphanDataset() to build orphan graph Signed-off-by: wslulciuc <willy@datakin.com> * continued: Add toLineageWithOrphanDataset() to build orphan graph Signed-off-by: wslulciuc <willy@datakin.com> * continued: Add toLineageWithOrphanDataset() to build orphan graph Signed-off-by: wslulciuc <willy@datakin.com> * Return orphan graph on failed lookup for job when dataset nodeID provided Signed-off-by: wslulciuc <willy@datakin.com> Signed-off-by: wslulciuc <willy@datakin.com>
1 parent 3212c8f commit e11a792

4 files changed

Lines changed: 73 additions & 13 deletions

File tree

api/src/main/java/marquez/api/BaseResource.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import marquez.api.exceptions.RunAlreadyExistsException;
1919
import marquez.api.exceptions.RunNotFoundException;
2020
import marquez.api.exceptions.SourceNotFoundException;
21+
import marquez.common.models.DatasetFieldId;
2122
import marquez.common.models.DatasetId;
2223
import marquez.common.models.DatasetName;
2324
import marquez.common.models.FieldName;
25+
import marquez.common.models.JobId;
2426
import marquez.common.models.JobName;
2527
import marquez.common.models.NamespaceName;
2628
import marquez.common.models.RunId;
@@ -37,6 +39,7 @@
3739
import marquez.service.ServiceFactory;
3840
import marquez.service.SourceService;
3941
import marquez.service.TagService;
42+
import marquez.service.models.NodeId;
4043
import marquez.service.models.Run;
4144

4245
public class BaseResource {
@@ -74,6 +77,10 @@ void throwIfNotExists(@NonNull NamespaceName namespaceName) {
7477
}
7578
}
7679

80+
void throwIfNotExists(@NonNull DatasetId datasetId) {
81+
throwIfNotExists(datasetId.getNamespace(), datasetId.getName());
82+
}
83+
7784
void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull DatasetName datasetName) {
7885
if (!datasetService.exists(namespaceName.getValue(), datasetName.getValue())) {
7986
throw new DatasetNotFoundException(datasetName);
@@ -86,6 +93,13 @@ void throwIfSourceNotExists(SourceName sourceName) {
8693
}
8794
}
8895

96+
void throwIfNotExists(@NonNull DatasetFieldId datasetFieldId) {
97+
throwIfNotExists(
98+
datasetFieldId.getDatasetId().getNamespace(),
99+
datasetFieldId.getDatasetId().getName(),
100+
datasetFieldId.getFieldName());
101+
}
102+
89103
void throwIfNotExists(
90104
@NonNull NamespaceName namespaceName,
91105
@NonNull DatasetName datasetName,
@@ -96,6 +110,10 @@ void throwIfNotExists(
96110
}
97111
}
98112

113+
void throwIfNotExists(@NonNull JobId jobId) {
114+
throwIfNotExists(jobId.getNamespace(), jobId.getName());
115+
}
116+
99117
void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull JobName jobName) {
100118
if (!jobService.exists(namespaceName.getValue(), jobName.getValue())) {
101119
throw new JobNotFoundException(jobName);
@@ -137,6 +155,20 @@ void throwIfDatasetsNotExist(ImmutableSet<DatasetId> datasets) {
137155
}
138156
}
139157

158+
void throwIfNotExists(@NonNull NodeId nodeId) {
159+
if (!nodeId.hasVersion()) {
160+
if (nodeId.isDatasetType()) {
161+
throwIfNotExists(nodeId.asDatasetId());
162+
} else if (nodeId.isDatasetFieldType()) {
163+
throwIfNotExists(nodeId.asDatasetFieldId());
164+
} else if (nodeId.isJobType()) {
165+
throwIfNotExists(nodeId.asJobId());
166+
} else if (nodeId.isRunType()) {
167+
throwIfNotExists(nodeId.asRunId());
168+
}
169+
}
170+
}
171+
140172
URI locationFor(@NonNull UriInfo uriInfo, @NonNull Run run) {
141173
return uriInfo
142174
.getBaseUriBuilder()

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ private int determineStatusCode(Throwable e) {
9696
public Response getLineage(
9797
@QueryParam("nodeId") @NotNull NodeId nodeId,
9898
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
99+
throwIfNotExists(nodeId);
99100
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
100101
}
101102

api/src/main/java/marquez/service/LineageService.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.UUID;
2222
import java.util.stream.Collectors;
2323
import java.util.stream.Stream;
24+
import lombok.NonNull;
2425
import lombok.extern.slf4j.Slf4j;
2526
import marquez.common.models.DatasetId;
2627
import marquez.common.models.JobId;
@@ -49,14 +50,30 @@ public LineageService(LineageDao delegate, JobDao jobDao) {
4950

5051
// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
5152
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
53+
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
5254
Optional<UUID> optionalUUID = getJobUuid(nodeId);
5355
if (optionalUUID.isEmpty()) {
54-
throw new NodeIdNotFoundException("Could not find node");
56+
log.warn(
57+
"Failed to get job associated with node '{}', returning orphan graph...",
58+
nodeId.getValue());
59+
return toLineageWithOrphanDataset(nodeId.asDatasetId());
5560
}
5661
UUID job = optionalUUID.get();
57-
62+
log.debug("Attempting to get lineage for job '{}'", job);
5863
Set<JobData> jobData = getLineage(Collections.singleton(job), depth);
5964

65+
// Ensure job data is not empty, an empty set cannot be passed to LineageDao.getCurrentRuns() or
66+
// LineageDao.getCurrentRunsWithFacets().
67+
if (jobData.isEmpty()) {
68+
// Log warning, then return an orphan lineage graph; a graph should contain at most one
69+
// job->dataset relationship.
70+
log.warn(
71+
"Failed to get lineage for job '{}' associated with node '{}', returning orphan graph...",
72+
job,
73+
nodeId.getValue());
74+
return toLineageWithOrphanDataset(nodeId.asDatasetId());
75+
}
76+
6077
List<Run> runs =
6178
withRunFacets
6279
? getCurrentRunsWithFacets(
@@ -85,19 +102,23 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
85102
if (nodeId.isDatasetType()
86103
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
87104
log.warn(
88-
"Found jobs {} which no longer share lineage with dataset {} - discarding",
89-
jobData.stream().map(JobData::getId).toList());
90-
DatasetId datasetId = nodeId.asDatasetId();
91-
DatasetData datasetData =
92-
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
93-
return new Lineage(
94-
ImmutableSortedSet.of(
95-
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
105+
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
106+
jobData.stream().map(JobData::getId).toList(),
107+
nodeId.getValue());
108+
return toLineageWithOrphanDataset(nodeId.asDatasetId());
96109
}
97110

98111
return toLineage(jobData, datasets);
99112
}
100113

114+
private Lineage toLineageWithOrphanDataset(@NonNull DatasetId datasetId) {
115+
final DatasetData datasetData =
116+
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
117+
return new Lineage(
118+
ImmutableSortedSet.of(
119+
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
120+
}
121+
101122
private Lineage toLineage(Set<JobData> jobData, Set<DatasetData> datasets) {
102123
Set<Node> nodes = new LinkedHashSet<>();
103124
// build mapping for later
@@ -227,7 +248,8 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
227248
return getJobFromInputOrOutput(
228249
datasetId.getName().getValue(), datasetId.getNamespace().getValue());
229250
} else {
230-
throw new NodeIdNotFoundException("Node must be a dataset node or job node");
251+
throw new NodeIdNotFoundException(
252+
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
231253
}
232254
}
233255
}

api/src/test/java/marquez/api/OpenLineageResourceTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static org.mockito.ArgumentMatchers.any;
1010
import static org.mockito.ArgumentMatchers.anyBoolean;
1111
import static org.mockito.ArgumentMatchers.anyInt;
12+
import static org.mockito.ArgumentMatchers.anyString;
1213
import static org.mockito.Mockito.mock;
1314
import static org.mockito.Mockito.when;
1415

@@ -20,6 +21,7 @@
2021
import javax.ws.rs.core.Response;
2122
import marquez.common.Utils;
2223
import marquez.db.OpenLineageDao;
24+
import marquez.service.JobService;
2325
import marquez.service.LineageService;
2426
import marquez.service.ServiceFactory;
2527
import marquez.service.models.Lineage;
@@ -36,6 +38,8 @@ class OpenLineageResourceTest {
3638
static {
3739
LineageService lineageService = mock(LineageService.class);
3840
OpenLineageDao openLineageDao = mock(OpenLineageDao.class);
41+
JobService jobService = mock(JobService.class);
42+
when(jobService.exists(anyString(), anyString())).thenReturn(true);
3943

4044
Node testNode =
4145
Utils.fromJson(
@@ -45,7 +49,8 @@ class OpenLineageResourceTest {
4549
when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE);
4650

4751
ServiceFactory serviceFactory =
48-
ApiTestUtils.mockServiceFactory(Map.of(LineageService.class, lineageService));
52+
ApiTestUtils.mockServiceFactory(
53+
Map.of(LineageService.class, lineageService, JobService.class, jobService));
4954

5055
UNDER_TEST =
5156
ResourceExtension.builder()
@@ -58,7 +63,7 @@ public void testGetLineage() {
5863
final Lineage lineage =
5964
UNDER_TEST
6065
.target("/api/v1/lineage")
61-
.queryParam("nodeId", "job:test")
66+
.queryParam("nodeId", "job:test-namespace:test-job")
6267
.request()
6368
.get()
6469
.readEntity(Lineage.class);

0 commit comments

Comments
 (0)