Skip to content

Commit 6c43496

Browse files
committed
simplify
1 parent e2db8f5 commit 6c43496

12 files changed

Lines changed: 262 additions & 346 deletions

File tree

.github/workflows/messages.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ jobs:
3939
chown -R 1001:1001 data
4040
- name: Collect static files
4141
run: make collectstatic
42-
- name: Create msg-imports bucket
43-
run: make import-bucket
42+
- name: Create object storage buckets
43+
run: make create-buckets
4444
- name: Run backend tests
4545
run: make test-back-parallel
4646

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ bootstrap: ## Prepare the project for local development
100100
update: ## Update the project with latest changes
101101
@$(MAKE) data/media
102102
@$(MAKE) data/static
103-
@$(MAKE) import-bucket
103+
@$(MAKE) create-buckets
104104
@$(MAKE) create-env-files
105105
@$(MAKE) build
106106
@$(MAKE) collectstatic
@@ -161,11 +161,11 @@ restart-minimal: \
161161
start-minimal
162162
.PHONY: restart-minimal
163163

164-
import-bucket: ## create the message imports & blobs buckets in objectstorage
164+
create-buckets: ## create the message imports & blobs buckets in objectstorage
165165
@$(COMPOSE) up -d objectstorage --wait
166166
@$(MANAGE_DB) create_bucket --storage message-imports --expire-days 1
167167
@$(MANAGE_DB) create_bucket --storage message-blobs
168-
.PHONY: import-bucket
168+
.PHONY: create-buckets
169169

170170
shell-objectstorage: ## open a shell in the objectstorage container
171171
@$(COMPOSE) run --rm --build objectstorage bash

src/backend/core/admin.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1005,10 +1005,19 @@ class BlobAdmin(admin.ModelAdmin):
10051005
"size",
10061006
"size_compressed",
10071007
"compression",
1008+
"storage_location",
1009+
"encryption_key_id",
10081010
"created_at",
10091011
)
10101012
search_fields = ("mailbox__local_part", "mailbox__domain__name", "content_type")
1011-
list_filter = ("content_type", "compression", "created_at", "updated_at")
1013+
list_filter = (
1014+
"content_type",
1015+
"compression",
1016+
"storage_location",
1017+
"encryption_key_id",
1018+
"created_at",
1019+
"updated_at",
1020+
)
10121021
autocomplete_fields = ("mailbox",)
10131022

10141023
def get_queryset(self, request):

src/backend/core/api/viewsets/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ def get(self, request):
190190
]
191191
dict_settings = {}
192192
for setting in array_settings:
193-
dict_settings[setting] = getattr(settings, setting, None)
193+
if hasattr(settings, setting):
194+
dict_settings[setting] = getattr(settings, setting)
194195

195196
# AI Features
196197
dict_settings["AI_ENABLED"] = is_ai_enabled()

src/backend/core/management/commands/verify_tiered_storage.py

Lines changed: 106 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
from core.enums import BlobStorageLocationChoices, CompressionTypeChoices
1919
from core.models import Blob
20-
from core.services.tiered_storage import TieredStorageService
20+
from core.services.tiered_storage import TieredStorageService, sha256_advisory_lock
2121

2222

2323
class Command(BaseCommand):
@@ -31,11 +31,6 @@ def add_arguments(self, parser):
3131
help="Verification mode: db-to-storage (check DB records have storage backing), "
3232
"storage-to-db (find orphans in storage), or full (both)",
3333
)
34-
parser.add_argument(
35-
"--fix",
36-
action="store_true",
37-
help="Fix issues: delete orphans from storage (missing blobs are only reported)",
38-
)
3934
parser.add_argument(
4035
"--verify-hashes",
4136
action="store_true",
@@ -61,7 +56,6 @@ def add_arguments(self, parser):
6156

6257
def handle(self, *args, **options):
6358
self.service = TieredStorageService()
64-
self.fix = options["fix"]
6559
self.verify_hashes = options["verify_hashes"]
6660
self.limit = options["limit"]
6761
self.dry_run = options["dry_run"]
@@ -184,16 +178,6 @@ def verify_storage_to_db(self):
184178
if not refs_exist:
185179
orphan_count += 1
186180
self.stdout.write(self.style.WARNING(f"ORPHAN: {obj_name}"))
187-
if self.fix:
188-
try:
189-
self.service.storage.delete(obj_name)
190-
self.stdout.write(
191-
self.style.SUCCESS(f" -> Deleted orphan {obj_name}")
192-
)
193-
except Exception as e: # pylint: disable=broad-except
194-
self.stdout.write(
195-
self.style.ERROR(f" -> Failed to delete: {e}")
196-
)
197181

198182
# Optionally verify hash
199183
if self.verify_hashes and refs_exist:
@@ -400,183 +384,145 @@ def re_encrypt_blobs(self):
400384
)
401385

