Skip to content

Commit de6fed8

Browse files
wslulciucHarel Shein
andauthored
Add db retention support (#2486)
* Add db migration to add cascade deletion on `fk`s Signed-off-by: wslulciuc <willy@datakin.com> * Add `DbDataRetention` and `dataRetentionInDays` config Signed-off-by: wslulciuc <willy@datakin.com> * Add `DbRetentionJob` Signed-off-by: wslulciuc <willy@datakin.com> * Add `DbRetentionCommand` Signed-off-by: wslulciuc <willy@datakin.com> * Add `frequencyMins` config for runs and rename `dbRetentionInDays` Signed-off-by: wslulciuc <willy@datakin.com> * Add docs to `DbRetentionJob` and minor renaming Signed-off-by: wslulciuc <willy@datakin.com> * Wrap `DbRetention.retentionOnDbOrError()` in `try/catch` Signed-off-by: wslulciuc <willy@datakin.com> * Add docs to DbRetention Signed-off-by: wslulciuc <willy@datakin.com> * continued: Add docs to `DbRetention` Signed-off-by: wslulciuc <willy@datakin.com> * Add handling of `errorOnDbRetention` Signed-off-by: wslulciuc <willy@datakin.com> * Add docs to `DbException` and `DbRetentionException` Signed-off-by: wslulciuc <willy@datakin.com> * `info` -> `debug` when inserting column lineage Signed-off-by: wslulciuc <willy@datakin.com> * Remove `dbRetention.enabled` Signed-off-by: wslulciuc <willy@datakin.com> * Update handling of `StatementException` Signed-off-by: wslulciuc <willy@datakin.com> * Minor changes Signed-off-by: wslulciuc <willy@datakin.com> * Add `docs/faq.md` Signed-off-by: wslulciuc <willy@datakin.com> * continued: `Add docs/faq.md` Signed-off-by: wslulciuc <willy@datakin.com> * continued: Add `docs/faq.md` Signed-off-by: wslulciuc <willy@datakin.com> * continued: Add `docs/faq.md` Signed-off-by: wslulciuc <willy@datakin.com> * Define `DEFAULT_RETENTION_DAYS` constant in `DbRetention` Signed-off-by: wslulciuc <willy@datakin.com> * Make chunk size in retention query configurable Signed-off-by: wslulciuc <willy@datakin.com> * Remove `DATA_RETENTION_IN_DAYS` from `MarquezConfig` Signed-off-by: wslulciuc <willy@datakin.com> * Update docs for chunk size config Signed-off-by: wslulciuc <willy@datakin.com> * Remove error log from `DbRetention.retentionOnDbOrError()` Signed-off-by: wslulciuc <willy@datakin.com> * Use `LOOP` for retention Signed-off-by: wslulciuc <willy@datakin.com> * continued: Use `LOOP` for retention Signed-off-by: wslulciuc <willy@datakin.com> * Use `numberOfRowsPerBatch` Signed-off-by: wslulciuc <willy@datakin.com> * Use `--number-of-rows-per-batch` Signed-off-by: wslulciuc <willy@datakin.com> * Add pause to prevent lock timeouts Signed-off-by: wslulciuc <willy@datakin.com> * Add `FOR UPDATE SKIP LOCKED` Signed-off-by: wslulciuc <willy@datakin.com> * Add `sql()` Signed-off-by: wslulciuc <willy@datakin.com> * Add `--dry-run` Signed-off-by: wslulciuc <willy@datakin.com> * Add `jdbi3-testcontainers` Signed-off-by: wslulciuc <willy@datakin.com> * Remove shortened flag args Signed-off-by: wslulciuc <willy@datakin.com> * Use `marquez.db.DbRetention.DEFAULT_DRY_RUN` Signed-off-by: wslulciuc <willy@datakin.com> * Add DbRetention.retentionOnRuns() Signed-off-by: wslulciuc <willy@datakin.com> * Add `DbMigration.migrateDbOrError(DataSource)` Signed-off-by: wslulciuc <willy@datakin.com> * Add `TestingDb` Signed-off-by: wslulciuc <willy@datakin.com> * Add `DbTest` Signed-off-by: wslulciuc <willy@datakin.com> * Add `testRetentionOnDbOrError_withDatasetsOlderThanXDays()` Signed-off-by: wslulciuc <willy@datakin.com> * Remove `jobs.DbRetentionConfig.dryRun` Signed-off-by: wslulciuc <willy@datakin.com> * Add `--dry-run` option to `faq.md` Signed-off-by: wslulciuc <willy@datakin.com> * continued: Add --dry-run option to faq.md Signed-off-by: wslulciuc <willy@datakin.com> * continued: `Add testRetentionOnDbOrError_withDatasetsOlderThanXDays` Signed-off-by: wslulciuc <willy@datakin.com> * Fix retention query for datasets and dataset versions Signed-off-by: wslulciuc <willy@datakin.com> * Add test for retention on dataset versions Signed-off-by: wslulciuc <willy@datakin.com> * Add comments to tests Signed-off-by: wslulciuc <willy@datakin.com> * Add `testRetentionOnDbOrErrorWithDatasetVersionsOlderThanXDays_skipIfVersionAsInputForRun()` Signed-off-by: wslulciuc <willy@datakin.com> * Add `testRetentionOnDbOrErrorWithJobsOlderThanXDays()` Signed-off-by: wslulciuc <willy@datakin.com> * Add `testRetentionOnDbOrErrorWithJobVersionsOlderThanXDays()` Signed-off-by: wslulciuc <willy@datakin.com> * Add tests for dry run Signed-off-by: wslulciuc <willy@datakin.com> * Add testRetentionOnDbOrErrorWithRunsOlderThanXDays() Signed-off-by: wslulciuc <willy@datakin.com> * Add `testRetentionOnDbOrErrorWithOlEventsOlderThanXDays()` Signed-off-by: wslulciuc <willy@datakin.com> * continued: `Add testRetentionOnDbOrErrorWithOlEventsOlderThanXDays()` Signed-off-by: wslulciuc <willy@datakin.com> * Add `javadocs` to `DbRetention` Signed-off-by: wslulciuc <willy@datakin.com> * Run tests in order of retention Signed-off-by: wslulciuc <willy@datakin.com> --------- Signed-off-by: wslulciuc <willy@datakin.com> Co-authored-by: Harel Shein <harel.shein@astronomer.io>
1 parent 50e1963 commit de6fed8

37 files changed

Lines changed: 3173 additions & 45 deletions

api/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ dependencies {
5353

5454
testImplementation "io.dropwizard:dropwizard-testing:${dropwizardVersion}"
5555
testImplementation "org.jdbi:jdbi3-testing:${jdbi3Version}"
56+
testImplementation "org.jdbi:jdbi3-testcontainers:${jdbi3Version}"
5657
testImplementation "org.junit.vintage:junit-vintage-engine:${junit5Version}"
5758
testImplementation "org.testcontainers:postgresql:${testcontainersVersion}"
5859
testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}"

api/src/main/java/marquez/MarquezApp.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@
2323
import io.sentry.Sentry;
2424
import java.util.EnumSet;
2525
import javax.servlet.DispatcherType;
26-
import javax.sql.DataSource;
2726
import lombok.NonNull;
2827
import lombok.extern.slf4j.Slf4j;
2928
import marquez.api.filter.JobRedirectFilter;
3029
import marquez.cli.DbMigrationCommand;
30+
import marquez.cli.DbRetentionCommand;
3131
import marquez.cli.MetadataCommand;
3232
import marquez.cli.SeedCommand;
3333
import marquez.common.Utils;
3434
import marquez.db.DbMigration;
35+
import marquez.jobs.DbRetentionJob;
3536
import marquez.logging.LoggingMdcFilter;
3637
import marquez.tracing.SentryConfig;
3738
import marquez.tracing.TracingContainerResponseFilter;
@@ -79,6 +80,7 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
7980
new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED)));
8081

