Skip to content

Commit 14e588a

Browse files
authored
Add java migrations for backfilling runs with job uuids and parents (#1980)
* Add migrations to support job parent relationship storage Signed-off-by: Michael Collado <collado.mike@gmail.com> * Update all job and run queries to reference jobs_view and runs_view Signed-off-by: Michael Collado <collado.mike@gmail.com> * Remove references to simple_name as job redirects handle redirecting simple name to fqn added unit test to verify Signed-off-by: Michael Collado <collado.mike@gmail.com> * Fix runs migration script Signed-off-by: Michael Collado <collado.mike@gmail.com> * Add java migrations for backfilling runs with job uuids and backfill Airflow runs Signed-off-by: Michael Collado <collado.mike@gmail.com> Signed-off-by: Michael Collado <collado.mike@gmail.com>
1 parent 9cbd9bf commit 14e588a

10 files changed

Lines changed: 875 additions & 3 deletions

api/src/main/java/marquez/common/Utils.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.io.UncheckedIOException;
2727
import java.net.MalformedURLException;
2828
import java.net.URL;
29+
import java.nio.ByteBuffer;
30+
import java.nio.charset.StandardCharsets;
2931
import java.time.Instant;
3032
import java.time.ZonedDateTime;
3133
import java.util.List;
@@ -37,6 +39,7 @@
3739
import java.util.stream.Collectors;
3840
import java.util.stream.Stream;
3941
import javax.annotation.Nullable;
42+
import javax.validation.constraints.NotNull;
4043
import lombok.Builder;
4144
import lombok.Getter;
4245
import lombok.NonNull;
@@ -50,6 +53,7 @@
5053
import marquez.service.models.DatasetMeta;
5154
import marquez.service.models.DbTableMeta;
5255
import marquez.service.models.LineageEvent;
56+
import marquez.service.models.LineageEvent.ParentRunFacet;
5357
import marquez.service.models.StreamMeta;
5458
import org.apache.commons.lang3.tuple.Triple;
5559

@@ -61,6 +65,16 @@ private Utils() {}
6165
public static final String VERSION_DELIM = ":";
6266
public static final Joiner VERSION_JOINER = Joiner.on(VERSION_DELIM).skipNulls();
6367

68+
/**
69+
* pre-defined NAMESPACE_URL defined in RFC4122. This is the namespace used by the OpenLineage
70+
* Airflow integration for constructing some run IDs as UUIDs. We use the same namespace to
71+
* construct the same UUIDs when absolutely necessary (e.g., backfills, backward compatibility)
72+
*
73+
* @see "https://datatracker.ietf.org/doc/html/rfc4122#appendix-C"
74+
*/
75+
public static final UUID NAMESPACE_URL_UUID =
76+
UUID.fromString("6ba7b811-9dad-11d1-80b4-00c04fd430c8");
77+
6478
private static final ObjectMapper MAPPER = newObjectMapper();
6579

6680
private static final int UUID_LENGTH = 36;
@@ -141,6 +155,70 @@ public static UUID toUuid(@NonNull final String uuidString) {
141155
return UUID.fromString(uuidString);
142156
}
143157

158+
/**
159+
* Construct a name-based {@link UUID} based on the {@link #NAMESPACE_URL_UUID} namespace. Name
160+
* parts are separated by a dot (.) character.
161+
*
162+
* @see "https://datatracker.ietf.org/doc/html/rfc4122#page-13"
163+
* @param nameParts
164+
* @return
165+
*/
166+
public static UUID toNameBasedUuid(String... nameParts) {
167+
String constructedName = String.join(".", nameParts);
168+
169+
final byte[] nameBytes = constructedName.getBytes(StandardCharsets.UTF_8);
170+
171+
ByteBuffer buffer = ByteBuffer.allocate(nameBytes.length + 16);
172+
buffer.putLong(NAMESPACE_URL_UUID.getMostSignificantBits());
173+
buffer.putLong(NAMESPACE_URL_UUID.getLeastSignificantBits());
174+
buffer.put(nameBytes);
175+
176+
return UUID.nameUUIDFromBytes(buffer.array());
177+
}
178+
179+
/**
180+
* Construct a UUID from a {@link ParentRunFacet} - if the {@link
181+
* marquez.service.models.LineageEvent.RunLink#runId} field is a valid {@link UUID}, use it.
182+
* Otherwise, compute a {@link UUID} from the job name and the reported runId. If the job name
183+
* contains a dot (.), only return the portion up to the last dot in the name (this attempts to
184+
* address airflow tasks, which always report the job name as &lt;dag_name&gt;.&lt;task_name&lt;
185+
*
186+
* @param parent
187+
* @return
188+
*/
189+
public static UUID findParentRunUuid(ParentRunFacet parent) {
190+
String jobName = parent.getJob().getName();
191+
String parentRunId = parent.getRun().getRunId();
192+
return findParentRunUuid(jobName, parentRunId);
193+
}
194+
195+
public static UUID findParentRunUuid(String parentJobName, String parentRunId) {
196+
String dagName = parseParentJobName(parentJobName);
197+
return toUuid(parentRunId, dagName);
198+
}
199+
200+
public static String parseParentJobName(String parentJobName) {
201+
return parentJobName.contains(".")
202+
? parentJobName.substring(0, parentJobName.lastIndexOf('.'))
203+
: parentJobName;
204+
}
205+
206+
/**
207+
* Compute a UUID from a RunId and a jobName
208+
*
209+
* @see Utils#toNameBasedUuid(String...) for details on the UUID construction.
210+
* @param runId
211+
* @param jobName
212+
* @return
213+
*/
214+
public static UUID toUuid(@NotNull String runId, String jobName) {
215+
try {
216+
return UUID.fromString(runId);
217+
} catch (IllegalArgumentException e) {
218+
return Utils.toNameBasedUuid(jobName, runId);
219+
}
220+
}
221+
144222
public static Instant toInstant(@Nullable final String asIso) {
145223
return (asIso == null) ? null : Instant.from(ISO_INSTANT.parse(asIso));
146224
}

api/src/main/java/marquez/db/FlywayFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ public final class FlywayFactory {
3131
private static final boolean DEFAULT_CLEAN_DISABLED = false;
3232
private static final boolean DEFAULT_OUT_OF_ORDER = false;
3333
private static final String DEFAULT_LOCATION = "marquez/db/migration";
34-
private static final List<String> DEFAULT_LOCATIONS = ImmutableList.of(DEFAULT_LOCATION);
34+
private static final String DEFAULT_MIGRATION_CLASSPATH = "classpath:marquez/db/migrations";
35+
private static final List<String> DEFAULT_LOCATIONS =
36+
ImmutableList.of(DEFAULT_LOCATION, DEFAULT_MIGRATION_CLASSPATH);
3537
private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
3638
private static final String DEFAULT_TABLE = "flyway_schema_history";
3739
private static final boolean DEFAULT_PLACEHOLDER_REPLACEMENT = false;
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package marquez.db.migrations;
2+
3+
import java.sql.Connection;
4+
import java.sql.PreparedStatement;
5+
import java.sql.ResultSet;
6+
import java.util.UUID;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.flywaydb.core.api.MigrationVersion;
9+
import org.flywaydb.core.api.migration.Context;
10+
import org.flywaydb.core.api.migration.JavaMigration;
11+
12+
/**
13+
* This migration is dependent on the migration found in the SQL script for V43. This updates the
14+
* runs table to include the <code>job_uuid</code> value for each record. We update the table in
15+
* batches to avoid table-level locks so that concurrent reads and writes can continue to take
16+
* place. Auto-commit is enabled, so it is entirely possible that this migration will fail partway
17+
* through and some records will retain the <code>job_uuid</code> value while others will not. This
18+
* is intentional as no harm will come from leaving these values in place in case of rollback.
19+
*/
20+
@Slf4j
21+
public class V43_1__UpdateRunsWithJobUUID implements JavaMigration {
22+
23+
@Override
24+
public MigrationVersion getVersion() {
25+
return MigrationVersion.fromVersion("43.1");
26+
}
27+
28+
// don't execute in a transaction so each batch can be committed immediately
29+
@Override
30+
public boolean canExecuteInTransaction() {
31+
return false;
32+
}
33+
34+
@Override
35+
public void migrate(Context context) throws Exception {
36+
Connection conn = context.getConnection();
37+
try (PreparedStatement queryPs =
38+
conn.prepareStatement("SELECT uuid, name, namespace_name FROM jobs");
39+
PreparedStatement updatePs =
40+
conn.prepareStatement(
41+
"UPDATE runs SET job_uuid=? WHERE job_name=? AND namespace_name=?")) {
42+
43+
ResultSet resultSet = queryPs.executeQuery();
44+
boolean isAutoCommit = conn.getAutoCommit();
45+
conn.setAutoCommit(true);
46+
try {
47+
while (resultSet.next()) {
48+
String uuid = resultSet.getString("uuid");
49+
String jobName = resultSet.getString("name");
50+
String namespace = resultSet.getString("namespace_name");
51+
updatePs.setObject(1, UUID.fromString(uuid));
52+
updatePs.setString(2, jobName);
53+
updatePs.setString(3, namespace);
54+
if (!updatePs.execute()) {
55+
log.error("Unable to execute update of runs for {}.{}", jobName, namespace);
56+
}
57+
}
58+
} finally {
59+
conn.setAutoCommit(isAutoCommit);
60+
}
61+
}
62+
}
63+
64+
@Override
65+
public String getDescription() {
66+
return "UpdateRunsWithJobUUID";
67+
}
68+
69+
@Override
70+
public Integer getChecksum() {
71+
return null;
72+
}
73+
74+
@Override
75+
public boolean isUndo() {
76+
return false;
77+
}
78+
79+
@Override
80+
public boolean isBaselineMigration() {
81+
return false;
82+
}
83+
}

0 commit comments

Comments
 (0)