Skip to content
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
c61b581
Add db migration to add cascade deletion on `fk`s
wslulciuc May 4, 2023
4612850
Add `DbDataRetention` and `dataRetentionInDays` config
wslulciuc May 5, 2023
a9372bf
Add `DbRetentionJob`
wslulciuc May 9, 2023
437a23c
Add `DbRetentionCommand`
wslulciuc May 11, 2023
77db993
Add `frequencyMins` config for runs and rename `dbRetentionInDays`
wslulciuc May 11, 2023
6f3014b
Add docs to `DbRetentionJob` and minor renaming
wslulciuc May 12, 2023
7943a52
Wrap `DbRetention.retentionOnDbOrError()` in `try/catch`
wslulciuc May 12, 2023
8dad10a
Add docs to DbRetention
wslulciuc May 12, 2023
7be8ea1
continued: Add docs to `DbRetention`
wslulciuc May 12, 2023
bdf8e39
Add handling of `errorOnDbRetention`
wslulciuc May 12, 2023
944c948
Add docs to `DbException` and `DbRetentionException`
wslulciuc May 12, 2023
9fde898
`info` -> `debug` when inserting column lineage
wslulciuc May 12, 2023
d8e1194
Remove `dbRetention.enabled`
wslulciuc May 15, 2023
f6e4c46
Update handling of `StatementException`
wslulciuc May 15, 2023
63a85e0
Minor changes
wslulciuc May 15, 2023
cec3982
Add `docs/faq.md`
wslulciuc May 16, 2023
f52dea8
continued: `Add docs/faq.md`
wslulciuc May 16, 2023
71aa297
continued: Add `docs/faq.md`
wslulciuc May 16, 2023
8e51759
continued: Add `docs/faq.md`
wslulciuc May 16, 2023
01dc8b1
Define `DEFAULT_RETENTION_DAYS` constant in `DbRetention`
wslulciuc May 16, 2023
4933749
Make chunk size in retention query configurable
wslulciuc May 16, 2023
cd07192
Remove `DATA_RETENTION_IN_DAYS` from `MarquezConfig`
wslulciuc May 16, 2023
0f6f58d
Update docs for chunk size config
wslulciuc May 16, 2023
5ad928a
Remove error log from `DbRetention.retentionOnDbOrError()`
wslulciuc May 16, 2023
3aa24e9
Use `LOOP` for retention
wslulciuc May 31, 2023
5136789
continued: Use `LOOP` for retention
wslulciuc May 31, 2023
e074606
Use `numberOfRowsPerBatch`
wslulciuc May 31, 2023
f217710
Use `--number-of-rows-per-batch`
wslulciuc May 31, 2023
a290e05
Add pause to prevent lock timeouts
wslulciuc Jun 2, 2023
1785b27
Add `FOR UPDATE SKIP LOCKED`
wslulciuc Jun 2, 2023
127e1bd
Add `sql()`
wslulciuc Jun 6, 2023
bd53f80
Merge branch 'main' into feature/db-retention
Jun 30, 2023
fd7aa35
Add `--dry-run`
wslulciuc Jun 30, 2023
a596291
Add `jdbi3-testcontainers`
wslulciuc Jun 30, 2023
1bf38ef
Merge branch 'feature/db-retention' of github.com:MarquezProject/marq…
wslulciuc Jun 30, 2023
85ff9bf
Remove shortened flag args
wslulciuc Jul 4, 2023
4c5c407
Use `marquez.db.DbRetention.DEFAULT_DRY_RUN`
wslulciuc Jul 4, 2023
84be539
Add DbRetention.retentionOnRuns()
wslulciuc Jul 4, 2023
0289277
Add `DbMigration.migrateDbOrError(DataSource)`
wslulciuc Jul 4, 2023
5a7cc75
Add `TestingDb`
wslulciuc Jul 5, 2023
4882570
Add `DbTest`
wslulciuc Jul 5, 2023
92e3e63
Add `testRetentionOnDbOrError_withDatasetsOlderThanXDays()`
wslulciuc Jul 5, 2023
4237853
Remove `jobs.DbRetentionConfig.dryRun`
wslulciuc Jul 5, 2023
8a83639
Add `--dry-run` option to `faq.md`
wslulciuc Jul 5, 2023
fe55341
continued: Add --dry-run option to faq.md
wslulciuc Jul 5, 2023
2aee600
continued: `Add testRetentionOnDbOrError_withDatasetsOlderThanXDays`
wslulciuc Jul 6, 2023
e58a8fa
Fix retention query for datasets and dataset versions
wslulciuc Jul 16, 2023
53e3783
Add test for retention on dataset versions
wslulciuc Jul 16, 2023
b4c9c2d
Add comments to tests
wslulciuc Jul 16, 2023
d9c7fc0
Add `testRetentionOnDbOrErrorWithDatasetVersionsOlderThanXDays_skipIf…
wslulciuc Jul 16, 2023
96735f8
Add `testRetentionOnDbOrErrorWithJobsOlderThanXDays()`
wslulciuc Jul 17, 2023
3ad251e
Add `testRetentionOnDbOrErrorWithJobVersionsOlderThanXDays()`
wslulciuc Jul 17, 2023
f554e7e
Add tests for dry run
wslulciuc Jul 17, 2023
eabdee9
Add testRetentionOnDbOrErrorWithRunsOlderThanXDays()
wslulciuc Jul 17, 2023
476dd2a
Add `testRetentionOnDbOrErrorWithOlEventsOlderThanXDays()`
wslulciuc Jul 17, 2023
c2eda14
continued: `Add testRetentionOnDbOrErrorWithOlEventsOlderThanXDays()`
wslulciuc Jul 18, 2023
9dd1db3
Add `javadocs` to `DbRetention`
wslulciuc Jul 18, 2023
d8b31aa
Run tests in order of retention
wslulciuc Jul 18, 2023
33d390a
Merge branch 'main' into feature/db-retention
wslulciuc Jul 18, 2023
fd42414
Use `V63` for cascade delete migration
wslulciuc Jul 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {

testImplementation "io.dropwizard:dropwizard-testing:${dropwizardVersion}"
testImplementation "org.jdbi:jdbi3-testing:${jdbi3Version}"
testImplementation "org.jdbi:jdbi3-testcontainers:${jdbi3Version}"
testImplementation "org.junit.vintage:junit-vintage-engine:${junit5Version}"
testImplementation "org.testcontainers:postgresql:${testcontainersVersion}"
testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}"
Expand Down
63 changes: 39 additions & 24 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import io.sentry.Sentry;
import java.util.EnumSet;
import javax.servlet.DispatcherType;
import javax.sql.DataSource;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.api.filter.JobRedirectFilter;
import marquez.cli.DbMigrationCommand;
import marquez.cli.DbRetentionCommand;
import marquez.cli.MetadataCommand;
import marquez.cli.SeedCommand;
import marquez.common.Utils;
import marquez.db.DbMigration;
import marquez.jobs.DbRetentionJob;
import marquez.logging.LoggingMdcFilter;
import marquez.tracing.SentryConfig;
import marquez.tracing.TracingContainerResponseFilter;
Expand Down Expand Up @@ -79,6 +80,7 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED)));

// Add CLI commands
bootstrap.addCommand(new DbRetentionCommand());
bootstrap.addCommand(new MetadataCommand());
bootstrap.addCommand(new SeedCommand());

Expand All @@ -97,7 +99,7 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
@Override
public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
final DataSourceFactory sourceFactory = config.getDataSourceFactory();
final DataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME);
final ManagedDataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME);

log.info("Running startup actions...");

Expand All @@ -124,17 +126,51 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
env.jersey().register(new TracingContainerResponseFilter());
}

MarquezContext marquezContext = buildMarquezContext(config, env, (ManagedDataSource) source);
final Jdbi jdbi = newJdbi(config, env, source);
final MarquezContext marquezContext =
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();

registerResources(config, env, marquezContext);
registerServlets(env);
registerFilters(env, marquezContext);

// Add scheduled jobs to lifecycle.
if (config.hasDbRetentionPolicy()) {
// Add job to apply retention policy to database.
env.lifecycle()
.manage(
new DbRetentionJob(
jdbi,
config.getDbRetention().getFrequencyMins(),
config.getDbRetention().getNumberOfRowsPerBatch(),
config.getDbRetention().getRetentionDays()));
}
}

private boolean isSentryEnabled(MarquezConfig config) {
return config.getSentry() != null
&& !config.getSentry().getDsn().equals(SentryConfig.DEFAULT_DSN);
}

/** Returns a new {@link Jdbi} object. */
private Jdbi newJdbi(
@NonNull MarquezConfig config, @NonNull Environment env, @NonNull ManagedDataSource source) {
final JdbiFactory factory = new JdbiFactory();
final Jdbi jdbi =
factory
.build(env, config.getDataSourceFactory(), source, DB_POSTGRES)
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
if (isSentryEnabled(config)) {
sqlLogger = new TracingSQLLogger(sqlLogger);
}
jdbi.setSqlLogger(sqlLogger);
jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper());
return jdbi;
}

public void registerResources(
@NonNull MarquezConfig config, @NonNull Environment env, MarquezContext context) {

Expand All @@ -156,27 +192,6 @@ protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
super.addDefaultCommands(bootstrap);
}

private MarquezContext buildMarquezContext(
MarquezConfig config, Environment env, ManagedDataSource source) {
final JdbiFactory factory = new JdbiFactory();
final Jdbi jdbi =
factory
.build(env, config.getDataSourceFactory(), source, DB_POSTGRES)
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
if (isSentryEnabled(config)) {
sqlLogger = new TracingSQLLogger(sqlLogger);
}
jdbi.setSqlLogger(sqlLogger);
jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper());

final MarquezContext context =
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();
return context;
}

private void registerServlets(@NonNull Environment env) {
log.debug("Registering servlets...");

Expand Down
12 changes: 12 additions & 0 deletions api/src/main/java/marquez/MarquezConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import io.dropwizard.db.DataSourceFactory;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import marquez.db.FlywayFactory;
import marquez.graphql.GraphqlConfig;
import marquez.jobs.DbRetentionConfig;
import marquez.service.models.Tag;
import marquez.tracing.SentryConfig;

Expand Down Expand Up @@ -40,4 +42,14 @@ public class MarquezConfig extends Configuration {
@Getter
@JsonProperty("sentry")
private final SentryConfig sentry = new SentryConfig();

@Getter
@Setter
@JsonProperty("dbRetention")
private DbRetentionConfig dbRetention; // OPTIONAL

/** Returns {@code true} if a data retention policy has been configured. */
public boolean hasDbRetentionPolicy() {
return (dbRetention != null);
}
}
114 changes: 114 additions & 0 deletions api/src/main/java/marquez/cli/DbRetentionCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.cli;

import static marquez.db.DbRetention.DEFAULT_DRY_RUN;
import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS;

import io.dropwizard.cli.ConfiguredCommand;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.db.ManagedDataSource;
import io.dropwizard.setup.Bootstrap;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.MarquezConfig;
import marquez.db.DbRetention;
import marquez.db.exceptions.DbRetentionException;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Namespace;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.postgres.PostgresPlugin;

/**
* A command to apply a one-off ad-hoc retention policy directly to source, dataset, and job
* metadata collected by Marquez.
*
* <h2>Usage</h2>
*
* For example, to override the {@code retention-days}:
*
* <pre>{@code
* java -jar marquez-api.jar db-retention --retention-days 30 marquez.yml
* }</pre>
*/
@Slf4j
public class DbRetentionCommand extends ConfiguredCommand<MarquezConfig> {
private static final String DB_SOURCE_NAME = "ad-hoc-db-retention-source";

/* Args for 'db-retention' command. */
private static final String CMD_ARG_NUMBER_OF_ROWS_PER_BATCH = "numberOfRowsPerBatch";
private static final String CMD_ARG_RETENTION_DAYS = "retentionDays";
private static final String CMD_ARG_DRY_RUN = "dryRun";

/* Define 'db-retention' command. */
public DbRetentionCommand() {
super("db-retention", "apply one-off ad-hoc retention policy directly to database");
}

@Override
public void configure(@NonNull net.sourceforge.argparse4j.inf.Subparser subparser) {
super.configure(subparser);
// Arg '--number-of-rows-per-batch'
subparser
.addArgument("--number-of-rows-per-batch")
.dest(CMD_ARG_NUMBER_OF_ROWS_PER_BATCH)
.type(Integer.class)
.required(false)
.setDefault(DEFAULT_NUMBER_OF_ROWS_PER_BATCH)
.help("the number of rows deleted per batch");
// Arg '--retention-days'
subparser
.addArgument("--retention-days")
.dest(CMD_ARG_RETENTION_DAYS)
.type(Integer.class)
.required(false)
.setDefault(DEFAULT_RETENTION_DAYS)
.help("the number of days to retain metadata");
Comment thread
wslulciuc marked this conversation as resolved.
// Arg '--dry-run'
subparser
.addArgument("--dry-run")
.dest(CMD_ARG_DRY_RUN)
.type(Boolean.class)
.required(false)
.setDefault(DEFAULT_DRY_RUN)
.action(Arguments.storeTrue())
.help(
"only output an estimate of metadata deleted by the retention policy, "
+ "without applying the policy on database");
}

@Override
protected void run(
@NonNull Bootstrap<MarquezConfig> bootstrap,
@NonNull Namespace namespace,
@NonNull MarquezConfig config)
throws Exception {
final int numberOfRowsPerBatch = namespace.getInt(CMD_ARG_NUMBER_OF_ROWS_PER_BATCH);
final int retentionDays = namespace.getInt(CMD_ARG_RETENTION_DAYS);
final boolean dryRun = namespace.getBoolean(CMD_ARG_DRY_RUN);

// Configure connection.
final DataSourceFactory sourceFactory = config.getDataSourceFactory();
final ManagedDataSource source =
sourceFactory.build(bootstrap.getMetricRegistry(), DB_SOURCE_NAME);

// Open connection.
final Jdbi jdbi = Jdbi.create(source);
jdbi.installPlugin(new PostgresPlugin()); // Add postgres support.

try {
// Attempt to apply a database retention policy. An exception is thrown on failed retention
// policy attempts requiring we handle the throwable and log the error.
DbRetention.retentionOnDbOrError(jdbi, numberOfRowsPerBatch, retentionDays, dryRun);
} catch (DbRetentionException errorOnDbRetention) {
log.error(
"Failed to apply retention policy of '{}' days to database!",
retentionDays,
errorOnDbRetention);
}
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private Columns() {}

/* LINEAGE EVENT ROW COLUMNS */
public static final String EVENT = "event";
public static final String EVENT_TIME = "event_time";

public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
Expand Down
10 changes: 8 additions & 2 deletions api/src/main/java/marquez/db/DbMigration.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@
public final class DbMigration {
private DbMigration() {}

private static final boolean DEFAULT_MIGRATE_DB_ON_STARTUP = false;

public static void migrateDbOrError(@NonNull final DataSource source) {
migrateDbOrError(new FlywayFactory(), source, DEFAULT_MIGRATE_DB_ON_STARTUP);
}

public static void migrateDbOrError(
@NonNull final FlywayFactory flywayFactory,
@NonNull final DataSource source,
final boolean migrateOnStartup) {
final boolean migrateDbOnStartup) {
final Flyway flyway = flywayFactory.build(source);
// Only attempt a database migration if there are pending changes to be applied,
// or on the initialization of a new database. Otherwise, error on pending changes
// when the flag 'migrateOnStartup' is set to 'false'.
if (!hasPendingDbMigrations(flyway)) {
log.info("No pending migrations found, skipping...");
return;
} else if (!migrateOnStartup && hasDbMigrationsApplied(flyway)) {
} else if (!migrateDbOnStartup && hasDbMigrationsApplied(flyway)) {
errorOnPendingDbMigrations(flyway);
}
// Attempt to perform a database migration. An exception is thrown on failed migration attempts
Expand Down
Loading