Skip to content

Commit 0aaa489

Browse files
authored
Add parentRun alias to LineageEvent RunFacet to support older OpenLin… (#2130)
* Add parentRun alias to LineageEvent RunFacet to support older OpenLineage events Signed-off-by: Michael Collado <collado.mike@gmail.com> * Add changelog update Signed-off-by: Michael Collado <collado.mike@gmail.com> Signed-off-by: Michael Collado <collado.mike@gmail.com>
1 parent b608fae commit 0aaa489

3 files changed

Lines changed: 142 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Changelog
22

33
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.26.0...HEAD)
4+
### Fixed
5+
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
46
## [0.26.0](https://github.com/MarquezProject/marquez/compare/0.25.0...0.26.0) - 2022-09-15
57

68
### Added

api/src/main/java/marquez/service/models/LineageEvent.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package marquez.service.models;
77

8+
import com.fasterxml.jackson.annotation.JsonAlias;
89
import com.fasterxml.jackson.annotation.JsonAnyGetter;
910
import com.fasterxml.jackson.annotation.JsonAnySetter;
1011
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -71,7 +72,11 @@ public static class Run extends BaseJsonModel {
7172
public static class RunFacet {
7273

7374
@Valid private NominalTimeRunFacet nominalTime;
74-
@Valid private ParentRunFacet parent;
75+
76+
@JsonAlias(
77+
"parentRun") // the Airflow integration previously reported parentRun instead of parent
78+
@Valid
79+
private ParentRunFacet parent;
7580

7681
@Builder.Default @JsonIgnore private Map<String, Object> additional = new LinkedHashMap<>();
7782

api/src/test/java/marquez/OpenLineageIntegrationTest.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Arrays;
3232
import java.util.Collections;
3333
import java.util.List;
34+
import java.util.Map;
3435
import java.util.Optional;
3536
import java.util.UUID;
3637
import java.util.concurrent.CompletableFuture;
@@ -235,6 +236,139 @@ public void testOpenLineageJobHierarchyAirflowIntegration()
235236
assertThat(runsList).isNotEmpty().hasSize(1);
236237
}
237238

239+
@Test
240+
public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet()
241+
throws ExecutionException, InterruptedException, TimeoutException {
242+
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
243+
ZonedDateTime startOfHour =
244+
Instant.now()
245+
.atZone(LineageTestUtils.LOCAL_ZONE)
246+
.with(ChronoField.MINUTE_OF_HOUR, 0)
247+
.with(ChronoField.SECOND_OF_MINUTE, 0);
248+
ZonedDateTime endOfHour = startOfHour.plusHours(1);
249+
String airflowParentRunId = UUID.randomUUID().toString();
250+
String task1Name = "task1";
251+
String task2Name = "task2";
252+
String dagName = "the_dag";
253+
RunEvent airflowTask1 =
254+
createAirflowRunEvent(
255+
ol,
256+
startOfHour,
257+
endOfHour,
258+
airflowParentRunId,
259+
dagName,
260+
dagName + "." + task1Name,
261+
NAMESPACE_NAME);
262+
263+
// the older airflow integration reported parentRun instead of parent. We support this as an
264+
// alias for compatibility
265+
RunFacet parent = airflowTask1.getRun().getFacets().getAdditionalProperties().remove("parent");
266+
airflowTask1.getRun().getFacets().getAdditionalProperties().put("parentRun", parent);
267+
268+
RunEvent airflowTask2 =
269+
createAirflowRunEvent(
270+
ol,
271+
startOfHour,
272+
endOfHour,
273+
airflowParentRunId,
274+
dagName,
275+
dagName + "." + task2Name,
276+
NAMESPACE_NAME);
277+
parent = airflowTask2.getRun().getFacets().getAdditionalProperties().remove("parent");
278+
airflowTask2.getRun().getFacets().getAdditionalProperties().put("parentRun", parent);
279+
280+
CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
281+
future.get(5, TimeUnit.SECONDS);
282+
283+
Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
284+
assertThat(job)
285+
.isNotNull()
286+
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
287+
.hasFieldOrPropertyWithValue("parentJobName", dagName);
288+
289+
Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
290+
assertThat(parentJob)
291+
.isNotNull()
292+
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
293+
.hasFieldOrPropertyWithValue("parentJobName", null);
294+
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
295+
assertThat(runsList).isNotEmpty().hasSize(1);
296+
}
297+
298+
@Test
299+
public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunFacet()
300+
throws ExecutionException, InterruptedException, TimeoutException {
301+
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
302+
ZonedDateTime startOfHour =
303+
Instant.now()
304+
.atZone(LineageTestUtils.LOCAL_ZONE)
305+
.with(ChronoField.MINUTE_OF_HOUR, 0)
306+
.with(ChronoField.SECOND_OF_MINUTE, 0);
307+
ZonedDateTime endOfHour = startOfHour.plusHours(1);
308+
String airflowParentRunId = UUID.randomUUID().toString();
309+
String task1Name = "task1";
310+
String task2Name = "task2";
311+
String dagName = "the_dag";
312+
RunEvent airflowTask1 =
313+
createAirflowRunEvent(
314+
ol,
315+
startOfHour,
316+
endOfHour,
317+
airflowParentRunId,
318+
dagName,
319+
dagName + "." + task1Name,
320+
NAMESPACE_NAME);
321+
322+
// the older airflow integration reported parentRun instead of parent. The new integration
323+
// reports both. They are the same in the airflow integration, but this test verifies we handle
324+
// the "parentRun" field first.
325+
// It would be preferable to prioritize the "parent" field, but it seems Jackson prefers the
326+
// alias first.
327+
RunFacet parent = airflowTask1.getRun().getFacets().getAdditionalProperties().get("parent");
328+
RunFacet newParent = ol.newRunFacet();
329+
Map<String, Object> runFacetProps = newParent.getAdditionalProperties();
330+
runFacetProps.put("run", parent.getAdditionalProperties().get("run"));
331+
runFacetProps.put(
332+
"job", ImmutableMap.of("name", "a_new_dag", "namespace", "incorrect_namespace"));
333+
airflowTask1.getRun().getFacets().getAdditionalProperties().put("parentRun", parent);
334+
airflowTask1.getRun().getFacets().getAdditionalProperties().put("parent", newParent);
335+
336+
RunEvent airflowTask2 =
337+
createAirflowRunEvent(
338+
ol,
339+
startOfHour,
340+
endOfHour,
341+
airflowParentRunId,
342+
dagName,
343+
dagName + "." + task2Name,
344+
NAMESPACE_NAME);
345+
parent = airflowTask2.getRun().getFacets().getAdditionalProperties().get("parent");
346+
newParent = ol.newRunFacet();
347+
runFacetProps = newParent.getAdditionalProperties();
348+
runFacetProps.put("run", parent.getAdditionalProperties().get("run"));
349+
runFacetProps.put(
350+
"job", ImmutableMap.of("name", "a_new_dag", "namespace", "incorrect_namespace"));
351+
airflowTask2.getRun().getFacets().getAdditionalProperties().put("parentRun", parent);
352+
airflowTask2.getRun().getFacets().getAdditionalProperties().put("parent", newParent);
353+
354+
CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
355+
future.get(5, TimeUnit.SECONDS);
356+
357+
Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
358+
assertThat(job)
359+
.isNotNull()
360+
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
361+
.hasFieldOrPropertyWithValue("parentJobName", dagName);
362+
363+
Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
364+
assertThat(parentJob)
365+
.isNotNull()
366+
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
367+
.hasFieldOrPropertyWithValue("parentJobName", null);
368+
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
369+
assertThat(runsList).isNotEmpty().hasSize(1);
370+
}
371+
238372
@Test
239373
public void testOpenLineageJobHierarchyAirflowIntegrationWithDagNameWithDot()
240374
throws ExecutionException, InterruptedException, TimeoutException {

0 commit comments

Comments
 (0)