Skip to content

Commit a7aa49b

Browse files
authored
chore: remove peek_next_commit from LogStore (delta-io#4059)
# Description After removing deprecated methods from `DeltaTable` we no longer require or use `peek_next_commit`. Comments hint at that we were looking forward to remove this anyways. So this PR removed `peek_next_commit` from `LogStore` and removes associated code. Signed-off-by: Robert Pack <robstar.pack@gmail.com>
1 parent c033c96 commit a7aa49b

3 files changed

Lines changed: 0 additions & 89 deletions

File tree

crates/core/src/lib.rs

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ mod tests {
196196
use futures::TryStreamExt as _;
197197

198198
use super::*;
199-
use crate::table::PeekCommit;
200199
use test_utils::file_paths_from;
201200

202201
#[tokio::test]
@@ -623,39 +622,6 @@ mod tests {
623622
assert_eq!(history3.len(), 5);
624623
}
625624

626-
#[tokio::test]
627-
async fn test_poll_table_commits() -> DeltaResult<()> {
628-
let table_path = std::path::Path::new("../test/tests/data/simple_table_with_checkpoint")
629-
.canonicalize()
630-
.unwrap();
631-
let table_url = url::Url::from_directory_path(table_path).unwrap();
632-
let mut table = crate::open_table_with_version(table_url, 9).await.unwrap();
633-
assert_eq!(table.version(), Some(9));
634-
let peek = table
635-
.log_store()
636-
.peek_next_commit(table.version().unwrap())
637-
.await
638-
.unwrap();
639-
assert!(matches!(peek, PeekCommit::New(..)));
640-
641-
if let PeekCommit::New(version, actions) = peek {
642-
assert_eq!(table.version(), Some(9));
643-
assert_eq!(version, 10);
644-
assert_eq!(actions.len(), 2);
645-
646-
table.update_incremental(None).await.unwrap();
647-
assert_eq!(table.version(), Some(10));
648-
};
649-
650-
let peek = table
651-
.log_store()
652-
.peek_next_commit(table.version().unwrap())
653-
.await
654-
.unwrap();
655-
assert!(matches!(peek, PeekCommit::UpToDate));
656-
Ok(())
657-
}
658-
659625
#[tokio::test]
660626
async fn test_read_vacuumed_log() {
661627
let table_path = std::path::Path::new("../test/tests/data/checkpoints_vacuumed")

crates/core/src/logstore/mod.rs

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -216,16 +216,6 @@ pub enum CommitOrBytes {
216216
LogBytes(Bytes),
217217
}
218218

219-
/// The next commit that's available from underlying storage
220-
///
221-
#[derive(Debug)]
222-
pub enum PeekCommit {
223-
/// The next commit version and associated actions
224-
New(i64, Vec<Action>),
225-
/// Provided DeltaVersion is up to date
226-
UpToDate,
227-
}
228-
229219
/// Configuration parameters for a log store
230220
#[derive(Debug, Clone)]
231221
pub struct LogStoreConfig {
@@ -308,19 +298,6 @@ pub trait LogStore: Send + Sync + AsAny {
308298
/// Find latest version currently stored in the delta log.
309299
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;
310300

311-
/// Get the list of actions for the next commit
312-
async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
313-
let next_version = current_version + 1;
314-
let commit_log_bytes = match self.read_commit_entry(next_version).await {
315-
Ok(Some(bytes)) => Ok(bytes),
316-
Ok(None) => return Ok(PeekCommit::UpToDate),
317-
Err(err) => Err(err),
318-
}?;
319-
320-
let actions = crate::logstore::get_actions(next_version, &commit_log_bytes);
321-
Ok(PeekCommit::New(next_version, actions?))
322-
}
323-
324301
/// Get object store, can pass operation_id for object stores linked to an operation
325302
fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
326303

@@ -463,10 +440,6 @@ impl<T: LogStore + ?Sized> LogStore for Arc<T> {
463440
T::get_latest_version(self, start_version).await
464441
}
465442

466-
async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
467-
T::peek_next_commit(self, current_version).await
468-
}
469-
470443
fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
471444
T::object_store(self, operation_id)
472445
}
@@ -975,31 +948,6 @@ pub(crate) mod tests {
975948
);
976949
}
977950

978-
/// <https://github.com/delta-io/delta-rs/issues/3297>:w
979-
#[tokio::test]
980-
async fn test_peek_with_invalid_json() -> DeltaResult<()> {
981-
use crate::logstore::object_store::memory::InMemory;
982-
let memory_store = Arc::new(InMemory::new());
983-
let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json");
984-
985-
let log_content = r#"{invalid_json"#;
986-
987-
memory_store
988-
.put(&log_path, log_content.into())
989-
.await
990-
.expect("Failed to write log file");
991-
992-
let table_uri = url::Url::parse("memory:///delta-table").unwrap();
993-
let table = crate::DeltaTableBuilder::from_url(table_uri.clone())
994-
.unwrap()
995-
.with_storage_backend(memory_store, table_uri)
996-
.build()?;
997-
998-
let result = table.log_store().peek_next_commit(0).await;
999-
assert!(result.is_err());
1000-
Ok(())
1001-
}
1002-
1003951
/// Collect list stream
1004952
pub(crate) async fn flatten_list_stream(
1005953
storage: &object_store::DynObjectStore,

crates/core/src/table/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@ use crate::logstore::{
2525
use crate::partitions::PartitionFilter;
2626
use crate::{DeltaResult, DeltaTableBuilder, DeltaTableError};
2727

28-
// NOTE: this use can go away when peek_next_commit is removed off of [DeltaTable]
29-
pub use crate::logstore::PeekCommit;
30-
3128
pub mod builder;
3229
pub mod config;
3330
pub mod state;

0 commit comments

Comments
 (0)