Skip to content

Commit 439044b

Browse files
Add fix and tests for handling Airflow dags with dots and task groups (#2126)
Signed-off-by: Michael Collado <collado.mike@gmail.com> Signed-off-by: Michael Collado <collado.mike@gmail.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
1 parent 590da35 commit 439044b

2 files changed

Lines changed: 172 additions & 17 deletions

File tree

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@
6464

6565
@RegisterRowMapper(LineageEventMapper.class)
6666
public interface OpenLineageDao extends BaseDao {
67-
public String DEFAULT_SOURCE_NAME = "default";
68-
public String DEFAULT_NAMESPACE_OWNER = "anonymous";
67+
String DEFAULT_SOURCE_NAME = "default";
68+
String DEFAULT_NAMESPACE_OWNER = "anonymous";
6969

7070
@SqlUpdate(
7171
"INSERT INTO lineage_events ("
@@ -370,7 +370,10 @@ private JobRow findParentJobRow(
370370
.findJobRowByRunUuid(uuid)
371371
.map(
372372
j -> {
373-
String parentJobName = Utils.parseParentJobName(facet.getJob().getName());
373+
String parentJobName =
374+
facet.getJob().getName().equals(event.getJob().getName())
375+
? Utils.parseParentJobName(facet.getJob().getName())
376+
: facet.getJob().getName();
374377
if (j.getNamespaceName().equals(facet.getJob().getNamespace())
375378
&& j.getName().equals(parentJobName)) {
376379
return j;
@@ -432,6 +435,10 @@ private JobRow createParentJobRunRecord(
432435
PGobject inputs) {
433436
Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();
434437
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
438+
String parentJobName =
439+
facet.getJob().getName().equals(event.getJob().getName())
440+
? Utils.parseParentJobName(facet.getJob().getName())
441+
: facet.getJob().getName();
435442
JobRow newParentJobRow =
436443
createJobDao()
437444
.upsertJob(
@@ -440,7 +447,7 @@ private JobRow createParentJobRunRecord(
440447
now,
441448
namespace.getUuid(),
442449
namespace.getName(),
443-
Utils.parseParentJobName(facet.getJob().getName()),
450+
parentJobName,
444451
null,
445452
jobContext.getUuid(),
446453
location,

api/src/test/java/marquez/OpenLineageIntegrationTest.java

Lines changed: 161 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,127 @@ public void testOpenLineageJobHierarchyAirflowIntegration()
199199
String dagName = "the_dag";
200200
RunEvent airflowTask1 =
201201
createAirflowRunEvent(
202-
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
202+
ol,
203+
startOfHour,
204+
endOfHour,
205+
airflowParentRunId,
206+
dagName,
207+
dagName + "." + task1Name,
208+
NAMESPACE_NAME);
209+
210+
RunEvent airflowTask2 =
211+
createAirflowRunEvent(
212+
ol,
213+
startOfHour,
214+
endOfHour,
215+
airflowParentRunId,
216+
dagName,
217+
dagName + "." + task2Name,
218+
NAMESPACE_NAME);
219+
220+
CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
221+
future.get(5, TimeUnit.SECONDS);
222+
223+
Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
224+
assertThat(job)
225+
.isNotNull()
226+
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
227+
.hasFieldOrPropertyWithValue("parentJobName", dagName);
228+
229+
Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
230+
assertThat(parentJob)
231+
.isNotNull()
232+
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
233+
.hasFieldOrPropertyWithValue("parentJobName", null);
234+
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
235+
assertThat(runsList).isNotEmpty().hasSize(1);
236+
}
237+
238+
@Test
239+
public void testOpenLineageJobHierarchyAirflowIntegrationWithDagNameWithDot()
240+
throws ExecutionException, InterruptedException, TimeoutException {
241+
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
242+
ZonedDateTime startOfHour =
243+
Instant.now()
244+
.atZone(LineageTestUtils.LOCAL_ZONE)
245+
.with(ChronoField.MINUTE_OF_HOUR, 0)
246+
.with(ChronoField.SECOND_OF_MINUTE, 0);
247+
ZonedDateTime endOfHour = startOfHour.plusHours(1);
248+
String airflowParentRunId = UUID.randomUUID().toString();
249+
String task1Name = "task1";
250+
String task2Name = "task2";
251+
String dagName = "the.dag";
252+
RunEvent airflowTask1 =
253+
createAirflowRunEvent(
254+
ol,
255+
startOfHour,
256+
endOfHour,
257+
airflowParentRunId,
258+
dagName,
259+
dagName + "." + task1Name,
260+
NAMESPACE_NAME);
261+
262+
RunEvent airflowTask2 =
263+
createAirflowRunEvent(
264+
ol,
265+
startOfHour,
266+
endOfHour,
267+
airflowParentRunId,
268+
dagName,
269+
dagName + "." + task2Name,
270+
NAMESPACE_NAME);
271+
272+
CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
273+
future.get(5, TimeUnit.SECONDS);
274+
275+
Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
276+
assertThat(job)
277+
.isNotNull()
278+
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
279+
.hasFieldOrPropertyWithValue("parentJobName", dagName);
280+
281+
Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
282+
assertThat(parentJob)
283+
.isNotNull()
284+
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
285+
.hasFieldOrPropertyWithValue("parentJobName", null);
286+
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
287+
assertThat(runsList).isNotEmpty().hasSize(1);
288+
}
289+
290+
@Test
291+
public void testOpenLineageJobHierarchyAirflowIntegrationWithTaskGroup()
292+
throws ExecutionException, InterruptedException, TimeoutException {
293+
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
294+
ZonedDateTime startOfHour =
295+
Instant.now()
296+
.atZone(LineageTestUtils.LOCAL_ZONE)
297+
.with(ChronoField.MINUTE_OF_HOUR, 0)
298+
.with(ChronoField.SECOND_OF_MINUTE, 0);
299+
ZonedDateTime endOfHour = startOfHour.plusHours(1);
300+
String airflowParentRunId = UUID.randomUUID().toString();
301+
String task1Name = "task_group.task1";
302+
String task2Name = "task_group.task2";
303+
String dagName = "dag_with_task_group";
304+
RunEvent airflowTask1 =
305+
createAirflowRunEvent(
306+
ol,
307+
startOfHour,
308+
endOfHour,
309+
airflowParentRunId,
310+
dagName,
311+
dagName + "." + task1Name,
312+
NAMESPACE_NAME);
203313

204314
RunEvent airflowTask2 =
205315
createAirflowRunEvent(
206-
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);
316+
ol,
317+
startOfHour,
318+
endOfHour,
319+
airflowParentRunId,
320+
dagName,
321+
dagName + "." + task2Name,
322+
NAMESPACE_NAME);
207323

208324
CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
209325
future.get(5, TimeUnit.SECONDS);
@@ -242,13 +358,27 @@ public void testOpenLineageJobHierarchyOldAirflowIntegration()
242358
String task1Name = "task1";
243359
String task2Name = "task2";
244360
String dagName = "the_dag";
361+
362+
// the old integration also used the fully qualified task name as the parent job name
245363
RunEvent airflowTask1 =
246364
createAirflowRunEvent(
247-
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
365+
ol,
366+
startOfHour,
367+
endOfHour,
368+
airflowParentRunId,
369+
dagName + "." + task1Name,
370+
dagName + "." + task1Name,
371+
NAMESPACE_NAME);
248372

249373
RunEvent airflowTask2 =
250374
createAirflowRunEvent(
251-
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);
375+
ol,
376+
startOfHour,
377+
endOfHour,
378+
airflowParentRunId,
379+
dagName + "." + task2Name,
380+
dagName + "." + task2Name,
381+
NAMESPACE_NAME);
252382

253383
CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
254384
future.get(5, TimeUnit.SECONDS);
@@ -291,12 +421,24 @@ public void testOpenLineageJobHierarchyAirflowIntegrationConflictingRunUuid()
291421
// two dag runs with different namespaces - should result in two distinct jobs
292422
RunEvent airflowTask1 =
293423
createAirflowRunEvent(
294-
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
424+
ol,
425+
startOfHour,
426+
endOfHour,
427+
airflowParentRunId,
428+
dagName,
429+
dagName + "." + task1Name,
430+
NAMESPACE_NAME);
295431

296432
String secondNamespace = "another_namespace";
297433
RunEvent airflowTask2 =
298434
createAirflowRunEvent(
299-
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, secondNamespace);
435+
ol,
436+
startOfHour,
437+
endOfHour,
438+
airflowParentRunId,
439+
dagName,
440+
dagName + "." + task1Name,
441+
secondNamespace);
300442

301443
CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
302444
future.get(5, TimeUnit.SECONDS);
@@ -332,16 +474,22 @@ public void testOpenLineageJobHierarchySparkAndAirflow()
332474
String dagName = "the_dag";
333475
RunEvent airflowTask1 =
334476
createAirflowRunEvent(
335-
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
477+
ol,
478+
startOfHour,
479+
endOfHour,
480+
airflowParentRunId,
481+
dagName,
482+
dagName + "." + task1Name,
483+
NAMESPACE_NAME);
336484

337485
RunEvent sparkTask =
338486
createRunEvent(
339487
ol,
340488
startOfHour,
341489
endOfHour,
342490
airflowTask1.getRun().getRunId().toString(),
343-
sparkTaskName,
344491
dagName + "." + task1Name,
492+
dagName + "." + task1Name + "." + sparkTaskName,
345493
Optional.empty(),
346494
NAMESPACE_NAME);
347495

@@ -609,8 +757,8 @@ private RunEvent createAirflowRunEvent(
609757
ZonedDateTime startOfHour,
610758
ZonedDateTime endOfHour,
611759
String airflowParentRunId,
612-
String taskName,
613760
String dagName,
761+
String taskName,
614762
String namespace) {
615763
RunFacet airflowVersionFacet = ol.newRunFacet();
616764
airflowVersionFacet
@@ -622,8 +770,8 @@ private RunEvent createAirflowRunEvent(
622770
startOfHour,
623771
endOfHour,
624772
airflowParentRunId,
625-
taskName,
626773
dagName,
774+
taskName,
627775
Optional.of(airflowVersionFacet),
628776
namespace);
629777
}
@@ -634,8 +782,8 @@ private RunEvent createRunEvent(
634782
ZonedDateTime startOfHour,
635783
ZonedDateTime endOfHour,
636784
String airflowParentRunId,
637-
String taskName,
638785
String dagName,
786+
String taskName,
639787
Optional<RunFacet> airflowVersionFacet,
640788
String namespace) {
641789
// The Java SDK requires parent run ids to be a UUID, but the python SDK doesn't. In order to
@@ -650,7 +798,7 @@ private RunEvent createRunEvent(
650798
"run",
651799
ImmutableMap.of("runId", airflowParentRunId),
652800
"job",
653-
ImmutableMap.of("namespace", namespace, "name", dagName + "." + taskName)));
801+
ImmutableMap.of("namespace", namespace, "name", dagName)));
654802
RunFacetsBuilder runFacetBuilder =
655803
ol.newRunFacetsBuilder()
656804
.nominalTime(ol.newNominalTimeRunFacet(startOfHour, endOfHour))
@@ -663,7 +811,7 @@ private RunEvent createRunEvent(
663811
.job(
664812
ol.newJob(
665813
namespace,
666-
dagName + "." + taskName,
814+
taskName,
667815
ol.newJobFacetsBuilder()
668816
.documentation(ol.newDocumentationJobFacet("the job docs"))
669817
.sql(ol.newSQLJobFacet("SELECT * FROM the_table"))

0 commit comments

Comments
 (0)