Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.29.0...HEAD)

### Added

* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2350`](https://github.com/MarquezProject/marquez/pull/2350), [`2355`](https://github.com/MarquezProject/marquez/pull/2355), [`2359`](https://github.com/MarquezProject/marquez/pull/2359)
[@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
* Performance improvement storing and querying facets.
* Migration procedure requires manual steps if database has more than 100K lineage events.
* We highly encourage users to review our [migration plan](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md).

## [0.29.0](https://github.com/MarquezProject/marquez/compare/0.28.0...0.29.0) - 2022-12-19

### Added
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.api.filter.JobRedirectFilter;
import marquez.cli.DbMigrationCommand;
import marquez.cli.MetadataCommand;
import marquez.cli.SeedCommand;
import marquez.common.Utils;
Expand Down Expand Up @@ -149,6 +150,12 @@ public void registerResources(
}
}

@Override
protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
bootstrap.addCommand(new DbMigrationCommand<>(this));
super.addDefaultCommands(bootstrap);
}

private MarquezContext buildMarquezContext(
MarquezConfig config, Environment env, ManagedDataSource source) {
final JdbiFactory factory = new JdbiFactory();
Expand Down
113 changes: 113 additions & 0 deletions api/src/main/java/marquez/cli/DbMigrationCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.cli;

import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.db.ManagedDataSource;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import marquez.db.migrations.V57_1__BackfillFacets;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.jackson2.Jackson2Plugin;
import org.jdbi.v3.postgres.PostgresPlugin;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;

/**
* A command to manually run database migrations when needed. This migration requires a heavy DB
* operation which can be done asynchronously (with limited API downtime) due to separate migration
* command.
*/
@Slf4j
public class DbMigrationCommand<MarquezConfig> extends EnvironmentCommand<marquez.MarquezConfig> {

private static final String DB_MIGRATE = "db-migrate";
private static final String MIGRATION_V57_DESCRIPTION =
"""
A command to manually run V57 database migration.
Please refer to https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md for more details.
""";

private static final String COMMAND_DESCRIPTION =
"""
A command to manually run database migrations.
Extra parameters are required to specify the migration to run.
""";

/**
* Creates a new environment command.
*
* @param application the application providing this command
*/
public DbMigrationCommand(Application<marquez.MarquezConfig> application) {
super(application, DB_MIGRATE, COMMAND_DESCRIPTION);
}

@Override
public void configure(Subparser subparser) {
subparser
.addArgument("--chunkSize")
.dest("chunkSize")
.type(Integer.class)
.required(false)
.setDefault(V57_1__BackfillFacets.DEFAULT_CHUNK_SIZE)
.help("amount of lineage_events rows processed in a single SQL query and transaction.");

subparser
.addArgument("--version")
.dest("version")
.type(String.class)
.required(true)
.help("migration version to apply like 'v57'");

addFileArgument(subparser);
}

@Override
protected void run(
Environment environment, Namespace namespace, marquez.MarquezConfig configuration)
throws Exception {

final DataSourceFactory sourceFactory = configuration.getDataSourceFactory();
final DataSource source = sourceFactory.build(environment.metrics(), "MarquezApp-source");
final JdbiFactory factory = new JdbiFactory();

Jdbi jdbi =
factory
.build(
environment,
configuration.getDataSourceFactory(),
(ManagedDataSource) source,
"postgresql-command")
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());

MarquezMigrations.valueOf(namespace.getString("version")).run(jdbi, namespace);
}

enum MarquezMigrations {
v57 {
public void run(Jdbi jdbi, Namespace namespace) throws Exception {
log.info("Running V57_1__BackfillFacets migration");
V57_1__BackfillFacets migration = new V57_1__BackfillFacets();
migration.setManual(true);
migration.setJdbi(jdbi);
migration.setChunkSize(namespace.getInt("chunkSize"));
migration.migrate(null);
}
};

public void run(Jdbi jdbi, Namespace namespace) throws Exception {
throw new UnsupportedOperationException();
}
}
}
245 changes: 245 additions & 0 deletions api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.migrations;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import marquez.db.Columns;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;
import org.jdbi.v3.core.Jdbi;

@Slf4j
public class V57_1__BackfillFacets implements JavaMigration {

public static int DEFAULT_CHUNK_SIZE = 10000;

private static int BASIC_MIGRATION_LIMIT = 100000;

private static final String GET_CURRENT_LOCK_SQL =
"""
SELECT * FROM facet_migration_lock
ORDER BY created_at ASC, run_uuid ASC
LIMIT 1
""";

private static final String GET_FINISHING_LOCK_SQL =
"""
SELECT run_uuid, created_at FROM lineage_events
ORDER BY
COALESCE(created_at, event_time) ASC,
run_uuid ASC
LIMIT 1
""";

private static final String GET_INITIAL_LOCK_SQL =
"""
SELECT
run_uuid,
COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at
FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1
""";

private static final String COUNT_LINEAGE_EVENTS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events
""";

private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
""";

private String getBackFillFacetsSQL() {
return String.format(
"""
WITH events_chunk AS (
SELECT e.* FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC
LIMIT :chunkSize
),
insert_datasets AS (
INSERT INTO dataset_facets %s
),
insert_runs AS (
INSERT INTO run_facets %s
),
insert_jobs AS (
INSERT INTO job_facets %s
)
INSERT INTO facet_migration_lock
SELECT events_chunk.created_at, events_chunk.run_uuid
FROM events_chunk
ORDER BY
COALESCE(events_chunk.created_at, events_chunk.event_time) ASC,
events_chunk.run_uuid ASC
LIMIT 1
RETURNING created_at, run_uuid;
""",
V56_1__FacetViews.getDatasetFacetsDefinitionSQL("events_chunk"),
V56_1__FacetViews.getRunFacetsDefinitionSQL("events_chunk"),
V56_1__FacetViews.getJobFacetsDefinitionSQL("events_chunk"));
}

@Setter private Integer chunkSize = null;

@Setter private boolean manual = false;

@Setter private Jdbi jdbi;

public int getChunkSize() {
return chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE;
}

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("57.2");
}

@Override
public String getDescription() {
return "BackFillFacets";
}

@Override
public Integer getChecksum() {
return null;
}

@Override
public boolean isUndo() {
return false;
}

@Override
public boolean isBaselineMigration() {
return false;
}

@Override
public boolean canExecuteInTransaction() {
return false;
}

@Override
public void migrate(Context context) throws Exception {
if (context != null) {
jdbi = Jdbi.create(context.getConnection());
}

if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) {
// lineage_events table is empty -> no need to run migration
// anyway. we need to create lock to mark that no data requires migration
execute("INSERT INTO facet_migration_lock VALUES (NOW(), null)");

createTargetViews();
return;
}
Optional<MigrationLock> lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL);

if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) {
log.warn(
"""
==================================================
==================================================
==================================================
MARQUEZ INSTANCE TOO BIG TO RUN AUTO UPGRADE.
YOU NEED TO RUN v55_migrate COMMAND MANUALLY.
FOR MORE DETAILS, PLEASE REFER TO:
https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md
==================================================
==================================================
==================================================
""");
// We end migration successfully although no data has been migrated to facet tables
return;
}

log.info("Configured chunkSize is {}", getChunkSize());
MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get());
while (!lock.equals(lastExpectedLock.get())) {
lock = backFillChunk(lock);
log.info(
"Migrating chunk finished. Still having {} records to migrate.",
countLineageEventsToProcess(lock));
}

createTargetViews();
log.info("All records migrated");
}

private void createTargetViews() {
// replace facet views with tables
execute("DROP VIEW IF EXISTS run_facets_view");
execute("DROP VIEW IF EXISTS job_facets_view");
execute("DROP VIEW IF EXISTS dataset_facets_view");
execute("CREATE OR REPLACE VIEW run_facets_view AS SELECT * FROM run_facets");
execute("CREATE OR REPLACE VIEW job_facets_view AS SELECT * FROM job_facets");
execute("CREATE OR REPLACE VIEW dataset_facets_view AS SELECT * FROM dataset_facets");
}

private void execute(String sql) {
jdbi.inTransaction(handle -> handle.execute(sql));
}

private MigrationLock backFillChunk(MigrationLock lock) {
String backFillQuery = getBackFillFacetsSQL();
return jdbi.withHandle(
h ->
h.createQuery(backFillQuery)
.bind("chunkSize", getChunkSize())
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.one());
}

private Optional<MigrationLock> getLock(String sql) {
return jdbi.withHandle(
h ->
h.createQuery(sql)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.findFirst());
}

private int countLineageEvents() {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_SQL)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
}

private int countLineageEventsToProcess(MigrationLock lock) {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL)
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
}

private record MigrationLock(UUID run_uuid, Instant created_at) {}
}
Loading