Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
Expand All @@ -37,6 +39,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -50,6 +53,7 @@
import marquez.service.models.DatasetMeta;
import marquez.service.models.DbTableMeta;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.StreamMeta;
import org.apache.commons.lang3.tuple.Triple;

Expand All @@ -61,6 +65,16 @@ private Utils() {}
public static final String VERSION_DELIM = ":";
public static final Joiner VERSION_JOINER = Joiner.on(VERSION_DELIM).skipNulls();

/**
* pre-defined NAMESPACE_URL defined in RFC4122. This is the namespace used by the OpenLineage
* Airflow integration for constructing some run IDs as UUIDs. We use the same namespace to
* construct the same UUIDs when absolutely necessary (e.g., backfills, backward compatibility)
*
* @see "https://datatracker.ietf.org/doc/html/rfc4122#appendix-C"
*/
public static final UUID NAMESPACE_URL_UUID =
UUID.fromString("6ba7b811-9dad-11d1-80b4-00c04fd430c8");

private static final ObjectMapper MAPPER = newObjectMapper();

private static final int UUID_LENGTH = 36;
Expand Down Expand Up @@ -141,6 +155,70 @@ public static UUID toUuid(@NonNull final String uuidString) {
return UUID.fromString(uuidString);
}

/**
* Construct a name-based {@link UUID} based on the {@link #NAMESPACE_URL_UUID} namespace. Name
* parts are separated by a dot (.) character.
*
* @see "https://datatracker.ietf.org/doc/html/rfc4122#page-13"
* @param nameParts
* @return
*/
public static UUID toNameBasedUuid(String... nameParts) {
String constructedName = String.join(".", nameParts);

final byte[] nameBytes = constructedName.getBytes(StandardCharsets.UTF_8);

ByteBuffer buffer = ByteBuffer.allocate(nameBytes.length + 16);
buffer.putLong(NAMESPACE_URL_UUID.getMostSignificantBits());
buffer.putLong(NAMESPACE_URL_UUID.getLeastSignificantBits());
buffer.put(nameBytes);

return UUID.nameUUIDFromBytes(buffer.array());
}

/**
* Construct a UUID from a {@link ParentRunFacet} - if the {@link
* marquez.service.models.LineageEvent.RunLink#runId} field is a valid {@link UUID}, use it.
* Otherwise, compute a {@link UUID} from the job name and the reported runId. If the job name
* contains a dot (.), only return the portion up to the last dot in the name (this attempts to
* address airflow tasks, which always report the job name as <dag_name>.<task_name<
*
* @param parent
* @return
*/
public static UUID findParentRunUuid(ParentRunFacet parent) {
String jobName = parent.getJob().getName();
String parentRunId = parent.getRun().getRunId();
return findParentRunUuid(jobName, parentRunId);
}

public static UUID findParentRunUuid(String parentJobName, String parentRunId) {
String dagName = parseParentJobName(parentJobName);
return toUuid(parentRunId, dagName);
}

public static String parseParentJobName(String parentJobName) {
return parentJobName.contains(".")
? parentJobName.substring(0, parentJobName.lastIndexOf('.'))
: parentJobName;
}

/**
* Compute a UUID from a RunId and a jobName
*
* @see Utils#toNameBasedUuid(String...) for details on the UUID construction.
* @param runId
* @param jobName
* @return
*/
public static UUID toUuid(@NotNull String runId, String jobName) {
try {
return UUID.fromString(runId);
} catch (IllegalArgumentException e) {
return Utils.toNameBasedUuid(jobName, runId);
}
}

public static Instant toInstant(@Nullable final String asIso) {
return (asIso == null) ? null : Instant.from(ISO_INSTANT.parse(asIso));
}
Expand Down
4 changes: 3 additions & 1 deletion api/src/main/java/marquez/db/FlywayFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public final class FlywayFactory {
private static final boolean DEFAULT_CLEAN_DISABLED = false;
private static final boolean DEFAULT_OUT_OF_ORDER = false;
private static final String DEFAULT_LOCATION = "marquez/db/migration";
private static final List<String> DEFAULT_LOCATIONS = ImmutableList.of(DEFAULT_LOCATION);
private static final String DEFAULT_MIGRATION_CLASSPATH = "classpath:marquez/db/migrations";
private static final List<String> DEFAULT_LOCATIONS =
ImmutableList.of(DEFAULT_LOCATION, DEFAULT_MIGRATION_CLASSPATH);
private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
private static final String DEFAULT_TABLE = "flyway_schema_history";
private static final boolean DEFAULT_PLACEHOLDER_REPLACEMENT = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package marquez.db.migrations;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;

/**
* This migration is dependent on the migration found in the SQL script for V43. This updates the
* runs table to include the <code>job_uuid</code> value for each record. We update the table in
* batches to avoid table-level locks so that concurrent reads and writes can continue to take
* place. Auto-commit is enabled, so it is entirely possible that this migration will fail partway
* through and some records will retain the <code>job_uuid</code> value while others will not. This
* is intentional as no harm will come from leaving these values in place in case of rollback.
*/
@Slf4j
public class V43_1__UpdateRunsWithJobUUID implements JavaMigration {

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("43.1");
}

// don't execute in a transaction so each batch can be committed immediately
@Override
public boolean canExecuteInTransaction() {
return false;
}

@Override
public void migrate(Context context) throws Exception {
Connection conn = context.getConnection();
try (PreparedStatement queryPs =
conn.prepareStatement("SELECT uuid, name, namespace_name FROM jobs");
PreparedStatement updatePs =
conn.prepareStatement(
"UPDATE runs SET job_uuid=? WHERE job_name=? AND namespace_name=?")) {

ResultSet resultSet = queryPs.executeQuery();
boolean isAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(true);
try {
while (resultSet.next()) {
String uuid = resultSet.getString("uuid");
String jobName = resultSet.getString("name");
String namespace = resultSet.getString("namespace_name");
updatePs.setObject(1, UUID.fromString(uuid));
updatePs.setString(2, jobName);
updatePs.setString(3, namespace);
if (!updatePs.execute()) {
log.error("Unable to execute update of runs for {}.{}", jobName, namespace);
}
}
} finally {
conn.setAutoCommit(isAutoCommit);
}
}
}

@Override
public String getDescription() {
return "UpdateRunsWithJobUUID";
}

@Override
public Integer getChecksum() {
return null;
}

@Override
public boolean isUndo() {
return false;
}

@Override
public boolean isBaselineMigration() {
return false;
}
}
Loading