Skip to content

Commit 33d390a

Browse files
authored
Merge branch 'main' into feature/db-retention
2 parents d8b31aa + 3d023e4 commit 33d390a

39 files changed

Lines changed: 771 additions & 221 deletions

.circleci/api-load-test.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
set -e
1515

1616
# Build version of Marquez
17-
readonly MARQUEZ_VERSION=0.37.0-SNAPSHOT
17+
readonly MARQUEZ_VERSION=0.38.0-SNAPSHOT
1818
# Fully qualified path to marquez.jar
1919
readonly MARQUEZ_JAR="api/build/libs/marquez-api-${MARQUEZ_VERSION}.jar"
2020

.circleci/db-migration.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# Version of PostgreSQL
1414
readonly POSTGRES_VERSION="12.1"
1515
# Version of Marquez
16-
readonly MARQUEZ_VERSION=0.36.0
16+
readonly MARQUEZ_VERSION=0.37.0
1717
# Build version of Marquez
1818
readonly MARQUEZ_BUILD_VERSION="$(git log --pretty=format:'%h' -n 1)" # SHA1
1919

.env.example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
API_PORT=5000
22
API_ADMIN_PORT=5001
33
WEB_PORT=3000
4-
TAG=0.36.0
4+
TAG=0.37.0

CHANGELOG.md

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
# Changelog
22

