Skip to content

Commit 19d36e0

Browse files
authored
Merge branch 'main' into fix/remove-limit
2 parents 20e3f50 + b5bcfbf commit 19d36e0

60 files changed

Lines changed: 1021 additions & 208 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/src/main/java/marquez/common/Utils.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.time.Instant;
3434
import java.time.LocalDate;
3535
import java.time.ZonedDateTime;
36+
import java.util.Collection;
37+
import java.util.Collections;
3638
import java.util.List;
3739
import java.util.Map;
3840
import java.util.Optional;
@@ -59,6 +61,7 @@
5961
import marquez.service.models.LineageEvent;
6062
import marquez.service.models.LineageEvent.ParentRunFacet;
6163
import marquez.service.models.StreamMeta;
64+
import org.apache.commons.lang3.tuple.Pair;
6265
import org.apache.commons.lang3.tuple.Triple;
6366

6467
public final class Utils {
@@ -351,6 +354,36 @@ private static Version newDatasetVersionFor(DatasetVersionData data) {
351354
return Version.of(UUID.nameUUIDFromBytes(bytes));
352355
}
353356

357+
/**
358+
* Returns a new {@link Version} object based on the dataset id and collection of fields
359+
* information. A {@link Version} is generated by concatenating the provided metadata together
360+
* (delimited by a colon). For fields, only the name and type contribute. The resulting string is
361+
* then converted to a {@code byte} array and passed to {@link UUID#nameUUIDFromBytes(byte[])}.
362+
*
363+
* @param namespaceName The namespace of the dataset.
364+
* @param datasetName The dataset name.
365+
* @param fields The fields of the dataset.
366+
* @return A {@link Version} object based on the specified dataset metadata.
367+
*/
368+
public static Version newDatasetSchemaVersionFor(
369+
String namespaceName, String datasetName, Collection<Pair<String, String>> fields) {
370+
final byte[] bytes =
371+
VERSION_JOINER
372+
.join(
373+
namespaceName,
374+
datasetName,
375+
Optional.ofNullable(fields).orElse(Collections.emptyList()).stream()
376+
.sorted()
377+
.map(Utils::joinField)
378+
.collect(joining(VERSION_DELIM)))
379+
.getBytes(UTF_8);
380+
return Version.of(UUID.nameUUIDFromBytes(bytes));
381+
}
382+
383+
private static String joinField(Pair<String, String> field) {
384+
return VERSION_JOINER.join(field.getLeft(), field.getRight());
385+
}
386+
354387
private static String joinField(Triple<String, String, String> field) {
355388
return VERSION_JOINER.join(field.getLeft(), field.getMiddle(), field.getRight());
356389
}

api/src/main/java/marquez/db/BaseDao.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ public interface BaseDao extends SqlObject {
1818
@CreateSqlObject
1919
DatasetVersionDao createDatasetVersionDao();
2020

21+
@CreateSqlObject
22+
DatasetSchemaVersionDao createDatasetSchemaVersionDao();
23+
2124
@CreateSqlObject
2225
JobDao createJobDao();
2326

api/src/main/java/marquez/db/Columns.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ private Columns() {}
4949
public static final String NAMESPACE_UUID = "namespace_uuid";
5050
public static final String DATASET_UUID = "dataset_uuid";
5151
public static final String DATASET_VERSION_UUID = "dataset_version_uuid";
52+
public static final String DATASET_SCHEMA_VERSION_UUID = "dataset_schema_version_uuid";
5253
public static final String JOB_VERSION_UUID = "job_version_uuid";
5354
public static final String CURRENT_VERSION_UUID = "current_version_uuid";
5455
public static final String CHECKSUM = "checksum";

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ default void setFields(Dataset ds) {
107107
ds.getCurrentVersion()
108108
.ifPresent(
109109
dsv -> {
110-
ds.setFields(datasetFieldDao.find(dsv));
110+
ds.setFields(datasetFieldDao.findByDatasetVersion(dsv));
111111
});
112112
}
113113

@@ -363,7 +363,7 @@ default Dataset upsertDatasetMeta(
363363
DatasetVersionRow dvRow =
364364
createDatasetVersionDao()
365365
.upsertDatasetVersion(
366-
datasetRow.getUuid(),
366+
datasetRow,
367367
now,
368368
namespaceName.getValue(),
369369
datasetName.getValue(),

api/src/main/java/marquez/db/DatasetFieldDao.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,18 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli
220220
+ "FROM dataset_fields f "
221221
+ "INNER JOIN dataset_versions_field_mapping fm on fm.dataset_field_uuid = f.uuid "
222222
+ "WHERE fm.dataset_version_uuid = :datasetVersionUuid")
223-
List<Field> find(UUID datasetVersionUuid);
223+
List<Field> findByDatasetVersion(UUID datasetVersionUuid);
224+
225+
@SqlQuery(
226+
"SELECT f.*, "
227+
+ "ARRAY(SELECT t.name "
228+
+ " FROM dataset_fields_tag_mapping m "
229+
+ " INNER JOIN tags t on t.uuid = m.tag_uuid "
230+
+ " WHERE m.dataset_field_uuid = f.uuid) AS tags "
231+
+ "FROM dataset_fields f "
232+
+ "INNER JOIN dataset_schema_versions_field_mapping fm on fm.dataset_field_uuid = f.uuid "
233+
+ "WHERE fm.dataset_schema_version_uuid = :datasetSchemaVersionUuid")
234+
List<Field> findByDatasetSchemaVersion(UUID datasetSchemaVersionUuid);
224235

225236
@SqlQuery(
226237
"""
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2018-2024 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db;
7+
8+
import java.time.Instant;
9+
import java.util.List;
10+
import java.util.Optional;
11+
import java.util.UUID;
12+
import java.util.stream.Collectors;
13+
import marquez.common.Utils;
14+
import marquez.common.models.Version;
15+
import marquez.db.mappers.DatasetSchemaVersionRowMapper;
16+
import marquez.db.models.DatasetFieldRow;
17+
import marquez.db.models.DatasetRow;
18+
import marquez.db.models.DatasetSchemaVersionRow;
19+
import org.apache.commons.lang3.tuple.Pair;
20+
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
21+
import org.jdbi.v3.sqlobject.statement.SqlBatch;
22+
import org.jdbi.v3.sqlobject.statement.SqlQuery;
23+
24+
@RegisterRowMapper(DatasetSchemaVersionRowMapper.class)
25+
public interface DatasetSchemaVersionDao extends BaseDao {
26+
default Version upsertSchemaVersion(
27+
DatasetRow datasetRow, List<DatasetFieldRow> datasetFields, Instant now) {
28+
final Version computedVersion =
29+
Utils.newDatasetSchemaVersionFor(
30+
datasetRow.getNamespaceName(),
31+
datasetRow.getName(),
32+
datasetFields.stream()
33+
.map(field -> Pair.of(field.getName(), field.getType()))
34+
.collect(Collectors.toSet()));
35+
upsertSchemaVersion(computedVersion.getValue(), datasetRow.getUuid(), now)
36+
.ifPresent(
37+
newRow -> {
38+
// if not null it means a new insert, so we have to do the fields as well
39+
// if null then it means the version already exists, and so the fields must already
40+
// exist
41+
upsertFieldMappings(
42+
newRow.getUuid(),
43+
datasetFields.stream()
44+
.map(DatasetFieldRow::getUuid)
45+
.collect(Collectors.toList()));
46+
});
47+
return computedVersion;
48+
}
49+
50+
@SqlQuery(
51+
"INSERT INTO dataset_schema_versions "
52+
+ "(uuid, dataset_uuid, created_at) "
53+
+ "VALUES (:uuid, :datasetUuid, :now) "
54+
+ "ON CONFLICT DO NOTHING "
55+
+ "RETURNING *")
56+
Optional<DatasetSchemaVersionRow> upsertSchemaVersion(UUID uuid, UUID datasetUuid, Instant now);
57+
58+
@SqlBatch(
59+
"INSERT INTO dataset_schema_versions_field_mapping "
60+
+ "(dataset_schema_version_uuid, dataset_field_uuid) "
61+
+ "VALUES (:schemaVersionUuid, :fieldUuid) "
62+
+ "ON CONFLICT DO NOTHING")
63+
void upsertFieldMappings(UUID schemaVersionUuid, Iterable<UUID> fieldUuid);
64+
}

api/src/main/java/marquez/db/DatasetVersionDao.java

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import marquez.db.mappers.DatasetVersionRowMapper;
2626
import marquez.db.mappers.ExtendedDatasetVersionRowMapper;
2727
import marquez.db.models.DatasetFieldRow;
28+
import marquez.db.models.DatasetRow;
2829
import marquez.db.models.DatasetVersionRow;
2930
import marquez.db.models.ExtendedDatasetVersionRow;
3031
import marquez.db.models.TagRow;
@@ -47,7 +48,7 @@ public interface DatasetVersionDao extends BaseDao {
4748

4849
@Transaction
4950
default DatasetVersionRow upsertDatasetVersion(
50-
UUID datasetUuid,
51+
DatasetRow datasetRow,
5152
Instant now,
5253
String namespaceName,
5354
String datasetName,
@@ -56,14 +57,38 @@ default DatasetVersionRow upsertDatasetVersion(
5657
TagDao tagDao = createTagDao();
5758
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
5859

60+
List<DatasetFieldRow> datasetFields = new ArrayList<>();
61+
List<DatasetFieldTag> datasetFieldTags = new ArrayList<>();
62+
for (Field field : datasetMeta.getFields()) {
63+
DatasetFieldRow datasetFieldRow =
64+
datasetFieldDao.upsert(
65+
UUID.randomUUID(),
66+
now,
67+
field.getName().getValue(),
68+
field.getType(),
69+
field.getDescription().orElse(null),
70+
datasetRow.getUuid());
71+
datasetFields.add(datasetFieldRow);
72+
for (TagName tagName : field.getTags()) {
73+
TagRow tag = tagDao.upsert(UUID.randomUUID(), now, tagName.getValue());
74+
datasetFieldTags.add(new DatasetFieldTag(datasetFieldRow.getUuid(), tag.getUuid(), now));
75+
}
76+
}
77+
datasetFieldDao.updateTags(datasetFieldTags);
78+
5979
Version version = Utils.newDatasetVersionFor(namespaceName, datasetName, datasetMeta);
6080
UUID newDatasetVersionUuid = UUID.randomUUID();
81+
UUID datasetSchemaVersionUuid =
82+
createDatasetSchemaVersionDao()
83+
.upsertSchemaVersion(datasetRow, datasetFields, now)
84+
.getValue();
6185
DatasetVersionRow datasetVersionRow =
6286
upsert(
6387
newDatasetVersionUuid,
6488
now,
65-
datasetUuid,
89+
datasetRow.getUuid(),
6690
version.getValue(),
91+
datasetSchemaVersionUuid,
6792
datasetMeta.getRunId().map(RunId::getValue).orElse(null),
6893
toPgObjectFields(datasetMeta.getFields()),
6994
namespaceName,
@@ -84,29 +109,13 @@ default DatasetVersionRow upsertDatasetVersion(
84109
}
85110

86111
List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();
87-
List<DatasetFieldTag> datasetFieldTag = new ArrayList<>();
88-
89-
for (Field field : datasetMeta.getFields()) {
90-
DatasetFieldRow datasetFieldRow =
91-
datasetFieldDao.upsert(
92-
UUID.randomUUID(),
93-
now,
94-
field.getName().getValue(),
95-
field.getType(),
96-
field.getDescription().orElse(null),
97-
datasetUuid);
98-
for (TagName tagName : field.getTags()) {
99-
TagRow tag = tagDao.upsert(UUID.randomUUID(), now, tagName.getValue());
100-
datasetFieldTag.add(new DatasetFieldTag(datasetFieldRow.getUuid(), tag.getUuid(), now));
101-
}
112+
for (DatasetFieldRow datasetFieldRow : datasetFields) {
102113
datasetFieldMappings.add(
103114
new DatasetFieldMapping(datasetVersionRow.getUuid(), datasetFieldRow.getUuid()));
104115
}
105-
106116
datasetFieldDao.updateFieldMapping(datasetFieldMappings);
107-
datasetFieldDao.updateTags(datasetFieldTag);
108117

109-
createDatasetDao().updateVersion(datasetUuid, now, datasetVersionRow.getUuid());
118+
createDatasetDao().updateVersion(datasetRow.getUuid(), now, datasetVersionRow.getUuid());
110119
return datasetVersionRow;
111120
}
112121

@@ -166,8 +175,8 @@ WITH selected_dataset_versions AS (
166175
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
167176
)
168177
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
169-
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
170-
t.tags, f.facets
178+
dv.created_at, dv.version, dv.dataset_schema_version_uuid, dv.fields, dv.run_uuid AS createdByRunUuid,
179+
sv.schema_location, t.tags, f.facets
171180
FROM selected_dataset_versions dv
172181
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
173182
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
@@ -197,8 +206,8 @@ WITH selected_dataset_versions AS (
197206
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown' OR df.type ILIKE 'input')
198207
)
199208
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
200-
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
201-
t.tags, f.facets
209+
dv.created_at, dv.version, dv.dataset_schema_version_uuid, dv.fields, dv.run_uuid AS createdByRunUuid,
210+
sv.schema_location, t.tags, f.facets
202211
FROM selected_dataset_versions dv
203212
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
204213
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
@@ -246,8 +255,8 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {
246255
"""
247256
WITH dataset_info AS (
248257
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,
249-
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
250-
t.tags, f.facets, f.lineage_event_time, f.dataset_version_uuid, facet_name
258+
dv.created_at, dv.version, dv.dataset_schema_version_uuid, dv.fields, dv.run_uuid AS createdByRunUuid,
259+
sv.schema_location, t.tags, f.facets, f.lineage_event_time, f.dataset_version_uuid, facet_name
251260
FROM dataset_versions dv
252261
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
253262
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
@@ -272,12 +281,12 @@ LEFT JOIN (
272281
)
273282
SELECT
274283
type, name, physical_name, namespace_name, source_name, description, lifecycle_state,
275-
created_at, version, fields, createdByRunUuid, schema_location,
284+
created_at, version, dataset_schema_version_uuid, fields, createdByRunUuid, schema_location,
276285
tags, dataset_version_uuid,
277286
JSONB_AGG(facets ORDER BY lineage_event_time ASC) AS facets
278287
FROM dataset_info
279288
GROUP BY type, name, physical_name, namespace_name, source_name, description, lifecycle_state,
280-
created_at, version, fields, createdByRunUuid, schema_location,
289+
created_at, version, dataset_schema_version_uuid, fields, createdByRunUuid, schema_location,
281290
tags, dataset_version_uuid
282291
ORDER BY created_at DESC
283292
""")
@@ -302,9 +311,9 @@ default List<DatasetVersion> findAllWithRun(
302311

303312
@SqlQuery(
304313
"INSERT INTO dataset_versions "
305-
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name, lifecycle_state) "
314+
+ "(uuid, created_at, dataset_uuid, version, dataset_schema_version_uuid, run_uuid, fields, namespace_name, dataset_name, lifecycle_state) "
306315
+ "VALUES "
307-
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName, :lifecycleState) "
316+
+ "(:uuid, :now, :datasetUuid, :version, :schemaVersionUuid, :runUuid, :fields, :namespaceName, :datasetName, :lifecycleState) "
308317
+ "ON CONFLICT(version) "
309318
+ "DO UPDATE SET "
310319
+ "run_uuid = EXCLUDED.run_uuid "
@@ -314,6 +323,7 @@ DatasetVersionRow upsert(
314323
Instant now,
315324
UUID datasetUuid,
316325
UUID version,
326+
UUID schemaVersionUuid,
317327
UUID runUuid,
318328
PGobject fields,
319329
String namespaceName,

api/src/main/java/marquez/db/JobVersionDao.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ private static ExtendedDatasetVersionRow toExtendedDatasetVersionRow(DatasetReco
442442
d.getDatasetRow().getCreatedAt(),
443443
d.getDatasetVersionRow().getDatasetUuid(),
444444
d.getDatasetVersionRow().getVersion(),
445+
d.getDatasetVersionRow().getSchemaVersionUuid().orElse(null),
445446
null,
446447
null,
447448
d.getDatasetRow().getNamespaceName(),

0 commit comments

Comments
 (0)