402386
def _re_encrypt_single_blob(self, blob: Blob, target_key_id: int) -> str:
403-
"""
404-
Re-encrypt a single blob with the target key.
405-
406-
Uses select_for_update to prevent concurrent modifications.
407-
For object storage blobs, keeps old content in memory so S3 can
408-
be restored if the DB update fails.
387+
"""Re-encrypt a single blob (and its dedup cohort) with the target key.
409388
410-
Args:
411-
blob: The blob to re-encrypt
412-
target_key_id: The encryption key ID to use for re-encryption
389+
Holds the per-sha256 advisory lock for the duration so concurrent
390+
offload, cleanup, or another re-encrypt of the same content cannot
391+
interleave.
413392
414-
Returns:
415-
"success", "skipped", or "error"
393+
Returns "success", "skipped", or "error".
416394
"""
417-
old_key_id = blob.encryption_key_id
418-
419395
if self.dry_run:
420-
location = (
421-
"POSTGRES"
422-
if blob.storage_location == BlobStorageLocationChoices.POSTGRES
423-
else "OBJECT_STORAGE"
424-
)
396+
location = blob.get_storage_location_display()
425397
self.stdout.write(
426398
f" Would re-encrypt blob {blob.id} ({location}): "
427-
f"key_id {old_key_id} -> {target_key_id}"
399+
f"key_id {blob.encryption_key_id} -> {target_key_id}"
428400
)
429401
return "success"
430402

403+
sha256 = bytes(blob.sha256)
404+
431405
if blob.storage_location == BlobStorageLocationChoices.POSTGRES:
432-
with transaction.atomic():
433-
blob = Blob.objects.select_for_update().get(id=blob.id)
434-
if blob.encryption_key_id == target_key_id:
435-
return "skipped"
406+
return self._re_encrypt_postgres_blob(blob, sha256, target_key_id)
407+
return self._re_encrypt_object_storage_blob(blob, sha256, target_key_id)
408+
409+
def _re_encrypt_postgres_blob(
410+
self, blob: Blob, sha256: bytes, target_key_id: int
411+
) -> str:
412+
"""Re-encrypt a single PostgreSQL-backed blob row in place."""
413+
with transaction.atomic(), sha256_advisory_lock(sha256):
414+
blob = Blob.objects.select_for_update().get(id=blob.id)
415+
if blob.encryption_key_id == target_key_id:
416+
return "skipped"
417+
if blob.raw_content is None:
418+
self.stdout.write(
419+
self.style.WARNING(
420+
f" SKIP blob {blob.id}: no content in PostgreSQL"
421+
)
422+
)
423+
return "skipped"
436424

437-
old_key_id = blob.encryption_key_id
425+
old_key_id = blob.encryption_key_id
426+
decrypted = self.service.decrypt(bytes(blob.raw_content), old_key_id)
427+
encrypted, new_key_id = self.service.encrypt(decrypted)
438428

439-
if blob.raw_content is None:
440-
self.stdout.write(
441-
self.style.WARNING(
442-
f" SKIP blob {blob.id}: no content in PostgreSQL"
443-
)
444-
)
445-
return "skipped"
429+
blob.raw_content = encrypted
430+
blob.encryption_key_id = new_key_id
431+
blob.save(
432+
update_fields=["raw_content", "encryption_key_id", "size_compressed"]
433+
)
446434

447-
decrypted = self.service.decrypt(bytes(blob.raw_content), old_key_id)
448-
encrypted, new_key_id = self.service.encrypt(decrypted)
435+
self.stdout.write(
436+
f" Re-encrypted blob {blob.id} (POSTGRES): "
437+
f"key_id {old_key_id} -> {new_key_id}"
438+
)
439+
return "success"
449440

450-
blob.raw_content = encrypted
451-
blob.encryption_key_id = new_key_id
452-
blob.save(
453-
update_fields=[
454-
"raw_content",
455-
"encryption_key_id",
456-
"size_compressed",
457-
]
458-
)
441+
def _re_encrypt_object_storage_blob(
442+
self, blob: Blob, sha256: bytes, target_key_id: int
443+
) -> str:
444+
"""Re-encrypt the storage object backing an OBJECT_STORAGE cohort.
459445
446+
Stages the new ciphertext at a temp key so a crash between DB
447+
commit and storage promote leaves storage and DB consistent.
448+
Concurrency is handled by the per-sha256 advisory lock; the
449+
bulk-update flips every sibling to the new key in one round trip.
450+
"""
451+
if not self.service.enabled:
460452
self.stdout.write(
461-
f" Re-encrypted blob {blob.id} (POSTGRES): "
462-
f"key_id {old_key_id} -> {new_key_id}"
453+
self.style.WARNING(
454+
f" SKIP blob {blob.id}: object storage not configured"
455+
)
463456
)
457+
return "skipped"
464458

465-
else:
466-
# Object storage blob
467-
if not self.service.enabled:
468-
self.stdout.write(
469-
self.style.WARNING(
470-
f" SKIP blob {blob.id}: object storage not configured"
471-
)
472-
)
473-
return "skipped"
459+
storage_key = self.service.compute_storage_key(sha256)
460+
temp_key = None
474461

475-
storage_key = self.service.compute_storage_key(bytes(blob.sha256))
462+
try:
463+
with transaction.atomic(), sha256_advisory_lock(sha256):
464+
cohort = Blob.objects.filter(
465+
sha256=sha256,
466+
storage_location=BlobStorageLocationChoices.OBJECT_STORAGE,
467+
).exclude(encryption_key_id=target_key_id)
468+
old_blob = cohort.first()
469+
if old_blob is None:
470+
return "skipped"
471+
old_key_id = old_blob.encryption_key_id
476472

477-
# Read & re-encrypt outside the DB transaction.
478-
try:
479-
with self.service.storage.open(storage_key, "rb") as f:
480-
old_encrypted = f.read()
481-
except FileNotFoundError:
482-
self.stderr.write(
483-
self.style.ERROR(
484-
f" ERROR blob {blob.id}: not found in storage at {storage_key}"
473+
try:
474+
with self.service.storage.open(storage_key, "rb") as f:
475+
old_encrypted = f.read()
476+
except FileNotFoundError:
477+
self.stderr.write(
478+
self.style.ERROR(
479+
f" ERROR blob {blob.id}: not found in storage at "
480+
f"{storage_key}"
481+
)
485482
)
486-
)
487-
return "error"
483+
return "error"
488484

489-
decrypted = self.service.decrypt(old_encrypted, blob.encryption_key_id)
490-
encrypted, new_key_id = self.service.encrypt(decrypted)
485+
decrypted = self.service.decrypt(old_encrypted, old_key_id)
486+
encrypted, new_key_id = self.service.encrypt(decrypted)
491487

492-
# Stage the new ciphertext at a temp key so the canonical object
493-
# is not touched until the DB transaction commits. If the DB
494-
# update fails, we drop the temp object and storage stays
495-
# consistent with the DB.
496-
temp_key = f"{storage_key}.tmp.{new_key_id}"
497-
self.service.storage.save(temp_key, ContentFile(encrypted))
488+
temp_key = f"{storage_key}.tmp.{new_key_id}"
489+
self.service.storage.save(temp_key, ContentFile(encrypted))
498490

499-
promote_done = {"value": False}
491+
cohort.update(encryption_key_id=new_key_id)
500492

501-
def _promote_temp():
502-
try:
503-
self.service.storage.save(storage_key, ContentFile(encrypted))
504-
finally:
505-
try:
506-
self.service.storage.delete(temp_key)
507-
except Exception as cleanup_err: # pylint: disable=broad-except
508-
self.stderr.write(
509-
self.style.ERROR(
510-
f" WARN blob {blob.id}: failed to delete temp "
511-
f"{temp_key}: {cleanup_err}"
512-
)
513-
)
514-
promote_done["value"] = True
493+
staged_temp = temp_key
515494

516-
try:
517-
with transaction.atomic():
518-
# The storage object is shared by every Blob row that has
519-
# the same sha256 and is in OBJECT_STORAGE (deduplication
520-
# in upload_blob). Lock the entire cohort so we can flip
521-
# all of them to new_key_id atomically — otherwise the
522-
# promote rewrites the object with new ciphertext while
523-
# un-rotated siblings still hold the old key_id.
524-
siblings = list(
525-
Blob.objects.select_for_update().filter(
526-
sha256=blob.sha256,
527-
storage_location=BlobStorageLocationChoices.OBJECT_STORAGE,
528-
)
529-
)
530-
if not siblings:
531-
# Row was deleted between iteration and lock.
532-
try:
533-
self.service.storage.delete(temp_key)
534-
except Exception as cleanup_err: # pylint: disable=broad-except
535-
self.stderr.write(
536-
self.style.ERROR(
537-
f" WARN blob {blob.id}: failed to delete "
538-
f"temp {temp_key}: {cleanup_err}"
539-
)
540-
)
541-
return "skipped"
542-
543-
pending = [
544-
s for s in siblings if s.encryption_key_id != target_key_id
545-
]
546-
if not pending:
547-
# Another worker rotated this cohort already.
495+
def _promote_temp():
496+
try:
497+
self.service.storage.save(storage_key, ContentFile(encrypted))
498+
finally:
548499
try:
549-
self.service.storage.delete(temp_key)
500+
self.service.storage.delete(staged_temp)
550501
except Exception as cleanup_err: # pylint: disable=broad-except
551502
self.stderr.write(
552503
self.style.ERROR(
553504
f" WARN blob {blob.id}: failed to delete "
554-
f"temp {temp_key}: {cleanup_err}"
505+
f"temp {staged_temp}: {cleanup_err}"
555506
)
556507
)
557-
return "skipped"
558508

559-
old_key_id = pending[0].encryption_key_id
560-
Blob.objects.filter(id__in=[s.id for s in pending]).update(
561-
encryption_key_id=new_key_id
562-
)
563-
transaction.on_commit(_promote_temp)
564-
except Exception:
565-
if not promote_done["value"]:
566-
try:
567-
self.service.storage.delete(temp_key)
568-
except Exception as cleanup_err: # pylint: disable=broad-except
569-
self.stderr.write(
570-
self.style.ERROR(
571-
f" WARN blob {blob.id}: failed to delete temp "
572-
f"{temp_key}: {cleanup_err}"
573-
)
509+
transaction.on_commit(_promote_temp)
510+
# Promotion now owns temp_key; don't double-delete on rollback.
511+
temp_key = None
512+
finally:
513+
if temp_key is not None:
514+
try:
515+
self.service.storage.delete(temp_key)
516+
except Exception as cleanup_err: # pylint: disable=broad-except
517+
self.stderr.write(
518+
self.style.ERROR(
519+
f" WARN blob {blob.id}: failed to delete temp "
520+
f"{temp_key}: {cleanup_err}"
574521
)
575-
raise
576-
577-
self.stdout.write(
578-
f" Re-encrypted blob {blob.id} (OBJECT_STORAGE): "
579-
f"key_id {old_key_id} -> {new_key_id}"
580-
)
522+
)
581523

524+
self.stdout.write(
525+
f" Re-encrypted blob {blob.id} (OBJECT_STORAGE): "
526+
f"key_id {old_key_id} -> {new_key_id}"
527+
)
582528
return "success"

0 commit comments

Comments
 (0)