3-
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.36.0...HEAD)
3+
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.37.0...HEAD)
4+
5+
## [0.37.0](https://github.com/MarquezProject/marquez/compare/0.36.0...0.37.0) - 2023-07-17
6+
### Added
7+
* API: add ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
8+
*Introduces an `EventTypeResolver` for using the `schemaURL` field to decode `POST` requests to `/lineage` with `LineageEvent`s, `DatasetEvent`s or `JobEvent`s, as the first step in implementing static lineage support.*
9+
410
### Fixed
5-
* API: remove unnecessary DB updates [`#2531`](https://github.com/MarquezProject/marquez/pull/2531)[@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
6-
*Prevent updates that are not needed and are deadlock prone.*
11+
* API: remove unnecessary DB updates [`#2531`](https://github.com/MarquezProject/marquez/pull/2531) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
12+
*Prevent updates that are not needed and are deadlock-prone.*
13+
* Web: revert URL encoding when fetching lineage [`#2529`](https://github.com/MarquezProject/marquez/pull/2529) [@jlukenoff](https://github.com/jlukenoff)
14+
*Reverts the node ID from being URL-encoded and allows the backend to return lineage details successfully even when a node ID contains special characters.*
715

816
## [0.36.0](https://github.com/MarquezProject/marquez/compare/0.35.0...0.36.0) - 2023-06-27
917
### Added
@@ -28,6 +36,12 @@
2836
* Web: handle lineage graph cycles on the client [`#2506`](https://github.com/MarquezProject/marquez/pull/2506) [@jlukenoff](https://github.com/jlukenoff)
2937
*Fixes a bug where we blow the stack on the client-side if the user selects a node that is part of a cycle in the graph.*
3038

39+
### Added
40+
41+
* Ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
42+
*Adds the ability to distinguish on a bakend static metadata events introduced based on the [proposal](https://github.com/OpenLineage/OpenLineage/blob/main/proposals/1837/static_lineage.md).*
43+
44+
3145
## [0.34.0](https://github.com/MarquezProject/marquez/compare/0.33.0...0.34.0) - 2023-05-18
3246

3347
### Fixed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ Versions of Marquez are compatible with OpenLineage unless noted otherwise. We e
7979
| **Marquez** | **OpenLineage** | **Status** |
8080
|--------------------------------------------------------------------------------------------------|---------------------------------------------------------------|---------------|
8181
| [`UNRELEASED`](https://github.com/MarquezProject/marquez/blob/main/CHANGELOG.md#unreleased) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `CURRENT` |
82-
| [`0.35.0`](https://github.com/MarquezProject/marquez/blob/0.35.0/CHANGELOG.md#0350---2023-06-13) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `RECOMMENDED` |
83-
| [`0.34.0`](https://github.com/MarquezProject/marquez/blob/0.34.0/CHANGELOG.md#0340---2023-05-18) | [`1-0-5`](https://openlineage.io/spec/1-0-0/OpenLineage.json) | `MAINTENANCE` |
82+
| [`0.37.0`](https://github.com/MarquezProject/marquez/blob/0.37.0/CHANGELOG.md#0370---2023-07-17) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `RECOMMENDED` |
83+
| [`0.36.0`](https://github.com/MarquezProject/marquez/blob/0.36.0/CHANGELOG.md#0360---2023-06-27) | [`1-0-5`](https://openlineage.io/spec/1-0-0/OpenLineage.json) | `MAINTENANCE` |
8484

8585
> **Note:** The [`openlineage-python`](https://pypi.org/project/openlineage-python) and [`openlineage-java`](https://central.sonatype.com/artifact/io.openlineage/openlineage-java) libraries will a higher version than the OpenLineage [specification](https://github.com/OpenLineage/OpenLineage/tree/main/spec) as they have different version requirements.
8686
@@ -160,7 +160,7 @@ Marquez listens on port `8080` for all API calls and port `8081` for the admin i
160160

161161
* Website: https://marquezproject.ai
162162
* Source: https://github.com/MarquezProject/marquez
163-
* Chat: [https://marquezproject.slack.com](https://bit.ly/MqzSlack)
163+
* Chat: [MarquezProject Slack](https://bit.ly/MqzSlackInvite)
164164
* Twitter: [@MarquezProject](https://twitter.com/MarquezProject)
165165

166166
## Contributing

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import marquez.api.models.SortDirection;
3939
import marquez.db.OpenLineageDao;
4040
import marquez.service.ServiceFactory;
41+
import marquez.service.models.BaseEvent;
4142
import marquez.service.models.LineageEvent;
4243
import marquez.service.models.NodeId;
4344

@@ -61,20 +62,26 @@ public OpenLineageResource(
6162
@Consumes(APPLICATION_JSON)
6263
@Produces(APPLICATION_JSON)
6364
@Path("/lineage")
64-
public void create(
65-
@Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse)
65+
public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncResponse asyncResponse)
6666
throws JsonProcessingException, SQLException {
67-
openLineageService
68-
.createAsync(event)
69-
.whenComplete(
70-
(result, err) -> {
71-
if (err != null) {
72-
log.error("Unexpected error while processing request", err);
73-
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
74-
} else {
75-
asyncResponse.resume(Response.status(201).build());
76-
}
77-
});
67+
if (event instanceof LineageEvent) {
68+
openLineageService
69+
.createAsync((LineageEvent) event)
70+
.whenComplete(
71+
(result, err) -> {
72+
if (err != null) {
73+
log.error("Unexpected error while processing request", err);
74+
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
75+
} else {
76+
asyncResponse.resume(Response.status(201).build());
77+
}
78+
});
79+
} else {
80+
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());
81+
82+
// return serialized event
83+
asyncResponse.resume(Response.status(200).entity(event).build());
84+
}
7885
}
7986

8087
private int determineStatusCode(Throwable e) {

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 59 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -71,26 +71,28 @@ void updateLastModifiedAt(
7171

7272
@SqlQuery(
7373
"""
74-
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
75-
FROM datasets_view d
76-
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
77-
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
78-
LEFT JOIN (
79-
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
80-
FROM tags AS t
81-
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
82-
GROUP BY m.dataset_uuid
83-
) t ON t.dataset_uuid = d.uuid
84-
LEFT JOIN (
85-
SELECT
86-
df.dataset_version_uuid,
87-
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
88-
FROM dataset_facets_view AS df
89-
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
90-
GROUP BY df.dataset_version_uuid
91-
) f ON f.dataset_version_uuid = d.current_version_uuid
92-
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
93-
""")
74+
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
75+
FROM datasets_view d
76+
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
77+
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
78+
LEFT JOIN (
79+
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
80+
FROM tags AS t
81+
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
82+
GROUP BY m.dataset_uuid
83+
) t ON t.dataset_uuid = d.uuid
84+
LEFT JOIN (
85+
SELECT
86+
df.dataset_version_uuid,
87+
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
88+
FROM dataset_facets AS df
89+
WHERE df.facet IS NOT NULL AND
90+
(df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') AND
91+
df.dataset_uuid = (SELECT uuid FROM datasets WHERE name = :datasetName AND namespace_name = :namespaceName)
92+
GROUP BY df.dataset_version_uuid
93+
) f ON f.dataset_version_uuid = d.current_version_uuid
94+
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
95+
""")
9496
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
9597

9698
default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
@@ -119,28 +121,30 @@ default void setFields(Dataset ds) {
119121

120122
@SqlQuery(
121123
"""
122-
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
123-
FROM datasets_view d
124-
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
125-
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
126-
LEFT JOIN (
127-
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
128-
FROM tags AS t
129-
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
130-
GROUP BY m.dataset_uuid
131-
) t ON t.dataset_uuid = d.uuid
132-
LEFT JOIN (
133-
SELECT
134-
df.dataset_version_uuid,
135-
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
136-
FROM dataset_facets_view AS df
137-
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
138-
GROUP BY df.dataset_version_uuid
139-
) f ON f.dataset_version_uuid = d.current_version_uuid
140-
WHERE d.namespace_name = :namespaceName
141-
ORDER BY d.name
142-
LIMIT :limit OFFSET :offset
143-
""")
124+
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
125+
FROM datasets_view d
126+
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
127+
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
128+
LEFT JOIN (
129+
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
130+
FROM tags AS t
131+
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
132+
GROUP BY m.dataset_uuid
133+
) t ON t.dataset_uuid = d.uuid
134+
LEFT JOIN (
135+
SELECT
136+
df.dataset_version_uuid,
137+
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
138+
FROM dataset_facets AS df
139+
WHERE df.facet IS NOT NULL AND
140+
(df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') AND
141+
df.dataset_uuid IN (SELECT uuid FROM datasets_view WHERE namespace_name = :namespaceName ORDER BY name LIMIT :limit OFFSET :offset)
142+
GROUP BY df.dataset_version_uuid
143+
) f ON f.dataset_version_uuid = d.current_version_uuid
144+
WHERE d.namespace_name = :namespaceName
145+
ORDER BY d.name
146+
LIMIT :limit OFFSET :offset
147+
""")
144148
List<Dataset> findAll(String namespaceName, int limit, int offset);
145149

146150
@SqlQuery("SELECT count(*) FROM datasets_view")
@@ -249,23 +253,23 @@ DatasetRow upsert(
249253

250254
@SqlUpdate(
251255
"""
252-
UPDATE datasets d
253-
SET is_hidden = true
254-
FROM namespaces n
255-
WHERE n.uuid=d.namespace_uuid
256-
AND n.name=:namespaceName
257-
""")
256+
UPDATE datasets d
257+
SET is_hidden = true
258+
FROM namespaces n
259+
WHERE n.uuid=d.namespace_uuid
260+
AND n.name=:namespaceName
261+
""")
258262
void deleteByNamespaceName(String namespaceName);
259263

260264
@SqlQuery(
261265
"""
262-
UPDATE datasets d
263-
SET is_hidden = true
264-
FROM namespaces n
265-
WHERE n.uuid = d.namespace_uuid
266-
AND n.name=:namespaceName AND d.name=:name
267-
RETURNING *
268-
""")
266+
UPDATE datasets d
267+
SET is_hidden = true
268+
FROM namespaces n
269+
WHERE n.uuid = d.namespace_uuid
270+
AND n.name=:namespaceName AND d.name=:name
271+
RETURNING *
272+
""")
269273
Optional<DatasetRow> delete(String namespaceName, String name);
270274

271275
@Transaction
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service.models;
7+
8+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
9+
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
10+
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
11+
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
12+
13+
@JsonTypeIdResolver(EventTypeResolver.class)
14+
@JsonTypeInfo(
15+
use = Id.CUSTOM,
16+
include = As.EXISTING_PROPERTY,
17+
property = "schemaURL",
18+
defaultImpl = LineageEvent.class,
19+
visible = true)
20+
public class BaseEvent extends BaseJsonModel {}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service.models;
7+
8+
import java.net.URI;
9+
import java.time.ZonedDateTime;
10+
import javax.validation.Valid;
11+
import javax.validation.constraints.NotNull;
12+
import lombok.AllArgsConstructor;
13+
import lombok.Builder;
14+
import lombok.Getter;
15+
import lombok.NoArgsConstructor;
16+
import lombok.Setter;
17+
import lombok.ToString;
18+
19+
@Builder
20+
@AllArgsConstructor
21+
@NoArgsConstructor
22+
@Setter
23+
@Getter
24+
@Valid
25+
@ToString
26+
public class DatasetEvent extends BaseEvent {
27+
@NotNull private ZonedDateTime eventTime;
28+
@Valid private LineageEvent.Dataset dataset;
29+
@Valid @NotNull private String producer;
30+
@Valid @NotNull private URI schemaURL;
31+
}

0 commit comments

Comments
 (0)