|
| 1 | +import os |
| 2 | + |
| 3 | +import pytest |
| 4 | +from arro3.core import Array, ChunkedArray, DataType, Table |
| 5 | +from numpy.random import standard_normal |
| 6 | + |
| 7 | +from deltalake import DeltaTable, QueryBuilder, write_deltalake |
| 8 | + |
| 9 | +# NOTE: make sure to run these in release mode with |
| 10 | +# MATURIN_EXTRA_ARGS=--release make develop |
| 11 | +# When profiling, use: |
| 12 | +# MATURIN_EXTRA_ARGS="--profile release-with-debug" make develop |
| 13 | + |
| 14 | + |
| 15 | +@pytest.fixture() |
| 16 | +def sample_table() -> Table: |
| 17 | + max_size_bytes = 128 * 1024 * 1024 |
| 18 | + ncols = 20 |
| 19 | + nrows = max_size_bytes // 20 // 8 |
| 20 | + tab = Table.from_pydict({f"x{i}": standard_normal(nrows) for i in range(ncols)}) |
| 21 | + # Add index column for sorting |
| 22 | + tab = tab.append_column( |
| 23 | + "i", ChunkedArray(Array(range(nrows), type=DataType.int64())) |
| 24 | + ) |
| 25 | + return tab |
| 26 | + |
| 27 | + |
| 28 | +@pytest.mark.benchmark(group="write") |
| 29 | +def test_benchmark_write(benchmark, sample_table, tmp_path): |
| 30 | + benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite") |
| 31 | + |
| 32 | + dt = DeltaTable(str(tmp_path)) |
| 33 | + assert ( |
| 34 | + QueryBuilder().register("tbl", dt).execute("select * from tbl order by id") |
| 35 | + == sample_table |
| 36 | + ) |
| 37 | + |
| 38 | + |
| 39 | +@pytest.mark.pyarrow |
| 40 | +@pytest.mark.benchmark(group="read") |
| 41 | +def test_benchmark_read(benchmark, sample_table, tmp_path): |
| 42 | + import pyarrow as pa |
| 43 | + |
| 44 | + write_deltalake(str(tmp_path), sample_table) |
| 45 | + dt = DeltaTable(str(tmp_path)) |
| 46 | + |
| 47 | + result = benchmark(dt.to_pyarrow_table) |
| 48 | + assert result.sort_by("i") == pa.table(sample_table) |
| 49 | + |
| 50 | + |
| 51 | +@pytest.mark.pyarrow |
| 52 | +@pytest.mark.benchmark(group="read") |
| 53 | +def test_benchmark_read_pyarrow(benchmark, sample_table, tmp_path): |
| 54 | + import pyarrow as pa |
| 55 | + import pyarrow.fs as pa_fs |
| 56 | + |
| 57 | + write_deltalake(str(tmp_path), sample_table) |
| 58 | + dt = DeltaTable(str(tmp_path)) |
| 59 | + |
| 60 | + fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem()) |
| 61 | + result = benchmark(dt.to_pyarrow_table, filesystem=fs) |
| 62 | + assert result.sort_by("i") == pa.table(sample_table) |
| 63 | + |
| 64 | + |
| 65 | +@pytest.mark.benchmark(group="optimize") |
| 66 | +@pytest.mark.parametrize("max_tasks", [1, 5]) |
| 67 | +def test_benchmark_optimize(benchmark, sample_table, tmp_path, max_tasks): |
| 68 | + # Create 2 partitions, each partition with 10 files. |
| 69 | + # Each file is about 100MB, so the total size is 2GB. |
| 70 | + files_per_part = 10 |
| 71 | + parts = ["a", "b", "c", "d", "e"] |
| 72 | + |
| 73 | + nrows = int(sample_table.num_rows / files_per_part) |
| 74 | + for part in parts: |
| 75 | + tab = sample_table.slice(0, nrows) |
| 76 | + tab = tab.append_column( |
| 77 | + "part", ChunkedArray(Array([part] * nrows), DataType.int64()) |
| 78 | + ) |
| 79 | + for _ in range(files_per_part): |
| 80 | + write_deltalake(tmp_path, tab, mode="append", partition_by=["part"]) |
| 81 | + |
| 82 | + dt = DeltaTable(tmp_path) |
| 83 | + dt = DeltaTable(tmp_path) |
| 84 | + |
| 85 | + dt = DeltaTable(tmp_path) |
| 86 | + |
| 87 | + assert len(dt.files()) == files_per_part * len(parts) |
| 88 | + initial_version = dt.version() |
| 89 | + |
| 90 | + def setup(): |
| 91 | + # Instead of recreating the table for each benchmark run, we just delete |
| 92 | + # the optimize log file |
| 93 | + optimize_version = initial_version + 1 |
| 94 | + try: |
| 95 | + os.remove( |
| 96 | + os.path.join(tmp_path, "_delta_log", f"{optimize_version:020}.json") |
| 97 | + ) |
| 98 | + except FileNotFoundError: |
| 99 | + pass |
| 100 | + |
| 101 | + # Reload the table after we have altered the log |
| 102 | + dt = DeltaTable(tmp_path) |
| 103 | + assert dt.version() == initial_version |
| 104 | + |
| 105 | + return (dt,), dict(max_concurrent_tasks=max_tasks) |
| 106 | + |
| 107 | + def func(dt, max_concurrent_tasks): |
| 108 | + return dt.optimize.compact( |
| 109 | + max_concurrent_tasks=max_concurrent_tasks, target_size=1024 * 1024 * 1024 |
| 110 | + ) |
| 111 | + |
| 112 | + # We need to recreate the table for each benchmark run |
| 113 | + results = benchmark.pedantic(func, setup=setup, rounds=5) |
| 114 | + |
| 115 | + assert results["numFilesRemoved"] == 50 |
| 116 | + assert results["numFilesAdded"] == 5 |
| 117 | + assert results["partitionsOptimized"] == 5 |
0 commit comments