Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
throwIfNotExists(nodeId);
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
return Response.ok(lineageService.lineage(nodeId, depth)).build();
}

@Timed
Expand Down
88 changes: 46 additions & 42 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,52 +138,56 @@ AND CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlink
Optional<UUID> getJobFromInputOrOutput(String datasetName, String namespaceName);

@SqlQuery(
"WITH latest_runs AS (\n"
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
+ " FROM runs_view r\n"
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
+ ")\n"
+ "SELECT r.*, ra.args, f.facets,\n"
+ " r.version AS job_version, ri.input_versions, ro.output_versions\n"
+ " from latest_runs AS r\n"
+ "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT im.run_uuid,\n"
+ " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
+ " 'name', dv.dataset_name,\n"
+ " 'version', dv.version)) AS input_versions\n"
+ " FROM runs_input_mapping im\n"
+ " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n"
+ " WHERE im.run_uuid=r.uuid\n"
+ " GROUP BY im.run_uuid\n"
+ ") ri ON ri.run_uuid=r.uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
+ " FROM run_facets_view AS rf\n"
+ " WHERE rf.run_uuid=r.uuid\n"
+ " GROUP BY rf.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
+ " 'name', dataset_name,\n"
+ " 'version', version)) AS output_versions\n"
+ " FROM dataset_versions\n"
+ " WHERE run_uuid=r.uuid\n"
+ " GROUP BY run_uuid\n"
+ ") ro ON ro.run_uuid=r.uuid")
"""
WITH latest_runs AS (
SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version
FROM runs_view r
INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
ORDER BY r.job_name, r.namespace_name, created_at DESC
)
SELECT r.*, ra.args, f.facets,
r.version AS job_version, ri.input_versions, ro.output_versions
from latest_runs AS r
LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
LEFT JOIN LATERAL (
SELECT im.run_uuid,
JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
WHERE im.run_uuid=r.uuid
GROUP BY im.run_uuid
) ri ON ri.run_uuid=r.uuid
LEFT JOIN LATERAL (
SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets
FROM run_facets_view AS rf
WHERE rf.run_uuid=r.uuid
GROUP BY rf.run_uuid
) AS f ON r.uuid=f.run_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version)) AS output_versions
FROM dataset_versions
WHERE run_uuid=r.uuid
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
""")
List<Run> getCurrentRunsWithFacets(@BindList Collection<UUID> jobUuid);

@SqlQuery(
"""
SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version
FROM runs_view r
INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
ORDER BY r.job_name, r.namespace_name, created_at DESC""")
WITH latest_runs AS (SELECT current_run_uuid, current_version_uuid AS job_version
FROM jobs j
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>))
SELECT *
FROM runs
inner join latest_runs ON runs.uuid = latest_runs.current_run_uuid
ORDER BY runs.created_at desc;
""")
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);

@SqlQuery(
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ WHERE r.uuid IN (
INNER JOIN jobs_view j ON j.namespace_name=r.namespace_name AND j.name=r.job_name
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR j.name=ANY(j.aliases))
)
ORDER BY transitioned_at DESC
ORDER BY transitioned_at DESC, started_at DESC
LIMIT :limit OFFSET :offset
""")
List<Run> findByLatestJob(String namespace, String jobName, int limit, int offset);
Expand Down
7 changes: 2 additions & 5 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public LineageService(LineageDao delegate, JobDao jobDao) {
}

// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
public Lineage lineage(NodeId nodeId, int depth) {
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
Optional<UUID> optionalUUID = getJobUuid(nodeId);
if (optionalUUID.isEmpty()) {
Expand All @@ -90,10 +90,7 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
}

List<Run> runs =
withRunFacets
? getCurrentRunsWithFacets(
jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()))
: getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));
getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));

