Skip to content

Commit 5d0f39a

Browse files
committed
add settings and review feedbacks on potential concurrent issues
1 parent 8858822 commit 5d0f39a

9 files changed

Lines changed: 241 additions & 66 deletions

File tree

env.d/development/backend.defaults

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ STORAGE_MESSAGE_BLOBS_ENDPOINT_URL=http://objectstorage:9000
6262
STORAGE_MESSAGE_BLOBS_BUCKET_NAME=msg-blobs
6363
STORAGE_MESSAGE_BLOBS_ACCESS_KEY=st-messages
6464
STORAGE_MESSAGE_BLOBS_SECRET_KEY=password
65-
# Offload blobs older than N days
65+
# Tiered storage offload (disabled by default, enable to offload blobs to S3)
66+
TIERED_STORAGE_OFFLOAD_ENABLED=False
67+
# Offload blobs older than N days (0 = immediately)
6668
TIERED_STORAGE_OFFLOAD_AFTER_DAYS=3
6769

6870
# OIDC

src/backend/core/management/commands/verify_tiered_storage.py

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,10 @@ def _re_encrypt_single_blob(self, blob: Blob, target_key_id: int) -> str:
405405
"""
406406
Re-encrypt a single blob with the target key.
407407
408+
Uses select_for_update to prevent concurrent modifications.
409+
For object storage blobs, keeps old content in memory so S3 can
410+
be restored if the DB update fails.
411+
408412
Args:
409413
blob: The blob to re-encrypt
410414
target_key_id: The encryption key ID to use for re-encryption
@@ -426,24 +430,25 @@ def _re_encrypt_single_blob(self, blob: Blob, target_key_id: int) -> str:
426430
)
427431
return "success"
428432

429-
# Get the current encrypted/unencrypted content
430433
if blob.storage_location == BlobStorageLocationChoices.POSTGRES:
431-
if blob.raw_content is None:
432-
self.stdout.write(
433-
self.style.WARNING(
434-
f" SKIP blob {blob.id}: no content in PostgreSQL"
435-
)
436-
)
437-
return "skipped"
434+
with transaction.atomic():
435+
blob = Blob.objects.select_for_update().get(id=blob.id)
436+
if blob.encryption_key_id == target_key_id:
437+
return "skipped"
438438

439-
# Decrypt with old key (or passthrough if key_id=0)
440-
decrypted = self.service.decrypt(bytes(blob.raw_content), old_key_id)
439+
old_key_id = blob.encryption_key_id
441440

442-
# Re-encrypt with new key
443-
encrypted, new_key_id = self.service.encrypt(decrypted)
441+
if blob.raw_content is None:
442+
self.stdout.write(
443+
self.style.WARNING(
444+
f" SKIP blob {blob.id}: no content in PostgreSQL"
445+
)
446+
)
447+
return "skipped"
448+
449+
decrypted = self.service.decrypt(bytes(blob.raw_content), old_key_id)
450+
encrypted, new_key_id = self.service.encrypt(decrypted)
444451

445-
# Update blob
446-
with transaction.atomic():
447452
blob.raw_content = encrypted
448453
blob.encryption_key_id = new_key_id
449454
blob.save(update_fields=["raw_content", "encryption_key_id"])
@@ -463,31 +468,57 @@ def _re_encrypt_single_blob(self, blob: Blob, target_key_id: int) -> str:
463468
)
464469
return "skipped"
465470

466-
# Download and decrypt
467471
storage_key = self.service.compute_storage_key(bytes(blob.sha256))
472+
473+
# Keep old content in memory so we can restore S3 if the
474+
# transaction fails after the S3 overwrite.
475+
old_encrypted = None
476+
s3_updated = False
477+
468478
try:
469-
with self.service.storage.open(storage_key, "rb") as f:
470-
encrypted_content = f.read()
471-
except FileNotFoundError:
472-
self.stderr.write(
473-
self.style.ERROR(
474-
f" ERROR blob {blob.id}: not found in storage at {storage_key}"
475-
)
476-
)
477-
return "error"
479+
with transaction.atomic():
480+
blob = Blob.objects.select_for_update().get(id=blob.id)
481+
if blob.encryption_key_id == target_key_id:
482+
return "skipped"
478483

479-
decrypted = self.service.decrypt(encrypted_content, old_key_id)
484+
old_key_id = blob.encryption_key_id
480485

481-
# Re-encrypt with new key
482-
encrypted, new_key_id = self.service.encrypt(decrypted)
486+
try:
487+
with self.service.storage.open(storage_key, "rb") as f:
488+
old_encrypted = f.read()
489+
except FileNotFoundError:
490+
self.stderr.write(
491+
self.style.ERROR(
492+
f" ERROR blob {blob.id}: not found in storage at {storage_key}"
493+
)
494+
)
495+
return "error"
483496

484-
# Upload new encrypted content (overwrites existing)
485-
self.service.storage.save(storage_key, ContentFile(encrypted))
497+
decrypted = self.service.decrypt(old_encrypted, old_key_id)
498+
encrypted, new_key_id = self.service.encrypt(decrypted)
486499

487-
# Update blob metadata
488-
with transaction.atomic():
489-
blob.encryption_key_id = new_key_id
490-
blob.save(update_fields=["encryption_key_id"])
500+
self.service.storage.save(storage_key, ContentFile(encrypted))
501+
s3_updated = True
502+
503+
blob.encryption_key_id = new_key_id
504+
blob.save(update_fields=["encryption_key_id"])
505+
except Exception:
506+
# If S3 was overwritten but the transaction failed (DB error
507+
# or commit failure), restore the old S3 content to prevent
508+
# leaving the blob in a corrupted state.
509+
if s3_updated and old_encrypted is not None:
510+
try:
511+
self.service.storage.save(
512+
storage_key, ContentFile(old_encrypted)
513+
)
514+
except Exception as restore_err: # pylint: disable=broad-except
515+
self.stderr.write(
516+
self.style.ERROR(
517+
f" CRITICAL: failed to restore S3 content for "
518+
f"blob {blob.id}: {restore_err}"
519+
)
520+
)
521+
raise
491522

492523
self.stdout.write(
493524
f" Re-encrypted blob {blob.id} (OBJECT_STORAGE): "

src/backend/core/models.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,23 +1642,6 @@ def get_content(self) -> bytes:
16421642
return pyzstd.decompress(compressed)
16431643
raise ValueError(f"Unsupported compression type: {self.compression}")
16441644

1645-
def delete(self, *args, **kwargs):
1646-
"""Delete blob, cleaning up object storage if orphaned."""
1647-
sha256_copy = bytes(self.sha256)
1648-
storage_location_copy = self.storage_location
1649-
1650-
# Delete from DB first
1651-
super().delete(*args, **kwargs)
1652-
1653-
# Then cleanup storage if was in object storage
1654-
if storage_location_copy == BlobStorageLocationChoices.OBJECT_STORAGE:
1655-
# pylint: disable-next=import-outside-toplevel
1656-
from core.services.tiered_storage import TieredStorageService
1657-
1658-
service = TieredStorageService()
1659-
if service.enabled:
1660-
service.delete_if_orphaned(sha256_copy)
1661-
16621645

16631646
class Attachment(BaseModel):
16641647
"""Attachment model to link messages with blobs."""

src/backend/core/services/tiered_storage.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,9 @@ class TieredStorageService:
3030

3131
def __init__(self):
3232
"""Initialize the service, checking if object storage is configured."""
33-
# Check if message-blobs storage has endpoint_url configured
3433
self._storage = None
35-
self.enabled = bool(
36-
settings.STORAGES.get("message-blobs", {})
37-
.get("OPTIONS", {})
38-
.get("endpoint_url")
39-
)
34+
opts = settings.STORAGES.get("message-blobs", {}).get("OPTIONS", {})
35+
self.enabled = bool(opts.get("endpoint_url") or opts.get("access_key"))
4036
# encryption_keys is a dict: {"1": "key1", "2": "key2"}
4137
self.encryption_keys = settings.MESSAGES_BLOB_ENCRYPTION_KEYS or {}
4238
self.active_key_id = settings.MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID
@@ -245,6 +241,11 @@ def delete_if_orphaned(self, sha256_bytes: bytes) -> bool:
245241
"""
246242
Delete storage object only if no other blobs reference it.
247243
244+
Counts ALL blobs with the same SHA256 (any storage_location), not just
245+
OBJECT_STORAGE ones. This prevents a race condition where a concurrent
246+
offload task is about to upload a blob with the same SHA256 that is
247+
still in POSTGRES state.
248+
248249
Args:
249250
sha256_bytes: SHA256 hash of the blob to potentially delete
250251
@@ -256,11 +257,11 @@ def delete_if_orphaned(self, sha256_bytes: bytes) -> bool:
256257

