Skip to content

Commit 3b21698

Browse files
authored
Merge branch 'main' into helm-chart-fix-for-web
2 parents e3cfbc7 + 79a9c4f commit 3b21698

93 files changed

Lines changed: 976 additions & 616 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGELOG.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
### Added
88

99
* UI: add facet view enhancements [`#2336`](https://github.com/MarquezProject/marquez/pull/2336) [@tito12](https://github.com/tito12)
10-
*Creates a dynamic component with the ability to navigate and search the JSON, expand sections and click on links.*
10+
*Creates a dynamic component offering the ability to navigate and search the JSON, expand sections and click on links.*
1111
* UI: highlight selected path on graph and display status of jobs and datasets based on last 14 runs or latest quality facets [`#2384`](https://github.com/MarquezProject/marquez/pull/2384) [@tito12](https://github.com/tito12)
1212
*Adds highlighting of the visual graph based on upstream and downstream dependencies of selected nodes, makes displayed status reflect last 14 runs the case of jobs and latest quality facets in the case of datasets.*
1313
* UI: enable auto-accessibility feature on graph nodes [`#2388`](https://github.com/MarquezProject/marquez/pull/2400) [@merobi-hub](https://github.com/merobi-hub)
@@ -20,11 +20,14 @@
2020
* API: add missing indices to `column_lineage`, `dataset_facets`, `job_facets` tables [`#2419`](https://github.com/MarquezProject/marquez/pull/2419) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
2121
*Creates missing indices on reference columns in a number of database tables.*
2222
* Spec: make data version and dataset types the same [`#2400`](https://github.com/MarquezProject/marquez/pull/2400) [@phixme](https://github.com/phixMe)
23-
*Makes the `fields` property the same for datasets and dataset versions, allowing type-denerating systems to treat them the same way.*
23+
*Makes the `fields` property the same for datasets and dataset versions, allowing type-generating systems to treat them the same way.*
2424
* UI: show location button only when link to code exists [`#2409`](https://github.com/MarquezProject/marquez/pull/2409) [@tito12](https://github.com/tito12)
2525
*Makes the button visible only if the link is not empty.*
2626

2727

28+
* Improve dataset facets access [`#2407`](https://github.com/MarquezProject/marquez/pull/2407) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
29+
* Improves database query performance for accessing datasets and datasets' versions.*
30+
2831
## [0.30.0](https://github.com/MarquezProject/marquez/compare/0.29.0...0.30.0) - 2023-01-31
2932

3033
### Added

api/src/main/java/marquez/MarquezContext.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@
2828
import marquez.db.DatasetDao;
2929
import marquez.db.DatasetFieldDao;
3030
import marquez.db.DatasetVersionDao;
31-
import marquez.db.JobContextDao;
3231
import marquez.db.JobDao;
32+
import marquez.db.JobFacetsDao;
3333
import marquez.db.JobVersionDao;
3434
import marquez.db.LineageDao;
3535
import marquez.db.NamespaceDao;
3636
import marquez.db.OpenLineageDao;
3737
import marquez.db.RunArgsDao;
3838
import marquez.db.RunDao;
39+
import marquez.db.RunFacetsDao;
3940
import marquez.db.RunStateDao;
4041
import marquez.db.SearchDao;
4142
import marquez.db.SourceDao;
@@ -67,9 +68,10 @@ public final class MarquezContext {
6768
@Getter private final DatasetVersionDao datasetVersionDao;
6869
@Getter private final JobDao jobDao;
6970
@Getter private final JobVersionDao jobVersionDao;
70-
@Getter private final JobContextDao jobContextDao;
71+
@Getter private final JobFacetsDao jobFacetsDao;
7172
@Getter private final RunDao runDao;
7273
@Getter private final RunArgsDao runArgsDao;
74+
@Getter private final RunFacetsDao runFacetsDao;
7375
@Getter private final RunStateDao runStateDao;
7476
@Getter private final TagDao tagDao;
7577
@Getter private final OpenLineageDao openLineageDao;
@@ -116,9 +118,10 @@ private MarquezContext(
116118
this.datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class);
117119
this.jobDao = jdbi.onDemand(JobDao.class);
118120
this.jobVersionDao = jdbi.onDemand(JobVersionDao.class);
119-
this.jobContextDao = jdbi.onDemand(JobContextDao.class);
121+
this.jobFacetsDao = jdbi.onDemand(JobFacetsDao.class);
120122
this.runDao = jdbi.onDemand(RunDao.class);
121123
this.runArgsDao = jdbi.onDemand(RunArgsDao.class);
124+
this.runFacetsDao = jdbi.onDemand(RunFacetsDao.class);
122125
this.runStateDao = jdbi.onDemand(RunStateDao.class);
123126
this.tagDao = jdbi.onDemand(TagDao.class);
124127
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
@@ -158,7 +161,7 @@ private MarquezContext(
158161
this.sourceResource = new SourceResource(serviceFactory);
159162
this.datasetResource = new DatasetResource(serviceFactory);
160163
this.columnLineageResource = new ColumnLineageResource(serviceFactory);
161-
this.jobResource = new JobResource(serviceFactory, jobVersionDao);
164+
this.jobResource = new JobResource(serviceFactory, jobVersionDao, jobFacetsDao, runFacetsDao);
162165
this.tagResource = new TagResource(serviceFactory);
163166
this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao);
164167
this.searchResource = new SearchResource(searchDao);

api/src/main/java/marquez/api/JobResource.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.List;
1616
import javax.validation.Valid;
1717
import javax.validation.constraints.Min;
18+
import javax.validation.constraints.NotNull;
1819
import javax.ws.rs.Consumes;
1920
import javax.ws.rs.DELETE;
2021
import javax.ws.rs.DefaultValue;
@@ -37,11 +38,14 @@
3738
import marquez.api.exceptions.JobVersionNotFoundException;
3839
import marquez.api.models.JobVersion;
3940
import marquez.api.models.ResultsPage;
41+
import marquez.common.models.FacetType;
4042
import marquez.common.models.JobName;
4143
import marquez.common.models.NamespaceName;
4244
import marquez.common.models.RunId;
4345
import marquez.common.models.Version;
46+
import marquez.db.JobFacetsDao;
4447
import marquez.db.JobVersionDao;
48+
import marquez.db.RunFacetsDao;
4549
import marquez.db.models.JobRow;
4650
import marquez.service.ServiceFactory;
4751
import marquez.service.models.Job;
@@ -52,11 +56,18 @@
5256
@Path("/api/v1")
5357
public class JobResource extends BaseResource {
5458
private final JobVersionDao jobVersionDao;
59+
private final JobFacetsDao jobFacetsDao;
60+
private final RunFacetsDao runFacetsDao;
5561

5662
public JobResource(
57-
@NonNull final ServiceFactory serviceFactory, @NonNull final JobVersionDao jobVersionDao) {
63+
@NonNull final ServiceFactory serviceFactory,
64+
@NonNull final JobVersionDao jobVersionDao,
65+
@NonNull JobFacetsDao jobFacetsDao,
66+
@NonNull RunFacetsDao runFacetsDao) {
5867
super(serviceFactory);
5968
this.jobVersionDao = jobVersionDao;
69+
this.jobFacetsDao = jobFacetsDao;
70+
this.runFacetsDao = runFacetsDao;
6071
}
6172

6273
/**
@@ -236,6 +247,33 @@ public RunResource runResourceRoot(@PathParam("id") RunId runId) {
236247
return new RunResource(runId, runService);
237248
}
238249

250+
@Timed
251+
@ResponseMetered
252+
@ExceptionMetered
253+
@GET
254+
@Produces(APPLICATION_JSON)
255+
@Path("/jobs/runs/{id}/facets")
256+
public Response getRunFacets(
257+
@PathParam("id") RunId runId, @QueryParam("type") @NotNull FacetType type) {
258+
throwIfNotExists(runId);
259+
Object facets = null;
260+
switch (type) {
261+
case JOB:
262+
facets = jobFacetsDao.findJobFacetsByRunUuid(runId.getValue());
263+
break;
264+
case RUN:
265+
facets = runFacetsDao.findRunFacetsByRunUuid(runId.getValue());
266+
break;
267+
case DATASET:
268+
// for future case if there's a need to add dataset facets to the endpoint
269+
break;
270+
default:
271+
break;
272+
}
273+
274+
return Response.ok(facets).build();
275+
}
276+
239277
@Value
240278
static class JobVersions {
241279
@NonNull

api/src/main/java/marquez/api/models/JobVersion.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package marquez.api.models;
77

8-
import com.google.common.collect.ImmutableMap;
98
import java.net.URL;
109
import java.time.Instant;
1110
import java.util.List;
@@ -35,7 +34,6 @@ public final class JobVersion {
3534
@Getter private final Version version;
3635
@Getter private final NamespaceName namespace;
3736
@Nullable private final URL location;
38-
@Getter private final ImmutableMap<String, String> context;
3937
@Getter private final List<DatasetId> inputs;
4038
@Getter private final List<DatasetId> outputs;
4139
@Getter @Nullable private final Run latestRun;
@@ -46,7 +44,6 @@ public JobVersion(
4644
@NonNull final Instant createdAt,
4745
@NonNull final Version version,
4846
@Nullable final URL location,
49-
@Nullable final ImmutableMap<String, String> context,
5047
List<DatasetId> inputs,
5148
List<DatasetId> outputs,
5249
@Nullable Run latestRun) {
@@ -56,7 +53,6 @@ public JobVersion(
5653
this.version = version;
5754
this.namespace = id.getNamespace();
5855
this.location = location;
59-
this.context = (context == null) ? ImmutableMap.of() : context;
6056
this.inputs = inputs;
6157
this.outputs = outputs;
6258
this.latestRun = latestRun;

api/src/main/java/marquez/common/Utils.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.fasterxml.jackson.databind.SerializationFeature;
2121
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
2222
import com.google.common.base.Joiner;
23-
import com.google.common.collect.ImmutableMap;
2423
import com.google.common.collect.ImmutableSet;
2524
import com.google.common.hash.Hashing;
2625
import io.dropwizard.jackson.Jackson;
@@ -236,7 +235,6 @@ public static Instant toInstant(@Nullable final String asIso) {
236235
* @param jobName The name of the job.
237236
* @param jobInputIds The input dataset IDs for the job.
238237
* @param jobOutputIds The output dataset IDs for the job.
239-
* @param jobContext The context of the job.
240238
* @param jobLocation The source code location for the job.
241239
* @return A {@link Version} object based on the specified job meta.
242240
*/
@@ -245,7 +243,6 @@ public static Version newJobVersionFor(
245243
@NonNull final JobName jobName,
246244
@NonNull final ImmutableSet<DatasetId> jobInputIds,
247245
@NonNull final ImmutableSet<DatasetId> jobOutputIds,
248-
@NonNull final ImmutableMap<String, String> jobContext,
249246
@Nullable final String jobLocation) {
250247
final byte[] bytes =
251248
VERSION_JOINER
@@ -268,8 +265,7 @@ public static Version newJobVersionFor(
268265
jobOutputId.getNamespace().getValue(),
269266
jobOutputId.getName().getValue()))
270267
.collect(joining(VERSION_DELIM)),
271-
jobLocation,
272-
KV_JOINER.join(jobContext))
268+
jobLocation)
273269
.getBytes(UTF_8);
274270
return Version.of(UUID.nameUUIDFromBytes(bytes));
275271
}

api/src/main/java/marquez/common/models/DatasetId.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import com.fasterxml.jackson.annotation.JsonProperty;
99
import com.google.common.collect.ComparisonChain;
10-
import com.google.common.collect.ImmutableMap;
1110
import com.google.common.collect.ImmutableSet;
1211
import lombok.EqualsAndHashCode;
1312
import lombok.NonNull;
@@ -16,8 +15,8 @@
1615

1716
/**
1817
* ID for {@code Dataset}. The class implements {@link Comparable} to ensure job versions generated
19-
* with {@link Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet,
20-
* ImmutableMap, String)} are consistent as jobs may contain inputs and outputs out of order.
18+
* with {@link Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet, String)}
19+
* are consistent as jobs may contain inputs and outputs out of order.
2120
*/
2221
@EqualsAndHashCode
2322
@ToString
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.common.models;
7+
8+
public enum FacetType {
9+
RUN,
10+
JOB,
11+
DATASET;
12+
}

api/src/main/java/marquez/db/BaseDao.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ public interface BaseDao extends SqlObject {
1818
@CreateSqlObject
1919
DatasetVersionDao createDatasetVersionDao();
2020

21-
@CreateSqlObject
22-
JobContextDao createJobContextDao();
23-
2421
@CreateSqlObject
2522
JobDao createJobDao();
2623

api/src/main/java/marquez/db/Columns.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,9 @@ private Columns() {}
103103

104104
/* JOB VERSION ROW COLUMNS */
105105
public static final String JOB_UUID = "job_uuid";
106-
public static final String JOB_CONTEXT_UUID = "job_context_uuid";
107106
public static final String LOCATION = "location";
108107
public static final String LATEST_RUN_UUID = "latest_run_uuid";
109108

110-
/* JOB CONTEXT ROW COLUMNS */
111-
public static final String CONTEXT = "context";
112-
113109
/* RUN ROW COLUMNS */
114110
public static final String EXTERNAL_ID = "external_id";
115111
public static final String RUN_ARGS_UUID = "run_args_uuid";

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 22 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -71,30 +71,8 @@ void updateLastModifiedAt(
7171

7272
@SqlQuery(
7373
"""
74-
WITH selected_datasets AS (
75-
SELECT d.*
76-
FROM datasets_view d
77-
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
78-
), dataset_runs AS (
79-
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
80-
FROM selected_datasets d
81-
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
82-
LEFT JOIN LATERAL (
83-
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
84-
WHERE dataset_uuid = dv.dataset_uuid
85-
) df ON df.run_uuid = dv.run_uuid
86-
UNION
87-
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
88-
FROM selected_datasets d
89-
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
90-
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
91-
LEFT JOIN LATERAL (
92-
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
93-
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
94-
) df ON df.run_uuid = rim.run_uuid
95-
)
9674
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
97-
FROM selected_datasets d
75+
FROM datasets_view d
9876
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
9977
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
10078
LEFT JOIN (
@@ -104,11 +82,15 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
10482
GROUP BY m.dataset_uuid
10583
) t ON t.dataset_uuid = d.uuid
10684
LEFT JOIN (
107-
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
108-
FROM dataset_runs AS d2
109-
WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL
110-
GROUP BY d2.uuid
111-
) f ON f.dataset_uuid = d.uuid""")
85+
SELECT
86+
df.dataset_version_uuid,
87+
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
88+
FROM dataset_facets_view AS df
89+
WHERE df.facet IS NOT NULL
90+
GROUP BY df.dataset_version_uuid
91+
) f ON f.dataset_version_uuid = d.current_version_uuid
92+
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
93+
""")
11294
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
11395

11496
default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
@@ -137,32 +119,8 @@ default void setFields(Dataset ds) {
137119

138120
@SqlQuery(
139121
"""
140-
WITH selected_datasets AS (
141-
SELECT d.*
142-
FROM datasets_view d
143-
WHERE d.namespace_name = :namespaceName
144-
ORDER BY d.name
145-
LIMIT :limit OFFSET :offset
146-
), dataset_runs AS (
147-
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
148-
FROM selected_datasets d
149-
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
150-
LEFT JOIN LATERAL (
151-
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
152-
WHERE dataset_uuid = dv.dataset_uuid
153-
) df ON df.run_uuid = dv.run_uuid
154-
UNION
155-
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
156-
FROM selected_datasets d
157-
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
158-
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
159-
LEFT JOIN LATERAL (
160-
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
161-
WHERE dataset_uuid = dv.dataset_uuid
162-
) df ON df.run_uuid = rim.run_uuid
163-
)
164122
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
165-
FROM selected_datasets d
123+
FROM datasets_view d
166124
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
167125
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
168126
LEFT JOIN (
@@ -172,13 +130,17 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
172130
GROUP BY m.dataset_uuid
173131
) t ON t.dataset_uuid = d.uuid
174132
LEFT JOIN (
175-
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
176-
FROM dataset_runs AS d2
177-
WHERE d2.run_uuid = d2.run_uuid
178-
AND d2.facet IS NOT NULL
179-
GROUP BY d2.uuid
180-
) f ON f.dataset_uuid = d.uuid
181-
ORDER BY d.name""")
133+
SELECT
134+
df.dataset_version_uuid,
135+
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
136+
FROM dataset_facets_view AS df
137+
WHERE df.facet IS NOT NULL
138+
GROUP BY df.dataset_version_uuid
139+
) f ON f.dataset_version_uuid = d.current_version_uuid
140+
WHERE d.namespace_name = :namespaceName
141+
ORDER BY d.name
142+
LIMIT :limit OFFSET :offset
143+
""")
182144
List<Dataset> findAll(String namespaceName, int limit, int offset);
183145

184146
@SqlQuery("SELECT count(*) FROM datasets_view")

0 commit comments

Comments
 (0)