Skip to content

Commit 39ddd65

Browse files
authored
Fixing latestRun on /lineage endpoint (#2933)
* Fixing lineage latest run. Signed-off-by: phixMe <peter.hicks@pdtechsolutions.com> * Test fixes. Signed-off-by: phixMe <peter.hicks@pdtechsolutions.com> * Fixing minor sorting error. Signed-off-by: phixMe <peter.hicks@pdtechsolutions.com> * Removing assertions on facets that are no longer included. Signed-off-by: phixMe <peter.hicks@pdtechsolutions.com> --------- Signed-off-by: phixMe <peter.hicks@pdtechsolutions.com>
1 parent da39558 commit 39ddd65

6 files changed

Lines changed: 64 additions & 87 deletions

File tree

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public Response getLineage(
119119
@QueryParam("nodeId") @NotNull NodeId nodeId,
120120
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
121121
throwIfNotExists(nodeId);
122-
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
122+
return Response.ok(lineageService.lineage(nodeId, depth)).build();
123123
}
124124

125125
@Timed

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -138,52 +138,56 @@ AND CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlink
138138
Optional<UUID> getJobFromInputOrOutput(String datasetName, String namespaceName);
139139

140140
@SqlQuery(
141-
"WITH latest_runs AS (\n"
142-
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
143-
+ " FROM runs_view r\n"
144-
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
145-
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
146-
+ " WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
147-
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
148-
+ ")\n"
149-
+ "SELECT r.*, ra.args, f.facets,\n"
150-
+ " r.version AS job_version, ri.input_versions, ro.output_versions\n"
151-
+ " from latest_runs AS r\n"
152-
+ "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
153-
+ "LEFT JOIN LATERAL (\n"
154-
+ " SELECT im.run_uuid,\n"
155-
+ " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
156-
+ " 'name', dv.dataset_name,\n"
157-
+ " 'version', dv.version)) AS input_versions\n"
158-
+ " FROM runs_input_mapping im\n"
159-
+ " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n"
160-
+ " WHERE im.run_uuid=r.uuid\n"
161-
+ " GROUP BY im.run_uuid\n"
162-
+ ") ri ON ri.run_uuid=r.uuid\n"
163-
+ "LEFT JOIN LATERAL (\n"
164-
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
165-
+ " FROM run_facets_view AS rf\n"
166-
+ " WHERE rf.run_uuid=r.uuid\n"
167-
+ " GROUP BY rf.run_uuid\n"
168-
+ ") AS f ON r.uuid=f.run_uuid\n"
169-
+ "LEFT JOIN LATERAL (\n"
170-
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
171-
+ " 'name', dataset_name,\n"
172-
+ " 'version', version)) AS output_versions\n"
173-
+ " FROM dataset_versions\n"
174-
+ " WHERE run_uuid=r.uuid\n"
175-
+ " GROUP BY run_uuid\n"
176-
+ ") ro ON ro.run_uuid=r.uuid")
141+
"""
142+
WITH latest_runs AS (
143+
SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version
144+
FROM runs_view r
145+
INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
146+
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
147+
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
148+
ORDER BY r.job_name, r.namespace_name, created_at DESC
149+
)
150+
SELECT r.*, ra.args, f.facets,
151+
r.version AS job_version, ri.input_versions, ro.output_versions
152+
from latest_runs AS r
153+
LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
154+
LEFT JOIN LATERAL (
155+
SELECT im.run_uuid,
156+
JSON_AGG(json_build_object('namespace', dv.namespace_name,
157+
'name', dv.dataset_name,
158+
'version', dv.version)) AS input_versions
159+
FROM runs_input_mapping im
160+
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
161+
WHERE im.run_uuid=r.uuid
162+
GROUP BY im.run_uuid
163+
) ri ON ri.run_uuid=r.uuid
164+
LEFT JOIN LATERAL (
165+
SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets
166+
FROM run_facets_view AS rf
167+
WHERE rf.run_uuid=r.uuid
168+
GROUP BY rf.run_uuid
169+
) AS f ON r.uuid=f.run_uuid
170+
LEFT JOIN LATERAL (
171+
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
172+
'name', dataset_name,
173+
'version', version)) AS output_versions
174+
FROM dataset_versions
175+
WHERE run_uuid=r.uuid
176+
GROUP BY run_uuid
177+
) ro ON ro.run_uuid=r.uuid
178+
""")
177179
List<Run> getCurrentRunsWithFacets(@BindList Collection<UUID> jobUuid);
178180