8182
// Add CLI commands
83+
bootstrap.addCommand(new DbRetentionCommand());
8284
bootstrap.addCommand(new MetadataCommand());
8385
bootstrap.addCommand(new SeedCommand());
8486

@@ -97,7 +99,7 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
9799
@Override
98100
public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
99101
final DataSourceFactory sourceFactory = config.getDataSourceFactory();
100-
final DataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME);
102+
final ManagedDataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME);
101103

102104
log.info("Running startup actions...");
103105

@@ -124,17 +126,51 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
124126
env.jersey().register(new TracingContainerResponseFilter());
125127
}
126128

127-
MarquezContext marquezContext = buildMarquezContext(config, env, (ManagedDataSource) source);
129+
final Jdbi jdbi = newJdbi(config, env, source);
130+
final MarquezContext marquezContext =
131+
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();
132+
128133
registerResources(config, env, marquezContext);
129134
registerServlets(env);
130135
registerFilters(env, marquezContext);
136+
137+
// Add scheduled jobs to lifecycle.
138+
if (config.hasDbRetentionPolicy()) {
139+
// Add job to apply retention policy to database.
140+
env.lifecycle()
141+
.manage(
142+
new DbRetentionJob(
143+
jdbi,
144+
config.getDbRetention().getFrequencyMins(),
145+
config.getDbRetention().getNumberOfRowsPerBatch(),
146+
config.getDbRetention().getRetentionDays()));
147+
}
131148
}
132149

