1313import java .time .ZoneId ;
1414import java .time .ZonedDateTime ;
1515import java .util .ArrayList ;
16+ import java .util .Collections ;
1617import java .util .HashSet ;
1718import java .util .LinkedHashMap ;
1819import java .util .List ;
1920import java .util .Map ;
2021import java .util .Optional ;
2122import java .util .Set ;
2223import java .util .UUID ;
24+ import java .util .stream .Collectors ;
25+ import java .util .stream .Stream ;
2326import marquez .common .Utils ;
2427import marquez .common .models .DatasetId ;
2528import marquez .common .models .DatasetName ;
3134import marquez .db .DatasetFieldDao .DatasetFieldMapping ;
3235import marquez .db .JobVersionDao .BagOfJobVersionInfo ;
3336import marquez .db .mappers .LineageEventMapper ;
37+ import marquez .db .models .ColumnLineageRow ;
3438import marquez .db .models .DatasetFieldRow ;
3539import marquez .db .models .DatasetRow ;
3640import marquez .db .models .DatasetSymlinkRow ;
3741import marquez .db .models .DatasetVersionRow ;
42+ import marquez .db .models .InputFieldData ;
3843import marquez .db .models .JobContextRow ;
3944import marquez .db .models .JobRow ;
4045import marquez .db .models .NamespaceRow ;
5661import marquez .service .models .LineageEvent .RunFacet ;
5762import marquez .service .models .LineageEvent .SchemaDatasetFacet ;
5863import marquez .service .models .LineageEvent .SchemaField ;
64+ import org .apache .commons .lang3 .tuple .Pair ;
5965import org .jdbi .v3 .sqlobject .config .RegisterRowMapper ;
6066import org .jdbi .v3 .sqlobject .statement .SqlQuery ;
6167import org .jdbi .v3 .sqlobject .statement .SqlUpdate ;
@@ -131,6 +137,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
131137 RunDao runDao = createRunDao ();
132138 RunArgsDao runArgsDao = createRunArgsDao ();
133139 RunStateDao runStateDao = createRunStateDao ();
140+ ColumnLineageDao columnLineageDao = createColumnLineageDao ();
134141
135142 Instant now = event .getEventTime ().withZoneSameInstant (ZoneId .of ("UTC" )).toInstant ();
136143
@@ -323,7 +330,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
323330 datasetDao ,
324331 datasetVersionDao ,
325332 datasetFieldDao ,
326- runDao );
333+ runDao ,
334+ columnLineageDao );
327335 datasetInputs .add (record );
328336 }
329337 }
@@ -345,7 +353,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
345353 datasetDao ,
346354 datasetVersionDao ,
347355 datasetFieldDao ,
348- runDao );
356+ runDao ,
357+ columnLineageDao );
349358 datasetOutputs .add (record );
350359 }
351360 }
@@ -541,7 +550,8 @@ default DatasetRecord upsertLineageDataset(
541550 DatasetDao datasetDao ,
542551 DatasetVersionDao datasetVersionDao ,
543552 DatasetFieldDao datasetFieldDao ,
544- RunDao runDao ) {
553+ RunDao runDao ,
554+ ColumnLineageDao columnLineageDao ) {
545555 NamespaceRow dsNamespace =
546556 namespaceDao .upsertNamespaceRow (
547557 UUID .randomUUID (), now , ds .getNamespace (), DEFAULT_NAMESPACE_OWNER );
@@ -662,6 +672,7 @@ default DatasetRecord upsertLineageDataset(
662672 return row ;
663673 });
664674 List <DatasetFieldMapping > datasetFieldMappings = new ArrayList <>();
675+ List <DatasetFieldRow > datasetFields = new ArrayList <>();
665676 if (fields != null ) {
666677 for (SchemaField field : fields ) {
667678 DatasetFieldRow datasetFieldRow =
@@ -672,6 +683,7 @@ default DatasetRecord upsertLineageDataset(
672683 field .getType (),
673684 field .getDescription (),
674685 datasetRow .getUuid ());
686+ datasetFields .add (datasetFieldRow );
675687 datasetFieldMappings .add (
676688 new DatasetFieldMapping (datasetVersionRow .getUuid (), datasetFieldRow .getUuid ()));
677689 }
@@ -690,7 +702,85 @@ default DatasetRecord upsertLineageDataset(
690702 }
691703 }
692704
693- return new DatasetRecord (datasetRow , datasetVersionRow , datasetNamespace );
705+ List <ColumnLineageRow > columnLineageRows = Collections .emptyList ();
706+ if (!isInput ) {
707+ columnLineageRows =
708+ upsertColumnLineage (
709+ runUuid ,
710+ ds ,
711+ now ,
712+ datasetFields ,
713+ columnLineageDao ,
714+ datasetFieldDao ,
715+ datasetVersionRow );
716+ }
717+
718+ return new DatasetRecord (datasetRow , datasetVersionRow , datasetNamespace , columnLineageRows );
719+ }
720+
721+ private List <ColumnLineageRow > upsertColumnLineage (
722+ UUID runUuid ,
723+ Dataset ds ,
724+ Instant now ,
725+ List <DatasetFieldRow > datasetFields ,
726+ ColumnLineageDao columnLineageDao ,
727+ DatasetFieldDao datasetFieldDao ,
728+ DatasetVersionRow datasetVersionRow ) {
729+ // get all the fields related to this particular run
730+ List <InputFieldData > runFields = datasetFieldDao .findInputFieldsDataAssociatedWithRun (runUuid );
731+
732+ return Optional .ofNullable (ds .getFacets ())
733+ .map (DatasetFacets ::getColumnLineage )
734+ .map (LineageEvent .ColumnLineageFacet ::getOutputColumnsList )
735+ .stream ()
736+ .flatMap (list -> list .stream ())
737+ .flatMap (
738+ outputColumn -> {
739+ Optional <DatasetFieldRow > outputField =
740+ datasetFields .stream ()
741+ .filter (dfr -> dfr .getName ().equals (outputColumn .getName ()))
742+ .findAny ();
743+
744+ if (outputField .isEmpty ()) {
745+ Logger log = LoggerFactory .getLogger (OpenLineageDao .class );
746+ log .error (
747+ "Cannot produce column lineage for missing output field in output dataset: {}" ,
748+ outputColumn .getName ());
749+ return Stream .empty ();
750+ }
751+
752+ // get field uuids of input columns related to this run
753+ List <Pair <UUID , UUID >> inputFields =
754+ runFields .stream ()
755+ .filter (
756+ fieldData ->
757+ outputColumn .getInputFields ().stream ()
758+ .filter (
759+ of ->
760+ of .getDatasetNamespace ().equals (fieldData .getNamespace ())
761+ && of .getDatasetName ()
762+ .equals (fieldData .getDatasetName ())
763+ && of .getFieldName ().equals (fieldData .getField ()))
764+ .findAny ()
765+ .isPresent ())
766+ .map (
767+ fieldData ->
768+ Pair .of (
769+ fieldData .getDatasetVersionUuid (),
770+ fieldData .getDatasetFieldUuid ()))
771+ .collect (Collectors .toList ());
772+
773+ return columnLineageDao
774+ .upsertColumnLineageRow (
775+ datasetVersionRow .getUuid (),
776+ outputField .get ().getUuid (),
777+ inputFields ,
778+ outputColumn .getTransformationDescription (),
779+ outputColumn .getTransformationType (),
780+ now )
781+ .stream ();
782+ })
783+ .collect (Collectors .toList ());
694784 }
695785
696786 default String formatDatasetName (String name ) {
0 commit comments