179181
@SqlQuery(
180182
"""
181-
SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version
182-
FROM runs_view r
183-
INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
184-
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
185-
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
186-
ORDER BY r.job_name, r.namespace_name, created_at DESC""")
183+
WITH latest_runs AS (SELECT current_run_uuid, current_version_uuid AS job_version
184+
FROM jobs j
185+
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>))
186+
SELECT *
187+
FROM runs
188+
inner join latest_runs ON runs.uuid = latest_runs.current_run_uuid
189+
ORDER BY runs.created_at desc;
190+
""")
187191
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);
188192

189193
@SqlQuery(

api/src/main/java/marquez/db/RunDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ WHERE r.uuid IN (
515515
INNER JOIN jobs_view j ON j.namespace_name=r.namespace_name AND j.name=r.job_name
516516
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR j.name=ANY(j.aliases))
517517
)
518-
ORDER BY transitioned_at DESC
518+
ORDER BY transitioned_at DESC, started_at DESC
519519
LIMIT :limit OFFSET :offset
520520
""")
521521
List<Run> findByLatestJob(String namespace, String jobName, int limit, int offset);

api/src/main/java/marquez/service/LineageService.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public LineageService(LineageDao delegate, JobDao jobDao) {
6464
}
6565

6666
// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
67-
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
67+
public Lineage lineage(NodeId nodeId, int depth) {
6868
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
6969
Optional<UUID> optionalUUID = getJobUuid(nodeId);
7070
if (optionalUUID.isEmpty()) {
@@ -90,10 +90,7 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
9090
}
9191

9292
List<Run> runs =
93-
withRunFacets
94-
? getCurrentRunsWithFacets(
95-
jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()))
96-
: getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));
93+
getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));
9794

9895
for (JobData j : jobData) {
9996
if (j.getLatestRun().isEmpty()) {

api/src/test/java/marquez/api/OpenLineageResourceTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import static org.junit.jupiter.api.Assertions.assertEquals;
99
import static org.mockito.ArgumentMatchers.any;
10-
import static org.mockito.ArgumentMatchers.anyBoolean;
1110
import static org.mockito.ArgumentMatchers.anyInt;
1211
import static org.mockito.ArgumentMatchers.anyString;
1312
import static org.mockito.Mockito.mock;
@@ -46,7 +45,7 @@ class OpenLineageResourceTest {
4645
OpenLineageResourceTest.class.getResourceAsStream("/lineage/node.json"),
4746
new TypeReference<>() {});
4847
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
49-
when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE);
48+
when(lineageService.lineage(any(NodeId.class), anyInt())).thenReturn(LINEAGE);
5049

5150
ServiceFactory serviceFactory =
5251
ApiTestUtils.mockServiceFactory(

api/src/test/java/marquez/service/LineageServiceTest.java

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import marquez.common.models.JobId;
2929
import marquez.common.models.JobName;
3030
import marquez.common.models.NamespaceName;
31-
import marquez.common.models.OutputDatasetVersion;
3231
import marquez.common.models.RunState;
3332
import marquez.db.DatasetDao;
3433
import marquez.db.JobDao;
@@ -137,8 +136,7 @@ public void testLineage() {
137136
dataset);
138137
String jobName = writeJob.getJob().getName();
139138
Lineage lineage =
140-
lineageService.lineage(
141-
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);
139+
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
142140

143141
// 1 writeJob + 1 commonDataset
144142
// 20 readJob + 20 outputData
@@ -173,11 +171,6 @@ public void testLineage() {
173171
.extracting(
174172
Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class))
175173
.hasSize(0);
176-
runAssert
177-
.extracting(
178-
Run::getOutputDatasetVersions,
179-
InstanceOfAssertFactories.list(OutputDatasetVersion.class))
180-
.hasSize(1);
181174

182175
// check the output edges for the commonDataset node
183176
assertThat(lineage.getGraph())
@@ -273,8 +266,7 @@ public void testLineageWithDeletedDataset() {
273266

274267
String jobName = writeJob.getJob().getName();
275268
Lineage lineage =
276-
lineageService.lineage(
277-
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);
269+
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
278270

279271
// 1 writeJob + 0 commonDataset is hidden
280272
// 20 readJob + 20 outputData
@@ -309,11 +301,6 @@ public void testLineageWithDeletedDataset() {
309301
.extracting(
310302
Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class))
311303
.hasSize(0);
312-
runAssert
313-
.extracting(
314-
Run::getOutputDatasetVersions,
315-
InstanceOfAssertFactories.list(InputDatasetVersion.class))
316-
.hasSize(1);
317304

318305
// check the output edges for the commonDataset node
319306
assertThat(lineage.getGraph())
@@ -326,8 +313,7 @@ public void testLineageWithDeletedDataset() {
326313
jobDao.delete(NAMESPACE, "downstreamJob0<-outputData<-readJob0<-commonDataset");
327314

328315
lineage =
329-
lineageService.lineage(
330-
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);
316+
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
331317

332318
// 1 writeJob + 0 commonDataset is hidden
333319
// 20 readJob + 20 outputData
@@ -357,9 +343,7 @@ public void testLineageWithNoDatasets() {
357343
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());
358344
Lineage lineage =
359345
lineageService.lineage(
360-
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())),
361-
5,
362-
true);
346+
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())), 5);
363347
assertThat(lineage.getGraph())
364348
.hasSize(1)
365349
.first()
@@ -410,8 +394,7 @@ public void testLineageWithWithCycle() {
410394
lineageService.lineage(
411395
NodeId.of(
412396
new NamespaceName(NAMESPACE), new JobName(intermediateJob.getJob().getName())),
413-
5,
414-
true);
397+
5);
415398
assertThat(lineage.getGraph()).extracting(Node::getId).hasSize(6);
416399
ObjectAssert<Node> datasetNode =
417400
assertThat(lineage.getGraph())
@@ -493,15 +476,13 @@ public void testGetLineageJobRunTwice() {
493476
lineageService.lineage(
494477
NodeId.of(
495478
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))),
496-
5,
497-
true);
479+
5);
498480