133150
private boolean isSentryEnabled(MarquezConfig config) {
134151
return config.getSentry() != null
135152
&& !config.getSentry().getDsn().equals(SentryConfig.DEFAULT_DSN);
136153
}
137154

155+
/** Returns a new {@link Jdbi} object. */
156+
private Jdbi newJdbi(
157+
@NonNull MarquezConfig config, @NonNull Environment env, @NonNull ManagedDataSource source) {
158+
final JdbiFactory factory = new JdbiFactory();
159+
final Jdbi jdbi =
160+
factory
161+
.build(env, config.getDataSourceFactory(), source, DB_POSTGRES)
162+
.installPlugin(new SqlObjectPlugin())
163+
.installPlugin(new PostgresPlugin())
164+
.installPlugin(new Jackson2Plugin());
165+
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
166+
if (isSentryEnabled(config)) {
167+
sqlLogger = new TracingSQLLogger(sqlLogger);
168+
}
169+
jdbi.setSqlLogger(sqlLogger);
170+
jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper());
171+
return jdbi;
172+
}
173+
138174
public void registerResources(
139175
@NonNull MarquezConfig config, @NonNull Environment env, MarquezContext context) {
140176

@@ -156,27 +192,6 @@ protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
156192
super.addDefaultCommands(bootstrap);
157193
}
158194

