Skip to content

Commit fea1168

Browse files
optimizing current runs query for lieage api
Signed-off-by: Prachi Mishra <prachi.mishra@astronomer.io>
1 parent 03f60cf commit fea1168

3 files changed

Lines changed: 13 additions & 43 deletions

File tree

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -92,42 +92,11 @@ WHERE ds.uuid IN (<dsUuids>)""")
9292
Optional<UUID> getJobFromInputOrOutput(String datasetName, String namespaceName);
9393

9494
@SqlQuery(
95-
"WITH latest_runs AS (\n"
96-
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
95+
"SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version\n"
9796
+ " FROM runs_view r\n"
9897
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
9998
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
10099
+ " WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
101-
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
102-
+ ")\n"
103-
+ "SELECT r.*, ra.args, ctx.context, f.facets,\n"
104-
+ " r.version AS job_version, ri.input_versions, ro.output_versions\n"
105-
+ " from latest_runs AS r\n"
106-
+ "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
107-
+ "LEFT JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
108-
+ "LEFT JOIN LATERAL (\n"
109-
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
110-
+ " FROM lineage_events le\n"
111-
+ " WHERE le.run_uuid=r.uuid\n"
112-
+ " GROUP BY le.run_uuid\n"
113-
+ ") AS f ON r.uuid=f.run_uuid\n"
114-
+ "LEFT JOIN LATERAL (\n"
115-
+ " SELECT im.run_uuid,\n"
116-
+ " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
117-
+ " 'name', dv.dataset_name,\n"
118-
+ " 'version', dv.version)) AS input_versions\n"
119-
+ " FROM runs_input_mapping im\n"
120-
+ " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n"
121-
+ " WHERE im.run_uuid=r.uuid\n"
122-
+ " GROUP BY im.run_uuid\n"
123-
+ ") ri ON ri.run_uuid=r.uuid\n"
124-
+ "LEFT JOIN LATERAL (\n"
125-
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
126-
+ " 'name', dataset_name,\n"
127-
+ " 'version', version)) AS output_versions\n"
128-
+ " FROM dataset_versions\n"
129-
+ " WHERE run_uuid=r.uuid\n"
130-
+ " GROUP BY run_uuid\n"
131-
+ ") ro ON ro.run_uuid=r.uuid")
100+
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC")
132101
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);
133102
}

api/src/main/java/marquez/db/mappers/RunMapper.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.fasterxml.jackson.core.type.TypeReference;
1919
import com.google.common.collect.ImmutableList;
20+
import com.google.common.collect.ImmutableMap;
2021
import java.sql.ResultSet;
2122
import java.sql.SQLException;
2223
import java.time.Instant;
@@ -71,7 +72,7 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context)
7172
? timestampOrNull(results, columnPrefix + Columns.ENDED_AT)
7273
: null,
7374
durationMs.orElse(null),
74-
toArgs(results, columnPrefix + Columns.ARGS),
75+
toArgsOrNull(results, columnPrefix + Columns.ARGS),
7576
stringOrThrow(results, columnPrefix + Columns.NAMESPACE_NAME),
7677
stringOrThrow(results, columnPrefix + Columns.JOB_NAME),
7778
uuidOrNull(results, columnPrefix + Columns.JOB_VERSION),
@@ -82,7 +83,9 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context)
8283
columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS)
8384
? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS)
8485
: ImmutableList.of(),
85-
JobMapper.toContext(results, columnPrefix + Columns.CONTEXT),
86+
columnNames.contains(columnPrefix + Columns.CONTEXT)
87+
? JobMapper.toContext(results, columnPrefix + Columns.CONTEXT)
88+
: null,
8689
toFacetsOrNull(results, columnPrefix + Columns.FACETS));
8790
}
8891

@@ -94,8 +97,12 @@ private List<DatasetVersionId> toDatasetVersion(ResultSet rs, String column) thr
9497
return Utils.fromJson(dsString, new TypeReference<List<DatasetVersionId>>() {});
9598
}
9699

97-
private Map<String, String> toArgs(ResultSet results, String column) throws SQLException {
98-
String args = stringOrNull(results, column);
100+
private Map<String, String> toArgsOrNull(ResultSet results, String argsColumn)
101+
throws SQLException {
102+
if (!Columns.exists(results, argsColumn)) {
103+
return ImmutableMap.of();
104+
}
105+
String args = stringOrNull(results, argsColumn);
99106
if (args == null) {
100107
return null;
101108
}

api/src/test/java/marquez/service/LineageServiceTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@ public void testLineage() {
158158
runAssert
159159
.extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
160160
.hasSize(0);
161-
runAssert
162-
.extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
163-
.hasSize(1);
164161

165162
// check the output edges for the commonDataset node
166163
assertThat(lineage.getGraph())
@@ -266,9 +263,6 @@ public void testLineageWithDeletedDataset() {
266263
runAssert
267264
.extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
268265
.hasSize(0);
269-
runAssert
270-
.extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
271-
.hasSize(1);
272266

273267
// check the output edges for the commonDataset node
274268
assertThat(lineage.getGraph())

0 commit comments

Comments
 (0)