|
| 1 | +# Proposal: Optimize query performance for OpenLineage facets |
| 2 | + |
| 3 | +Author: Willy Lulciuc ([@wslulciuc](https://github.com/wslulciuc)), Paweł Leszczyński ([@pawel-big-lebowski](https://github.com/pawel-big-lebowski)) |
| 4 | + |
| 5 | +Created: 2022-08-18 |
| 6 | + |
| 7 | +Discussion: [https://github.com/MarquezProject/marquez/issues/2078](https://github.com/MarquezProject/marquez/issues/2078) |
| 8 | + |
| 9 | +## Overview |
| 10 | + |
| 11 | +[OpenLineage](https://openlineage.io) was initially prototyped using Marquez, with the [initial draft](https://github.com/OpenLineage/OpenLineage/blob/main/CHANGELOG.md#010---2021-08-12) of the spec taking inspiration from Marquez's [data model](https://lucid.app/lucidchart/f918ce01-9eb4-4900-b266-49935da271b8/view?page=8xAE.zxyknLQ#). OpenLineage events are collected via [`POST` `/lineage`](https://marquezproject.github.io/marquez/openapi.html#tag/Lineage) calls, and can be queried via the `lineage_events` table using the `run_uuid` associated with the event. The _current_ schema for the `lineage_events` table is defined below: |
| 12 | + |
| 13 | +### Table `lineage_events` |
| 14 | + |
| 15 | +| **COLUMN** | **TYPE** | |
| 16 | +|---------------|--------------| |
| 17 | +| run_uuid | `UUID` | |
| 18 | +| event_time | `TIMESTAMPZ` | |
| 19 | +| event_type | `TEXT` | |
| 20 | +| event | `JSONB` | |
| 21 | +| job_name | `TEXT` | |
| 22 | +| job_namespace | `TEXT` | |
| 23 | +| producer | `TEXT` | |
| 24 | + |
| 25 | +> **Note:** The table has an index on `run_uuid` and a _multicolumn_ index on `job_name`, `job_namespace`. |
| 26 | +
|
| 27 | +The `event` column contains the raw OpenLineage event. When Marquez handles the event, metadata for datasets, jobs, and runs are inserted into their respective tables. |
| 28 | + |
| 29 | +OpenLineage's core model is extensible via _facets_. A `facet` is user-defined metadata and enables entity enrichment. Initially, returning dataset, job, and run facets via the REST API was not supported, but eventually added in release [`0.14.0`](https://github.com/MarquezProject/marquez/compare/0.13.1...0.14.0). The implementation was simple: when querying the `datasets`, `jobs`, or `runs` tables, also query the `lineage_events` table for facets. |
| 30 | + |
| 31 | +We knew the initial implementation would have to be revisited eventually. That is, the `lineage_events` table may have multiple events for a given `run_uuid` that can easily exceed **>** **`10MBs`** per event, resulting in out-of-memory (OOM) errors as facet queries require first loading the raw `event` in memory, then filtering for any relevant facets. For example, the query snippet below is used to query the `datasets` table: |
| 32 | + |
| 33 | +```sql |
| 34 | +. |
| 35 | +. |
| 36 | +LEFT JOIN LATERAL ( |
| 37 | + SELECT run_uuid, event_time, event FROM lineage_events |
| 38 | + WHERE run_uuid = dv.run_uuid |
| 39 | +) e ON e.run_uuid = dv.run_uuid |
| 40 | +. |
| 41 | +. |
| 42 | +LEFT JOIN ( |
| 43 | + SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets |
| 44 | + FROM dataset_runs d2, |
| 45 | + jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds |
| 46 | + WHERE d2.run_uuid = d2.run_uuid |
| 47 | + AND ds -> 'facets' IS NOT NULL |
| 48 | + AND ds ->> 'name' = d2.name |
| 49 | + AND ds ->> 'namespace' = d2.namespace_name |
| 50 | + GROUP BY d2.uuid |
| 51 | +) f ON f.dataset_uuid = d.uuid") |
| 52 | +``` |
| 53 | +
|
| 54 | +In the above query, the `inputs` and `outputs` dataset facets for each event are aggregated, then ordered by the event time. This proposal outlines how we can optimize query performance for OpenLineage facets that limit access to the `lineage_events` table. |
| 55 | +
|
| 56 | +## Proposal |
| 57 | +
|
| 58 | +To improve query performance for facets, and avoid querying the `lineage_events` table, we propose adding the following tables to group facets by how they will be accessed: |
| 59 | +
|
| 60 | +### Table `dataset_facets` |
| 61 | +
|
| 62 | +| **COLUMN** | **TYPE** | |
| 63 | +|--------------------|---------------| |
| 64 | +| uuid **(PK)** | `UUID` | |
| 65 | +| created_at | `TIMESTAMPTZ` | |
| 66 | +| run_uuid | `UUID` | |
| 67 | +| lineage_event_time | `TIMESTAMPTZ` | |
| 68 | +| lineage_event_type | `VARCHAR` | |
| 69 | +| type | `VARCHAR` | |
| 70 | +| name | `VARCHAR` | |
| 71 | +| facet | `JSONB` | |
| 72 | +
|
| 73 | +> **Table 1:** Facets for a given dataset. |
| 74 | +
|
| 75 | +> **Note:** The `type` used to determine the type of facet: `DATASET`, `INPUT`, `OUTPUT` (see the [_Standard Facets_](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.md#dataset-facets) section in the OpenLineage spec). |
| 76 | +
|
| 77 | +### Table `job_facets` |
| 78 | +
|
| 79 | +| **COLUMN** | **TYPE** | |
| 80 | +|--------------------|---------------| |
| 81 | +| uuid **(PK)** | `UUID` | |
| 82 | +| created_at | `TIMESTAMPTZ` | |
| 83 | +| run_uuid | `UUID` | |
| 84 | +| lineage_event_time | `TIMESTAMPTZ` | |
| 85 | +| lineage_event_type | `VARCHAR` | |
| 86 | +| name | `VARCHAR` | |
| 87 | +| facet | `JSONB` | |
| 88 | +
|
| 89 | +> **Table 2:** Facets for a given job. |
| 90 | +
|
| 91 | +### Table `run_facets` |
| 92 | +
|
| 93 | +| **COLUMN** | **TYPE** | |
| 94 | +|--------------------|---------------| |
| 95 | +| uuid **(PK)** | `UUID` | |
| 96 | +| created_at | `TIMESTAMPTZ` | |
| 97 | +| run_uuid | `UUID` | |
| 98 | +| lineage_event_time | `TIMESTAMPTZ` | |
| 99 | +| lineage_event_type | `VARCHAR` | |
| 100 | +| name | `VARCHAR` | |
| 101 | +| facet | `JSONB` | |
| 102 | +
|
| 103 | +> **Table 3:** Facets for a given run. |
| 104 | +
|
| 105 | +Note, facet tables will be: |
| 106 | +
|
| 107 | +* Append only, mirroring the current insertion pattern of the `lineage_events` table; therefore, avoiding facet conflicts |
| 108 | +* Merging facets will follow a _first-to-last_ received order; meaning, facet rows will be merged post query using [`MapperUtils.toFacetsOrNull()`](https://github.com/MarquezProject/marquez/blob/main/api/src/main/java/marquez/db/mappers/MapperUtils.java#L50) mirroring the current logic (i.e. newer facets will be added or override older facet values based on when the OpenLineage event was received) |
| 109 | +* Indexed on `run_uuid` |
| 110 | +
|
| 111 | +## Implementation |
| 112 | +
|
| 113 | +The implementation requires: |
| 114 | +
|
| 115 | +1. Schema changes to create the facet tables outlined above. |
| 116 | +2. Using the facet tables instead the `lineage_events` table to query for facets. |
| 117 | +3. Lazy migration, the facet tables will be queried, and if no facets are returned, then the `lineage_events` table; this approach avoids a backfill, but one will still be needed. |
| 118 | +
|
| 119 | +## Migration procedure |
| 120 | +
|
| 121 | +Following challenges need to be addressed to provide a successful migration procedure: |
| 122 | +* Rewriting existing `lineage_events` row to lineage datasets can be expensive DB operation for large Marquez instances. |
| 123 | +* Migration procedure should minimize downtime for processing new OL events. |
| 124 | +* Users need to be able to revert in case of migration failure. |
| 125 | +
|
| 126 | +Based on that, a migration procedure will be split into two jumps. The first revertible step will: |
| 127 | +* create new tables: `dataset_facets`, `job_facets` and `run_facets`, |
| 128 | +* introduce code change that writes into two new tables, |
| 129 | +* provide procedure to migrate facets from `lineage_events` table into `dataset_facets`, `job_facets` and `run_facets`. |
| 130 | +
|
| 131 | +The second irreversible step will perform the cleanup of `lineage_events` table. |
| 132 | +The second migration step will be published in a future release. |
| 133 | +
|
| 134 | +We distinguish two migration modes: |
| 135 | +* `BASIC` when there is up to 100K rows in `lineage_events`. |
| 136 | +* `PRO` for Marquez instances with more than 100K records. |
| 137 | +
|
| 138 | +Migration script will run `BASIC` version automatically if the condition is met. |
| 139 | +`PRO` mode will require extra manual steps. |
| 140 | +
|
| 141 | +### BASIC MODE < 100K records |
| 142 | +
|
| 143 | +A flyaway DB script to create tables and migrate data will be created. This will be recommended for users who |
| 144 | +have up to 100K records in `lineage_events` table assuming each event is ~20KB size. |
| 145 | +Performance test for such scenario should be run during implementation phase. |
| 146 | +Such users may experience a few minute long downtime and should be OK with that while being |
| 147 | +clearly informed on that in `CHANGELOG`. |
| 148 | +
|
| 149 | +### PRO MODE > 100K records |
| 150 | +
|
| 151 | +Flyway migration engine runs the migration in transactions. |
| 152 | +Updating whole `lineage_events` table in a single transaction could be dangerous. |
| 153 | +The upgrade procedure will look the same as `BASIC` mode except for data migration from `lineage_events` table to |
| 154 | +`dataset_facets`, `job_facets` and `run_facets` tables which will not be triggered automatically nor done |
| 155 | +in a single run. Migration procedure command will be introduced and require manual trigger. |
| 156 | +Migrating facets from `lineage_events` will be done in chunks and the chunk size will be configurable. |
| 157 | +API will be able to receive incoming OpenLineage events while the data migration script will be running. |
| 158 | +
|
| 159 | +An extra `v55_migration_lock` table will be introduced: |
| 160 | +
|
| 161 | +| **COLUMN** | **TYPE** | |
| 162 | +|------------|---------------| |
| 163 | +| run_uuid | `UUID` | |
| 164 | +| created_at | `TIMESTAMPTZ` | |
| 165 | +
|
| 166 | +Data migration will be run in chunks each chunk will |
| 167 | +contain events older than rows in `v55_migration_lock` table. |
| 168 | +
|
| 169 | +``` |
| 170 | +WITH events_chunk AS ( |
| 171 | + SELECT * FROM lineage_events |
| 172 | + JOIN migration_lock m |
| 173 | + WHERE lineage_events.created_at < migration_lock.created_at |
| 174 | + OR (lineage_events.created_at = migration_lock.created_at AND lineage_events.run_uuid < migration_lock.run_uuid) |
| 175 | + ORDER BY created_at DESC, run_uuid DESC -- start with latest and move to older events |
| 176 | + LIMIT :chunk_size |
| 177 | +), |
| 178 | +insert_datasets AS ( |
| 179 | + INSERT INTO dataset_facets |
| 180 | + SELECT ... FROM events_chunk |
| 181 | +), |
| 182 | +insert_runs AS ( |
| 183 | + INSERT INTO run_facets |
| 184 | + SELECT ... FROM events_chunk |
| 185 | +), |
| 186 | +insert_jobs AS ( |
| 187 | + INSERT INTO job_facets |
| 188 | + SELECT ... FROM events_chunk |
| 189 | +), |
| 190 | +INSERT INTO v55_migration_lock -- insert lock for the oldest event migrated |
| 191 | +SELECT events_chunk.created_at, event_chunk_run_uuid |
| 192 | +FROM events_chunk |
| 193 | +ORDER BY created_at ASC , run_uuid ASC |
| 194 | +LIMIT 1 |
| 195 | +``` |
| 196 | +Such a query will be run until `created_at` and `run_uuid` in `v55_migration_lock` will equal: |
| 197 | +``` |
| 198 | +SELECT run_uuid, created_at FROM lineage_events ORDER BY created_at ASC, run_uuid ASC LIMIT 1; |
| 199 | +``` |
| 200 | +The second migration step will not start unless the condition is met. |
| 201 | +For users, who attempt to run two migration steps in a single run, |
| 202 | +the second step will fail and ask to manually run data migration command and retry migration after |
| 203 | +the command runs successfully. Table `migration_lock` will be dropped at the end of second migration step. |
0 commit comments