Skip to content

Commit d17668a

Browse files
OL facets - PR2 - read facets from views based on lineage_events table (#2355)
* OL facets - PR2 - read facets from views pointing to lineage_events table Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> * OL facets - PR3 - migrate data to facet tables (#2359) Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> --------- Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
1 parent d7298a9 commit d17668a

30 files changed

Lines changed: 1367 additions & 159 deletions

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ jobs:
166166
working_directory: ~/marquez
167167
machine:
168168
image: ubuntu-2004:current
169+
resource_class: large
169170
steps:
170171
- checkout
171172
- run: ./.circleci/get-docker-compose.sh

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44

55
### Added
66

7+
* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2350`](https://github.com/MarquezProject/marquez/pull/2350), [`2355`](https://github.com/MarquezProject/marquez/pull/2355), [`2359`](https://github.com/MarquezProject/marquez/pull/2359)
8+
[@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
9+
* Performance improvement storing and querying facets.
10+
* Migration procedure requires manual steps if database has more than 100K lineage events.
11+
* We highly encourage users to review our [migration plan](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md).
712
* Additions to seed data for column-lineage [`#2381`](https://github.com/MarquezProject/marquez/pull/2381) [@rossturk](https://github.com/rossturk)
813
* Added new `docker/down.sh` script that makes it easier to stop local deployment when run detached [`#2380`](https://github.com/MarquezProject/marquez/pull/2380) [@rossturk](https://github.com/rossturk)
914

10-
1115
## [0.29.0](https://github.com/MarquezProject/marquez/compare/0.28.0...0.29.0) - 2022-12-19
1216

1317
### Added

api/src/main/java/marquez/MarquezApp.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import lombok.NonNull;
2828
import lombok.extern.slf4j.Slf4j;
2929
import marquez.api.filter.JobRedirectFilter;
30+
import marquez.cli.DbMigrationCommand;
3031
import marquez.cli.MetadataCommand;
3132
import marquez.cli.SeedCommand;
3233
import marquez.common.Utils;
@@ -149,6 +150,12 @@ public void registerResources(
149150
}
150151
}
151152

153+
@Override
154+
protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
155+
bootstrap.addCommand(new DbMigrationCommand<>(this));
156+
super.addDefaultCommands(bootstrap);
157+
}
158+
152159
private MarquezContext buildMarquezContext(
153160
MarquezConfig config, Environment env, ManagedDataSource source) {
154161
final JdbiFactory factory = new JdbiFactory();
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.cli;
7+
8+
import io.dropwizard.Application;
9+
import io.dropwizard.cli.EnvironmentCommand;
10+
import io.dropwizard.db.DataSourceFactory;
11+
import io.dropwizard.db.ManagedDataSource;
12+
import io.dropwizard.jdbi3.JdbiFactory;
13+
import io.dropwizard.setup.Environment;
14+
import javax.sql.DataSource;
15+
import lombok.extern.slf4j.Slf4j;
16+
import marquez.db.migrations.V57_1__BackfillFacets;
17+
import net.sourceforge.argparse4j.inf.Namespace;
18+
import net.sourceforge.argparse4j.inf.Subparser;
19+
import org.jdbi.v3.core.Jdbi;
20+
import org.jdbi.v3.jackson2.Jackson2Plugin;
21+
import org.jdbi.v3.postgres.PostgresPlugin;
22+
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
23+
24+
/**
25+
* A command to manually run database migrations when needed. This migration requires a heavy DB
26+
* operation which can be done asynchronously (with limited API downtime) due to separate migration
27+
* command.
28+
*/
29+
@Slf4j
30+
public class DbMigrationCommand<MarquezConfig> extends EnvironmentCommand<marquez.MarquezConfig> {
31+
32+
private static final String DB_MIGRATE = "db-migrate";
33+
private static final String MIGRATION_V57_DESCRIPTION =
34+
"""
35+
A command to manually run V57 database migration.
36+
Please refer to https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md for more details.
37+
""";
38+
39+
private static final String COMMAND_DESCRIPTION =
40+
"""
41+
A command to manually run database migrations.
42+
Extra parameters are required to specify the migration to run.
43+
""";
44+
45+
/**
46+
* Creates a new environment command.
47+
*
48+
* @param application the application providing this command
49+
*/
50+
public DbMigrationCommand(Application<marquez.MarquezConfig> application) {
51+
super(application, DB_MIGRATE, COMMAND_DESCRIPTION);
52+
}
53+
54+
@Override
55+
public void configure(Subparser subparser) {
56+
subparser
57+
.addArgument("--chunkSize")
58+
.dest("chunkSize")
59+
.type(Integer.class)
60+
.required(false)
61+
.setDefault(V57_1__BackfillFacets.DEFAULT_CHUNK_SIZE)
62+
.help("amount of lineage_events rows processed in a single SQL query and transaction.");
63+
64+
subparser
65+
.addArgument("--version")
66+
.dest("version")
67+
.type(String.class)
68+
.required(true)
69+
.help("migration version to apply like 'v57'");
70+
71+
addFileArgument(subparser);
72+
}
73+
74+
@Override
75+
protected void run(
76+
Environment environment, Namespace namespace, marquez.MarquezConfig configuration)
77+
throws Exception {
78+
79+
final DataSourceFactory sourceFactory = configuration.getDataSourceFactory();
80+
final DataSource source = sourceFactory.build(environment.metrics(), "MarquezApp-source");
81+
final JdbiFactory factory = new JdbiFactory();
82+
83+
Jdbi jdbi =
84+
factory
85+
.build(
86+
environment,
87+
configuration.getDataSourceFactory(),
88+
(ManagedDataSource) source,
89+
"postgresql-command")
90+
.installPlugin(new SqlObjectPlugin())
91+
.installPlugin(new PostgresPlugin())
92+
.installPlugin(new Jackson2Plugin());
93+
94+
MarquezMigrations.valueOf(namespace.getString("version")).run(jdbi, namespace);
95+
}
96+
97+
enum MarquezMigrations {
98+
v57 {
99+
public void run(Jdbi jdbi, Namespace namespace) throws Exception {
100+
log.info("Running V57_1__BackfillFacets migration");
101+
V57_1__BackfillFacets migration = new V57_1__BackfillFacets();
102+
migration.setManual(true);
103+
migration.setJdbi(jdbi);
104+
migration.setChunkSize(namespace.getInt("chunkSize"));
105+
migration.migrate(null);
106+
}
107+
};
108+
109+
public void run(Jdbi jdbi, Namespace namespace) throws Exception {
110+
throw new UnsupportedOperationException();
111+
}
112+
}
113+
}

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -76,22 +76,22 @@ WITH selected_datasets AS (
7676
FROM datasets_view d
7777
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
7878
), dataset_runs AS (
79-
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
79+
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
8080
FROM selected_datasets d
81-
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
81+
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
8282
LEFT JOIN LATERAL (
83-
SELECT run_uuid, event_time, event FROM lineage_events
84-
WHERE run_uuid = dv.run_uuid
85-
) e ON e.run_uuid = dv.run_uuid
83+
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
84+
WHERE dataset_uuid = dv.dataset_uuid
85+
) df ON df.run_uuid = dv.run_uuid
8686
UNION
87-
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
87+
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
8888
FROM selected_datasets d
8989
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
9090
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
9191
LEFT JOIN LATERAL (
92-
SELECT run_uuid, event_time, event FROM lineage_events
93-
WHERE run_uuid = rim.run_uuid
94-
) e ON e.run_uuid = rim.run_uuid
92+
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
93+
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
94+
) df ON df.run_uuid = rim.run_uuid
9595
)
9696
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
9797
FROM selected_datasets d
@@ -104,13 +104,9 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
104104
GROUP BY m.dataset_uuid
105105
) t ON t.dataset_uuid = d.uuid
106106
LEFT JOIN (
107-
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
108-
FROM dataset_runs d2,
109-
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
110-
WHERE d2.run_uuid = d2.run_uuid
111-
AND ds -> 'facets' IS NOT NULL
112-
AND ds ->> 'name' = d2.name
113-
AND ds ->> 'namespace' = d2.namespace_name
107+
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
108+
FROM dataset_runs AS d2
109+
WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL
114110
GROUP BY d2.uuid
115111
) f ON f.dataset_uuid = d.uuid""")
116112
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
@@ -148,22 +144,22 @@ WITH selected_datasets AS (
148144
ORDER BY d.name
149145
LIMIT :limit OFFSET :offset
150146
), dataset_runs AS (
151-
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
147+
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
152148
FROM selected_datasets d
153149
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
154150
LEFT JOIN LATERAL (
155-
SELECT run_uuid, event_time, event FROM lineage_events
156-
WHERE run_uuid = dv.run_uuid
157-
) e ON e.run_uuid = dv.run_uuid
151+
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
152+
WHERE dataset_uuid = dv.dataset_uuid
153+
) df ON df.run_uuid = dv.run_uuid
158154
UNION
159-
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
155+
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
160156
FROM selected_datasets d
161157
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
162158
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
163159
LEFT JOIN LATERAL (
164-
SELECT run_uuid, event_time, event FROM lineage_events
165-
WHERE run_uuid = rim.run_uuid
166-
) e ON e.run_uuid = rim.run_uuid
160+
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
161+
WHERE dataset_uuid = dv.dataset_uuid
162+
) df ON df.run_uuid = rim.run_uuid
167163
)
168164
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
169165
FROM selected_datasets d
@@ -176,13 +172,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
176172
GROUP BY m.dataset_uuid
177173
) t ON t.dataset_uuid = d.uuid
178174
LEFT JOIN (
179-
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets
180-
FROM dataset_runs d2,
181-
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
175+
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
176+
FROM dataset_runs AS d2
182177
WHERE d2.run_uuid = d2.run_uuid
183-
AND ds -> 'facets' IS NOT NULL
184-
AND ds ->> 'name' = d2.name
185-
AND ds ->> 'namespace' = d2.namespace_name
178+
AND d2.facet IS NOT NULL
186179
GROUP BY d2.uuid
187180
) f ON f.dataset_uuid = d.uuid
188181
ORDER BY d.name""")

