2626import java .io .UncheckedIOException ;
2727import java .net .MalformedURLException ;
2828import java .net .URL ;
29+ import java .nio .ByteBuffer ;
30+ import java .nio .charset .StandardCharsets ;
2931import java .time .Instant ;
3032import java .time .ZonedDateTime ;
3133import java .util .List ;
3739import java .util .stream .Collectors ;
3840import java .util .stream .Stream ;
3941import javax .annotation .Nullable ;
42+ import javax .validation .constraints .NotNull ;
4043import lombok .Builder ;
4144import lombok .Getter ;
4245import lombok .NonNull ;
5053import marquez .service .models .DatasetMeta ;
5154import marquez .service .models .DbTableMeta ;
5255import marquez .service .models .LineageEvent ;
56+ import marquez .service .models .LineageEvent .ParentRunFacet ;
5357import marquez .service .models .StreamMeta ;
5458import org .apache .commons .lang3 .tuple .Triple ;
5559
@@ -61,6 +65,16 @@ private Utils() {}
6165 public static final String VERSION_DELIM = ":" ;
6266 public static final Joiner VERSION_JOINER = Joiner .on (VERSION_DELIM ).skipNulls ();
6367
68+ /**
69+ * pre-defined NAMESPACE_URL defined in RFC4122. This is the namespace used by the OpenLineage
70+ * Airflow integration for constructing some run IDs as UUIDs. We use the same namespace to
71+ * construct the same UUIDs when absolutely necessary (e.g., backfills, backward compatibility)
72+ *
73+ * @see https://datatracker.ietf.org/doc/html/rfc4122#appendix-C
74+ */
75+ public static final UUID NAMESPACE_URL_UUID =
76+ UUID .fromString ("6ba7b811-9dad-11d1-80b4-00c04fd430c8" );
77+
6478 private static final ObjectMapper MAPPER = newObjectMapper ();
6579
6680 private static final int UUID_LENGTH = 36 ;
@@ -141,6 +155,70 @@ public static UUID toUuid(@NonNull final String uuidString) {
141155 return UUID .fromString (uuidString );
142156 }
143157
158+ /**
159+ * Construct a name-based {@link UUID} based on the {@link #NAMESPACE_URL_UUID} namespace. Name
160+ * parts are separated by a dot (.) character.
161+ *
162+ * @see https://datatracker.ietf.org/doc/html/rfc4122#page-13
163+ * @param nameParts
164+ * @return
165+ */
166+ public static UUID toNameBasedUuid (String ... nameParts ) {
167+ String constructedName = String .join ("." , nameParts );
168+
169+ final byte [] nameBytes = constructedName .getBytes (StandardCharsets .UTF_8 );
170+
171+ ByteBuffer buffer = ByteBuffer .allocate (nameBytes .length + 16 );
172+ buffer .putLong (NAMESPACE_URL_UUID .getMostSignificantBits ());
173+ buffer .putLong (NAMESPACE_URL_UUID .getLeastSignificantBits ());
174+ buffer .put (nameBytes );
175+
176+ return UUID .nameUUIDFromBytes (buffer .array ());
177+ }
178+
179+ /**
180+ * Construct a UUID from a {@link ParentRunFacet} - if the {@link
181+ * marquez.service.models.LineageEvent.RunLink#runId} field is a valid {@link UUID}, use it.
182+ * Otherwise, compute a {@link UUID} from the job name and the reported runId. If the job name
183+ * contains a dot (.), only return the portion up to the last dot in the name (this attempts to
184+ * address airflow tasks, which always report the job name as <dag_name>.<task_name>
185+ *
186+ * @param parent
187+ * @return
188+ */
189+ public static UUID findParentRunUuid (ParentRunFacet parent ) {
190+ String jobName = parent .getJob ().getName ();
191+ String parentRunId = parent .getRun ().getRunId ();
192+ return findParentRunUuid (jobName , parentRunId );
193+ }
194+
195+ public static UUID findParentRunUuid (String parentJobName , String parentRunId ) {
196+ String dagName = parseParentJobName (parentJobName );
197+ return toUuid (parentRunId , dagName );
198+ }
199+
200+ public static String parseParentJobName (String parentJobName ) {
201+ return parentJobName .contains ("." )
202+ ? parentJobName .substring (0 , parentJobName .lastIndexOf ('.' ))
203+ : parentJobName ;
204+ }
205+
206+ /**
207+ * Compute a UUID from a RunId and a jobName
208+ *
209+ * @see Utils#toNameBasedUuid(String...) for details on the UUID construction.
210+ * @param runId
211+ * @param jobName
212+ * @return
213+ */
214+ public static UUID toUuid (@ NotNull String runId , String jobName ) {
215+ try {
216+ return UUID .fromString (runId );
217+ } catch (IllegalArgumentException e ) {
218+ return Utils .toNameBasedUuid (jobName , runId );
219+ }
220+ }
221+
144222 public static Instant toInstant (@ Nullable final String asIso ) {
145223 return (asIso == null ) ? null : Instant .from (ISO_INSTANT .parse (asIso ));
146224 }
0 commit comments