Skip to content

Commit 672d2e6

Browse files
committed
review fixes
1 parent 4fa8705 commit 672d2e6

6 files changed

Lines changed: 92 additions & 54 deletions

File tree

src/backend/core/management/commands/verify_tiered_storage.py

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
from django.core.management.base import BaseCommand
1414
from django.db import transaction
1515

16-
from core.enums import BlobStorageLocationChoices
16+
import pyzstd
17+
18+
from core.enums import BlobStorageLocationChoices, CompressionTypeChoices
1719
from core.models import Blob
1820
from core.services.tiered_storage import TieredStorageService
1921

@@ -277,10 +279,6 @@ def _verify_blob_hash(self, obj_name: str, expected_sha_bytes: bytes) -> bool:
277279

278280
# The decrypted content is still compressed
279281
# We need to decompress to verify the original hash
280-
import pyzstd
281-
282-
from core.enums import CompressionTypeChoices
283-
284282
if blob.compression == CompressionTypeChoices.ZSTD:
285283
original = pyzstd.decompress(decrypted)
286284
else:
@@ -451,7 +449,13 @@ def _re_encrypt_single_blob(self, blob: Blob, target_key_id: int) -> str:
451449

452450
blob.raw_content = encrypted
453451
blob.encryption_key_id = new_key_id
454-
blob.save(update_fields=["raw_content", "encryption_key_id"])
452+
blob.save(
453+
update_fields=[
454+
"raw_content",
455+
"encryption_key_id",
456+
"size_compressed",
457+
]
458+
)
455459

