Skip to content

Commit 951d60a

Browse files
OL facets - PR3 - migrate data to facet tables
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent bde93bd commit 951d60a

9 files changed

Lines changed: 1097 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.29.0...HEAD)
44

5+
### Added
6+
7+
* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2350`](https://github.com/MarquezProject/marquez/pull/2350), [`2355`](https://github.com/MarquezProject/marquez/pull/2355), [`2359`](https://github.com/MarquezProject/marquez/pull/2359)
8+
[@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
9+
* Performance improvement with migration procedure that requires manual steps if database has more than 100K lineage events.
10+
* We highly encourage users to review our [migration plan](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md).
11+
512
## [0.29.0](https://github.com/MarquezProject/marquez/compare/0.28.0...0.29.0) - 2022-12-19
613

714
### Added

api/src/main/java/marquez/MarquezApp.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import lombok.NonNull;
2828
import lombok.extern.slf4j.Slf4j;
2929
import marquez.api.filter.JobRedirectFilter;
30+
import marquez.cli.DbMigrationsCommand;
3031
import marquez.cli.MetadataCommand;
3132
import marquez.cli.SeedCommand;
3233
import marquez.common.Utils;
@@ -149,6 +150,12 @@ public void registerResources(
149150
}
150151
}
151152

153+
@Override
154+
protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
155+
bootstrap.addCommand(new DbMigrationsCommand<>(this));
156+
super.addDefaultCommands(bootstrap);
157+
}
158+
152159
private MarquezContext buildMarquezContext(
153160
MarquezConfig config, Environment env, ManagedDataSource source) {
154161
final JdbiFactory factory = new JdbiFactory();
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.cli;
7+
8+
import io.dropwizard.Application;
9+
import io.dropwizard.cli.EnvironmentCommand;
10+
import io.dropwizard.db.DataSourceFactory;
11+
import io.dropwizard.db.ManagedDataSource;
12+
import io.dropwizard.jdbi3.JdbiFactory;
13+
import io.dropwizard.setup.Environment;
14+
import javax.sql.DataSource;
15+
import lombok.extern.slf4j.Slf4j;
16+
import marquez.db.migrations.V57_1__BackfillFacets;
17+
import net.sourceforge.argparse4j.inf.Namespace;
18+
import net.sourceforge.argparse4j.inf.Subparser;
19+
import org.jdbi.v3.core.Jdbi;
20+
import org.jdbi.v3.jackson2.Jackson2Plugin;
21+
import org.jdbi.v3.postgres.PostgresPlugin;
22+
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
23+
24+
/**
25+
* A command to manually run database migrations when needed. This migration requires a heavy DB
26+
* operation which can be done asynchronously (with limited API downtime) due to separate migration
27+
* command.
28+
*/
29+
@Slf4j
30+
public class DbMigrationsCommand<MarquezConfig> extends EnvironmentCommand<marquez.MarquezConfig> {
31+
32+
private static final String DB_MIGRATE = "db_migrate";
33+
private static final String MIGRATION_V57_DESCRIPTION =
34+
"""
35+
A command to manually run V57 database migration.
36+
Please refer to https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md for more details.
37+
""";
38+
39+
private static final String COMMAND_DESCRIPTION =
40+
"""
41+
A command to manually run database migrations.
42+
Extra parameters are required to specify the migration to run.
43+
""";
44+
45+
/**
46+
* Creates a new environment command.
47+
*
48+
* @param application the application providing this command
49+
*/
50+
public DbMigrationsCommand(Application<marquez.MarquezConfig> application) {
51+
super(application, DB_MIGRATE, COMMAND_DESCRIPTION);
52+
}
53+
54+
@Override
55+
public void configure(Subparser subparser) {
56+
subparser
57+
.addArgument("--chunkSize")
58+
.dest("chunkSize")
59+
.type(Integer.class)
60+
.required(false)
61+
.setDefault(V57_1__BackfillFacets.DEFAULT_CHUNK_SIZE)
62+
.help("amount of lineage_events rows processed in a single SQL query and transaction.");
63+
64+
subparser
65+
.addArgument("--version")
66+
.dest("version")
67+
.type(String.class)
68+
.required(true)
69+
.help("migration version to apply like 'v57'");
70+
71+
addFileArgument(subparser);
72+
}
73+
74+
@Override
75+
protected void run(
76+
Environment environment, Namespace namespace, marquez.MarquezConfig configuration)
77+
throws Exception {
78+
79+
final DataSourceFactory sourceFactory = configuration.getDataSourceFactory();
80+
final DataSource source = sourceFactory.build(environment.metrics(), "MarquezApp-source");
81+
final JdbiFactory factory = new JdbiFactory();
82+
83+
Jdbi jdbi =
84+
factory
85+
.build(
86+
environment,
87+
configuration.getDataSourceFactory(),
88+
(ManagedDataSource) source,
89+
"postgresql-command")
90+
.installPlugin(new SqlObjectPlugin())
91+
.installPlugin(new PostgresPlugin())
92+
.installPlugin(new Jackson2Plugin());
93+
94+
MarquezMigrations.valueOf(namespace.getString("version")).run(jdbi, namespace);
95+
}
96+
97+
enum MarquezMigrations {
98+
v57 {
99+
public void run(Jdbi jdbi, Namespace namespace) throws Exception {
100+
log.info("Running V57_1__BackfillFacets migration");
101+
V57_1__BackfillFacets migration = new V57_1__BackfillFacets();
102+
migration.setManual(true);
103+
migration.setJdbi(jdbi);
104+
migration.setChunkSize(namespace.getInt("chunkSize"));
105+
migration.migrate(null);
106+
}
107+
};
108+
109+
public void run(Jdbi jdbi, Namespace namespace) throws Exception {
110+
throw new UnsupportedOperationException();
111+
}
112+
}
113+
}
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db.migrations;
7+
8+
import java.time.Instant;
9+
import java.util.Optional;
10+
import java.util.UUID;
11+
import lombok.Setter;
12+
import lombok.extern.slf4j.Slf4j;
13+
import marquez.db.Columns;
14+
import org.flywaydb.core.api.MigrationVersion;
15+
import org.flywaydb.core.api.migration.Context;
16+
import org.flywaydb.core.api.migration.JavaMigration;
17+
import org.jdbi.v3.core.Jdbi;
18+
19+
@Slf4j
20+
public class V57_1__BackfillFacets implements JavaMigration {
21+
22+
public static int DEFAULT_CHUNK_SIZE = 10000;
23+
24+
private static int BASIC_MIGRATION_LIMIT = 100000;
25+
26+
private static final String GET_CURRENT_LOCK_SQL =
27+
"""
28+
SELECT * FROM facet_migration_lock
29+
ORDER BY created_at ASC, run_uuid ASC
30+
LIMIT 1
31+
""";
32+
33+
private static final String GET_FINISHING_LOCK_SQL =
34+
"""
35+
SELECT run_uuid, created_at FROM lineage_events
36+
ORDER BY
37+
COALESCE(created_at, event_time) ASC,
38+
run_uuid ASC
39+
LIMIT 1
40+
""";
41+
42+
private static final String GET_INITIAL_LOCK_SQL =
43+
"""
44+
SELECT
45+
run_uuid,
46+
COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at
47+
FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1
48+
""";
49+
50+
private static final String COUNT_LINEAGE_EVENTS_SQL =
51+
"""
52+
SELECT count(*) as cnt FROM lineage_events
53+
""";
54+
55+
private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL =
56+
"""
57+
SELECT count(*) as cnt FROM lineage_events e
58+
WHERE
59+
COALESCE(e.created_at, e.event_time) < :createdAt
60+
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
61+
""";
62+
63+
private String getBackFillFacetsSQL() {
64+
return String.format(
65+
"""
66+
WITH events_chunk AS (
67+
SELECT e.* FROM lineage_events e
68+
WHERE
69+
COALESCE(e.created_at, e.event_time) < :createdAt
70+
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
71+
ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC
72+
LIMIT :chunkSize
73+
),
74+
insert_datasets AS (
75+
INSERT INTO dataset_facets %s
76+
),
77+
insert_runs AS (
78+
INSERT INTO run_facets %s
79+
),
80+
insert_jobs AS (
81+
INSERT INTO job_facets %s
82+
)
83+
INSERT INTO facet_migration_lock
84+
SELECT events_chunk.created_at, events_chunk.run_uuid
85+
FROM events_chunk
86+
ORDER BY
87+
COALESCE(events_chunk.created_at, events_chunk.event_time) ASC,
88+
events_chunk.run_uuid ASC
89+
LIMIT 1
90+
RETURNING created_at, run_uuid;
91+
""",
92+
V56_1__FacetViews.getDatasetFacetsDefinitionSQL(jdbi, "events_chunk"),
93+
V56_1__FacetViews.getRunFacetsDefinitionSQL(jdbi, "events_chunk"),
94+
V56_1__FacetViews.getJobFacetsDefinitionSQL(jdbi, "events_chunk"));
95+
}
96+
97+
@Setter private Integer chunkSize = null;
98+
99+
@Setter private boolean manual = false;
100+
101+
@Setter private Jdbi jdbi;
102+
103+
public int getChunkSize() {
104+
return chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE;
105+
}
106+
107+
@Override
108+
public MigrationVersion getVersion() {
109+
return MigrationVersion.fromVersion("57.2");
110+
}
111+
112+
@Override
113+
public String getDescription() {
114+
return "BackFillFacets";
115+
}
116+
117+
@Override
118+
public Integer getChecksum() {
119+
return null;
120+
}
121+
122+
@Override
123+
public boolean isUndo() {
124+
return false;
125+
}
126+
127+
@Override
128+
public boolean isBaselineMigration() {
129+
return false;
130+
}
131+
132+
@Override
133+
public boolean canExecuteInTransaction() {
134+
return false;
135+
}
136+
137+
@Override
138+
public void migrate(Context context) throws Exception {
139+
if (context != null) {
140+
jdbi = Jdbi.create(context.getConnection());
141+
}
142+
143+
if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) {
144+
// lineage_events table is empty -> no need to run migration
145+
// anyway. we need to create lock to mark that no data requires migration
146+
execute("INSERT INTO facet_migration_lock VALUES (NOW(), uuid_generate_v4())");
147+
148+
createTargetViews();
149+
return;
150+
}
151+
Optional<MigrationLock> lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL);
152+
153+
if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) {
154+
log.warn(
155+
"""
156+
==================================================
157+
==================================================
158+
==================================================
159+
MARQUEZ INSTANCE TOO BIG TO RUN AUTO UPGRADE.
160+
YOU NEED TO RUN v55_migrate COMMAND MANUALLY.
161+
FOR MORE DETAILS, PLEASE REFER TO:
162+
https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md
163+
==================================================
164+
==================================================
165+
==================================================
166+
""");
167+
// We end migration successfully although no data has been migrated to facet tables
168+
return;
169+
}
170+
171+
log.info("Configured chunkSize is {}", getChunkSize());
172+
MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get());
173+
while (!lock.equals(lastExpectedLock.get())) {
174+
lock = backFillChunk(lock);
175+
log.info(
176+
"Migrating chunk finished. Still having {} records to migrate.",
177+
countLineageEventsToProcess(lock));
178+
}
179+
180+
createTargetViews();
181+
log.info("All records migrated");
182+
}
183+
184+
private void createTargetViews() {
185+
// replace facet views with tables
186+
execute("DROP VIEW IF EXISTS run_facets_view");
187+
execute("DROP VIEW IF EXISTS job_facets_view");
188+
execute("DROP VIEW IF EXISTS dataset_facets_view");
189+
execute("CREATE OR REPLACE VIEW run_facets_view AS SELECT * FROM run_facets");
190+
execute("CREATE OR REPLACE VIEW job_facets_view AS SELECT * FROM job_facets");
191+
execute("CREATE OR REPLACE VIEW dataset_facets_view AS SELECT * FROM dataset_facets");
192+
}
193+
194+
private void execute(String sql) {
195+
jdbi.inTransaction(handle -> handle.execute(sql));
196+
}
197+
198+
private MigrationLock backFillChunk(MigrationLock lock) {
199+
String backFillQuery = getBackFillFacetsSQL();
200+
return jdbi.withHandle(
201+
h ->
202+
h.createQuery(backFillQuery)
203+
.bind("chunkSize", getChunkSize())
204+
.bind("createdAt", lock.created_at)
205+
.bind("runUuid", lock.run_uuid)
206+
.map(
207+
rs ->
208+
new MigrationLock(
209+
rs.getColumn(Columns.RUN_UUID, UUID.class),
210+
rs.getColumn(Columns.CREATED_AT, Instant.class)))
211+
.one());
212+
}
213+
214+
private Optional<MigrationLock> getLock(String sql) {
215+
return jdbi.withHandle(
216+
h ->
217+
h.createQuery(sql)
218+
.map(
219+
rs ->
220+
new MigrationLock(
221+
rs.getColumn(Columns.RUN_UUID, UUID.class),
222+
rs.getColumn(Columns.CREATED_AT, Instant.class)))
223+
.findFirst());
224+
}
225+
226+
private int countLineageEvents() {
227+
return jdbi.withHandle(
228+
h ->
229+
h.createQuery(COUNT_LINEAGE_EVENTS_SQL)
230+
.map(rs -> rs.getColumn("cnt", Integer.class))
231+
.one());
232+
}
233+
234+
private int countLineageEventsToProcess(MigrationLock lock) {
235+
return jdbi.withHandle(
236+
h ->
237+
h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL)
238+
.bind("createdAt", lock.created_at)
239+
.bind("runUuid", lock.run_uuid)
240+
.map(rs -> rs.getColumn("cnt", Integer.class))
241+
.one());
242+
}
243+
244+
private record MigrationLock(UUID run_uuid, Instant created_at) {}
245+
}

0 commit comments

Comments
 (0)