Skip to content

Commit 87ff469

Browse files
Add proposal for ol facet tables (#2076)
* Add proposal for ol facet tables Signed-off-by: wslulciuc <willy@datakin.com> * continued: Add proposal for ol facet tables Signed-off-by: wslulciuc <willy@datakin.com> * Add implementation step for lazy migration Signed-off-by: wslulciuc <willy@datakin.com> * Link issue Signed-off-by: wslulciuc <willy@datakin.com> * continued: Link issue Signed-off-by: wslulciuc <willy@datakin.com> * Add index details for facet tables Signed-off-by: wslulciuc <willy@datakin.com> * Add uuid as pk for facet tables Signed-off-by: wslulciuc <willy@datakin.com> * Update tables and expand overview section Signed-off-by: wslulciuc <willy@datakin.com> * Add info on facet tables Signed-off-by: wslulciuc <willy@datakin.com> * Add type column in dataset_facets table Signed-off-by: wslulciuc <willy@datakin.com> * include proposal for migration procedure Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Signed-off-by: wslulciuc <willy@datakin.com> Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Co-authored-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 3f187ff commit 87ff469

1 file changed

Lines changed: 203 additions & 0 deletions

File tree

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
# Proposal: Optimize query performance for OpenLineage facets
2+
3+
Author: Willy Lulciuc ([@wslulciuc](https://github.com/wslulciuc)), Paweł Leszczyński ([@pawel-big-lebowski](https://github.com/pawel-big-lebowski))
4+
5+
Created: 2022-08-18
6+
7+
Discussion: [https://github.com/MarquezProject/marquez/issues/2078](https://github.com/MarquezProject/marquez/issues/2078)
8+
9+
## Overview
10+
11+
[OpenLineage](https://openlineage.io) was initially prototyped using Marquez, with the [initial draft](https://github.com/OpenLineage/OpenLineage/blob/main/CHANGELOG.md#010---2021-08-12) of the spec taking inspiration from Marquez's [data model](https://lucid.app/lucidchart/f918ce01-9eb4-4900-b266-49935da271b8/view?page=8xAE.zxyknLQ#). OpenLineage events are collected via [`POST` `/lineage`](https://marquezproject.github.io/marquez/openapi.html#tag/Lineage) calls, and can be queried via the `lineage_events` table using the `run_uuid` associated with the event. The _current_ schema for the `lineage_events` table is defined below:
12+
13+
### Table `lineage_events`
14+
15+
| **COLUMN** | **TYPE** |
16+
|---------------|--------------|
17+
| run_uuid | `UUID` |
18+
| event_time | `TIMESTAMPZ` |
19+
| event_type | `TEXT` |
20+
| event | `JSONB` |
21+
| job_name | `TEXT` |
22+
| job_namespace | `TEXT` |
23+
| producer | `TEXT` |
24+
25+
> **Note:** The table has an index on `run_uuid` and a _multicolumn_ index on `job_name`, `job_namespace`.
26+
27+
The `event` column contains the raw OpenLineage event. When Marquez handles the event, metadata for datasets, jobs, and runs are inserted into their respective tables.
28+
29+
OpenLineage's core model is extensible via _facets_. A `facet` is user-defined metadata and enables entity enrichment. Initially, returning dataset, job, and run facets via the REST API was not supported, but eventually added in release [`0.14.0`](https://github.com/MarquezProject/marquez/compare/0.13.1...0.14.0). The implementation was simple: when querying the `datasets`, `jobs`, or `runs` tables, also query the `lineage_events` table for facets.
30+
31+
We knew the initial implementation would have to be revisited eventually. That is, the `lineage_events` table may have multiple events for a given `run_uuid` that can easily exceed **>** **`10MBs`** per event, resulting in out-of-memory (OOM) errors as facet queries require first loading the raw `event` in memory, then filtering for any relevant facets. For example, the query snippet below is used to query the `datasets` table:
32+
33+
```sql
34+
.
35+
.
36+
LEFT JOIN LATERAL (
37+
SELECT run_uuid, event_time, event FROM lineage_events
38+
WHERE run_uuid = dv.run_uuid
39+
) e ON e.run_uuid = dv.run_uuid
40+
.
41+
.
42+
LEFT JOIN (
43+
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
44+
FROM dataset_runs d2,
45+
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
46+
WHERE d2.run_uuid = d2.run_uuid
47+
AND ds -> 'facets' IS NOT NULL
48+
AND ds ->> 'name' = d2.name
49+
AND ds ->> 'namespace' = d2.namespace_name
50+
GROUP BY d2.uuid
51+
) f ON f.dataset_uuid = d.uuid")
52+
```
53+
54+
In the above query, the `inputs` and `outputs` dataset facets for each event are aggregated, then ordered by the event time. This proposal outlines how we can optimize query performance for OpenLineage facets that limit access to the `lineage_events` table.
55+
56+
## Proposal
57+
58+
To improve query performance for facets, and avoid querying the `lineage_events` table, we propose adding the following tables to group facets by how they will be accessed:
59+
60+
### Table `dataset_facets`
61+
62+
| **COLUMN** | **TYPE** |
63+
|--------------------|---------------|
64+
| uuid **(PK)** | `UUID` |
65+
| created_at | `TIMESTAMPTZ` |
66+
| run_uuid | `UUID` |
67+
| lineage_event_time | `TIMESTAMPTZ` |
68+
| lineage_event_type | `VARCHAR` |
69+
| type | `VARCHAR` |
70+
| name | `VARCHAR` |
71+
| facet | `JSONB` |
72+
73+
> **Table 1:** Facets for a given dataset.
74+
75+
> **Note:** The `type` used to determine the type of facet: `DATASET`, `INPUT`, `OUTPUT` (see the [_Standard Facets_](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.md#dataset-facets) section in the OpenLineage spec).
76+
77+
### Table `job_facets`
78+
79+
| **COLUMN** | **TYPE** |
80+
|--------------------|---------------|
81+
| uuid **(PK)** | `UUID` |
82+
| created_at | `TIMESTAMPTZ` |
83+
| run_uuid | `UUID` |
84+
| lineage_event_time | `TIMESTAMPTZ` |
85+
| lineage_event_type | `VARCHAR` |
86+
| name | `VARCHAR` |
87+
| facet | `JSONB` |
88+
89+
> **Table 2:** Facets for a given job.
90+
91+
### Table `run_facets`
92+
93+
| **COLUMN** | **TYPE** |
94+
|--------------------|---------------|
95+
| uuid **(PK)** | `UUID` |
96+
| created_at | `TIMESTAMPTZ` |
97+
| run_uuid | `UUID` |
98+
| lineage_event_time | `TIMESTAMPTZ` |
99+
| lineage_event_type | `VARCHAR` |
100+
| name | `VARCHAR` |
101+
| facet | `JSONB` |
102+
103+
> **Table 3:** Facets for a given run.
104+
105+
Note, facet tables will be:
106+
107+
* Append only, mirroring the current insertion pattern of the `lineage_events` table; therefore, avoiding facet conflicts
108+
* Merging facets will follow a _first-to-last_ received order; meaning, facet rows will be merged post query using [`MapperUtils.toFacetsOrNull()`](https://github.com/MarquezProject/marquez/blob/main/api/src/main/java/marquez/db/mappers/MapperUtils.java#L50) mirroring the current logic (i.e. newer facets will be added or override older facet values based on when the OpenLineage event was received)
109+
* Indexed on `run_uuid`
110+
111+
## Implementation
112+
113+
The implementation requires:
114+
115+
1. Schema changes to create the facet tables outlined above.
116+
2. Using the facet tables instead the `lineage_events` table to query for facets.
117+
3. Lazy migration, the facet tables will be queried, and if no facets are returned, then the `lineage_events` table; this approach avoids a backfill, but one will still be needed.
118+
119+
## Migration procedure
120+
121+
Following challenges need to be addressed to provide a successful migration procedure:
122+
* Rewriting existing `lineage_events` row to lineage datasets can be expensive DB operation for large Marquez instances.
123+
* Migration procedure should minimize downtime for processing new OL events.
124+
* Users need to be able to revert in case of migration failure.
125+
126+
Based on that, a migration procedure will be split into two jumps. The first revertible step will:
127+
* create new tables: `dataset_facets`, `job_facets` and `run_facets`,
128+
* introduce code change that writes into two new tables,
129+
* provide procedure to migrate facets from `lineage_events` table into `dataset_facets`, `job_facets` and `run_facets`.
130+
131+
The second irreversible step will perform the cleanup of `lineage_events` table.
132+
The second migration step will be published in a future release.
133+
134+
We distinguish two migration modes:
135+
* `BASIC` when there is up to 100K rows in `lineage_events`.
136+
* `PRO` for Marquez instances with more than 100K records.
137+
138+
Migration script will run `BASIC` version automatically if the condition is met.
139+
`PRO` mode will require extra manual steps.
140+
141+
### BASIC MODE < 100K records
142+
143+
A flyaway DB script to create tables and migrate data will be created. This will be recommended for users who
144+
have up to 100K records in `lineage_events` table assuming each event is ~20KB size.
145+
Performance test for such scenario should be run during implementation phase.
146+
Such users may experience a few minute long downtime and should be OK with that while being
147+
clearly informed on that in `CHANGELOG`.
148+
149+
### PRO MODE > 100K records
150+
151+
Flyway migration engine runs the migration in transactions.
152+
Updating whole `lineage_events` table in a single transaction could be dangerous.
153+
The upgrade procedure will look the same as `BASIC` mode except for data migration from `lineage_events` table to
154+
`dataset_facets`, `job_facets` and `run_facets` tables which will not be triggered automatically nor done
155+
in a single run. Migration procedure command will be introduced and require manual trigger.
156+
Migrating facets from `lineage_events` will be done in chunks and the chunk size will be configurable.
157+
API will be able to receive incoming OpenLineage events while the data migration script will be running.
158+
159+
An extra `v55_migration_lock` table will be introduced:
160+
161+
| **COLUMN** | **TYPE** |
162+
|------------|---------------|
163+
| run_uuid | `UUID` |
164+
| created_at | `TIMESTAMPTZ` |
165+
166+
Data migration will be run in chunks each chunk will
167+
contain events older than rows in `v55_migration_lock` table.
168+
169+
```
170+
WITH events_chunk AS (
171+
SELECT * FROM lineage_events
172+
JOIN migration_lock m
173+
WHERE lineage_events.created_at < migration_lock.created_at
174+
OR (lineage_events.created_at = migration_lock.created_at AND lineage_events.run_uuid < migration_lock.run_uuid)
175+
ORDER BY created_at DESC, run_uuid DESC -- start with latest and move to older events
176+
LIMIT :chunk_size
177+
),
178+
insert_datasets AS (
179+
INSERT INTO dataset_facets
180+
SELECT ... FROM events_chunk
181+
),
182+
insert_runs AS (
183+
INSERT INTO run_facets
184+
SELECT ... FROM events_chunk
185+
),
186+
insert_jobs AS (
187+
INSERT INTO job_facets
188+
SELECT ... FROM events_chunk
189+
),
190+
INSERT INTO v55_migration_lock -- insert lock for the oldest event migrated
191+
SELECT events_chunk.created_at, event_chunk_run_uuid
192+
FROM events_chunk
193+
ORDER BY created_at ASC , run_uuid ASC
194+
LIMIT 1
195+
```
196+
Such a query will be run until `created_at` and `run_uuid` in `v55_migration_lock` will equal:
197+
```
198+
SELECT run_uuid, created_at FROM lineage_events ORDER BY created_at ASC, run_uuid ASC LIMIT 1;
199+
```
200+
The second migration step will not start unless the condition is met.
201+
For users, who attempt to run two migration steps in a single run,
202+
the second step will fail and ask to manually run data migration command and retry migration after
203+
the command runs successfully. Table `migration_lock` will be dropped at the end of second migration step.

0 commit comments

Comments
 (0)