1313from django .core .management .base import BaseCommand
1414from django .db import transaction
1515
16- from core .enums import BlobStorageLocationChoices
16+ import pyzstd
17+
18+ from core .enums import BlobStorageLocationChoices , CompressionTypeChoices
1719from core .models import Blob
1820from core .services .tiered_storage import TieredStorageService
1921
@@ -277,10 +279,6 @@ def _verify_blob_hash(self, obj_name: str, expected_sha_bytes: bytes) -> bool:
277279
278280 # The decrypted content is still compressed
279281 # We need to decompress to verify the original hash
280- import pyzstd
281-
282- from core .enums import CompressionTypeChoices
283-
284282 if blob .compression == CompressionTypeChoices .ZSTD :
285283 original = pyzstd .decompress (decrypted )
286284 else :
@@ -451,7 +449,13 @@ def _re_encrypt_single_blob(self, blob: Blob, target_key_id: int) -> str:
451449
452450 blob .raw_content = encrypted
453451 blob .encryption_key_id = new_key_id
454- blob .save (update_fields = ["raw_content" , "encryption_key_id" ])
452+ blob .save (
453+ update_fields = [
454+ "raw_content" ,
455+ "encryption_key_id" ,
456+ "size_compressed" ,
457+ ]
458+ )
455459
456460 self .stdout .write (
457461 f" Re-encrypted blob { blob .id } (POSTGRES): "
@@ -470,52 +474,74 @@ def _re_encrypt_single_blob(self, blob: Blob, target_key_id: int) -> str:
470474
471475 storage_key = self .service .compute_storage_key (bytes (blob .sha256 ))
472476
473- # Keep old content in memory so we can restore S3 if the
474- # transaction fails after the S3 overwrite.
475- old_encrypted = None
476- s3_updated = False
477-
477+ # Read & re-encrypt outside the DB transaction.
478478 try :
479- with transaction .atomic ():
480- blob = Blob .objects .select_for_update ().get (id = blob .id )
481- if blob .encryption_key_id == target_key_id :
482- return "skipped"
479+ with self .service .storage .open (storage_key , "rb" ) as f :
480+ old_encrypted = f .read ()
481+ except FileNotFoundError :
482+ self .stderr .write (
483+ self .style .ERROR (
484+ f" ERROR blob { blob .id } : not found in storage at { storage_key } "
485+ )
486+ )
487+ return "error"
483488
484- old_key_id = blob .encryption_key_id
489+ decrypted = self .service .decrypt (old_encrypted , blob .encryption_key_id )
490+ encrypted , new_key_id = self .service .encrypt (decrypted )
485491
492+ # Stage the new ciphertext at a temp key so the canonical object
493+ # is not touched until the DB transaction commits. If the DB
494+ # update fails, we drop the temp object and storage stays
495+ # consistent with the DB.
496+ temp_key = f"{ storage_key } .tmp.{ new_key_id } "
497+ self .service .storage .save (temp_key , ContentFile (encrypted ))
498+
499+ promote_done = {"value" : False }
500+
501+ def _promote_temp ():
502+ try :
503+ self .service .storage .save (storage_key , ContentFile (encrypted ))
504+ finally :
486505 try :
487- with self .service .storage .open (storage_key , "rb" ) as f :
488- old_encrypted = f .read ()
489- except FileNotFoundError :
506+ self .service .storage .delete (temp_key )
507+ except Exception as cleanup_err : # pylint: disable=broad-except
490508 self .stderr .write (
491509 self .style .ERROR (
492- f" ERROR blob { blob .id } : not found in storage at { storage_key } "
510+ f" WARN blob { blob .id } : failed to delete temp "
511+ f"{ temp_key } : { cleanup_err } "
493512 )
494513 )
495- return "error"
496-
497- decrypted = self .service .decrypt (old_encrypted , old_key_id )
498- encrypted , new_key_id = self .service .encrypt (decrypted )
514+ promote_done ["value" ] = True
499515
500- self .service .storage .save (storage_key , ContentFile (encrypted ))
501- s3_updated = True
516+ try :
517+ with transaction .atomic ():
518+ blob = Blob .objects .select_for_update ().get (id = blob .id )
519+ if blob .encryption_key_id == target_key_id :
520+ # Another worker beat us to it; clean up our temp.
521+ try :
522+ self .service .storage .delete (temp_key )
523+ except Exception as cleanup_err : # pylint: disable=broad-except
524+ self .stderr .write (
525+ self .style .ERROR (
526+ f" WARN blob { blob .id } : failed to delete temp "
527+ f"{ temp_key } : { cleanup_err } "
528+ )
529+ )
530+ return "skipped"
502531
532+ old_key_id = blob .encryption_key_id
503533 blob .encryption_key_id = new_key_id
504534 blob .save (update_fields = ["encryption_key_id" ])
535+ transaction .on_commit (_promote_temp )
505536 except Exception :
506- # If S3 was overwritten but the transaction failed (DB error
507- # or commit failure), restore the old S3 content to prevent
508- # leaving the blob in a corrupted state.
509- if s3_updated and old_encrypted is not None :
537+ if not promote_done ["value" ]:
510538 try :
511- self .service .storage .save (
512- storage_key , ContentFile (old_encrypted )
513- )
514- except Exception as restore_err : # pylint: disable=broad-except
539+ self .service .storage .delete (temp_key )
540+ except Exception as cleanup_err : # pylint: disable=broad-except
515541 self .stderr .write (
516542 self .style .ERROR (
517- f" CRITICAL : failed to restore S3 content for "
518- f"blob { blob . id } : { restore_err } "
543+ f" WARN blob { blob . id } : failed to delete temp "
544+ f"{ temp_key } : { cleanup_err } "
519545 )
520546 )
521547 raise
0 commit comments