@@ -773,7 +773,7 @@ public void testGetCurrentRuns() {
773773 Stream .of (writeJob .getJob ().getUuid ()), newRows .stream ().map (JobLineage ::getId ))
774774 .collect (Collectors .toSet ());
775775
776- List <Run > currentRuns = lineageDao .getCurrentRuns (jobids );
776+ List <Run > currentRuns = lineageDao .getCurrentRunsWithFacets (jobids );
777777
778778 // assert the job does exist
779779 assertThat (currentRuns )
@@ -790,7 +790,7 @@ public void testGetCurrentRunsWithFailedJob() {
790790
791791 Set <UUID > jobids = Collections .singleton (writeJob .getJob ().getUuid ());
792792
793- List <Run > currentRuns = lineageDao .getCurrentRuns (jobids );
793+ List <Run > currentRuns = lineageDao .getCurrentRunsWithFacets (jobids );
794794
795795 // assert the job does exist
796796 assertThat (currentRuns )
@@ -799,6 +799,56 @@ public void testGetCurrentRunsWithFailedJob() {
799799 .contains (writeJob .getRun ().getUuid ());
800800 }
801801
802+ @ Test
803+ public void testGetCurrentRunsWithFacetsGetsLatestRun () {
804+ for (int i = 0 ; i < 5 ; i ++) {
805+ LineageTestUtils .createLineageRow (
806+ openLineageDao ,
807+ "writeJob" ,
808+ "COMPLETE" ,
809+ jobFacet ,
810+ Arrays .asList (),
811+ Arrays .asList (dataset ));
812+ }
813+
814+ List <JobLineage > newRows =
815+ writeDownstreamLineage (
816+ openLineageDao ,
817+ new LinkedList <>(
818+ Arrays .asList (
819+ new DatasetConsumerJob ("readJob" , 3 , Optional .of ("outputData2" )),
820+ new DatasetConsumerJob ("downstreamJob" , 1 , Optional .empty ()))),
821+ jobFacet ,
822+ dataset );
823+ UpdateLineageRow writeJob =
824+ LineageTestUtils .createLineageRow (
825+ openLineageDao , "writeJob" , "FAIL" , jobFacet , Arrays .asList (), Arrays .asList (dataset ));
826+
827+ Set <UUID > expectedRunIds =
828+ Stream .concat (
829+ Stream .of (writeJob .getRun ().getUuid ()), newRows .stream ().map (JobLineage ::getRunId ))
830+ .collect (Collectors .toSet ());
831+ Set <UUID > jobids =
832+ Stream .concat (
833+ Stream .of (writeJob .getJob ().getUuid ()), newRows .stream ().map (JobLineage ::getId ))
834+ .collect (Collectors .toSet ());
835+
836+ List <Run > currentRuns = lineageDao .getCurrentRunsWithFacets (jobids );
837+
838+ // assert the job does exist
839+ assertThat (currentRuns )
840+ .hasSize (expectedRunIds .size ())
841+ .extracting (r -> r .getId ().getValue ())
842+ .containsAll (expectedRunIds );
843+
844+ // assert that run_args, input/output versions, and run facets are fetched from the dao.
845+ for (Run run : currentRuns ) {
846+ assertThat (run .getArgs ()).hasSize (2 );
847+ assertThat (run .getOutputVersions ()).hasSize (1 );
848+ assertThat (run .getFacets ()).hasSize (1 );
849+ }
850+ }
851+
802852 @ Test
803853 public void testGetCurrentRunsGetsLatestRun () {
804854 for (int i = 0 ; i < 5 ; i ++) {
@@ -840,5 +890,14 @@ public void testGetCurrentRunsGetsLatestRun() {
840890 .hasSize (expectedRunIds .size ())
841891 .extracting (r -> r .getId ().getValue ())
842892 .containsAll (expectedRunIds );
893+
894+ // assert that run_args, input/output versions, and run facets are NOT fetched from the dao.
895+ for (Run run : currentRuns ) {
896+ assertThat (run .getArgs ()).hasSize (0 );
897+ assertThat (run .getOutputVersions ()).hasSize (0 );
898+ assertThat (run .getInputVersions ()).hasSize (0 );
899+ assertThat (run .getFacets ()).hasSize (0 );
900+ assertThat (run .getContext ()).isNull ();
901+ }
843902 }
844903}
0 commit comments