Skip to content

Commit 6436ddc

Browse files
OL facets - PR1 - create & feed new tables while not reading them (#2350)
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
1 parent 656b2e6 commit 6436ddc

32 files changed

Lines changed: 1580 additions & 46 deletions

api/src/main/java/marquez/db/BaseDao.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,13 @@ public interface BaseDao extends SqlObject {
5656

5757
@CreateSqlObject
5858
ColumnLineageDao createColumnLineageDao();
59+
60+
@CreateSqlObject
61+
DatasetFacetsDao createDatasetFacetsDao();
62+
63+
@CreateSqlObject
64+
JobFacetsDao createJobFacetsDao();
65+
66+
@CreateSqlObject
67+
RunFacetsDao createRunFacetsDao();
5968
}

api/src/main/java/marquez/db/Columns.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import lombok.extern.slf4j.Slf4j;
2626
import marquez.common.Utils;
2727
import org.postgresql.util.PGInterval;
28+
import org.postgresql.util.PGobject;
2829

2930
@Slf4j
3031
public final class Columns {
@@ -299,4 +300,17 @@ public static ImmutableMap<String, String> mapOrNull(final ResultSet results, fi
299300
final String mapAsString = results.getString(column);
300301
return Utils.fromJson(mapAsString, new TypeReference<>() {});
301302
}
303+
304+
public static PGobject toPgObject(@NonNull final Object object) {
305+
final PGobject jsonObject = new PGobject();
306+
jsonObject.setType("jsonb");
307+
final String json = Utils.toJson(object);
308+
try {
309+
jsonObject.setValue(json);
310+
} catch (SQLException e) {
311+
log.error("Error when ...", e);
312+
return null;
313+
}
314+
return jsonObject;
315+
}
302316
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db;
7+
8+
import com.fasterxml.jackson.databind.JsonNode;
9+
import java.time.Instant;
10+
import java.util.Arrays;
11+
import java.util.Spliterator;
12+
import java.util.Spliterators;
13+
import java.util.UUID;
14+
import java.util.stream.StreamSupport;
15+
import lombok.NonNull;
16+
import marquez.common.Utils;
17+
import marquez.service.models.LineageEvent;
18+
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
19+
import org.jdbi.v3.sqlobject.transaction.Transaction;
20+
import org.postgresql.util.PGobject;
21+
22+
/** The DAO for {@code dataset} facets. */
23+
public interface DatasetFacetsDao {
24+
/* An {@code enum} used ... */
25+
enum Type {
26+
DATASET,
27+
INPUT,
28+
OUTPUT,
29+
UNKNOWN;
30+
}
31+
32+
/* An {@code enum} used to determine the dataset facet. */
33+
enum DatasetFacet {
34+
DOCUMENTATION(Type.DATASET, "documentation"),
35+
DESCRIPTION(Type.DATASET, "description"),
36+
SCHEMA(Type.DATASET, "schema"),
37+
DATASOURCE(Type.DATASET, "dataSource"),
38+
LIFECYCLE_STATE_CHANGE(Type.DATASET, "lifecycleStateChange"),
39+
VERSION(Type.DATASET, "version"),
40+
COLUMN_LINEAGE(Type.DATASET, "columnLineage"),
41+
OWNERSHIP(Type.DATASET, "ownership"),
42+
DATA_QUALITY_METRICS(Type.INPUT, "dataQualityMetrics"),
43+
DATA_QUALITY_ASSERTIONS(Type.INPUT, "dataQualityAssertions"),
44+
OUTPUT_STATISTICS(Type.OUTPUT, "outputStatistics");
45+
46+
final Type type;
47+
final String name;
48+
49+
DatasetFacet(@NonNull final Type type, @NonNull final String name) {
50+
this.type = type;
51+
this.name = name;
52+
}
53+
54+
Type getType() {
55+
return type;
56+
}
57+
58+
String getName() {
59+
return name;
60+
}
61+
62+
/** ... */
63+
public static Type typeFromName(@NonNull final String name) {
64+
return Arrays.stream(DatasetFacet.values())
65+
.filter(facet -> facet.getName().equalsIgnoreCase(name))
66+
.map(facet -> facet.getType())
67+
.findFirst()
68+
.orElse(Type.UNKNOWN);
69+
}
70+
}
71+
72+
/**
73+
* @param uuid
74+
* @param createdAt
75+
* @param datasetUuid
76+
* @param runUuid
77+
* @param lineageEventTime
78+
* @param lineageEventType
79+
* @param type
80+
* @param name
81+
* @param facet
82+
*/
83+
@SqlUpdate(
84+
"""
85+
INSERT INTO dataset_facets (
86+
uuid,
87+
created_at,
88+
dataset_uuid,
89+
run_uuid,
90+
lineage_event_time,
91+
lineage_event_type,
92+
type,
93+
name,
94+
facet
95+
) VALUES (
96+
:uuid,
97+
:createdAt,
98+
:datasetUuid,
99+
:runUuid,
100+
:lineageEventTime,
101+
:lineageEventType,
102+
:type,
103+
:name,
104+
:facet
105+
)
106+
""")
107+
void insertDatasetFacet(
108+
UUID uuid,
109+
Instant createdAt,
110+
UUID datasetUuid,
111+
UUID runUuid,
112+
Instant lineageEventTime,
113+
String lineageEventType,
114+
Type type,
115+
String name,
116+
PGobject facet);
117+
118+
/**
119+
* @param datasetUuid
120+
* @param runUuid
121+
* @param lineageEventTime
122+
* @param lineageEventType
123+
* @param datasetFacets
124+
*/
125+
@Transaction
126+
default void insertDatasetFacetsFor(
127+
@NonNull UUID datasetUuid,
128+
@NonNull UUID runUuid,
129+
@NonNull Instant lineageEventTime,
130+
@NonNull String lineageEventType,
131+
@NonNull LineageEvent.DatasetFacets datasetFacets) {
132+
final Instant now = Instant.now();
133+
134+
JsonNode jsonNode = Utils.getMapper().valueToTree(datasetFacets);
135+
StreamSupport.stream(
136+
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
137+
.forEach(
138+
fieldName ->
139+
insertDatasetFacet(
140+
UUID.randomUUID(),
141+
now,
142+
datasetUuid,
143+
runUuid,
144+
lineageEventTime,
145+
lineageEventType,
146+
DatasetFacet.typeFromName(fieldName),
147+
fieldName,
148+
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
149+
}
150+
151+
record DatasetFacetRow(
152+
UUID uuid,
153+
Instant createdAt,
154+
UUID datasetUuid,
155+
UUID runUuid,
156+
Instant lineageEventTime,
157+
String lineageEventType,
158+
DatasetFacetsDao.Type type,
159+
String name,
160+
PGobject facet) {}
161+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db;
7+
8+
import com.fasterxml.jackson.databind.node.ObjectNode;
9+
import lombok.NonNull;
10+
import marquez.common.Utils;
11+
import org.postgresql.util.PGobject;
12+
13+
public class FacetUtils {
14+
15+
static ObjectNode asJson(@NonNull final String facetName, @NonNull Object facetValue) {
16+
final ObjectNode facetAsJson = Utils.getMapper().createObjectNode();
17+
facetAsJson.putPOJO(facetName, facetValue);
18+
return facetAsJson;
19+
}
20+
21+
static PGobject toPgObject(String name, Object o) {
22+
return Columns.toPgObject(asJson(name, o));
23+
}
24+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db;
7+
8+
import com.fasterxml.jackson.databind.JsonNode;
9+
import java.time.Instant;
10+
import java.util.Spliterator;
11+
import java.util.Spliterators;
12+
import java.util.UUID;
13+
import java.util.stream.StreamSupport;
14+
import lombok.NonNull;
15+
import marquez.common.Utils;
16+
import marquez.service.models.LineageEvent;
17+
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
18+
import org.jdbi.v3.sqlobject.transaction.Transaction;
19+
import org.postgresql.util.PGobject;
20+
21+
/** The DAO for {@code job} facets. */
22+
public interface JobFacetsDao {
23+
24+
@SqlUpdate(
25+
"""
26+
INSERT INTO job_facets (
27+
uuid,
28+
created_at,
29+
job_uuid,
30+
run_uuid,
31+
lineage_event_time,
32+
lineage_event_type,
33+
name,
34+
facet
35+
) VALUES (
36+
:uuid,
37+
:createdAt,
38+
:jobUuid,
39+
:runUuid,
40+
:lineageEventTime,
41+
:lineageEventType,
42+
:name,
43+
:facet
44+
)
45+
""")
46+
void insertJobFacet(
47+
UUID uuid,
48+
Instant createdAt,
49+
UUID jobUuid,
50+
UUID runUuid,
51+
Instant lineageEventTime,
52+
String lineageEventType,
53+
String name,
54+
PGobject facet);
55+
56+
@Transaction
57+
default void insertJobFacetsFor(
58+
@NonNull UUID jobUuid,
59+
@NonNull UUID runUuid,
60+
@NonNull Instant lineageEventTime,
61+
@NonNull String lineageEventType,
62+
@NonNull LineageEvent.JobFacet jobFacet) {
63+
final Instant now = Instant.now();
64+
65+
JsonNode jsonNode = Utils.getMapper().valueToTree(jobFacet);
66+
StreamSupport.stream(
67+
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
68+
.forEach(
69+
fieldName ->
70+
insertJobFacet(
71+
UUID.randomUUID(),
72+
now,
73+
jobUuid,
74+
runUuid,
75+
lineageEventTime,
76+
lineageEventType,
77+
fieldName,
78+
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
79+
}
80+
81+
record JobFacetRow(
82+
UUID uuid,
83+
Instant createdAt,
84+
UUID jobUuid,
85+
UUID runUuid,
86+
Instant lineageEventTime,
87+
String lineageEventType,
88+
String name,
89+
PGobject facet) {}
90+
}

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import marquez.db.mappers.JobDataMapper;
1515
import marquez.db.mappers.JobRowMapper;
1616
import marquez.db.mappers.RunMapper;
17-
import marquez.db.models.DatasetData;
18-
import marquez.db.models.JobData;
17+
import marquez.service.models.DatasetData;
18+
import marquez.service.models.JobData;
1919
import marquez.service.models.Run;
2020
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
2121
import org.jdbi.v3.sqlobject.customizer.BindList;

0 commit comments

Comments
 (0)