OL facets - PR1 - create & write new events to new tables while not reading them#2350
Conversation
9cfdd83 to
19e093b
Compare
Codecov Report
@@ Coverage Diff @@
## main #2350 +/- ##
============================================
+ Coverage 76.72% 76.81% +0.08%
- Complexity 1177 1195 +18
============================================
Files 222 226 +4
Lines 5354 5473 +119
Branches 429 443 +14
============================================
+ Hits 4108 4204 +96
- Misses 768 772 +4
- Partials 478 497 +19
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
a026783 to
34311f3
Compare
| enum DatasetFacet implements Facet { | ||
| DOCUMENTATION(Type.DATASET, "documentation") { | ||
| Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) { | ||
| return Optional.ofNullable(facets.getDocumentation()).map(this::toPgObject); | ||
| } | ||
| }, | ||
| SCHEMA(Type.DATASET, "schema") { | ||
| Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) { | ||
| return Optional.ofNullable(facets.getSchema()).map(this::toPgObject); | ||
| } | ||
| }, | ||
| DATASOURCE(Type.DATASET, "dataSource") { | ||
| Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) { | ||
| return Optional.ofNullable(facets.getDataSource()).map(this::toPgObject); | ||
| } | ||
| }, | ||
| DESCRIPTION(Type.DATASET, "description") { | ||
| Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) { | ||
| return Optional.ofNullable(facets.getDescription()).map(this::toPgObject); | ||
| } | ||
| }, | ||
| LIFECYCLE_STATE_CHANGE(Type.DATASET, "lifecycleStateChange") { | ||
| Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) { | ||
| return Optional.ofNullable(facets.getLifecycleStateChange()).map(this::toPgObject); | ||
| } | ||
| }, | ||
| VERSION(Type.DATASET, "version"), | ||
| COLUMN_LINEAGE(Type.DATASET, "columnLineage") { | ||
| Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) { | ||
| return Optional.ofNullable(facets.getColumnLineage()).map(this::toPgObject); | ||
| } | ||
| }, | ||
| OWNERSHIP(Type.DATASET, "ownership"), | ||
| DATA_QUALITY_METRICS(Type.INPUT, "dataQualityMetrics"), | ||
| DATA_QUALITY_ASSERTIONS(Type.INPUT, "dataQualityAssertions"), | ||
| OUTPUT_STATISTICS(Type.OUTPUT, "outputStatistics"); |
There was a problem hiding this comment.
Discussed briefly the other day, but these enums should be unnecessary, especially the facets that aren't mapped directly to Java fields - e.g., outputStatistics, dataQualityMetrics, etc.). Your call to datasetFacets.getAdditionalFacets() below should fetch all facets that haven't been mapped to a field in the DatasetFacets class. But since you have to convert everything to JSON anyway, why not convert the whole DatasetFacets class to a generic JsonNode object? You could do something like
JsonNode jsonNode = Utils.getMapper().valueToTree(facets);
StreamSupport.stream(Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT),
false)
.forEach(fieldName -> {
storeFacet(fieldName, toPgObject(jsonNode.get(fieldName)));
});You'll be able to support all currently known facets as well as new facets and custom facets without ever having to update the server code.
There was a problem hiding this comment.
I got rid of enums in RunFacetsDao and JobFacetsDao.
We still need it in a reduced form in DatasetFacetsDao to extract facet type (DATASET, INPUT, OUTPUT or UNKNOWN).
3b63ae1 to
c89504d
Compare
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
c89504d to
be0f179
Compare
wslulciuc
left a comment
There was a problem hiding this comment.
Thanks, @pawel-big-lebowski 💯 🥇
…350) Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
Signed-off-by: Pawel Leszczynski leszczynski.pawel@gmail.com
Problem
Facets are stored in
lineage_eventstable in JSONs containing whole events. This PR is the first among three PRs to create and make use of OpenLineage facet tables.This PR was created as a part of #2152. Content provided within this PR is mostly authored by @wslulciuc.
Solution
job_facets,run_facetsanddataset_facetstables.Checklist
CHANGELOG.mdwith details about your change under the "Unreleased" section (if relevant, depending on the change, this may not be necessary).sqldatabase schema migration according to Flyway's naming convention (if relevant)