@@ -39,16 +39,54 @@ use crate::table::config::TablePropertiesExt as _;
3939
4040const DEFAULT_WRITER_BATCH_CHANNEL_SIZE : usize = 10 ;
4141
42+ fn parse_channel_size ( raw : Option < & str > ) -> usize {
43+ raw. and_then ( |s| s. parse :: < usize > ( ) . ok ( ) )
44+ . filter ( |size| * size > 0 )
45+ . unwrap_or ( DEFAULT_WRITER_BATCH_CHANNEL_SIZE )
46+ }
47+
4248fn channel_size ( ) -> usize {
4349 static CHANNEL_SIZE : OnceLock < usize > = OnceLock :: new ( ) ;
4450 * CHANNEL_SIZE . get_or_init ( || {
45- std:: env:: var ( "DELTARS_WRITER_BATCH_CHANNEL_SIZE" )
46- . ok ( )
47- . and_then ( |s| s. parse :: < usize > ( ) . ok ( ) )
48- . unwrap_or ( DEFAULT_WRITER_BATCH_CHANNEL_SIZE )
51+ parse_channel_size (
52+ std:: env:: var ( "DELTARS_WRITER_BATCH_CHANNEL_SIZE" )
53+ . ok ( )
54+ . as_deref ( ) ,
55+ )
4956 } )
5057}
5158
59+ #[ cfg( test) ]
60+ mod tests {
61+ use super :: { DEFAULT_WRITER_BATCH_CHANNEL_SIZE , parse_channel_size} ;
62+
63+ #[ test]
64+ fn channel_size_zero_falls_back_to_default ( ) {
65+ assert_eq ! (
66+ parse_channel_size( Some ( "0" ) ) ,
67+ DEFAULT_WRITER_BATCH_CHANNEL_SIZE
68+ ) ;
69+ }
70+
71+ #[ test]
72+ fn channel_size_positive_value_is_used ( ) {
73+ assert_eq ! ( parse_channel_size( Some ( "8" ) ) , 8 ) ;
74+ }
75+
76+ #[ test]
77+ fn channel_size_invalid_value_falls_back_to_default ( ) {
78+ assert_eq ! (
79+ parse_channel_size( Some ( "abc" ) ) ,
80+ DEFAULT_WRITER_BATCH_CHANNEL_SIZE
81+ ) ;
82+ }
83+
84+ #[ test]
85+ fn channel_size_missing_value_falls_back_to_default ( ) {
86+ assert_eq ! ( parse_channel_size( None ) , DEFAULT_WRITER_BATCH_CHANNEL_SIZE ) ;
87+ }
88+ }
89+
5290/// Cap on concurrent writer tasks. Each writer holds open multipart uploads
5391/// and in-memory buffers, so more writers means higher memory and FD usage.
5492/// Defaults to `num_cpus` (matching DataFusion's `target_partitions`),
0 commit comments