499481
Lineage lineageFromOutput =
500482
lineageService.lineage(
501483
NodeId.of(
502484
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
503-
5,
504-
true);
485+
5);
505486

506487
assertThat(lineageFromInput.getGraph()).hasSize(3); // 2 datasets + 1 job
507488
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
@@ -546,15 +527,13 @@ public void testGetLineageForRunningStreamingJob() {
546527
lineageService.lineage(
547528
NodeId.of(
548529
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))),
549-
5,
550-
true);
530+
5);
551531

552532
Lineage lineageFromOutput =
553533
lineageService.lineage(
554534
NodeId.of(
555535
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
556-
5,
557-
true);
536+
5);
558537

559538
assertThat(lineageFromInput.getGraph()).hasSize(5); // 2 datasets + 3 jobs
560539
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
@@ -589,8 +568,7 @@ public void testGetLineageForCompleteStreamingJob() {
589568
lineageService.lineage(
590569
NodeId.of(
591570
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
592-
5,
593-
true);
571+
5);
594572

595573
assertThat(lineage.getGraph()).hasSize(3); // 1 job + 2 datasets
596574
}
@@ -608,7 +586,7 @@ public void testLineageForOrphanedDataset() {
608586

609587
NodeId datasetNodeId =
610588
NodeId.of(new NamespaceName(dataset.getNamespace()), new DatasetName(dataset.getName()));
611-
Lineage lineage = lineageService.lineage(datasetNodeId, 2, false);
589+
Lineage lineage = lineageService.lineage(datasetNodeId, 2);
612590
assertThat(lineage.getGraph())
613591
.hasSize(2)
614592
.extracting(Node::getId)
@@ -620,7 +598,7 @@ public void testLineageForOrphanedDataset() {
620598
LineageTestUtils.createLineageRow(
621599
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());
622600

623-
lineage = lineageService.lineage(datasetNodeId, 2, false);
601+
lineage = lineageService.lineage(datasetNodeId, 2);
624602
assertThat(lineage.getGraph())
625603
.hasSize(1)
626604
.extracting(Node::getId)
@@ -685,8 +663,7 @@ public void testSymlinkDatasetLineage() {
685663
lineageService.lineage(
686664
NodeId.of(
687665
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("symlinkDataset"))),
688-
5,
689-
true);
666+
5);
690667

691668
assertThat(lineage.getGraph()).hasSize(2);
692669
}

0 commit comments

Comments
 (0)