Skip to content

Commit 9757e54

Browse files
authored
Merge branch 'main' into bug/spec-missing-required-fields-in-dataset
2 parents 6c799fa + 90b2eff commit 9757e54

28 files changed

Lines changed: 526 additions & 243 deletions

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
* Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
88
*Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.*
9+
* Column lineage point in time java client [`#2269`](https://github.com/MarquezProject/marquez/pull/2269) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
10+
*Java client methods to retrieve point in time `column-lineage`. Please note that existing methods `getColumnLineageByDataset`, `getColumnLineageByDataset` and `getColumnLineageByDatasetField` were replaced by a single `getColumnLineage` taking `NodeId` as a parameter.*
911

1012
### Fixed
1113

api/src/main/java/marquez/api/BaseResource.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import marquez.api.exceptions.RunAlreadyExistsException;
1919
import marquez.api.exceptions.RunNotFoundException;
2020
import marquez.api.exceptions.SourceNotFoundException;
21+
import marquez.common.models.DatasetFieldId;
2122
import marquez.common.models.DatasetId;
2223
import marquez.common.models.DatasetName;
2324
import marquez.common.models.FieldName;
25+
import marquez.common.models.JobId;
2426
import marquez.common.models.JobName;
2527
import marquez.common.models.NamespaceName;
2628
import marquez.common.models.RunId;
@@ -37,6 +39,7 @@
3739
import marquez.service.ServiceFactory;
3840
import marquez.service.SourceService;
3941
import marquez.service.TagService;
42+
import marquez.service.models.NodeId;
4043
import marquez.service.models.Run;
4144

4245
public class BaseResource {
@@ -74,6 +77,10 @@ void throwIfNotExists(@NonNull NamespaceName namespaceName) {
7477
}
7578
}
7679

80+
void throwIfNotExists(@NonNull DatasetId datasetId) {
81+
throwIfNotExists(datasetId.getNamespace(), datasetId.getName());
82+
}
83+
7784
void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull DatasetName datasetName) {
7885
if (!datasetService.exists(namespaceName.getValue(), datasetName.getValue())) {
7986
throw new DatasetNotFoundException(datasetName);
@@ -86,6 +93,13 @@ void throwIfSourceNotExists(SourceName sourceName) {
8693
}
8794
}
8895

96+
void throwIfNotExists(@NonNull DatasetFieldId datasetFieldId) {
97+
throwIfNotExists(
98+
datasetFieldId.getDatasetId().getNamespace(),
99+
datasetFieldId.getDatasetId().getName(),
100+
datasetFieldId.getFieldName());
101+
}
102+
89103
void throwIfNotExists(
90104
@NonNull NamespaceName namespaceName,
91105
@NonNull DatasetName datasetName,
@@ -96,6 +110,10 @@ void throwIfNotExists(
96110
}
97111
}
98112

113+
void throwIfNotExists(@NonNull JobId jobId) {
114+
throwIfNotExists(jobId.getNamespace(), jobId.getName());
115+
}
116+
99117
void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull JobName jobName) {
100118
if (!jobService.exists(namespaceName.getValue(), jobName.getValue())) {
101119
throw new JobNotFoundException(jobName);
@@ -137,6 +155,20 @@ void throwIfDatasetsNotExist(ImmutableSet<DatasetId> datasets) {
137155
}
138156
}
139157

158+
void throwIfNotExists(@NonNull NodeId nodeId) {
159+
if (!nodeId.hasVersion()) {
160+
if (nodeId.isDatasetType()) {
161+
throwIfNotExists(nodeId.asDatasetId());
162+
} else if (nodeId.isDatasetFieldType()) {
163+
throwIfNotExists(nodeId.asDatasetFieldId());
164+
} else if (nodeId.isJobType()) {
165+
throwIfNotExists(nodeId.asJobId());
166+
} else if (nodeId.isRunType()) {
167+
throwIfNotExists(nodeId.asRunId());
168+
}
169+
}
170+
}
171+
140172
URI locationFor(@NonNull UriInfo uriInfo, @NonNull Run run) {
141173
return uriInfo
142174
.getBaseUriBuilder()

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ private int determineStatusCode(Throwable e) {
9696
public Response getLineage(
9797
@QueryParam("nodeId") @NotNull NodeId nodeId,
9898
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
99+
throwIfNotExists(nodeId);
99100
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
100101
}
101102

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids,
8080
WHERE ds.uuid IN (<dsUuids>)""")
8181
Set<DatasetData> getDatasetData(@BindList Set<UUID> dsUuids);
8282

83+
@SqlQuery(
84+
"""
85+
SELECT ds.*, dv.fields, dv.lifecycle_state
86+
FROM datasets_view ds
87+
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
88+
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName""")
89+
DatasetData getDatasetData(String namespaceName, String datasetName);
90+
8391
@SqlQuery(
8492
"""
8593
SELECT j.uuid FROM jobs j

api/src/main/java/marquez/service/LineageService.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.google.common.base.Functions;
99
import com.google.common.collect.ImmutableSet;
10+
import com.google.common.collect.ImmutableSortedSet;
1011
import com.google.common.collect.Maps;
1112
import java.util.Collections;
1213
import java.util.HashMap;
@@ -20,6 +21,7 @@
2021
import java.util.UUID;
2122
import java.util.stream.Collectors;
2223
import java.util.stream.Stream;
24+
import lombok.NonNull;
2325
import lombok.extern.slf4j.Slf4j;
2426
import marquez.common.models.DatasetId;
2527
import marquez.common.models.JobId;
@@ -48,14 +50,30 @@ public LineageService(LineageDao delegate, JobDao jobDao) {
4850

4951
// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
5052
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
53+
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
5154
Optional<UUID> optionalUUID = getJobUuid(nodeId);
5255
if (optionalUUID.isEmpty()) {
53-
throw new NodeIdNotFoundException("Could not find node");
56+
log.warn(
57+
"Failed to get job associated with node '{}', returning orphan graph...",
58+
nodeId.getValue());
59+
return toLineageWithOrphanDataset(nodeId.asDatasetId());
5460
}
5561
UUID job = optionalUUID.get();
56-
62+
log.debug("Attempting to get lineage for job '{}'", job);
5763
Set<JobData> jobData = getLineage(Collections.singleton(job), depth);
5864

65+
// Ensure job data is not empty, an empty set cannot be passed to LineageDao.getCurrentRuns() or
66+
// LineageDao.getCurrentRunsWithFacets().
67+
if (jobData.isEmpty()) {
68+
// Log warning, then return an orphan lineage graph; a graph should contain at most one
69+
// job->dataset relationship.
70+
log.warn(
71+
"Failed to get lineage for job '{}' associated with node '{}', returning orphan graph...",
72+
job,
73+
nodeId.getValue());
74+
return toLineageWithOrphanDataset(nodeId.asDatasetId());
75+
}
76+
5977
List<Run> runs =
6078
withRunFacets
6179
? getCurrentRunsWithFacets(
@@ -81,10 +99,26 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
8199
if (!datasetIds.isEmpty()) {
82100
datasets.addAll(this.getDatasetData(datasetIds));
83101
}
102+
if (nodeId.isDatasetType()
103+
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
104+
log.warn(
105+
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
106+
jobData.stream().map(JobData::getId).toList(),
107+
nodeId.getValue());
108+
return toLineageWithOrphanDataset(nodeId.asDatasetId());
109+
}
84110

85111
return toLineage(jobData, datasets);
86112
}
87113

114+
private Lineage toLineageWithOrphanDataset(@NonNull DatasetId datasetId) {
115+
final DatasetData datasetData =
116+
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
117+
return new Lineage(
118+
ImmutableSortedSet.of(
119+
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
120+
}
121+
88122
private Lineage toLineage(Set<JobData> jobData, Set<DatasetData> datasets) {
89123
Set<Node> nodes = new LinkedHashSet<>();
90124
// build mapping for later
@@ -214,7 +248,8 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
214248
return getJobFromInputOrOutput(
215249
datasetId.getName().getValue(), datasetId.getNamespace().getValue());
216250
} else {
217-
throw new NodeIdNotFoundException("Node must be a dataset node or job node");
251+
throw new NodeIdNotFoundException(
252+
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
218253
}
219254
}
220255
}
Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,28 @@
11
/* SPDX-License-Identifier: Apache-2.0 */
2-
ALTER TABLE dataset_symlinks ALTER COLUMN name TYPE VARCHAR;
2+
3+
DO
4+
$$
5+
DECLARE
6+
datasets_view_exists boolean;
7+
datasets_view_definition text;
8+
BEGIN
9+
SELECT EXISTS (
10+
SELECT * FROM information_schema.views
11+
WHERE table_schema='public' AND table_name='datasets_view'
12+
) INTO datasets_view_exists;
13+
14+
IF datasets_view_exists THEN
15+
-- Altering is not allowed when the column is being used from views. So here,
16+
-- we temporarily drop the view before altering and recreate it.
17+
SELECT view_definition FROM information_schema.views
18+
WHERE table_schema='public' AND table_name='datasets_view'
19+
INTO datasets_view_definition;
20+
21+
DROP VIEW datasets_view;
22+
ALTER TABLE dataset_symlinks ALTER COLUMN name TYPE VARCHAR;
23+
EXECUTE format('CREATE VIEW datasets_view AS %s', datasets_view_definition);
24+
ELSE
25+
ALTER TABLE dataset_symlinks ALTER COLUMN name TYPE VARCHAR;
26+
END IF;
27+
END
28+
$$ LANGUAGE plpgsql;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- new column will be created with 'null' filled for existing rows
2+
ALTER TABLE lineage_events ADD created_at TIMESTAMP WITH TIME ZONE;
3+
4+
create index lineage_events_created_at_index on lineage_events (created_at desc NULLS LAST);
5+
6+
-- The new default set to UTC now() will only apply in subsequent INSERT or UPDATE commands; it does not cause rows already in the table to change.
7+
ALTER TABLE lineage_events ALTER COLUMN created_at SET DEFAULT (now() AT TIME ZONE 'UTC')::timestamptz;

api/src/test/java/marquez/ColumnLineageIntegrationTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414
import java.util.Optional;
1515
import marquez.api.JdbiUtils;
1616
import marquez.client.MarquezClient;
17+
import marquez.client.models.DatasetFieldId;
18+
import marquez.client.models.DatasetId;
19+
import marquez.client.models.JobId;
1720
import marquez.client.models.Node;
21+
import marquez.client.models.NodeId;
1822
import marquez.db.LineageTestUtils;
1923
import marquez.db.OpenLineageDao;
2024
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
@@ -64,7 +68,8 @@ public void tearDown(Jdbi jdbi) {
6468

6569
@Test
6670
public void testColumnLineageEndpointByDataset() {
67-
MarquezClient.Lineage lineage = client.getColumnLineageByDataset("namespace", "dataset_b");
71+
MarquezClient.Lineage lineage =
72+
client.getColumnLineage(NodeId.of(new DatasetId("namespace", "dataset_b")));
6873

6974
assertThat(lineage.getGraph()).hasSize(3);
7075
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
@@ -75,7 +80,7 @@ public void testColumnLineageEndpointByDataset() {
7580
@Test
7681
public void testColumnLineageEndpointByDatasetField() {
7782
MarquezClient.Lineage lineage =
78-
client.getColumnLineageByDataset("namespace", "dataset_b", "col_c");
83+
client.getColumnLineage(NodeId.of(new DatasetFieldId("namespace", "dataset_b", "col_c")));
7984

8085
assertThat(lineage.getGraph()).hasSize(3);
8186
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();
@@ -86,7 +91,8 @@ public void testColumnLineageEndpointByDatasetField() {
8691
@Test
8792
public void testColumnLineageEndpointWithDepthLimit() {
8893
MarquezClient.Lineage lineage =
89-
client.getColumnLineageByDatasetField("namespace", "dataset_c", "col_d", 1, false);
94+
client.getColumnLineage(
95+
NodeId.of(new DatasetFieldId("namespace", "dataset_c", "col_d")), 1, false);
9096

9197
assertThat(lineage.getGraph()).hasSize(2);
9298
assertThat(getNodeByFieldName(lineage, "col_c")).isPresent();
@@ -96,15 +102,16 @@ public void testColumnLineageEndpointWithDepthLimit() {
96102
@Test
97103
public void testColumnLineageEndpointWithDownstream() {
98104
MarquezClient.Lineage lineage =
99-
client.getColumnLineageByDatasetField("namespace", "dataset_b", "col_c", 10, true);
105+
client.getColumnLineage(NodeId.of(new JobId("namespace", "job1")), 10, true);
100106

101107
assertThat(lineage.getGraph()).hasSize(4);
102108
assertThat(getNodeByFieldName(lineage, "col_d")).isPresent();
103109
}
104110

105111
@Test
106112
public void testColumnLineageEndpointByJob() {
107-
MarquezClient.Lineage lineage = client.getColumnLineageByJob("namespace", "job1");
113+
MarquezClient.Lineage lineage =
114+
client.getColumnLineage(NodeId.of(new JobId("namespace", "job1")), 1, false);
108115

109116
assertThat(lineage.getGraph()).hasSize(3);
110117
assertThat(getNodeByFieldName(lineage, "col_a")).isPresent();

api/src/test/java/marquez/api/OpenLineageResourceTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static org.mockito.ArgumentMatchers.any;
1010
import static org.mockito.ArgumentMatchers.anyBoolean;
1111
import static org.mockito.ArgumentMatchers.anyInt;
12+
import static org.mockito.ArgumentMatchers.anyString;
1213
import static org.mockito.Mockito.mock;
1314
import static org.mockito.Mockito.when;
1415

@@ -20,6 +21,7 @@
2021
import javax.ws.rs.core.Response;
2122
import marquez.common.Utils;
2223
import marquez.db.OpenLineageDao;
24+
import marquez.service.JobService;
2325
import marquez.service.LineageService;
2426
import marquez.service.ServiceFactory;
2527
import marquez.service.models.Lineage;
@@ -36,6 +38,8 @@ class OpenLineageResourceTest {
3638
static {
3739
LineageService lineageService = mock(LineageService.class);
3840
OpenLineageDao openLineageDao = mock(OpenLineageDao.class);
41+
JobService jobService = mock(JobService.class);
42+
when(jobService.exists(anyString(), anyString())).thenReturn(true);
3943

4044
Node testNode =
4145
Utils.fromJson(
@@ -45,7 +49,8 @@ class OpenLineageResourceTest {
4549
when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE);
4650

4751
ServiceFactory serviceFactory =
48-
ApiTestUtils.mockServiceFactory(Map.of(LineageService.class, lineageService));
52+
ApiTestUtils.mockServiceFactory(
53+
Map.of(LineageService.class, lineageService, JobService.class, jobService));
4954

5055
UNDER_TEST =
5156
ResourceExtension.builder()
@@ -58,7 +63,7 @@ public void testGetLineage() {
5863
final Lineage lineage =
5964
UNDER_TEST
6065
.target("/api/v1/lineage")
61-
.queryParam("nodeId", "job:test")
66+
.queryParam("nodeId", "job:test-namespace:test-job")
6267
.request()
6368
.get()
6469
.readEntity(Lineage.class);

api/src/test/java/marquez/service/LineageServiceTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import marquez.api.JdbiUtils;
1818
import marquez.common.models.DatasetName;
1919
import marquez.common.models.DatasetVersionId;
20+
import marquez.common.models.JobId;
2021
import marquez.common.models.JobName;
2122
import marquez.common.models.NamespaceName;
2223
import marquez.db.DatasetDao;
@@ -393,6 +394,38 @@ public void testLineageWithWithCycle() {
393394
.matches(n -> n.isJobType() && n.asJobId().getName().getValue().equals("writeJob"));
394395
}
395396

397+
@Test
398+
public void testLineageForOrphanedDataset() {
399+
UpdateLineageRow writeJob =
400+
LineageTestUtils.createLineageRow(
401+
openLineageDao,
402+
"writeJob",
403+
"COMPLETE",
404+
jobFacet,
405+
Arrays.asList(),
406+
Arrays.asList(dataset));
407+
408+
NodeId datasetNodeId =
409+
NodeId.of(new NamespaceName(dataset.getNamespace()), new DatasetName(dataset.getName()));
410+
Lineage lineage = lineageService.lineage(datasetNodeId, 2, false);
411+
assertThat(lineage.getGraph())
412+
.hasSize(2)
413+
.extracting(Node::getId)
414+
.containsExactlyInAnyOrder(
415+
NodeId.of(new JobId(new NamespaceName(NAMESPACE), new JobName("writeJob"))),
416+
datasetNodeId);
417+
418+
UpdateLineageRow updatedWriteJob =
419+
LineageTestUtils.createLineageRow(
420+
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());
421+
422+
lineage = lineageService.lineage(datasetNodeId, 2, false);
423+
assertThat(lineage.getGraph())
424+
.hasSize(1)
425+
.extracting(Node::getId)
426+
.containsExactlyInAnyOrder(datasetNodeId);
427+
}
428+
396429
private boolean jobNameEquals(Node node, String writeJob) {
397430
return node.getId().asJobId().getName().getValue().equals(writeJob);
398431
}

0 commit comments

Comments
 (0)