Skip to content

Commit c15d768

Browse files
roeapwjones127
andauthored
feat: handle protocol compatibility (#1807)
# Description In preparation for further improvements to protocol support, this PR introduces a `ProtocolChecker` which validates that we can read form / write to / commit to a specific table. So far everything is expressed in table features in the hopes, that this keeps on giving correct behaviours as we add more table features. The existing support for append only is integrated and extended to handle enablement logic according to the protocol. --------- Co-authored-by: Will Jones <willjones127@gmail.com>
1 parent 4d103a7 commit c15d768

16 files changed

Lines changed: 521 additions & 89 deletions

File tree

crates/deltalake-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ tempfile = "3"
130130
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
131131
utime = "0.3"
132132
hyper = { version = "0.14", features = ["server"] }
133+
criterion = "0.5"
133134

134135
[features]
135136
azure = ["object_store/azure"]

crates/deltalake-core/benches/read_checkpoint.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use criterion::{criterion_group, criterion_main, Criterion};
2-
use deltalake::table::state::DeltaTableState;
3-
use deltalake::DeltaTableConfig;
2+
use deltalake_core::table::state::DeltaTableState;
3+
use deltalake_core::DeltaTableConfig;
44
use std::fs::File;
55
use std::io::Read;
66

crates/deltalake-core/src/kernel/actions/types.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::{HashMap, HashSet};
2+
use std::fmt;
23
use std::str::FromStr;
34
// use std::io::{Cursor, Read};
45
// use std::sync::Arc;
@@ -225,6 +226,24 @@ impl From<String> for ReaderFeatures {
225226
}
226227
}
227228

229+
impl AsRef<str> for ReaderFeatures {
230+
fn as_ref(&self) -> &str {
231+
match self {
232+
ReaderFeatures::ColumnMapping => "columnMapping",
233+
ReaderFeatures::DeleteionVecotrs => "deletionVectors",
234+
ReaderFeatures::TimestampWithoutTimezone => "timestampNtz",
235+
ReaderFeatures::V2Checkpoint => "v2Checkpoint",
236+
ReaderFeatures::Other(f) => f,
237+
}
238+
}
239+
}
240+
241+
impl fmt::Display for ReaderFeatures {
242+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243+
write!(f, "{}", self.as_ref())
244+
}
245+
}
246+
228247
/// Features table writers can support as well as let users know
229248
/// what is supported
230249
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
@@ -303,6 +322,33 @@ impl From<String> for WriterFeatures {
303322
}
304323
}
305324