159-
private MarquezContext buildMarquezContext(
160-
MarquezConfig config, Environment env, ManagedDataSource source) {
161-
final JdbiFactory factory = new JdbiFactory();
162-
final Jdbi jdbi =
163-
factory
164-
.build(env, config.getDataSourceFactory(), source, DB_POSTGRES)
165-
.installPlugin(new SqlObjectPlugin())
166-
.installPlugin(new PostgresPlugin())
167-
.installPlugin(new Jackson2Plugin());
168-
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
169-
if (isSentryEnabled(config)) {
170-
sqlLogger = new TracingSQLLogger(sqlLogger);
171-
}
172-
jdbi.setSqlLogger(sqlLogger);
173-
jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper());
174-
175-
final MarquezContext context =
176-
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();
177-
return context;
178-
}
179-
180195
private void registerServlets(@NonNull Environment env) {
181196
log.debug("Registering servlets...");
182197

api/src/main/java/marquez/MarquezConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import io.dropwizard.db.DataSourceFactory;
1212
import lombok.Getter;
1313
import lombok.NoArgsConstructor;
14+
import lombok.Setter;
1415
import marquez.db.FlywayFactory;
1516
import marquez.graphql.GraphqlConfig;
17+
import marquez.jobs.DbRetentionConfig;
1618
import marquez.service.models.Tag;
1719
import marquez.tracing.SentryConfig;
1820

@@ -40,4 +42,14 @@ public class MarquezConfig extends Configuration {
4042
@Getter
4143
@JsonProperty("sentry")
4244
private final SentryConfig sentry = new SentryConfig();
45+
46+
@Getter
47+
@Setter
48+
@JsonProperty("dbRetention")
49+
private DbRetentionConfig dbRetention; // OPTIONAL
50+
51+
/** Returns {@code true} if a data retention policy has been configured. */
52+
public boolean hasDbRetentionPolicy() {
53+
return (dbRetention != null);
54+
}
4355
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.cli;
7+
8+
import static marquez.db.DbRetention.DEFAULT_DRY_RUN;
9+
import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
10+
import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS;
11+
12+
import io.dropwizard.cli.ConfiguredCommand;
13+
import io.dropwizard.db.DataSourceFactory;
14+
import io.dropwizard.db.ManagedDataSource;
15+
import io.dropwizard.setup.Bootstrap;
16+
import lombok.NonNull;
17+
import lombok.extern.slf4j.Slf4j;
18+
import marquez.MarquezConfig;
19+
import marquez.db.DbRetention;
20+
import marquez.db.exceptions.DbRetentionException;
21+
import net.sourceforge.argparse4j.impl.Arguments;
22+
import net.sourceforge.argparse4j.inf.Namespace;
23+
import org.jdbi.v3.core.Jdbi;
24+
import org.jdbi.v3.postgres.PostgresPlugin;
25+
26+
/**
27+
* A command to apply a one-off ad-hoc retention policy directly to source, dataset, and job
28+
* metadata collected by Marquez.
29+
*
30+
* <h2>Usage</h2>
31+
*
32+
* For example, to override the {@code retention-days}:
33+
*
34+
* <pre>{@code
35+
* java -jar marquez-api.jar db-retention --retention-days 30 marquez.yml
36+
* }</pre>
37+
*/
38+
@Slf4j
39+
public class DbRetentionCommand extends ConfiguredCommand<MarquezConfig> {
40+
private static final String DB_SOURCE_NAME = "ad-hoc-db-retention-source";
41+
42+
/* Args for 'db-retention' command. */
43+
private static final String CMD_ARG_NUMBER_OF_ROWS_PER_BATCH = "numberOfRowsPerBatch";
44+
private static final String CMD_ARG_RETENTION_DAYS = "retentionDays";
45+
private static final String CMD_ARG_DRY_RUN = "dryRun";
46+
47+
/* Define 'db-retention' command. */
48+
public DbRetentionCommand() {
49+
super("db-retention", "apply one-off ad-hoc retention policy directly to database");
50+
}
51+
52+
@Override
53+
public void configure(@NonNull net.sourceforge.argparse4j.inf.Subparser subparser) {
54+
super.configure(subparser);
55+
// Arg '--number-of-rows-per-batch'
56+
subparser
57+
.addArgument("--number-of-rows-per-batch")
58+
.dest(CMD_ARG_NUMBER_OF_ROWS_PER_BATCH)
59+
.type(Integer.class)
60+
.required(false)
61+
.setDefault(DEFAULT_NUMBER_OF_ROWS_PER_BATCH)
62+
.help("the number of rows deleted per batch");
63+
// Arg '--retention-days'
64+
subparser
65+
.addArgument("--retention-days")
66+
.dest(CMD_ARG_RETENTION_DAYS)
67+
.type(Integer.class)
68+
.required(false)
69+
.setDefault(DEFAULT_RETENTION_DAYS)
70+
.help("the number of days to retain metadata");
71+
// Arg '--dry-run'
72+
subparser
73+
.addArgument("--dry-run")
74+
.dest(CMD_ARG_DRY_RUN)
75+
.type(Boolean.class)
76+
.required(false)
77+
.setDefault(DEFAULT_DRY_RUN)
78+
.action(Arguments.storeTrue())
79+
.help(
80+
"only output an estimate of metadata deleted by the retention policy, "
81+
+ "without applying the policy on database");
82+
}
83+
84+
@Override
85+
protected void run(
86+
@NonNull Bootstrap<MarquezConfig> bootstrap,
87+
@NonNull Namespace namespace,
88+
@NonNull MarquezConfig config)
89+
throws Exception {
90+
final int numberOfRowsPerBatch = namespace.getInt(CMD_ARG_NUMBER_OF_ROWS_PER_BATCH);
91+
final int retentionDays = namespace.getInt(CMD_ARG_RETENTION_DAYS);
92+
final boolean dryRun = namespace.getBoolean(CMD_ARG_DRY_RUN);
93+
94+
// Configure connection.
95+
final DataSourceFactory sourceFactory = config.getDataSourceFactory();
96+
final ManagedDataSource source =
97+
sourceFactory.build(bootstrap.getMetricRegistry(), DB_SOURCE_NAME);
98+
99+
// Open connection.
100+
final Jdbi jdbi = Jdbi.create(source);
101+
jdbi.installPlugin(new PostgresPlugin()); // Add postgres support.
102+
103+
try {
104+
// Attempt to apply a database retention policy. An exception is thrown on failed retention
105+
// policy attempts requiring we handle the throwable and log the error.
106+
DbRetention.retentionOnDbOrError(jdbi, numberOfRowsPerBatch, retentionDays, dryRun);
107+
} catch (DbRetentionException errorOnDbRetention) {
108+
log.error(
109+
"Failed to apply retention policy of '{}' days to database!",
110+
retentionDays,
111+
errorOnDbRetention);
112+
}
113+
}
114+
}

api/src/main/java/marquez/db/Columns.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ private Columns() {}
140140

141141
/* LINEAGE EVENT ROW COLUMNS */
142142
public static final String EVENT = "event";
143+
public static final String EVENT_TIME = "event_time";
143144

144145
public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException {
145146
if (results.getObject(column) == null) {

api/src/main/java/marquez/db/DbMigration.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,24 @@
1616
public final class DbMigration {
1717
private DbMigration() {}
1818

19+
private static final boolean DEFAULT_MIGRATE_DB_ON_STARTUP = false;
20+
21+
public static void migrateDbOrError(@NonNull final DataSource source) {
22+
migrateDbOrError(new FlywayFactory(), source, DEFAULT_MIGRATE_DB_ON_STARTUP);
23+
}
24+
1925
public static void migrateDbOrError(
2026
@NonNull final FlywayFactory flywayFactory,
2127
@NonNull final DataSource source,
22-
final boolean migrateOnStartup) {
28+
final boolean migrateDbOnStartup) {
2329
final Flyway flyway = flywayFactory.build(source);
2430
// Only attempt a database migration if there are pending changes to be applied,
2531
// or on the initialization of a new database. Otherwise, error on pending changes
2632
// when the flag 'migrateOnStartup' is set to 'false'.
2733
if (!hasPendingDbMigrations(flyway)) {
2834
log.info("No pending migrations found, skipping...");
2935
return;
30-
} else if (!migrateOnStartup && hasDbMigrationsApplied(flyway)) {
36+
} else if (!migrateDbOnStartup && hasDbMigrationsApplied(flyway)) {
3137
errorOnPendingDbMigrations(flyway);
3238
}
3339
// Attempt to perform a database migration. An exception is thrown on failed migration attempts

0 commit comments

Comments
 (0)