@@ -816,44 +816,34 @@ fn find_nested_field<'a>(
816816mod tests {
817817 use std:: collections:: HashMap ;
818818
819- use chrono:: Utc ;
820- use deltalake_test:: utils:: * ;
821819 use futures:: TryStreamExt ;
822820 use itertools:: Itertools ;
823821
824822 use super :: log_segment:: tests:: { concurrent_checkpoint, test_log_segment} ;
825823 use super :: replay:: tests:: test_log_replay;
826824 use super :: * ;
827- use crate :: kernel:: Remove ;
828825 use crate :: protocol:: { DeltaOperation , SaveMode } ;
829- use crate :: test_utils:: ActionFactory ;
826+ use crate :: test_utils:: { assert_batches_sorted_eq , ActionFactory , TestResult , TestTables } ;
830827
831828 #[ tokio:: test]
832829 async fn test_snapshots ( ) -> TestResult {
833- let context = IntegrationContext :: new ( Box :: < LocalStorageIntegration > :: default ( ) ) ?;
834- context. load_table ( TestTables :: Checkpoints ) . await ?;
835- context. load_table ( TestTables :: Simple ) . await ?;
836- context. load_table ( TestTables :: SimpleWithCheckpoint ) . await ?;
837- context. load_table ( TestTables :: WithDvSmall ) . await ?;
838-
839- test_log_segment ( & context) . await ?;
840- test_log_replay ( & context) . await ?;
841- test_snapshot ( & context) . await ?;
842- test_eager_snapshot ( & context) . await ?;
830+ test_log_segment ( ) . await ?;
831+ test_log_replay ( ) . await ?;
832+ test_snapshot ( ) . await ?;
833+ test_eager_snapshot ( ) . await ?;
843834
844835 Ok ( ( ) )
845836 }
846837
847838 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
848839 async fn test_concurrent_checkpoint ( ) -> TestResult {
849- let context = IntegrationContext :: new ( Box :: < LocalStorageIntegration > :: default ( ) ) ?;
850- concurrent_checkpoint ( & context) . await ?;
840+ concurrent_checkpoint ( ) . await ?;
851841 Ok ( ( ) )
852842 }
853843
854- async fn test_snapshot ( context : & IntegrationContext ) -> TestResult {
855- let store = context
856- . table_builder ( TestTables :: Simple )
844+ async fn test_snapshot ( ) -> TestResult {
845+ let store = TestTables :: Simple
846+ . table_builder ( )
857847 . build_storage ( ) ?
858848 . object_store ( None ) ;
859849
@@ -900,8 +890,8 @@ mod tests {
900890 ] ;
901891 assert_batches_sorted_eq ! ( expected, & batches) ;
902892
903- let store = context
904- . table_builder ( TestTables :: Checkpoints )
893+ let store = TestTables :: Checkpoints
894+ . table_builder ( )
905895 . build_storage ( ) ?
906896 . object_store ( None ) ;
907897
@@ -924,9 +914,9 @@ mod tests {
924914 Ok ( ( ) )
925915 }
926916
927- async fn test_eager_snapshot ( context : & IntegrationContext ) -> TestResult {
928- let store = context
929- . table_builder ( TestTables :: Simple )
917+ async fn test_eager_snapshot ( ) -> TestResult {
918+ let store = TestTables :: Simple
919+ . table_builder ( )
930920 . build_storage ( ) ?
931921 . object_store ( None ) ;
932922
@@ -942,8 +932,8 @@ mod tests {
942932 let expected: StructType = serde_json:: from_str ( schema_string) ?;
943933 assert_eq ! ( snapshot. schema( ) , & expected) ;
944934
945- let store = context
946- . table_builder ( TestTables :: Checkpoints )
935+ let store = TestTables :: Checkpoints
936+ . table_builder ( )
947937 . build_storage ( ) ?
948938 . object_store ( None ) ;
949939
@@ -962,76 +952,6 @@ mod tests {
962952 Ok ( ( ) )
963953 }
964954
965- #[ tokio:: test]
966- async fn test_eager_snapshot_advance ( ) -> TestResult {
967- let context = IntegrationContext :: new ( Box :: < LocalStorageIntegration > :: default ( ) ) ?;
968- context. load_table ( TestTables :: Simple ) . await ?;
969-
970- let store = context
971- . table_builder ( TestTables :: Simple )
972- . build_storage ( ) ?
973- . object_store ( None ) ;
974-
975- let mut snapshot =
976- EagerSnapshot :: try_new ( & Path :: default ( ) , store. clone ( ) , Default :: default ( ) , None )
977- . await ?;
978-
979- let version = snapshot. version ( ) ;
980-
981- let files = snapshot. file_actions ( ) ?. enumerate ( ) . collect_vec ( ) ;
982- let num_files = files. len ( ) ;
983-
984- let split = files. split ( |( idx, _) | * idx == num_files / 2 ) . collect_vec ( ) ;
985- assert ! ( split. len( ) == 2 && !split[ 0 ] . is_empty( ) && !split[ 1 ] . is_empty( ) ) ;
986- let ( first, second) = split. into_iter ( ) . next_tuple ( ) . unwrap ( ) ;
987-
988- let removes = first
989- . iter ( )
990- . map ( |( _, add) | {
991- Remove {
992- path : add. path . clone ( ) ,
993- size : Some ( add. size ) ,
994- data_change : add. data_change ,
995- deletion_timestamp : Some ( Utc :: now ( ) . timestamp_millis ( ) ) ,
996- extended_file_metadata : Some ( true ) ,
997- partition_values : Some ( add. partition_values . clone ( ) ) ,
998- tags : add. tags . clone ( ) ,
999- deletion_vector : add. deletion_vector . clone ( ) ,
1000- base_row_id : add. base_row_id ,
1001- default_row_commit_version : add. default_row_commit_version ,
1002- }
1003- . into ( )
1004- } )
1005- . collect_vec ( ) ;
1006-
1007- let operation = DeltaOperation :: Write {
1008- mode : SaveMode :: Append ,
1009- partition_by : None ,
1010- predicate : None ,
1011- } ;
1012-
1013- let actions = vec ! [ CommitData :: new(
1014- removes,
1015- operation,
1016- HashMap :: new( ) ,
1017- Vec :: new( ) ,
1018- ) ] ;
1019-
1020- let new_version = snapshot. advance ( & actions) ?;
1021- assert_eq ! ( new_version, version + 1 ) ;
1022-
1023- let new_files = snapshot. file_actions ( ) ?. map ( |f| f. path ) . collect :: < Vec < _ > > ( ) ;
1024- assert_eq ! ( new_files. len( ) , num_files - first. len( ) ) ;
1025- assert ! ( first
1026- . iter( )
1027- . all( |( _, add) | { !new_files. contains( & add. path) } ) ) ;
1028- assert ! ( second
1029- . iter( )
1030- . all( |( _, add) | { new_files. contains( & add. path) } ) ) ;
1031-
1032- Ok ( ( ) )
1033- }
1034-
1035955 #[ test]
1036956 fn test_partition_schema ( ) {
1037957 let schema = StructType :: new ( vec ! [
0 commit comments