@@ -35,7 +35,7 @@ use tracing::*;
3535use super :: { CustomExecuteHandler , Operation } ;
3636use crate :: errors:: { DeltaResult , DeltaTableError } ;
3737use crate :: kernel:: transaction:: { CommitBuilder , CommitProperties } ;
38- use crate :: kernel:: { EagerSnapshot , resolve_snapshot} ;
38+ use crate :: kernel:: { EagerSnapshot , TombstoneView , resolve_snapshot} ;
3939use crate :: logstore:: { LogStore , LogStoreRef } ;
4040use crate :: protocol:: DeltaOperation ;
4141use crate :: table:: config:: TablePropertiesExt as _;
@@ -280,6 +280,8 @@ impl VacuumBuilder {
280280 _ => HashSet :: new ( ) ,
281281 } ;
282282
283+ let mut file_count = 0 ;
284+
283285 let expired_tombstones =
284286 get_stale_files ( snapshot, retention_period, now_millis, & self . log_store ) . await ?;
285287 let valid_files: HashSet < _ > = snapshot
@@ -288,66 +290,62 @@ impl VacuumBuilder {
288290 . try_collect ( )
289291 . await ?;
290292
293+ let partition_columns = snapshot. metadata ( ) . partition_columns ( ) ;
294+
291295 let mut files_to_delete = vec ! [ ] ;
292296 let mut file_sizes = vec ! [ ] ;
293- let object_store = self . log_store . object_store ( None ) ;
294-
295- let list_span = info_span ! ( "list_files" , operation = "vacuum" ) ;
296- let mut all_files = list_span. in_scope ( || object_store. list ( None ) ) ;
297- let partition_columns = snapshot. metadata ( ) . partition_columns ( ) ;
298297
299- let mut file_count = 0 ;
300- while let Some ( obj_meta) = all_files. next ( ) . await {
301- // TODO should we allow NotFound here in case we have a temporary commit file in the list
302- let obj_meta = obj_meta. map_err ( DeltaTableError :: from) ?;
303- file_count += 1 ;
304- // file is still being tracked in table
305- if valid_files. contains ( & obj_meta. location ) {
306- continue ;
307- }
308- // file is associated with a version that we are keeping
309- if keep_files. contains ( & obj_meta. location . to_string ( ) ) {
310- debug ! (
311- "The file {:?} is in a version specified to be kept by the user, skipping" ,
312- & obj_meta. location
313- ) ;
314- continue ;
315- }
316- if is_hidden_directory ( partition_columns, & obj_meta. location ) ? {
317- continue ;
298+ // VacuumMode::Lite file set
299+ // Expired tombstones are *always deleted (*unless in keep list)
300+ for tombs in expired_tombstones. iter ( ) {
301+ let path = Path :: from ( tombs. path ( ) . to_string ( ) ) ;
302+ if ok_to_delete ( & path, & valid_files, & keep_files, partition_columns) ? {
303+ files_to_delete. push ( path) ;
304+ file_sizes. push ( tombs. size ( ) . unwrap_or ( 0 ) ) ;
318305 }
319- // file is not an expired tombstone _and_ this is a "Lite" vacuum
320- // If the file is not an expired tombstone and we have gotten to here with a
321- // VacuumMode::Full then it should be added to the deletion plan
322- if !expired_tombstones. contains ( obj_meta. location . as_ref ( ) ) {
323- // For files without tombstones (uncommitted or orphaned files),
324- // check their physical age to protect recently written files from deletion.
325- // This prevents race conditions where a concurrent writer's uncommitted files
326- // could be deleted before the transaction is committed.
327- let file_age_millis = now_millis - obj_meta. last_modified . timestamp_millis ( ) ;
328- if file_age_millis < retention_period. num_milliseconds ( ) {
329- debug ! (
330- "The file {:?} is not in the log but too recent , protecting from vacuum" ,
306+ }
307+
308+ if self . mode == VacuumMode :: Full {
309+ let object_store = self . log_store . object_store ( None ) ;
310+
311+ let list_span = info_span ! ( "list_files" , operation = "vacuum" ) ;
312+ let mut all_files = list_span. in_scope ( || object_store. list ( None ) ) ;
313+
314+ let already_queued: HashSet < Path > = files_to_delete. iter ( ) . cloned ( ) . collect ( ) ;
315+
316+ while let Some ( obj_meta) = all_files. next ( ) . await {
317+ // TODO should we allow NotFound here in case we have a temporary commit file in the list
318+ let obj_meta = obj_meta. map_err ( DeltaTableError :: from) ?;
319+ // If the file is not an expired tombstone
320+ if !already_queued. contains ( & obj_meta. location )
321+ && ok_to_delete (
331322 & obj_meta. location ,
332- ) ;
333- continue ;
334- }
335- if self . mode == VacuumMode :: Lite {
336- debug ! (
337- "The file {:?} was not referenced in a log file, but VacuumMode::Lite means it will not be vacuumed" ,
338- & obj_meta. location
339- ) ;
340- continue ;
341- } else {
323+ & valid_files,
324+ & keep_files,
325+ partition_columns,
326+ ) ?
327+ {
328+ // For files without tombstones (uncommitted or orphaned files),
329+ // check their physical age to protect recently written files from deletion.
330+ // This prevents race conditions where a concurrent writer's uncommitted files
331+ // could be deleted before the transaction is committed.
332+ let file_age_millis = now_millis - obj_meta. last_modified . timestamp_millis ( ) ;
333+ if file_age_millis < retention_period. num_milliseconds ( ) {
334+ debug ! (
335+ "The file {:?} is not in the log but too recent , protecting from vacuum" ,
336+ & obj_meta. location,
337+ ) ;
338+ continue ;
339+ }
342340 debug ! (
343341 "The file {:?} was not referenced in a log file, but VacuumMode::Full means it *will be vacuumed*" ,
344342 & obj_meta. location
345343 ) ;
344+ files_to_delete. push ( obj_meta. location ) ;
345+ file_sizes. push ( obj_meta. size as i64 ) ;
346+ file_count += 1 ;
346347 }
347348 }
348-
349- files_to_delete. push ( obj_meta. location ) ;
350- file_sizes. push ( obj_meta. size as i64 ) ;
351349 }
352350 info ! (
353351 files_scanned = file_count,
@@ -530,13 +528,29 @@ fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result<bool
530528 . any ( |partition_column| path_name. starts_with ( partition_column) ) )
531529}
532530
531+ /// Returns true if the file at `location` is a candidate for deletion.
532+ /// A file should NOT be deleted if it is still tracked in the table,
533+ /// associated with a kept version, or is a hidden directory.
534+ fn ok_to_delete (
535+ location : & Path ,
536+ valid_files : & HashSet < Path > ,
537+ keep_files : & HashSet < String > ,
538+ partition_columns : & [ String ] ,
539+ ) -> Result < bool , DeltaTableError > {
540+ Ok (
541+ !( valid_files. contains ( location) // file is still being tracked in table
542+ || keep_files. contains ( & location. to_string ( ) ) // file is associated with a version that we are keeping
543+ || is_hidden_directory ( partition_columns, location) ?) ,
544+ )
545+ }
546+
533547/// List files no longer referenced by a Delta table and are older than the retention threshold.
534548async fn get_stale_files (
535549 snapshot : & EagerSnapshot ,
536550 retention_period : Duration ,
537551 now_timestamp_millis : i64 ,
538552 store : & dyn LogStore ,
539- ) -> DeltaResult < HashSet < String > > {
553+ ) -> DeltaResult < Vec < TombstoneView > > {
540554 let tombstone_retention_timestamp = now_timestamp_millis - retention_period. num_milliseconds ( ) ;
541555 snapshot
542556 . snapshot ( )
@@ -546,8 +560,7 @@ async fn get_stale_files(
546560 // then it's considered as a stale file
547561 ready ( tombstone. deletion_timestamp ( ) . unwrap_or ( 0 ) < tombstone_retention_timestamp)
548562 } )
549- . map_ok ( |tombstone| tombstone. path ( ) . to_string ( ) )
550- . try_collect :: < HashSet < _ > > ( )
563+ . try_collect :: < Vec < _ > > ( )
551564 . await
552565}
553566
0 commit comments