for (JobData j : jobData) {
if (j.getLatestRun().isEmpty()) {
Expand Down
3 changes: 1 addition & 2 deletions api/src/test/java/marquez/api/OpenLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -46,7 +45,7 @@ class OpenLineageResourceTest {
OpenLineageResourceTest.class.getResourceAsStream("/lineage/node.json"),
new TypeReference<>() {});
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE);
when(lineageService.lineage(any(NodeId.class), anyInt())).thenReturn(LINEAGE);

ServiceFactory serviceFactory =
ApiTestUtils.mockServiceFactory(
Expand Down
49 changes: 13 additions & 36 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import marquez.common.models.JobId;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.OutputDatasetVersion;
import marquez.common.models.RunState;
import marquez.db.DatasetDao;
import marquez.db.JobDao;
Expand Down Expand Up @@ -137,8 +136,7 @@ public void testLineage() {
dataset);
String jobName = writeJob.getJob().getName();
Lineage lineage =
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);

// 1 writeJob + 1 commonDataset
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -173,11 +171,6 @@ public void testLineage() {
.extracting(
Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class))
.hasSize(0);
runAssert
.extracting(
Run::getOutputDatasetVersions,
InstanceOfAssertFactories.list(OutputDatasetVersion.class))
.hasSize(1);

// check the output edges for the commonDataset node
assertThat(lineage.getGraph())
Expand Down Expand Up @@ -273,8 +266,7 @@ public void testLineageWithDeletedDataset() {

String jobName = writeJob.getJob().getName();
Lineage lineage =
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);

// 1 writeJob + 0 commonDataset is hidden
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -309,11 +301,6 @@ public void testLineageWithDeletedDataset() {
.extracting(
Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class))
.hasSize(0);
runAssert
.extracting(
Run::getOutputDatasetVersions,
InstanceOfAssertFactories.list(InputDatasetVersion.class))
.hasSize(1);

// check the output edges for the commonDataset node
assertThat(lineage.getGraph())
Expand All @@ -326,8 +313,7 @@ public void testLineageWithDeletedDataset() {
jobDao.delete(NAMESPACE, "downstreamJob0<-outputData<-readJob0<-commonDataset");

lineage =
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);

// 1 writeJob + 0 commonDataset is hidden
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -357,9 +343,7 @@ public void testLineageWithNoDatasets() {
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());
Lineage lineage =
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())),
5,
true);
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())), 5);
assertThat(lineage.getGraph())
.hasSize(1)
.first()
Expand Down Expand Up @@ -410,8 +394,7 @@ public void testLineageWithWithCycle() {
lineageService.lineage(
NodeId.of(
new NamespaceName(NAMESPACE), new JobName(intermediateJob.getJob().getName())),
5,
true);
5);
assertThat(lineage.getGraph()).extracting(Node::getId).hasSize(6);
ObjectAssert<Node> datasetNode =
assertThat(lineage.getGraph())
Expand Down Expand Up @@ -493,15 +476,13 @@ public void testGetLineageJobRunTwice() {
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))),
5,
true);
5);

Lineage lineageFromOutput =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
5,
true);
5);

assertThat(lineageFromInput.getGraph()).hasSize(3); // 2 datasets + 1 job
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
Expand Down Expand Up @@ -546,15 +527,13 @@ public void testGetLineageForRunningStreamingJob() {
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))),
5,
true);
5);

Lineage lineageFromOutput =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
5,
true);
5);

assertThat(lineageFromInput.getGraph()).hasSize(5); // 2 datasets + 3 jobs
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
Expand Down Expand Up @@ -589,8 +568,7 @@ public void testGetLineageForCompleteStreamingJob() {
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
5,
true);
5);

assertThat(lineage.getGraph()).hasSize(3); // 1 job + 2 datasets
}
Expand All @@ -608,7 +586,7 @@ public void testLineageForOrphanedDataset() {

NodeId datasetNodeId =
NodeId.of(new NamespaceName(dataset.getNamespace()), new DatasetName(dataset.getName()));
Lineage lineage = lineageService.lineage(datasetNodeId, 2, false);
Lineage lineage = lineageService.lineage(datasetNodeId, 2);
assertThat(lineage.getGraph())
.hasSize(2)
.extracting(Node::getId)
Expand All @@ -620,7 +598,7 @@ public void testLineageForOrphanedDataset() {
LineageTestUtils.createLineageRow(
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());

lineage = lineageService.lineage(datasetNodeId, 2, false);
lineage = lineageService.lineage(datasetNodeId, 2);
assertThat(lineage.getGraph())
.hasSize(1)
.extracting(Node::getId)
Expand Down Expand Up @@ -685,8 +663,7 @@ public void testSymlinkDatasetLineage() {
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("symlinkDataset"))),
5,
true);
5);

assertThat(lineage.getGraph()).hasSize(2);
}
Expand Down