325+
impl AsRef<str> for WriterFeatures {
326+
fn as_ref(&self) -> &str {
327+
match self {
328+
WriterFeatures::AppendOnly => "appendOnly",
329+
WriterFeatures::Invariants => "invariants",
330+
WriterFeatures::CheckConstraints => "checkConstraints",
331+
WriterFeatures::ChangeDataFeed => "changeDataFeed",
332+
WriterFeatures::GeneratedColumns => "generatedColumns",
333+
WriterFeatures::ColumnMapping => "columnMapping",
334+
WriterFeatures::IdentityColumns => "identityColumns",
335+
WriterFeatures::DeleteionVecotrs => "deletionVectors",
336+
WriterFeatures::RowTracking => "rowTracking",
337+
WriterFeatures::TimestampWithoutTimezone => "timestampNtz",
338+
WriterFeatures::DomainMetadata => "domainMetadata",
339+
WriterFeatures::V2Checkpoint => "v2Checkpoint",
340+
WriterFeatures::IcebergCompatV1 => "icebergCompatV1",
341+
WriterFeatures::Other(f) => f,
342+
}
343+
}
344+
}
345+
346+
impl fmt::Display for WriterFeatures {
347+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348+
write!(f, "{}", self.as_ref())
349+
}
350+
}
351+
306352
#[cfg(all(not(feature = "parquet2"), feature = "parquet"))]
307353
impl From<&parquet::record::Field> for WriterFeatures {
308354
fn from(value: &parquet::record::Field) -> Self {

crates/deltalake-core/src/operations/create.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ use std::sync::Arc;
77
use futures::future::BoxFuture;
88
use serde_json::{Map, Value};
99

10-
use super::transaction::commit;
11-
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
10+
use super::transaction::{commit, PROTOCOL};
1211
use crate::errors::{DeltaResult, DeltaTableError};
1312
use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType};
1413
use crate::logstore::{LogStore, LogStoreRef};
@@ -245,8 +244,8 @@ impl CreateBuilder {
245244
_ => unreachable!(),
246245
})
247246
.unwrap_or_else(|| Protocol {
248-
min_reader_version: MAX_SUPPORTED_READER_VERSION,
249-
min_writer_version: MAX_SUPPORTED_WRITER_VERSION,
247+
min_reader_version: PROTOCOL.default_reader_version(),
248+
min_writer_version: PROTOCOL.default_writer_version(),
250249
writer_features: None,
251250
reader_features: None,
252251
});
@@ -391,8 +390,14 @@ mod tests {
391390
.await
392391
.unwrap();
393392
assert_eq!(table.version(), 0);
394-
assert_eq!(table.get_min_reader_version(), MAX_SUPPORTED_READER_VERSION);
395-
assert_eq!(table.get_min_writer_version(), MAX_SUPPORTED_WRITER_VERSION);
393+
assert_eq!(
394+
table.get_min_reader_version(),
395+
PROTOCOL.default_reader_version()
396+
);
397+
assert_eq!(
398+
table.get_min_writer_version(),
399+
PROTOCOL.default_writer_version()
400+
);
396401
assert_eq!(table.schema().unwrap(), &schema);
397402

398403
// check we can overwrite default settings via adding actions

crates/deltalake-core/src/operations/delete.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use parquet::file::properties::WriterProperties;
3434
use serde::Serialize;
3535
use serde_json::Value;
3636

37+
use super::datafusion_utils::Expression;
38+
use super::transaction::PROTOCOL;
3739
use crate::delta_datafusion::expr::fmt_expr_to_sql;
3840
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
3941
use crate::errors::{DeltaResult, DeltaTableError};
@@ -44,8 +46,6 @@ use crate::protocol::DeltaOperation;
4446
use crate::table::state::DeltaTableState;
4547
use crate::DeltaTable;
4648

47-
use super::datafusion_utils::Expression;
48-
4949
/// Delete Records from the Delta Table.
5050
/// See this module's documentation for more information
5151
pub struct DeleteBuilder {
@@ -274,6 +274,8 @@ impl std::future::IntoFuture for DeleteBuilder {
274274
let mut this = self;
275275

276276
Box::pin(async move {
277+
PROTOCOL.can_write_to(&this.snapshot)?;
278+
277279
let state = this.state.unwrap_or_else(|| {
278280
let session = SessionContext::new();
279281

crates/deltalake-core/src/operations/load.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
66
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
77
use futures::future::BoxFuture;
88

9+
use super::transaction::PROTOCOL;
910
use crate::errors::{DeltaResult, DeltaTableError};
1011
use crate::logstore::LogStoreRef;
1112
use crate::table::state::DeltaTableState;
@@ -46,6 +47,8 @@ impl std::future::IntoFuture for LoadBuilder {
4647
let this = self;
4748

4849
Box::pin(async move {
50+
PROTOCOL.can_read_from(&this.snapshot)?;
51+
4952
let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
5053
let schema = table.state.arrow_schema()?;
5154
let projection = this

crates/deltalake-core/src/operations/merge.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use serde::Serialize;
6464
use serde_json::Value;
6565

6666
use super::datafusion_utils::{into_expr, maybe_into_expr, Expression};
67-
use super::transaction::commit;
67+
use super::transaction::{commit, PROTOCOL};
6868
use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression};
6969
use crate::delta_datafusion::{register_store, DeltaScanBuilder};
7070
use crate::kernel::{Action, Remove};
@@ -1208,6 +1208,8 @@ impl std::future::IntoFuture for MergeBuilder {
12081208
let mut this = self;
12091209

12101210
Box::pin(async move {
1211+
PROTOCOL.can_write_to(&this.snapshot)?;
1212+
12111213
let state = this.state.unwrap_or_else(|| {
12121214
let session = SessionContext::new();
12131215

crates/deltalake-core/src/operations/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,6 @@ pub mod write;
5050
#[cfg(all(feature = "arrow", feature = "parquet"))]
5151
pub mod writer;
5252

53-
/// Maximum supported writer version
54-
pub const MAX_SUPPORTED_WRITER_VERSION: i32 = 1;
55-
/// Maximum supported reader version
56-
pub const MAX_SUPPORTED_READER_VERSION: i32 = 1;
57-
5853
/// High level interface for executing commands against a DeltaTable
5954
pub struct DeltaOps(pub DeltaTable);
6055

crates/deltalake-core/src/operations/optimize.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use parquet::errors::ParquetError;
3838
use parquet::file::properties::WriterProperties;
3939
use serde::{Deserialize, Serialize};
4040

41-
use super::transaction::commit;
41+
use super::transaction::{commit, PROTOCOL};
4242
use super::writer::{PartitionWriter, PartitionWriterConfig};
4343
use crate::errors::{DeltaResult, DeltaTableError};
4444
use crate::kernel::{Action, Remove};
@@ -260,6 +260,8 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
260260
let this = self;
261261

262262
Box::pin(async move {
263+
PROTOCOL.can_write_to(&this.snapshot)?;
264+
263265
let writer_properties = this.writer_properties.unwrap_or_else(|| {
264266
WriterProperties::builder()
265267
.set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))

crates/deltalake-core/src/operations/restore.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,6 @@ async fn execute(
245245
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
246246
},
247247
&actions,
248-
&snapshot,
249248
None,
250249
)
251250
.await?;

0 commit comments

Comments
 (0)