Skip to content

Commit 799ba65

Browse files
sophielywslulciuc
andauthored
fix symlink display on marquez (#2736)
* fix symlink display on marquez Signed-off-by: sophiely <ly.sophie200@gmail.com> * fix code formatting Signed-off-by: sophiely <ly.sophie200@gmail.com> * update changelog Signed-off-by: sophiely <ly.sophie200@gmail.com> * change dataset_views query Signed-off-by: sophiely <ly.sophie200@gmail.com> * update changelog Signed-off-by: sophiely <ly.sophie200@gmail.com> * rename migration file Signed-off-by: sophiely <ly.sophie200@gmail.com> * rename migration file Signed-off-by: sophiely <ly.sophie200@gmail.com> * fix formatting and add migration file Signed-off-by: sophiely <ly.sophie200@gmail.com> * fix formatting Signed-off-by: sophiely <ly.sophie200@gmail.com> * resolve comments Signed-off-by: sophiely <ly.sophie200@gmail.com> * resolve tests Signed-off-by: sophiely <ly.sophie200@gmail.com> --------- Signed-off-by: sophiely <ly.sophie200@gmail.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
1 parent 0b167f8 commit 799ba65

5 files changed

Lines changed: 119 additions & 32 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
* Web: minor UI enhancements [`#2727`](https://github.com/MarquezProject/marquez/pull/2727) [@phixMe](https://github.com/phixMe)
1717
*Hygienic cleanup of project as a follow-up to [`#2725`](https://github.com/MarquezProject/marquez/pull/2725), including a fix for [`#2747`](https://github.com/MarquezProject/marquez/issues/2747).*
1818

19+
### Fixed
20+
21+
* bug: marquez dataset symlinks facet create empty namespace: [`#2645`](https://github.com/MarquezProject/marquez/pull/2645) [@sophiely](https://github.com/sophiely)
22+
Display symlink dataset in the previously empty namespace and link the symlink dataset lineage to the main dataset.
23+
1924
## [0.44.0](https://github.com/MarquezProject/marquez/compare/0.43.1...0.44.0) - 2024-01-22
2025

2126
### Added

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,21 @@ INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uu
102102
"""
103103
SELECT ds.*, dv.fields, dv.lifecycle_state
104104
FROM datasets_view ds
105-
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
106-
WHERE ds.uuid IN (<dsUuids>)""")
105+
LEFT JOIN dataset_versions dv ON dv.uuid = ds.current_version_uuid
106+
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name
107+
WHERE dsym.is_primary = true
108+
AND ds.uuid IN (<dsUuids>)""")
107109
Set<DatasetData> getDatasetData(@BindList Set<UUID> dsUuids);
108110

109111
@SqlQuery(
110112
"""
111113
SELECT ds.*, dv.fields, dv.lifecycle_state
112114
FROM datasets_view ds
113115
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
114-
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName""")
116+
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name
117+
INNER JOIN datasets_view AS d ON d.uuid = ds.uuid
118+
WHERE dsym.is_primary is true
119+
AND CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)""")
115120
DatasetData getDatasetData(String namespaceName, String datasetName);
116121

117122
@SqlQuery(

api/src/main/java/marquez/service/LineageService.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,20 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
114114
if (!datasetIds.isEmpty()) {
115115
datasets.addAll(this.getDatasetData(datasetIds));
116116
}
117-
if (nodeId.isDatasetType()
118-
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
119-
log.warn(
120-
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
121-
jobData.stream().map(JobData::getId).toList(),
122-
nodeId.getValue());
123-
return toLineageWithOrphanDataset(nodeId.asDatasetId());
124-
}
125117

118+
if (nodeId.isDatasetType()) {
119+
DatasetId datasetId = nodeId.asDatasetId();
120+
DatasetData datasetData =
121+
this.getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
122+
123+
if (!datasetIds.contains(datasetData.getUuid())) {
124+
log.warn(
125+
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
126+
jobData.stream().map(JobData::getId).toList(),
127+
nodeId.getValue());
128+
return toLineageWithOrphanDataset(nodeId.asDatasetId());
129+
}
130+
}
126131
return toLineage(jobData, datasets);
127132
}
128133

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,33 @@
11
DROP VIEW IF EXISTS datasets_view;
2-
CREATE VIEW datasets_view
3-
AS
2+
CREATE VIEW datasets_view AS
43
SELECT d.uuid,
5-
d.type,
6-
d.created_at,
7-
d.updated_at,
8-
d.namespace_uuid,
9-
d.source_uuid,
10-
d.name,
11-
array_agg(CAST((namespaces.name, symlinks.name) AS DATASET_NAME)) AS dataset_symlinks,
12-
d.physical_name,
13-
d.description,
14-
d.current_version_uuid,
15-
d.last_modified_at,
16-
d.namespace_name,
17-
d.source_name,
18-
d.is_deleted
19-
FROM datasets d
20-
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
21-
INNER JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
22-
WHERE d.is_hidden IS FALSE
23-
GROUP BY d.uuid;
4+
d.type,
5+
d.created_at,
6+
d.updated_at,
7+
CASE
8+
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_uuid
9+
ELSE namespaces.uuid
10+
END
11+
AS namespace_uuid ,
12+
d.source_uuid,
13+
CASE
14+
WHEN (d.namespace_name = namespaces.name and d.name = symlinks.name) THEN d.name
15+
ELSE symlinks.name
16+
END
17+
AS name,
18+
array(SELECT ROW(namespaces.name::character varying(255), symlinks.name::character varying(255))::dataset_name) AS dataset_symlinks,
19+
d.physical_name,
20+
d.description,
21+
d.current_version_uuid,
22+
d.last_modified_at,
23+
CASE
24+
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_name
25+
ELSE namespaces.name
26+
END
27+
AS namespace_name,
28+
d.source_name,
29+
d.is_deleted
30+
FROM datasets d
31+
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
32+
JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
33+
WHERE d.is_hidden is false;

api/src/test/java/marquez/service/LineageServiceTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313

1414
import java.util.Arrays;
1515
import java.util.Collections;
16+
import java.util.HashMap;
1617
import java.util.LinkedList;
1718
import java.util.List;
19+
import java.util.Map;
1820
import java.util.Optional;
1921
import java.util.UUID;
2022
import java.util.stream.Collectors;
@@ -588,4 +590,64 @@ public void testLineageForOrphanedDataset() {
588590
private boolean jobNameEquals(Node node, String writeJob) {
589591
return node.getId().asJobId().getName().getValue().equals(writeJob);
590592
}
593+
594+
@Test
595+
public void testSymlinkDatasetLineage() {
596+
// (1) Create symlink facet for our main dataset
597+
Map<String, Object> symlink = new HashMap<>();
598+
Map<String, Object> symlinkInfo = new HashMap<>();
599+
Map<String, Object> symlinkIdentifiers = new HashMap<>();
600+
symlinkIdentifiers.put("name", "symlinkDataset");
601+
symlinkIdentifiers.put("namespace", NAMESPACE);
602+
symlinkIdentifiers.put("type", "DB_TABLE");
603+
symlinkInfo.put("producer", "https://github.com/OpenLineage/producer/");
604+
symlinkInfo.put("schemaURL", "https://openlineage.io/schema/url/");
605+
symlinkInfo.put("identifiers", symlinkIdentifiers);
606+
symlink.put("symlinks", symlinkInfo);
607+
608+
// (2) Create main dataset with a symlink
609+
Dataset mainDataset =
610+
new Dataset(
611+
NAMESPACE,
612+
"mainDataset",
613+
newDatasetFacet(symlink, new SchemaField("firstname", "string", "the first name")));
614+
615+
// (3) Create the symlink dataset
616+
Dataset symlinkDataset =
617+
new Dataset(
618+
NAMESPACE,
619+
"symlinkDataset",
620+
newDatasetFacet(new SchemaField("firstname", "string", "the first name")));
621+
622+
// (3) Create a job with the main dataset
623+
UpdateLineageRow firstJob =
624+
LineageTestUtils.createLineageRow(
625+
openLineageDao,
626+
"firstJob",
627+
"COMPLETE",
628+
jobFacet,
629+
Arrays.asList(mainDataset),
630+
Arrays.asList());
631+
632+
// (4) Create a job with the symlink dataset
633+
UpdateLineageRow secondJob =
634+
LineageTestUtils.createLineageRow(
635+
openLineageDao,
636+
"secondJob",
637+
"COMPLETE",
638+
jobFacet,
639+
Arrays.asList(symlinkDataset),
640+
Arrays.asList());
641+
642+
// (5) We expect the first and second job linked together because the main
643+
// and symlink dataset are in fact the same dataset
644+
Lineage lineage =
645+
lineageService.lineage(
646+
NodeId.of(
647+
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("symlinkDataset"))),
648+
5,
649+
true);
650+
651+
assertThat(lineage.getGraph()).hasSize(2);
652+
}
591653
}

0 commit comments

Comments
 (0)