456460
self.stdout.write(
457461
f" Re-encrypted blob {blob.id} (POSTGRES): "
@@ -470,52 +474,74 @@ def _re_encrypt_single_blob(self, blob: Blob, target_key_id: int) -> str:
470474

471475
storage_key = self.service.compute_storage_key(bytes(blob.sha256))
472476

473-
# Keep old content in memory so we can restore S3 if the
474-
# transaction fails after the S3 overwrite.
475-
old_encrypted = None
476-
s3_updated = False
477-
477+
# Read & re-encrypt outside the DB transaction.
478478
try:
479-
with transaction.atomic():
480-
blob = Blob.objects.select_for_update().get(id=blob.id)
481-
if blob.encryption_key_id == target_key_id:
482-
return "skipped"
479+
with self.service.storage.open(storage_key, "rb") as f:
480+
old_encrypted = f.read()
481+
except FileNotFoundError:
482+
self.stderr.write(
483+
self.style.ERROR(
484+
f" ERROR blob {blob.id}: not found in storage at {storage_key}"
485+
)
486+
)
487+
return "error"
483488

484-
old_key_id = blob.encryption_key_id
489+
decrypted = self.service.decrypt(old_encrypted, blob.encryption_key_id)
490+
encrypted, new_key_id = self.service.encrypt(decrypted)
485491

492+
# Stage the new ciphertext at a temp key so the canonical object
493+
# is not touched until the DB transaction commits. If the DB
494+
# update fails, we drop the temp object and storage stays
495+
# consistent with the DB.
496+
temp_key = f"{storage_key}.tmp.{new_key_id}"
497+
self.service.storage.save(temp_key, ContentFile(encrypted))
498+
499+
promote_done = {"value": False}
500+
501+
def _promote_temp():
502+
try:
503+
self.service.storage.save(storage_key, ContentFile(encrypted))
504+
finally:
486505
try:
487-
with self.service.storage.open(storage_key, "rb") as f:
488-
old_encrypted = f.read()
489-
except FileNotFoundError:
506+
self.service.storage.delete(temp_key)
507+
except Exception as cleanup_err: # pylint: disable=broad-except
490508
self.stderr.write(
491509
self.style.ERROR(
492-
f" ERROR blob {blob.id}: not found in storage at {storage_key}"
510+
f" WARN blob {blob.id}: failed to delete temp "
511+
f"{temp_key}: {cleanup_err}"
493512
)
494513
)
495-
return "error"
496-
497-
decrypted = self.service.decrypt(old_encrypted, old_key_id)
498-
encrypted, new_key_id = self.service.encrypt(decrypted)
514+
promote_done["value"] = True
499515

500-
self.service.storage.save(storage_key, ContentFile(encrypted))
501-
s3_updated = True
516+
try:
517+
with transaction.atomic():
518+
blob = Blob.objects.select_for_update().get(id=blob.id)
519+
if blob.encryption_key_id == target_key_id:
520+
# Another worker beat us to it; clean up our temp.
521+
try:
522+
self.service.storage.delete(temp_key)
523+
except Exception as cleanup_err: # pylint: disable=broad-except
524+
self.stderr.write(
525+
self.style.ERROR(
526+
f" WARN blob {blob.id}: failed to delete temp "
527+
f"{temp_key}: {cleanup_err}"
528+
)
529+
)
530+
return "skipped"
502531

532+
old_key_id = blob.encryption_key_id
503533
blob.encryption_key_id = new_key_id
504534
blob.save(update_fields=["encryption_key_id"])
535+
transaction.on_commit(_promote_temp)
505536
except Exception:
506-
# If S3 was overwritten but the transaction failed (DB error
507-
# or commit failure), restore the old S3 content to prevent
508-
# leaving the blob in a corrupted state.
509-
if s3_updated and old_encrypted is not None:
537+
if not promote_done["value"]:
510538
try:
511-
self.service.storage.save(
512-
storage_key, ContentFile(old_encrypted)
513-
)
514-
except Exception as restore_err: # pylint: disable=broad-except
539+
self.service.storage.delete(temp_key)
540+
except Exception as cleanup_err: # pylint: disable=broad-except
515541
self.stderr.write(
516542
self.style.ERROR(
517-
f" CRITICAL: failed to restore S3 content for "
518-
f"blob {blob.id}: {restore_err}"
543+
f" WARN blob {blob.id}: failed to delete temp "
544+
f"{temp_key}: {cleanup_err}"
519545
)
520546
)
521547
raise

src/backend/core/services/tiered_storage.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ class TieredStorageService:
3131
def __init__(self):
3232
"""Initialize the service, checking if object storage is configured."""
3333
self._storage = None
34-
opts = settings.STORAGES.get("message-blobs", {}).get("OPTIONS", {})
35-
self.enabled = bool(opts.get("endpoint_url") or opts.get("access_key"))
34+
self.enabled = bool(settings.STORAGES.get("message-blobs"))
3635
# encryption_keys is a dict: {"1": "key1", "2": "key2"}
3736
self.encryption_keys = settings.MESSAGES_BLOB_ENCRYPTION_KEYS or {}
3837
self.active_key_id = settings.MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID

src/backend/core/signals.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,20 @@ def cleanup_blob_storage(sender, instance, **kwargs):
144144
145145
Uses a post_delete signal instead of a model delete() override to ensure
146146
cleanup also runs during CASCADE and QuerySet bulk deletes.
147+
148+
Defers the actual storage deletion to transaction.on_commit so that an
149+
enclosing transaction rollback does not leave us with an object deleted
150+
from S3 while the blob row is still in the DB.
147151
"""
148-
if instance.storage_location == BlobStorageLocationChoices.OBJECT_STORAGE:
149-
service = TieredStorageService()
150-
if service.enabled:
151-
service.delete_if_orphaned(bytes(instance.sha256))
152+
if instance.storage_location != BlobStorageLocationChoices.OBJECT_STORAGE:
153+
return
154+
155+
service = TieredStorageService()
156+
if not service.enabled:
157+
return
158+
159+
sha256 = bytes(instance.sha256)
160+
transaction.on_commit(lambda: service.delete_if_orphaned(sha256))
152161

153162

154163
@receiver(pre_delete, sender=models.Message)

src/backend/core/tests/commands/test_verify_tiered_storage.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ def test_command_disabled_when_no_storage(self):
3434
stderr = StringIO()
3535

3636
with patch("core.services.tiered_storage.settings") as mock_settings:
37-
mock_settings.STORAGES = {
38-
"message-blobs": {"OPTIONS": {"endpoint_url": ""}}
39-
}
37+
mock_settings.STORAGES = {}
4038
mock_settings.MESSAGES_BLOB_ENCRYPTION_KEYS = {}
4139
mock_settings.MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID = 0
4240

@@ -465,6 +463,7 @@ def test_re_encrypt_postgres_blob(self):
465463
decrypted = service.decrypt(bytes(blob.raw_content), blob.encryption_key_id)
466464
assert pyzstd.decompress(decrypted) == original_content
467465

466+
@pytest.mark.django_db(transaction=True)
468467
def test_re_encrypt_object_storage_blob(self):
469468
"""Test re-encrypting an object storage blob with real encryption."""
470469
import pyzstd

src/backend/core/tests/services/test_tiered_storage.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ def test_upload_download_with_encryption(self):
315315
finally:
316316
service.storage.delete(storage_key)
317317

318+
@pytest.mark.django_db(transaction=True)
318319
def test_deduplication_single_upload(self):
319320
"""Test that two blobs with same content result in single storage object."""
320321
service = TieredStorageService()
@@ -357,9 +358,12 @@ def test_deduplication_single_upload(self):
357358
if service.storage.exists(storage_key):
358359
service.storage.delete(storage_key)
359360

361+
@pytest.mark.django_db(transaction=True)
360362
def test_full_offload_workflow(self):
361363
"""Test the complete offload workflow: create blob, offload, read content."""
362-
from core.services.tiered_storage_tasks import offload_single_blob_task
364+
from core.services.tiered_storage_tasks import (
365+
offload_single_blob_task,
366+
)
363367

364368
mailbox = factories.MailboxFactory()
365369
content = b"Content for full offload workflow test" * 20
@@ -386,14 +390,17 @@ def test_full_offload_workflow(self):
386390
MESSAGES_BLOB_ENCRYPTION_KEYS={"1": _TEST_ENCRYPTION_KEY},
387391
MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID=1,
388392
)
393+
@pytest.mark.django_db(transaction=True)
389394
def test_offload_with_encryption_roundtrip(self):
390395
"""
391396
Test full offload workflow with encryption: create encrypted blob,
392397
offload to storage, read back content.
393398
394399
This is a critical regression test for the double-encryption bug.
395400
"""
396-
from core.services.tiered_storage_tasks import offload_single_blob_task
401+
from core.services.tiered_storage_tasks import (
402+
offload_single_blob_task,
403+
)
397404

398405
mailbox = factories.MailboxFactory()
399406
original_content = b"Test content for encryption offload roundtrip" * 50
@@ -419,6 +426,7 @@ def test_offload_with_encryption_roundtrip(self):
419426
finally:
420427
blob.delete()
421428

429+
@pytest.mark.django_db(transaction=True)
422430
def test_delete_if_orphaned(self):
423431
"""Test that delete_if_orphaned correctly handles references."""
424432
service = TieredStorageService()
@@ -562,7 +570,7 @@ def test_deduplication_with_different_encryption_keys(self):
562570
service.storage.delete(storage_key)
563571

564572

565-
@pytest.mark.django_db
573+
@pytest.mark.django_db(transaction=True)
566574
class TestTieredStorageCascadeDelete:
567575
"""Tests that S3 cleanup works during cascade and bulk deletes."""
568576

src/backend/core/tests/tasks/test_tiered_storage_tasks.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ def test_task_disabled_by_setting(self):
4444
def test_task_disabled_when_no_storage(self):
4545
"""Test that task returns disabled status when storage not configured."""
4646
with patch("core.services.tiered_storage.settings") as mock_settings:
47-
mock_settings.STORAGES = {
48-
"message-blobs": {"OPTIONS": {"endpoint_url": ""}}
49-
}
47+
mock_settings.STORAGES = {}
5048
mock_settings.MESSAGES_BLOB_ENCRYPTION_KEYS = {}
5149
mock_settings.MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID = 0
5250

@@ -207,9 +205,7 @@ def test_task_disabled_by_setting(self):
207205
def test_task_disabled_when_no_storage(self):
208206
"""Test that task returns disabled status when storage not configured."""
209207
with patch("core.services.tiered_storage.settings") as mock_settings:
210-
mock_settings.STORAGES = {
211-
"message-blobs": {"OPTIONS": {"endpoint_url": ""}}
212-
}
208+
mock_settings.STORAGES = {}
213209
mock_settings.MESSAGES_BLOB_ENCRYPTION_KEYS = {}
214210
mock_settings.MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID = 0
215211

@@ -310,6 +306,7 @@ def test_handles_upload_error(self):
310306
assert blob.storage_location == BlobStorageLocationChoices.POSTGRES
311307
assert blob.raw_content is not None
312308

309+
@pytest.mark.django_db(transaction=True)
313310
def test_deduplication_during_offload(self):
314311
"""Test that offload uses deduplication for identical content."""
315312
service = TieredStorageService()

0 commit comments

Comments
 (0)