257258
from core.models import Blob
258259

259-
# Count remaining references
260-
refs = Blob.objects.filter(
261-
sha256=sha256_bytes,
262-
storage_location=BlobStorageLocationChoices.OBJECT_STORAGE,
263-
).count()
260+
# Count ALL remaining references (any storage location).
261+
# A blob in POSTGRES with the same SHA256 could be offloaded later
262+
# and would need this S3 object. Orphaned S3 objects from blobs that
263+
# are never offloaded are cleaned up by the verify command.
264+
refs = Blob.objects.filter(sha256=sha256_bytes).count()
264265

265266
if refs > 0:
266267
logger.debug(

src/backend/core/services/tiered_storage_tasks.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from django.db import OperationalError, transaction
1313
from django.utils.timezone import now
1414

15+
from botocore.exceptions import BotoCoreError
1516
from celery.utils.log import get_task_logger
1617

1718
from core.enums import BlobStorageLocationChoices
@@ -22,6 +23,15 @@
2223

2324
logger = get_task_logger(__name__)
2425

26+
# Transient exceptions that should trigger a Celery retry.
27+
# BotoCoreError covers connection-level errors (timeouts, DNS, etc.).
28+
# ClientError is intentionally excluded - it covers HTTP API errors
29+
# (403, 404, etc.) which are usually permanent and shouldn't be retried.
30+
_TRANSIENT_EXCEPTIONS = (
31+
OSError,
32+
BotoCoreError,
33+
)
34+
2535

2636
@celery_app.task(bind=True)
2737
def offload_blobs_task(self) -> Dict[str, Any]:
@@ -36,9 +46,13 @@ def offload_blobs_task(self) -> Dict[str, Any]:
3646
Returns:
3747
Dict with status and number of blobs queued
3848
"""
49+
if not settings.TIERED_STORAGE_OFFLOAD_ENABLED:
50+
logger.debug("Tiered storage offload disabled, skipping")
51+
return {"status": "disabled", "queued": 0}
52+
3953
service = TieredStorageService()
4054
if not service.enabled:
41-
logger.debug("Tiered storage not enabled, skipping offload task")
55+
logger.debug("Object storage not configured, skipping offload task")
4256
return {"status": "disabled", "queued": 0}
4357

4458
offload_after_days = settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS
@@ -65,7 +79,7 @@ def offload_blobs_task(self) -> Dict[str, Any]:
6579
return {"status": "success", "queued": queued_count}
6680

6781

68-
@celery_app.task(bind=True)
82+
@celery_app.task(bind=True, max_retries=3)
6983
def offload_single_blob_task(self, blob_id: str) -> Dict[str, Any]:
7084
"""
7185
Offload a single blob to object storage atomically.
@@ -76,12 +90,18 @@ def offload_single_blob_task(self, blob_id: str) -> Dict[str, Any]:
7690
3. Updates the blob record and clears raw_content
7791
4. All within a transaction for atomicity
7892
93+
Transient S3/network errors are automatically retried with exponential
94+
backoff (up to 3 retries).
95+
7996
Args:
8097
blob_id: UUID of the blob to offload
8198
8299
Returns:
83100
Dict with status and blob_id
84101
"""
102+
if not settings.TIERED_STORAGE_OFFLOAD_ENABLED:
103+
return {"status": "disabled", "blob_id": blob_id}
104+
85105
service = TieredStorageService()
86106
if not service.enabled:
87107
return {"status": "disabled", "blob_id": blob_id}
@@ -128,6 +148,15 @@ def offload_single_blob_task(self, blob_id: str) -> Dict[str, Any]:
128148

129149
return {"status": "success", "blob_id": blob_id, "key_id": key_id}
130150

151+
except _TRANSIENT_EXCEPTIONS as e:
152+
logger.warning(
153+
"Transient error offloading blob %s (attempt %d/%d): %s",
154+
blob_id,
155+
self.request.retries + 1,
156+
self.max_retries + 1,
157+
e,
158+
)
159+
raise self.retry(exc=e, countdown=60 * (2**self.request.retries)) from e
131160
except Exception as e: # pylint: disable=broad-except
132161
logger.exception("Failed to offload blob %s: %s", blob_id, e)
133162
return {"status": "error", "blob_id": blob_id, "error": str(e)}

src/backend/core/signals.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
from django.dispatch import receiver
99

1010
from core import models
11+
from core.enums import BlobStorageLocationChoices
1112
from core.services.identity.keycloak import (
1213
sync_mailbox_to_keycloak_user,
1314
sync_maildomain_to_keycloak_group,
1415
)
1516
from core.services.search import MESSAGE_INDEX, get_opensearch_client
1617
from core.services.search.tasks import index_message_task, reindex_thread_task
18+
from core.services.tiered_storage import TieredStorageService
1719

1820
logger = logging.getLogger(__name__)
1921

@@ -104,6 +106,19 @@ def index_thread_post_save(sender, instance, created, **kwargs):
104106
)
105107

106108

109+
@receiver(post_delete, sender=models.Blob)
110+
def cleanup_blob_storage(sender, instance, **kwargs):
111+
"""Clean up object storage when a blob is deleted.
112+
113+
Uses a post_delete signal instead of a model delete() override to ensure
114+
cleanup also runs during CASCADE and QuerySet bulk deletes.
115+
"""
116+
if instance.storage_location == BlobStorageLocationChoices.OBJECT_STORAGE:
117+
service = TieredStorageService()
118+
if service.enabled:
119+
service.delete_if_orphaned(bytes(instance.sha256))
120+
121+
107122
@receiver(pre_delete, sender=models.Message)
108123
def delete_message_blobs(sender, instance, **kwargs):
109124
"""Delete the blobs associated with a message."""

0 commit comments

Comments
 (0)