api/src/main/java/marquez/db/DatasetFacetsDao.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 contributors to the Marquez project
2+
* Copyright 2018-2023 contributors to the Marquez project
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

@@ -70,9 +70,9 @@ public static Type typeFromName(@NonNull final String name) {
7070
}
7171

7272
/**
73-
* @param uuid
7473
* @param createdAt
7574
* @param datasetUuid
75+
* @param datasetVersionUuid
7676
* @param runUuid
7777
* @param lineageEventTime
7878
* @param lineageEventType
@@ -83,19 +83,19 @@ public static Type typeFromName(@NonNull final String name) {
8383
@SqlUpdate(
8484
"""
8585
INSERT INTO dataset_facets (
86-
uuid,
8786
created_at,
8887
dataset_uuid,
88+
dataset_version_uuid,
8989
run_uuid,
9090
lineage_event_time,
9191
lineage_event_type,
9292
type,
9393
name,
9494
facet
9595
) VALUES (
96-
:uuid,
9796
:createdAt,
9897
:datasetUuid,
98+
:datasetVersionUuid,
9999
:runUuid,
100100
:lineageEventTime,
101101
:lineageEventType,
@@ -105,9 +105,9 @@ INSERT INTO dataset_facets (
105105
)
106106
""")
107107
void insertDatasetFacet(
108-
UUID uuid,
109108
Instant createdAt,
110109
UUID datasetUuid,
110+
UUID datasetVersionUuid,
111111
UUID runUuid,
112112
Instant lineageEventTime,
113113
String lineageEventType,
@@ -125,6 +125,7 @@ void insertDatasetFacet(
125125
@Transaction
126126
default void insertDatasetFacetsFor(
127127
@NonNull UUID datasetUuid,
128+
@NonNull UUID datasetVersionUuid,
128129
@NonNull UUID runUuid,
129130
@NonNull Instant lineageEventTime,
130131
@NonNull String lineageEventType,
@@ -137,9 +138,9 @@ default void insertDatasetFacetsFor(
137138
.forEach(
138139
fieldName ->
139140
insertDatasetFacet(
140-
UUID.randomUUID(),
141141
now,
142142
datasetUuid,
143+
datasetVersionUuid,
143144
runUuid,
144145
lineageEventTime,
145146
lineageEventType,
@@ -149,9 +150,9 @@ default void insertDatasetFacetsFor(
149150
}
150151

151152
record DatasetFacetRow(
152-
UUID uuid,
153153
Instant createdAt,
154154
UUID datasetUuid,
155+
UUID datasetVersionUuid,
155156
UUID runUuid,
156157
Instant lineageEventTime,
157158
String lineageEventType,

0 commit comments

Comments
 (0)