Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds tiered blob storage with optional encryption and configurable compression, object-storage backend settings, async offload tasks, DB model changes for storage location and key id, a management command for verification/key rotation, startup checks, admin exposure, and extensive tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Application
participant Blob as Blob Model
participant TierSvc as TieredStorageService
participant Encrypt as AES-256-GCM
participant ObjStor as Object Storage
participant DB as PostgreSQL
Client->>Blob: create_blob(raw_bytes)
Blob->>TierSvc: encrypt(compressed_bytes)
TierSvc->>Encrypt: encrypt(data, active_key)
Encrypt-->>TierSvc: encrypted_bytes, key_id
TierSvc-->>Blob: encrypted_bytes, key_id
Blob->>DB: save(storage_location=OBJECT_STORAGE, encryption_key_id, raw_content=NULL)
DB-->>Blob: saved
Client->>Blob: get_content()
Blob->>DB: read storage_location, encryption_key_id
DB-->>Blob: location=OBJECT_STORAGE, key_id
Blob->>TierSvc: download_blob(blob)
TierSvc->>ObjStor: GET blobs/{key_id}/{shard}/{sha}
ObjStor-->>TierSvc: encrypted_bytes
TierSvc->>Encrypt: decrypt(encrypted_bytes, key_id)
Encrypt-->>TierSvc: decompressed_bytes
TierSvc-->>Blob: decompressed_bytes
Blob-->>Client: decompressed_bytes
sequenceDiagram
participant Beat as Celery Beat
participant Task as offload_blobs_task
participant DB as PostgreSQL
participant BlobTask as offload_single_blob_task
participant TierSvc as TieredStorageService
participant ObjStor as Object Storage
Beat->>Task: trigger (hourly)
Task->>DB: SELECT eligible blobs (POSTGRES, aged, size)
DB-->>Task: blob_ids
Task->>BlobTask: enqueue per blob_id
BlobTask->>DB: SELECT blob WHERE id=blob_id
DB-->>BlobTask: blob
BlobTask->>DB: acquire advisory_lock(sha256)
DB-->>BlobTask: locked
BlobTask->>TierSvc: upload_blob(raw_content)
TierSvc->>TierSvc: encrypt(raw_content)
TierSvc->>ObjStor: PUT blobs/{key_id}/{shard}/{sha}
ObjStor-->>TierSvc: stored
TierSvc-->>BlobTask: key_id
BlobTask->>DB: UPDATE blob SET storage_location=OBJECT_STORAGE, encryption_key_id, raw_content=NULL
DB-->>BlobTask: updated
sequenceDiagram
participant Cmd as verify_tiered_storage --re-encrypt
participant DB as PostgreSQL
participant TierSvc as TieredStorageService
participant ObjStor as Object Storage
Cmd->>DB: SELECT blobs WHERE encryption_key_id != active_key_id
DB-->>Cmd: blobs_to_rotate
loop per blob
Cmd->>DB: acquire advisory_lock(sha256)
DB-->>Cmd: locked
alt POSTGRES-backed
Cmd->>DB: read raw_content
DB-->>Cmd: encrypted_bytes
Cmd->>TierSvc: decrypt(encrypted_bytes, old_key)
TierSvc-->>Cmd: plaintext
Cmd->>TierSvc: encrypt(plaintext, active_key)
TierSvc-->>Cmd: new_encrypted
Cmd->>DB: UPDATE blob raw_content=new_encrypted, encryption_key_id=active_key
else OBJECT_STORAGE-backed
Cmd->>ObjStor: GET blobs/{old_key}/{shard}/{sha}
ObjStor-->>Cmd: encrypted_bytes
Cmd->>TierSvc: decrypt(encrypted_bytes, old_key)
TierSvc-->>Cmd: plaintext
Cmd->>TierSvc: encrypt(plaintext, active_key)
TierSvc-->>ObjStor: PUT blobs/{active_key}/{shard}/{sha}
Cmd->>DB: UPDATE blob encryption_key_id=active_key
Cmd->>TierSvc: delete old object (best-effort)
end
end
Cmd-->>Cmd: report summary
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 466-495: The current flow writes the newly encrypted object via
self.service.storage.save(storage_key, ...) before updating
blob.encryption_key_id inside transaction.atomic(), risking storage/DB
inconsistency if the DB update fails; instead, write the new encrypted bytes to
a temporary object (e.g. derive a temp key from storage_key and new_key_id)
using self.service.storage.save(temp_key, ContentFile(encrypted)), then perform
the DB update inside transaction.atomic() (update blob.encryption_key_id and
save), and only after the transaction succeeds atomically remove/rename the temp
object to the final storage_key (or copy temp→final and delete temp) so storage
and DB remain consistent; reference symbols: self.service.storage.save,
storage_key, temp_key (create), self.service.encrypt, blob.encryption_key_id,
transaction.atomic.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 31-43: In __init__, the enabled gate currently checks for an
OPTIONS.endpoint_url which wrongly disables valid S3 setups; instead set
self.enabled based on presence of the "message-blobs" storage config itself
(e.g. check that settings.STORAGES contains a non-empty "message-blobs" entry).
Update the assignment to self.enabled to use
settings.STORAGES.get("message-blobs") (or "message-blobs" in settings.STORAGES
and truthy) rather than digging for OPTIONS.endpoint_url so AWS S3 configs
without endpoint_url remain enabled.
🧹 Nitpick comments (3)
src/backend/core/services/tiered_storage_tasks.py (1)
68-133: Consider adding retry for transient failures.The task handles lock contention gracefully by returning "locked" status, but transient failures (network issues, temporary S3 unavailability) at line 131 are logged and returned as errors without retry. The periodic
offload_blobs_taskwill eventually re-queue these blobs, but adding explicit retry behavior for transient exceptions (e.g.,ConnectionError,Timeout) could improve reliability.💡 Optional: Add retry for transient failures
-@celery_app.task(bind=True) +@celery_app.task(bind=True, autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, max_retries=3) def offload_single_blob_task(self, blob_id: str) -> Dict[str, Any]:src/backend/core/models.py (1)
1536-1557: Enforce storage_location/raw_content invariants at the DB layer.
Withraw_contentnow nullable, inconsistent states (e.g., OBJECT_STORAGE + non-null content)
become possible and will surface as runtime errors inget_content. A check constraint makes
the invariant explicit and avoids silent drift. This will require a migration.♻️ Proposed constraint
constraints = [ models.CheckConstraint( check=( models.Q(mailbox__isnull=False) | models.Q(maildomain__isnull=False) ), name="blob_has_owner", ), + models.CheckConstraint( + check=( + models.Q( + storage_location=BlobStorageLocationChoices.POSTGRES, + raw_content__isnull=False, + ) + | models.Q( + storage_location=BlobStorageLocationChoices.OBJECT_STORAGE, + raw_content__isnull=True, + ) + ), + name="blob_storage_location_matches_content", + ), ]As per coding guidelines, enforce data integrity with model constraints.
Also applies to: 1583-1589
src/backend/core/services/tiered_storage.py (1)
244-281: Guard against orphan-delete races and capture delete errors.
There’s a TOCTOU window between the reference count (Line 259-263) and deletion (Line 274-275);
a concurrent offload could add a reference after the count and still have its object deleted.
Consider an advisory lock keyed by SHA256 or a transactional guard around the check+delete.Also, capture the storage deletion exception to Sentry so cleanup failures are observable.
♻️ Suggested Sentry capture
from cryptography.fernet import Fernet +from sentry_sdk import capture_exception @@ - except Exception as e: # pylint: disable=broad-except - logger.warning("Failed to delete blob from storage %s: %s", key, e) + except Exception as exc: # pylint: disable=broad-except + capture_exception(exc) + logger.warning("Failed to delete blob from storage %s: %s", key, exc) return FalseAs per coding guidelines, capture and report exceptions to Sentry.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
compose.yamlenv.d/development/backend.defaultssrc/backend/core/api/viewsets/config.pysrc/backend/core/enums.pysrc/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.pysrc/backend/core/models.pysrc/backend/core/services/search/search.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/signals.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/conftest.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/__init__.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/core/utils.pysrc/backend/messages/celery_app.pysrc/backend/messages/settings.py
🧰 Additional context used
📓 Path-based instructions (6)
src/backend/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
src/backend/**/*.py: Follow Django/PEP 8 style with a 100-character line limit
Use descriptive, snake_case names for variables and functions
Use Django ORM for database access; avoid raw SQL unless necessary for performance
Use Django’s built-in user model and authentication framework
Prefer try-except blocks to handle exceptions in business logic and views
Log expected and unexpected actions with appropriate log levels
Capture and report exceptions to Sentry; use capture_exception() for custom errors
Do not log sensitive information (tokens, passwords, financial/health data, PII)
Files:
src/backend/messages/celery_app.pysrc/backend/core/tests/tasks/__init__.pysrc/backend/core/api/viewsets/config.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/services/search/search.pysrc/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/utils.pysrc/backend/core/signals.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/messages/settings.pysrc/backend/core/tests/conftest.pysrc/backend/core/models.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/enums.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py
src/backend/**/{tests.py,tests/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
src/backend/**/{tests.py,tests/**/*.py}: Use Django’s testing tools (pytest-django) to ensure code quality and reliability
Unit tests should focus on a single use case, keep assertions minimal, and cover all possible cases
Files:
src/backend/core/tests/tasks/__init__.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/core/tests/conftest.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.py
src/backend/**/{settings.py,middleware.py}
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
Use middleware judiciously for cross-cutting concerns (authentication, logging, caching)
Files:
src/backend/messages/settings.py
src/backend/**/settings.py
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
src/backend/**/settings.py: Leverage Django’s caching framework (e.g., Redis/Memcached) where appropriate
Use Django’s cache framework with a backend like Redis or Memcached to reduce DB load
Optimize static file handling using Django’s staticfiles pipeline (e.g., WhiteNoise)
Files:
src/backend/messages/settings.py
src/backend/**/{models.py,forms.py,views.py}
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
Keep business logic in models and forms; keep views thin and focused on request handling
Files:
src/backend/core/models.py
src/backend/**/{models.py,migrations/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
Implement database indexing and query optimization (Model Meta indexes, constraints)
Files:
src/backend/core/models.pysrc/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py
🧠 Learnings (1)
📚 Learning: 2025-09-02T10:12:12.835Z
Learnt from: CR
Repo: suitenumerique/messages PR: 0
File: .cursor/rules/django-python.mdc:0-0
Timestamp: 2025-09-02T10:12:12.835Z
Learning: Applies to src/backend/**/{tests.py,tests/**/*.py} : Use Django’s testing tools (pytest-django) to ensure code quality and reliability
Applied to files:
src/backend/core/tests/conftest.pysrc/backend/core/tests/commands/__init__.py
🧬 Code graph analysis (3)
src/backend/messages/settings.py (1)
src/backend/core/utils.py (1)
JSONValue(8-22)
src/backend/core/tests/conftest.py (1)
src/backend/core/services/tiered_storage.py (1)
storage(45-49)
src/backend/core/models.py (2)
src/backend/core/enums.py (2)
BlobStorageLocationChoices(61-65)CompressionTypeChoices(54-58)src/backend/core/services/tiered_storage.py (6)
TieredStorageService(28-296)encrypt(68-91)compute_storage_key(52-66)decrypt(93-117)download_blob(208-242)delete_if_orphaned(244-280)
🪛 Ruff (0.14.11)
src/backend/core/tests/commands/test_verify_tiered_storage.py
31-31: import should be at the top-level of a file
(PLC0415)
180-180: import should be at the top-level of a file
(PLC0415)
208-208: import should be at the top-level of a file
(PLC0415)
275-275: import should be at the top-level of a file
(PLC0415)
357-357: import should be at the top-level of a file
(PLC0415)
397-397: import should be at the top-level of a file
(PLC0415)
415-415: import should be at the top-level of a file
(PLC0415)
442-442: import should be at the top-level of a file
(PLC0415)
470-470: import should be at the top-level of a file
(PLC0415)
506-506: import should be at the top-level of a file
(PLC0415)
552-552: import should be at the top-level of a file
(PLC0415)
596-596: import should be at the top-level of a file
(PLC0415)
630-630: import should be at the top-level of a file
(PLC0415)
src/backend/core/management/commands/verify_tiered_storage.py
280-280: import should be at the top-level of a file
(PLC0415)
282-282: import should be at the top-level of a file
(PLC0415)
src/backend/core/tests/services/test_tiered_storage.py
114-114: import should be at the top-level of a file
(PLC0415)
129-129: import should be at the top-level of a file
(PLC0415)
271-271: import should be at the top-level of a file
(PLC0415)
296-296: import should be at the top-level of a file
(PLC0415)
362-362: import should be at the top-level of a file
(PLC0415)
396-396: import should be at the top-level of a file
(PLC0415)
571-571: import should be at the top-level of a file
(PLC0415)
614-614: import should be at the top-level of a file
(PLC0415)
654-654: import should be at the top-level of a file
(PLC0415)
src/backend/core/tests/conftest.py
22-22: import should be at the top-level of a file
(PLC0415)
src/backend/core/models.py
1474-1474: import should be at the top-level of a file
(PLC0415)
1625-1625: import should be at the top-level of a file
(PLC0415)
1656-1656: import should be at the top-level of a file
(PLC0415)
src/backend/core/services/tiered_storage.py
132-132: import should be at the top-level of a file
(PLC0415)
149-149: import should be at the top-level of a file
(PLC0415)
257-257: import should be at the top-level of a file
(PLC0415)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: test-e2e
- GitHub Check: lint-back
- GitHub Check: build-front
- GitHub Check: test-front
- GitHub Check: check-api-state
- GitHub Check: lint-front
- GitHub Check: test-back
🔇 Additional comments (52)
src/backend/core/tests/tasks/__init__.py (1)
1-1: LGTM!Clean package initializer for the tasks test module.
src/backend/core/tests/commands/__init__.py (1)
1-1: LGTM!Appropriate package initializer for management command tests.
src/backend/core/tests/services/__init__.py (1)
1-1: LGTM!Appropriate package initializer for service tests.
src/backend/core/tests/tasks/test_task_send_message.py (3)
3-3: LGTM!Appropriate pylint disables for test file patterns:
no-value-for-parameterfor Celery task invocations without bound arguments, andunused-argumentfor pytest fixtures that establish state but aren't directly referenced.
45-45: Good hygiene.Using
_mailboxprefix correctly signals the variable is intentionally unused while still unpacking the fixture tuple.
99-99: Consistent with the pattern above.src/backend/core/api/viewsets/config.py (1)
139-140: LGTM!Using
getattr(settings, setting, None)ensures consistent API response schema where all keys are always present, aligning with the OpenAPI specification that marks these fields as required. This is cleaner than conditional inclusion and provides predictable behavior for frontend consumers.src/backend/core/utils.py (1)
14-22: LGTM!Returning
Nonefor empty/whitespace strings allows django-configurations to fall back to default values, which is appropriate for optional JSON configuration like encryption keys.env.d/development/backend.defaults (1)
54-66: LGTM!The development defaults are well-documented with clear comments. Tiered storage is configured with a 3-day offload threshold and credentials follow the existing pattern for
msg-imports.compose.yaml (1)
84-84: LGTM!The
msg-blobsbucket creation follows the existing pattern. Correctly omits the ILM expiration rules since blobs are intended for long-term storage unlike temporary imports.src/backend/messages/celery_app.py (1)
48-52: LGTM! The new beat schedule entry follows the existing pattern and an hourly interval is appropriate for blob offloading. Thecore.services.tiered_storage_tasks.offload_blobs_taskis properly defined with the@celery_app.task(bind=True)decorator and has comprehensive test coverage.src/backend/core/services/search/search.py (1)
40-42: No action required; direct access tosettings.OPENSEARCH_INDEX_THREADSis safe.The setting is properly defined in
src/backend/messages/settings.pyas aBooleanValuewith a default ofTrue. It will always exist at runtime, and direct access does not riskAttributeError. This pattern is already used consistently throughout the codebase in multiple other files (src/backend/core/signals.py,src/backend/core/services/search/tasks.py).Likely an incorrect or invalid review comment.
src/backend/core/enums.py (1)
61-66: LGTM!The new
BlobStorageLocationChoicesenum follows the existing patterns in this file with appropriate integer values and descriptive labels. Good placement in the logical order of the file.src/backend/core/tests/conftest.py (1)
14-45: LGTM!The session-scoped fixture properly defers Django imports and handles missing storage configuration gracefully. The broad exception handling is appropriate here to prevent test setup failures when object storage isn't configured. The
pylint: disablecomment at line 3 correctly covers the in-function import pattern.src/backend/core/services/tiered_storage_tasks.py (1)
26-65: LGTM!The task efficiently streams eligible blob IDs using
values_listwithiterator()to avoid memory pressure. The filtering criteria (age threshold + minimum size) appropriately limit the scope of each run.src/backend/core/tests/commands/test_verify_tiered_storage.py (3)
1-7: LGTM!Comprehensive test coverage for the
verify_tiered_storagemanagement command. The tests appropriately cover disabled states, E2E verification modes, hash verification with corruption detection, and re-encryption workflows. The in-function imports are intentionally used for test isolation and thepylint: disablecomment at line 7 correctly covers this pattern.
78-80: Good cleanup pattern.Consistent use of
try/finallywith existence checks ensures test isolation and prevents storage pollution across test runs.
468-534: Thorough E2E test for object storage re-encryption.This test covers the complete workflow: encrypting with old key, uploading to storage, rotating keys, re-encrypting, and verifying content integrity via download and decompression.
src/backend/core/signals.py (1)
51-52: The concern is unfounded.OPENSEARCH_INDEX_THREADSis always defined in settings with a default value (Trueinsrc/backend/messages/settings.py:83-84andFalseas a fallback at line 1090). Direct attribute access tosettings.OPENSEARCH_INDEX_THREADSis safe and will not raiseAttributeError. The change fromgetattr()to direct access is a valid simplification that removes unnecessary defensive programming.Likely an incorrect or invalid review comment.
src/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py (1)
12-27: Migration structure looks correct.The migration properly:
- Adds
storage_locationwithdb_index=Truefor efficient filtering- Makes
raw_contentnullable to support object storage blobs- Uses hardcoded choices in migration (correct Django practice)
One consideration: if key rotation queries will frequently filter by
encryption_key_id(e.g., finding all blobs encrypted with a specific key), adding an index on that field could be beneficial. However, this can be deferred based on actual query patterns.src/backend/core/tests/services/test_tiered_storage.py (5)
24-181: Comprehensive unit test coverage for encryption/decryption.The unit tests thoroughly cover:
- Storage key computation with different SHA256 prefixes
- Encryption passthrough when disabled (key_id=0)
- Proper error handling for invalid/missing keys and corrupted data
- Key rotation scenarios maintaining backward compatibility
Good separation of concerns with no DB or storage dependencies.
183-263: Good database-level test coverage.Tests properly validate:
- Default storage location behavior
- Content retrieval from PostgreSQL
- Error handling when content is missing
- SHA256-based storage key derivation
- Deduplication detection via
check_already_uploaded
389-420: Critical regression test for double-encryption bug.This test (
test_offload_with_encryption_roundtrip) is valuable as it explicitly guards against the double-encryption bug mentioned in the docstring. The test verifies that encrypted blobs can be offloaded and read back correctly, which is a common failure point.
494-562: Important deduplication behavior documented in test.The test correctly validates that when two blobs with identical content are encrypted with different keys, deduplication uses the first blob's encryption key_id. The inline comment at line 554 clarifies this is expected behavior until key rotation is complete.
565-667: Key rotation tests cover both storage locations.The tests properly validate the re-encryption workflow for:
- PostgreSQL-stored blobs (decrypt with old key, encrypt with new key, update in place)
- Object storage blobs (download, decrypt, re-encrypt, upload, update metadata)
Both tests verify content integrity after rotation, which is critical.
src/backend/messages/settings.py (2)
234-259: New storage configuration follows existing patterns.The
message-blobsstorage configuration mirrors the existingmessage-importspattern. The default bucket namemsg-blobsis provided.One difference:
endpoint_urlhas no default value, whereas some configurations might expect a default for local development. Verify this is intentional and that the development environment properly setsSTORAGE_MESSAGE_BLOBS_ENDPOINT_URL.
376-398: Well-documented encryption and offload configuration.The configuration properly:
- Documents the key format and key_id=0 convention
- Uses appropriate types (
JSONValuefor dict,PositiveIntegerValuefor IDs)- Defaults to encryption disabled (key_id=0), requiring explicit opt-in
- Allows fine-grained control over offload timing and size thresholds
src/backend/core/tests/tasks/test_tiered_storage_tasks.py (5)
32-48: Disabled state test properly mocks settings.The test correctly mocks the storage configuration to verify the task gracefully handles the disabled state.
55-120: Good coverage of blob eligibility criteria.The tests properly validate:
- Age-based filtering using
TIERED_STORAGE_OFFLOAD_AFTER_DAYS- Size-based filtering using
TIERED_STORAGE_OFFLOAD_MIN_SIZE- The use of
Blob.objects.filter().update()correctly bypasses any auto-update timestampsThe conditional assertion at line 119 (
if settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0) adapts to the test environment settings, which is acceptable.
156-171: Consistent disabled state handling test.Follows the same mocking pattern as the batch task test.
242-264: Good error handling test with rollback verification.The test
test_handles_upload_errorproperly verifies that:
- Upload errors are caught and reported
- The blob state is preserved (transaction rolled back)
- Storage location remains
POSTGRESandraw_contentis intactThis ensures data integrity during upload failures.
335-352: Idempotency test validates concurrent safety.The
test_concurrent_offload_idempotenttest verifies that repeated offload attempts for the same blob are handled gracefully, returningalready_offloadedon subsequent calls. This is important for Celery task retry scenarios.src/backend/core/management/commands/verify_tiered_storage.py (6)
24-88: Well-designed CLI interface with clear modes.The command provides:
- Multiple verification modes (
db-to-storage,storage-to-db,full)- Safety features (
--dry-run,--limit)- Key rotation capability (
--re-encrypt)- Proper early exit when storage is not configured
89-134: DB-to-storage verification handles scale well.The method:
- Uses
iterator(chunk_size=1000)to avoid memory issues with large datasets- Reports missing blobs to stderr (appropriate severity for potential data loss)
- Respects the
--limitoption for sampling
136-218: Storage-to-DB verification with orphan cleanup.The method properly:
- Validates storage path format before processing
- Detects orphans (objects without DB references)
- Optionally deletes orphans with
--fix- Optionally verifies hashes (wisely marked as "slow")
Progress is reported every 100 objects (line 201), which provides good feedback during long operations.
220-254: Storage listing handles multiple backends.The method:
- Prefers direct boto3 access with pagination for efficiency
- Falls back to Django's
listdirfor compatibility- Raises clear error when listing is not supported
256-305: Hash verification covers the complete pipeline.The method correctly:
- Downloads encrypted content from storage
- Decrypts using the blob's
encryption_key_id- Decompresses based on
compressiontype- Computes and compares SHA256 hash
The broad exception handling is acceptable here since the goal is to report issues, not crash.
307-402: Re-encryption workflow with proper validation.The method:
- Validates encryption configuration before starting
- Uses smaller chunk size (100) appropriate for the heavier re-encryption operation
- Supports
--dry-runfor safe preview- Provides clear progress reporting and final summary
src/backend/core/models.py (6)
31-33: No review needed for this import change.
1472-1487: Encryption is integrated cleanly into blob creation.
Storing encrypted bytes and the key id alongside compression keeps retrieval consistent.
1595-1599: Conditionalsize_compressedupdate makes sense.
This avoids clobbering the stored size after offload clearsraw_content.
1601-1609: No issues spotted.
1624-1643: Content retrieval flow reads cleanly.
The PostgreSQL vs object-storage branches and decryption/decompression steps are clear.
1645-1660: Deletion flow aligns with object-storage cleanup.src/backend/core/services/tiered_storage.py (8)
1-24: No review needed for the module header/imports.
44-49: Lazy storage initialization looks good.
51-66: Storage key sharding is straightforward.
68-117: Encryption/decryption flow is consistent with key_id semantics.
Please double-check that configured key formats align with Fernet requirements.
119-155: DB-backed dedup lookups are clear.
208-243: Download path is clear and well-scoped.
282-296: Existence check is straightforward.
157-207: Return value is correctly handled in production code. The call site insrc/backend/core/services/tiered_storage_tasks.py(line 113) properly captures the returnedkey_idand updatesblob.encryption_key_idaccordingly. Dedup-specific tests also confirm this pattern works correctly when the same content is uploaded with different encryption keys.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 449-455: When re-encrypting Postgres blobs in
verify_tiered_storage.py (using self.service.decrypt and self.service.encrypt)
ensure you persist the recomputed size_compressed: after setting
blob.raw_content = encrypted and blob.encryption_key_id = new_key_id, call
blob.save(update_fields=["raw_content", "encryption_key_id", "size_compressed"])
so the recalculated size_compressed is written to the DB (match the approach
used in 0007_blob_size_compressed.py).
- Around line 280-282: Move the local imports of pyzstd and
CompressionTypeChoices out of the function and place them at the top of the
module alongside the existing BlobStorageLocationChoices import so they are
module-level imports; update the import section to include "import pyzstd" and
"from core.enums import CompressionTypeChoices" and remove the in-function
imports where pyzstd and CompressionTypeChoices are currently referenced in
verify_tiered_storage logic.
In `@src/backend/core/signals.py`:
- Around line 109-119: The post_delete signal handler cleanup_blob_storage
currently calls TieredStorageService().delete_if_orphaned() immediately, which
can run inside a transaction that may roll back; wrap the deletion call in
transaction.on_commit to defer it until after successful commit: import
django.db.transaction if needed, capture the instance.sha256 (or
bytes(instance.sha256)) and any required storage_location check inside
cleanup_blob_storage, create a small closure or lambda that calls
TieredStorageService().delete_if_orphaned(...) and register it with
transaction.on_commit, and ensure the existing guard checks
(BlobStorageLocationChoices.OBJECT_STORAGE and service.enabled) are preserved
before scheduling the on_commit callback.
In `@src/backend/core/tests/services/test_tiered_storage.py`:
- Line 114: Several intentional local test imports (e.g. "from
cryptography.fernet import InvalidToken" and the other test-only imports flagged
at the comment's listed locations) are meant to remain inside test functions;
add a trailing "# noqa: PLC0415" to each of those local import lines instead of
moving them to module level so Ruff/Pylint stops reporting
import-outside-toplevel while keeping the imports local to their tests. Ensure
you update each flagged import line (the ones referenced in the review) by
appending the exact comment "# noqa: PLC0415".
🧹 Nitpick comments (2)
src/backend/core/tests/services/test_tiered_storage.py (1)
265-563: Consider skipping E2E tests when object storage isn’t configured.The docstring says “when available,” but these tests hard‑fail if storage isn’t configured. A
skipifmakes local/dev runs more resilient.🔧 Suggested refactor
+# At module level +_STORAGE_ENABLED = TieredStorageService().enabled @@ -@pytest.mark.django_db -class TestTieredStorageE2E: +@pytest.mark.django_db +@pytest.mark.skipif( + not _STORAGE_ENABLED, reason="Object storage not configured" +) +class TestTieredStorageE2E:src/backend/core/tests/tasks/test_tiered_storage_tasks.py (1)
60-194: Consider skipping E2E task tests when object storage isn’t configured.These E2E tests will fail in environments without MinIO. A
skipifkeeps dev/test runs resilient while still exercising coverage in CI.🔧 Suggested refactor
+# At module level +_STORAGE_ENABLED = TieredStorageService().enabled @@ -@pytest.mark.django_db -class TestOffloadBlobsTaskE2E: +@pytest.mark.django_db +@pytest.mark.skipif( + not _STORAGE_ENABLED, reason="Object storage not configured" +) +class TestOffloadBlobsTaskE2E: @@ -@pytest.mark.django_db -class TestOffloadSingleBlobTaskE2E: +@pytest.mark.django_db +@pytest.mark.skipif( + not _STORAGE_ENABLED, reason="Object storage not configured" +) +class TestOffloadSingleBlobTaskE2E:Also applies to: 221-399
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 475-535: The code currently locks and updates only the single Blob
row but the storage object is shared by all Blobs with the same sha256; fix by
selecting and locking the entire cohort (all Blob rows that share the sha256 and
OBJECT_STORAGE) inside the transaction, verify none already have
encryption_key_id == target_key_id (or treat as already-promoted if they do),
then update every locked row's encryption_key_id to new_key_id before calling
transaction.on_commit(_promote_temp). Use a select_for_update() queryset
filtered by blob.sha256 (and storage type/field used for OBJECT_STORAGE) instead
of Blob.objects.select_for_update().get(...), and perform a bulk update or
iterate the locked rows to set and save encryption_key_id so all siblings remain
readable after promotion.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 179-190: The dedup fast-path returns the existing
encryption_key_id without verifying the underlying object is present; change the
logic in the branch that uses check_already_uploaded(sha256_bytes) so it
computes the canonical storage key via compute_storage_key(sha256_bytes) and
verifies the object exists in the storage backend (e.g., storage.exists(key) or
trying to open/read the object) before calling
get_existing_key_id(sha256_bytes); if the object is missing or unreadable, fall
through to the upload path that uses blob.raw_content and storage.save(...) so
we don't convert the row to OBJECT_STORAGE pointing at nothing. Ensure you
reference check_already_uploaded, compute_storage_key, get_existing_key_id,
storage.exists/open, and storage.save in the change.
- Around line 47-61: compute_storage_key currently only uses the SHA of the
uncompressed payload, so blobs with identical source bytes but different
compression metadata collide; change compute_storage_key to include the storage
representation (e.g., compression) in the key or in the input used to derive the
key (either by accepting a compression/representation parameter and returning
something like "blobs/{compression}/{sha[:3]}/{sha}" or by computing the SHA
over the final compressed bytes), and update every call site (notably
upload_blob and the call sites around lines 187-190) to pass the representation
so deduplication is based on both content and representation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 966620c0-2a4d-4de7-be72-51978a4576ac
📒 Files selected for processing (6)
src/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/signals.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.py
0706ad8 to
672d2e6
Compare
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (1)
src/backend/core/services/tiered_storage.py (1)
74-88:⚠️ Potential issue | 🔴 CriticalInclude compression in the storage identity.
sha256is computed from the uncompressed payload, but the stored object is the compressed/encrypted representation. Two blobs with identical source bytes and differentcompressionvalues will collide on the same key, dedup together inupload_blob(), and later one of them will read back bytes that do not match its own metadata. Key derivation/dedup needs to include the storage representation, or uploads need to canonicalize representation before writing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage.py` around lines 74 - 88, compute_storage_key currently derives the object key only from the source SHA256 (sha256_bytes) which ignores the chosen compression/encryption, causing different storage representations to collide; update key derivation to include the storage representation (e.g., incorporate the compression string/enum and any representation-specific metadata into the input used to derive the key) or alternatively ensure upload_blob canonicalizes the bytes (compress/encrypt) before computing the SHA used by compute_storage_key so keys reflect the actual stored bytes; refer to compute_storage_key and upload_blob when making the change so the key always encodes the compression parameter (or is computed from post-compression bytes).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 332-356: The current loop limits raw queryset rows, but
OBJECT_STORAGE re-encrypts entire sha256 cohorts in
_re_encrypt_object_storage_blob, so change the logic in the command to build a
worklist of unique re-encryption work-units first (each work-unit represents
either a single Postgres row or a unique object-storage cohort identified by
sha256/object key), deduplicate those units, then apply the --limit to that
worklist before processing; update the code paths around Blob, queryset,
_re_encrypt_single_blob and _re_encrypt_object_storage_blob to consult the
prebuilt worklist (not the raw queryset) when counting, printing "Blobs to
re-encrypt", and iterating, and ensure current_key_id and OBJECT_STORAGE checks
are used to determine whether a row maps to a cohort work-unit.
In
`@src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py`:
- Around line 13-17: The new migrations.AddField for model_name='blob' adding
name='encryption_key_id' should create an indexed column and also add a
composite index with storage_location to support key-rotation queries; update
the migration to set db_index=True on the SmallIntegerField for
encryption_key_id (the migrations.AddField entry) and add a migrations.AddIndex
entry that creates an index on ('encryption_key_id','storage_location') (or
equivalent Index/fields tuple) so queries filtering by encryption_key_id and
storage_location use an index.
In `@src/backend/core/models.py`:
- Around line 2189-2201: The new tiered-storage fields (storage_location,
created_at, size, sha256) need composite DB indexes to avoid broad scans; update
the Blob model by adding appropriate Meta.indexes entries (e.g., an index on
("storage_location","created_at","size") for offload scans and an index on
("sha256","storage_location") for dedup/orphan checks) and create a Django
migration that adds these same composite indexes to the database so the ORM and
DB stay in sync; reference the Blob class/Blob.Meta and the existing migration
pattern for adding indexes when implementing the change.
In `@src/backend/core/services/tiered_storage_tasks.py`:
- Around line 78-91: The second Blob lookup inside the transaction/lock (the
select_for_update().get(id=blob_id) call in tiered_storage_tasks.py) can raise
Blob.DoesNotExist if the row was deleted after the initial sha256 lookup; wrap
that select_for_update().get(...) in a try/except catching Blob.DoesNotExist and
return {"status": "not_found", "blob_id": blob_id} instead of letting it bubble
to the broad exception handler; apply the same fix to the other similar block
later in this file (the other select_for_update().get usage around the offload
completion code) so both places avoid logger.exception noise and return
not_found on concurrent delete.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 303-310: The current delete_if_orphaned implementation swallows
all exceptions from storage.delete and returns False, preventing
cleanup_orphaned_blob_task from retrying transient errors; change
delete_if_orphaned so that storage.delete failures are not converted into a
silent False: either re-raise the caught exception (allowing
cleanup_orphaned_blob_task to catch and retry) or return a distinct error result
that cleanup_orphaned_blob_task understands as retryable. Locate
compute_storage_key, delete_if_orphaned and the call to self.storage.delete in
tiered_storage.py and update the exception handling to
propagate/storage-delete-error semantics instead of treating every exception as
"still_referenced".
In `@src/backend/core/tests/conftest.py`:
- Around line 39-47: The current bucket bootstrap incorrectly catches
client.exceptions.NoSuchBucket (which doesn’t exist) and falls back to a broad
except that masks real failures; update the try/except around
client.head_bucket() to catch botocore.exceptions.ClientError (import
botocore.exceptions as needed), inspect the error (e.response['Error']['Code']
or HTTPStatus in e.response['ResponseMetadata']) and only call
client.create_bucket(Bucket=bucket_name) when the error indicates the bucket is
missing (404 / NoSuchBucket), otherwise re-raise or log the unexpected exception
so real setup failures aren’t swallowed; replace references to
client.exceptions.NoSuchBucket with this ClientError check and ensure
client.create_bucket remains the recovery path.
In `@src/backend/core/tests/tasks/test_task_send_message.py`:
- Line 3: Remove the module-level "# pylint:
disable=no-value-for-parameter,unused-argument" and instead apply scoped
disables at the exact offending sites: add "# pylint:
disable=no-value-for-parameter" inline on the direct task invocation lines (the
direct task call sites in this test file) and add "# pylint:
disable=unused-argument" either inline on the specific test function signatures
or on the unused fixture parameter names where they are declared; ensure only
those lines reference the disables so future
unused-argument/no-value-for-parameter issues elsewhere in the file are not
globally suppressed.
In `@src/backend/messages/settings.py`:
- Around line 295-320: The "message-blobs" storage entry is always present in
settings.STORAGES which makes TieredStorageService.enabled truthy even when no
object-store is configured; change settings.py so the "message-blobs" key is
only added when real object-storage config is provided (e.g. check relevant env
vars such as STORAGE_MESSAGE_BLOBS_BUCKET_NAME or
STORAGE_MESSAGE_BLOBS_ENDPOINT_URL/ACCESS_KEY/SECRET in os.environ or via
values.Value().environ_name) instead of unconditionally defining the dict;
locate the "message-blobs" dict in src/backend/messages/settings.py and wrap its
creation/assignment in that conditional so
settings.STORAGES.get("message-blobs") is falsy when no blob config is supplied,
preserving the documented "empty = disabled" behavior used by
TieredStorageService.enabled.
---
Duplicate comments:
In `@src/backend/core/services/tiered_storage.py`:
- Around line 74-88: compute_storage_key currently derives the object key only
from the source SHA256 (sha256_bytes) which ignores the chosen
compression/encryption, causing different storage representations to collide;
update key derivation to include the storage representation (e.g., incorporate
the compression string/enum and any representation-specific metadata into the
input used to derive the key) or alternatively ensure upload_blob canonicalizes
the bytes (compress/encrypt) before computing the SHA used by
compute_storage_key so keys reflect the actual stored bytes; refer to
compute_storage_key and upload_blob when making the change so the key always
encodes the compression parameter (or is computed from post-compression bytes).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 59f4d311-6f3d-406c-bdf2-bf1f1057f13c
📒 Files selected for processing (23)
.github/workflows/messages.ymlMakefileenv.d/development/backend.defaultssrc/backend/core/admin.pysrc/backend/core/enums.pysrc/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.pysrc/backend/core/models.pysrc/backend/core/services/search/search.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/signals.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/conftest.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/__init__.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/core/utils.pysrc/backend/messages/celery_app.pysrc/backend/messages/settings.py
| migrations.AddField( | ||
| model_name='blob', | ||
| name='encryption_key_id', | ||
| field=models.SmallIntegerField(default=0, help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])', verbose_name='encryption key ID'), | ||
| ), |
There was a problem hiding this comment.
Missing index on encryption_key_id will hurt large-scale key-rotation queries.
At the PR’s target scale, filtering blobs by encryption key can become a full-table scan. Please index this field (and ideally add a composite index with storage_location if queries combine both).
🔧 Suggested migration adjustment
migrations.AddField(
model_name='blob',
name='encryption_key_id',
- field=models.SmallIntegerField(default=0, help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])', verbose_name='encryption key ID'),
+ field=models.SmallIntegerField(
+ default=0,
+ db_index=True,
+ help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])',
+ verbose_name='encryption key ID',
+ ),
),
+ migrations.AddIndex(
+ model_name='blob',
+ index=models.Index(
+ fields=['storage_location', 'encryption_key_id'],
+ name='core_blob_storage_enc_idx',
+ ),
+ ),As per coding guidelines, src/backend/**/{models.py,migrations/**/*.py}: "Implement database indexing and query optimization (Model Meta indexes, constraints)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py`
around lines 13 - 17, The new migrations.AddField for model_name='blob' adding
name='encryption_key_id' should create an indexed column and also add a
composite index with storage_location to support key-rotation queries; update
the migration to set db_index=True on the SmallIntegerField for
encryption_key_id (the migrations.AddField entry) and add a migrations.AddIndex
entry that creates an index on ('encryption_key_id','storage_location') (or
equivalent Index/fields tuple) so queries filtering by encryption_key_id and
storage_location use an index.
| # Tiered storage fields | ||
| storage_location = models.SmallIntegerField( | ||
| "storage location", | ||
| choices=BlobStorageLocationChoices.choices, | ||
| default=BlobStorageLocationChoices.POSTGRES, | ||
| help_text="Where the blob content is stored", | ||
| db_index=True, | ||
| ) | ||
| encryption_key_id = models.SmallIntegerField( | ||
| "encryption key ID", | ||
| default=0, | ||
| help_text="Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])", | ||
| ) |
There was a problem hiding this comment.
Add composite indexes for the new tiered-storage lookups.
These new columns become the hot filter keys for the periodic offload scan (storage_location, created_at, size) and for dedup/orphan checks (sha256, storage_location). At the scale this PR targets, single-column indexes here will still drive very broad scans on messages_blob. Please add matching composite indexes in Blob.Meta and the migration.
Suggested index shape
class Meta:
db_table = "messages_blob"
verbose_name = "blob"
verbose_name_plural = "blobs"
ordering = ["-created_at"]
+ indexes = [
+ models.Index(
+ fields=["storage_location", "created_at", "size"],
+ name="msg_blob_offload_scan",
+ ),
+ models.Index(
+ fields=["sha256", "storage_location"],
+ name="msg_blob_sha_loc",
+ ),
+ ]
constraints = [As per coding guidelines, src/backend/**/{models.py,migrations/**/*.py}: Implement database indexing and query optimization (Model Meta indexes, constraints).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/models.py` around lines 2189 - 2201, The new tiered-storage
fields (storage_location, created_at, size, sha256) need composite DB indexes to
avoid broad scans; update the Blob model by adding appropriate Meta.indexes
entries (e.g., an index on ("storage_location","created_at","size") for offload
scans and an index on ("sha256","storage_location") for dedup/orphan checks) and
create a Django migration that adds these same composite indexes to the database
so the ORM and DB stay in sync; reference the Blob class/Blob.Meta and the
existing migration pattern for adding indexes when implementing the change.
| "message-blobs": { | ||
| "BACKEND": "storages.backends.s3.S3Storage", | ||
| "OPTIONS": { | ||
| "endpoint_url": values.Value( | ||
| environ_name="STORAGE_MESSAGE_BLOBS_ENDPOINT_URL", | ||
| environ_prefix=None, | ||
| ), | ||
| "bucket_name": values.Value( | ||
| "msg-blobs", | ||
| environ_name="STORAGE_MESSAGE_BLOBS_BUCKET_NAME", | ||
| environ_prefix=None, | ||
| ), | ||
| "access_key": values.Value( | ||
| environ_name="STORAGE_MESSAGE_BLOBS_ACCESS_KEY", | ||
| environ_prefix=None, | ||
| ), | ||
| "secret_key": values.Value( | ||
| environ_name="STORAGE_MESSAGE_BLOBS_SECRET_KEY", | ||
| environ_prefix=None, | ||
| ), | ||
| "region_name": values.Value( | ||
| environ_name="STORAGE_MESSAGE_BLOBS_REGION_NAME", | ||
| environ_prefix=None, | ||
| ), | ||
| }, | ||
| }, |
There was a problem hiding this comment.
Gate message-blobs registration on real object-storage config.
TieredStorageService.enabled is just bool(settings.STORAGES.get("message-blobs")), so defining this alias unconditionally here makes blob offloading look enabled even when STORAGE_MESSAGE_BLOBS_* is intentionally left empty. That breaks the documented “empty = disabled” contract and can send the offload path into a partially configured S3 backend instead of keeping blobs in Postgres.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/messages/settings.py` around lines 295 - 320, The "message-blobs"
storage entry is always present in settings.STORAGES which makes
TieredStorageService.enabled truthy even when no object-store is configured;
change settings.py so the "message-blobs" key is only added when real
object-storage config is provided (e.g. check relevant env vars such as
STORAGE_MESSAGE_BLOBS_BUCKET_NAME or
STORAGE_MESSAGE_BLOBS_ENDPOINT_URL/ACCESS_KEY/SECRET in os.environ or via
values.Value().environ_name) instead of unconditionally defining the dict;
locate the "message-blobs" dict in src/backend/messages/settings.py and wrap its
creation/assignment in that conditional so
settings.STORAGES.get("message-blobs") is falsy when no blob config is supplied,
preserving the documented "empty = disabled" behavior used by
TieredStorageService.enabled.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/backend/core/models.py (1)
2077-2102:⚠️ Potential issue | 🟠 MajorThis configurable default is still bypassed by
Mailbox.create_blob().
BlobManager.create_blob()now usescompression=Noneto mean “readMESSAGES_BLOB_COMPRESS”, butsrc/backend/core/models.py:752-778still defaultsMailbox.create_blob(..., compression=CompressionTypeChoices.ZSTD)and forwards that value unconditionally. Callers using the mailbox helper will never reach this new fallback.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/models.py` around lines 2077 - 2102, Mailbox.create_blob currently supplies CompressionTypeChoices.ZSTD (hard default) and forwards it to BlobManager.create_blob which treats compression=None as "use settings.MESSAGES_BLOB_COMPRESS", so callers of Mailbox.create_blob never hit the new configurable default; change Mailbox.create_blob to accept compression: Optional[CompressionTypeChoices] = None (or if the signature must remain, ensure it passes None when the caller didn't explicitly request a compression) and forward that None to BlobManager.create_blob so BlobManager.parse_compression_spec can apply settings.MESSAGES_BLOB_COMPRESS; update references to the Mailbox.create_blob parameter handling to distinguish "unspecified" vs explicit CompressionTypeChoices.ZSTD and only pass ZSTD when explicitly requested.
♻️ Duplicate comments (6)
src/backend/core/services/tiered_storage.py (3)
291-298:⚠️ Potential issue | 🟠 MajorPropagate delete failures so cleanup can retry them.
cleanup_orphaned_blob_task()only retries whendelete_if_orphaned()raises. Converting storage delete failures intoFalsemakes the task report"still_referenced"on transient backend errors and leaks the orphaned object until manual repair.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage.py` around lines 291 - 298, The current delete_if_orphaned implementation swallows storage.delete failures by logging and returning False, preventing cleanup_orphaned_blob_task from retrying; change the code so storage.delete failures propagate: in delete_if_orphaned (the block that calls self.compute_storage_key and self.storage.delete) remove or narrow the broad except that returns False and instead log the error and re-raise the exception (or allow it to bubble up) so cleanup_orphaned_blob_task sees the exception and will retry the orphaned object; reference functions: delete_if_orphaned, compute_storage_key, self.storage.delete, and cleanup_orphaned_blob_task.
185-192:⚠️ Potential issue | 🟠 MajorDon't log full storage keys.
These log lines expose the object path, which embeds the blob SHA and key cohort. That is sensitive storage metadata and should not be written to normal application logs; log
blob.idand the outcome instead.Based on learnings,
src/backend/**/*.py: Do not log sensitive information (tokens, passwords, financial/health data, PII).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage.py` around lines 185 - 192, The debug/info logs currently emit sensitive storage keys/paths (see logger.debug and logger.info around the dedupe/save flow); change these to avoid logging the full storage key or existing_path returned by compute_storage_key_for_blob/storage.save and instead log only non-sensitive identifiers such as blob.id and the outcome (e.g., "deduped" or "uploaded") and, if needed, existing_key_id (only if it is non-sensitive); update the logger.debug that mentions existing_path and the logger.info after storage.save to remove the path variable and replace with a brief outcome message referencing blob.id and the operation result.
100-115:⚠️ Potential issue | 🔴 CriticalDedup still ignores the stored representation.
The storage key and sibling lookup are keyed by
sha256/key_id, but the uploaded bytes areblob.raw_contentafter compression. Two blobs with identical source bytes and differentcompressionvalues will reuse the same object and one of them will later read back bytes that do not match its own compression metadata.Also applies to: 154-193
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage.py` around lines 100 - 115, compute_storage_key and the sibling lookup currently key on sha256 of the raw bytes and key_id only, but the uploaded object is the compressed/stored representation (blob.raw_content), so different compression values can collide; change the logic to compute the SHA over the actual bytes written (i.e., the compressed/stored bytes) and include the representation/compression identifier in the storage key (and sibling lookup) so keys are unique per stored format; update compute_storage_key to accept the stored-bytes or a representation tag (e.g., compression) along with key_id, and make the sibling lookup code use that same computed key/sha-of-stored-bytes so read/write use the identical namespace.src/backend/core/services/tiered_storage_tasks.py (1)
91-91:⚠️ Potential issue | 🟠 MajorTreat the locked re-fetch delete race as
not_found.The row can still disappear between the sha256 lookup and the
select_for_update().get(...)at Line 91. Right now that benign race falls into the catch-all and returns"error". CatchBlob.DoesNotExistaround the locked lookup and return{"status": "not_found", "blob_id": blob_id}instead.Based on learnings,
logger.exception(...)insrc/backend/**/*.pyis automatically reported to Sentry as an event.Also applies to: 121-123
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage_tasks.py` at line 91, The selective row-lock fetch using Blob.objects.select_for_update().get(id=blob_id) can raise Blob.DoesNotExist if the row was deleted after the sha256 lookup; wrap that specific call in a try/except catching Blob.DoesNotExist and return {"status": "not_found", "blob_id": blob_id} instead of falling into the generic error path (do the same for the similar select_for_update().get(...) at the block around lines 121-123); ensure you only catch Blob.DoesNotExist (not broad Exception) and avoid using logger.exception for this benign race.src/backend/messages/settings.py (1)
295-320:⚠️ Potential issue | 🟠 MajorOnly register
message-blobswhen blob storage is really configured.This alias is always present, so
TieredStorageService.enabledbecomes truthy even when blob-storage env vars are intentionally unset. That pushes offload and verification down a half-configured backend instead of keeping tiered storage disabled.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/messages/settings.py` around lines 295 - 320, The "message-blobs" storage alias is always registered which makes TieredStorageService.enabled truthy even when blob env vars are unset; change the settings population so the "message-blobs" entry is only added when required blob configuration exists (e.g., STORAGE_MESSAGE_BLOBS_BUCKET_NAME or STORAGE_MESSAGE_BLOBS_ENDPOINT_URL is present). In practice, update the code that builds the storage aliases (the dict containing "message-blobs") to conditionally insert that key only if the relevant values.Value keys (access_key/secret_key/bucket_name or region/endpoint) are set/Truthy, ensuring TieredStorageService.enabled accurately reflects a fully configured blob backend.src/backend/core/models.py (1)
2198-2243:⚠️ Potential issue | 🟠 MajorAdd composite indexes for the new tiered-storage queries.
These fields are now on the hot paths for offload scans and dedup/orphan checks, but
Blob.Metastill has no matching composite indexes. At the scale described in this PR, single-column indexes will still leave very broad scans onmessages_blob.As per coding guidelines,
src/backend/**/{models.py,migrations/**/*.py}: Implement database indexing and query optimization (Model Meta indexes, constraints).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/models.py` around lines 2198 - 2243, Add composite DB indexes to the Blob model Meta to support tiered-storage queries: update Blob.Meta to include a list of models.Index entries for the hot-path combinations referencing the actual field names—at minimum add indexes on ("storage_location", "mailbox"), ("storage_location", "maildomain") and ("storage_location", "encryption_key_id") (give each index a descriptive name); this ensures queries in BlobManager and offload/dedup/orphan scans that filter by storage_location plus owner or key use the composite indexes instead of wide single-column scans.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/backend/core/services/tiered_storage_tasks.py`:
- Around line 87-89: The broad except in the task is catching
celery.exceptions.Retry raised by self.retry() (used after sha256_advisory_lock
contention and on transient errors), preventing Celery from requeueing; add an
explicit except celery.exceptions.Retry: raise (or re-raise) immediately before
the existing generic except Exception so Retry can propagate, leaving the rest
of the error handling unchanged — target the block that uses
sha256_advisory_lock and self.retry and the subsequent broad except to insert
this handler.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 230-257: rotate_blob() currently calls self.encrypt(...) which
always uses self.active_key_id, so when a caller passes a different
target_key_id the data is encrypted with the wrong key while paths/DB are
updated for the target; fix by either passing target_key_id into the encryption
routine (add an optional parameter to self.encrypt and call
self.encrypt(decrypted, key_id=target_key_id) in both the DB-blob and
OBJECT_STORAGE branches so encryption, compute_storage_key(sha256,
target_key_id) and DB updates use the same key) or, if you intend only to
support the active key, immediately assert target_key_id == self.active_key_id
at the start of rotate_blob() and remove the misleading parameter; update calls
around encrypt, compute_storage_key, and Blob.objects.filter(...).update(...)
accordingly to keep key IDs consistent.
In `@src/backend/core/tests/commands/test_verify_tiered_storage.py`:
- Around line 322-619: The tests that call the management command with
re_encrypt=True mutate the mocked TieredStorageService but fail to patch the
settings that re_encrypt_blobs() reads (MESSAGES_BLOB_ENCRYPTION_KEYS /
MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID), so the command uses ambient Django
settings instead of the test scenario; fix each re_encrypt=True test by patching
the same settings module that re_encrypt_blobs() uses (e.g., patch
"core.services.tiered_storage.settings" or django.conf.settings) and set
MESSAGES_BLOB_ENCRYPTION_KEYS and MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID to
match the service.encryption_keys and service.active_key_id before you patch
"core.management.commands.verify_tiered_storage.TieredStorageService", ensuring
the command and the mocked TieredStorageService see the same encryption config.
In `@src/backend/core/tests/tasks/test_tiered_storage_tasks.py`:
- Around line 61-159: The age-based offload tests fail when
TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0 because offload_blobs_task filters by
size__gte; update the tests (test_queues_eligible_blobs_by_age and
test_immediate_offload_with_zero_days) to ensure created blobs meet the size
threshold (e.g., make content length >=
settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE) or apply
override_settings(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0) for those tests so the
age/immediacy logic is exercised; reference the tests by name and the
offload_blobs_task and TieredStorageService.compute_storage_key_for_blob symbols
when making the change.
---
Outside diff comments:
In `@src/backend/core/models.py`:
- Around line 2077-2102: Mailbox.create_blob currently supplies
CompressionTypeChoices.ZSTD (hard default) and forwards it to
BlobManager.create_blob which treats compression=None as "use
settings.MESSAGES_BLOB_COMPRESS", so callers of Mailbox.create_blob never hit
the new configurable default; change Mailbox.create_blob to accept compression:
Optional[CompressionTypeChoices] = None (or if the signature must remain, ensure
it passes None when the caller didn't explicitly request a compression) and
forward that None to BlobManager.create_blob so
BlobManager.parse_compression_spec can apply settings.MESSAGES_BLOB_COMPRESS;
update references to the Mailbox.create_blob parameter handling to distinguish
"unspecified" vs explicit CompressionTypeChoices.ZSTD and only pass ZSTD when
explicitly requested.
---
Duplicate comments:
In `@src/backend/core/models.py`:
- Around line 2198-2243: Add composite DB indexes to the Blob model Meta to
support tiered-storage queries: update Blob.Meta to include a list of
models.Index entries for the hot-path combinations referencing the actual field
names—at minimum add indexes on ("storage_location", "mailbox"),
("storage_location", "maildomain") and ("storage_location", "encryption_key_id")
(give each index a descriptive name); this ensures queries in BlobManager and
offload/dedup/orphan scans that filter by storage_location plus owner or key use
the composite indexes instead of wide single-column scans.
In `@src/backend/core/services/tiered_storage_tasks.py`:
- Line 91: The selective row-lock fetch using
Blob.objects.select_for_update().get(id=blob_id) can raise Blob.DoesNotExist if
the row was deleted after the sha256 lookup; wrap that specific call in a
try/except catching Blob.DoesNotExist and return {"status": "not_found",
"blob_id": blob_id} instead of falling into the generic error path (do the same
for the similar select_for_update().get(...) at the block around lines 121-123);
ensure you only catch Blob.DoesNotExist (not broad Exception) and avoid using
logger.exception for this benign race.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 291-298: The current delete_if_orphaned implementation swallows
storage.delete failures by logging and returning False, preventing
cleanup_orphaned_blob_task from retrying; change the code so storage.delete
failures propagate: in delete_if_orphaned (the block that calls
self.compute_storage_key and self.storage.delete) remove or narrow the broad
except that returns False and instead log the error and re-raise the exception
(or allow it to bubble up) so cleanup_orphaned_blob_task sees the exception and
will retry the orphaned object; reference functions: delete_if_orphaned,
compute_storage_key, self.storage.delete, and cleanup_orphaned_blob_task.
- Around line 185-192: The debug/info logs currently emit sensitive storage
keys/paths (see logger.debug and logger.info around the dedupe/save flow);
change these to avoid logging the full storage key or existing_path returned by
compute_storage_key_for_blob/storage.save and instead log only non-sensitive
identifiers such as blob.id and the outcome (e.g., "deduped" or "uploaded") and,
if needed, existing_key_id (only if it is non-sensitive); update the
logger.debug that mentions existing_path and the logger.info after storage.save
to remove the path variable and replace with a brief outcome message referencing
blob.id and the operation result.
- Around line 100-115: compute_storage_key and the sibling lookup currently key
on sha256 of the raw bytes and key_id only, but the uploaded object is the
compressed/stored representation (blob.raw_content), so different compression
values can collide; change the logic to compute the SHA over the actual bytes
written (i.e., the compressed/stored bytes) and include the
representation/compression identifier in the storage key (and sibling lookup) so
keys are unique per stored format; update compute_storage_key to accept the
stored-bytes or a representation tag (e.g., compression) along with key_id, and
make the sibling lookup code use that same computed key/sha-of-stored-bytes so
read/write use the identical namespace.
In `@src/backend/messages/settings.py`:
- Around line 295-320: The "message-blobs" storage alias is always registered
which makes TieredStorageService.enabled truthy even when blob env vars are
unset; change the settings population so the "message-blobs" entry is only added
when required blob configuration exists (e.g., STORAGE_MESSAGE_BLOBS_BUCKET_NAME
or STORAGE_MESSAGE_BLOBS_ENDPOINT_URL is present). In practice, update the code
that builds the storage aliases (the dict containing "message-blobs") to
conditionally insert that key only if the relevant values.Value keys
(access_key/secret_key/bucket_name or region/endpoint) are set/Truthy, ensuring
TieredStorageService.enabled accurately reflects a fully configured blob
backend.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9ce7b1d7-830c-4186-bee0-8fdf30b9a6db
📒 Files selected for processing (13)
env.d/development/backend.defaultssrc/backend/core/apps.pysrc/backend/core/checks.pysrc/backend/core/enums.pysrc/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/models.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/signals.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/messages/settings.py
| def test_re_encrypt_no_keys_configured(self): | ||
| """Test that re-encrypt fails when no keys are configured.""" | ||
| from unittest.mock import patch | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| with patch("core.services.tiered_storage.settings") as mock_settings: | ||
| mock_settings.STORAGES = {"message-blobs": {"OPTIONS": {}}} | ||
| mock_settings.MESSAGES_BLOB_ENCRYPTION_KEYS = {} | ||
| mock_settings.MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID = 0 | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| assert "No encryption keys configured" in stderr.getvalue() | ||
|
|
||
| def test_all_blobs_already_current_key(self): | ||
| """Test that re-encrypt reports success when all blobs use current key.""" | ||
| key = secrets.token_hex(32) | ||
| service = TieredStorageService() | ||
| service.encryption_keys = {"1": key} | ||
| service.active_key_id = 1 | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| blob = mailbox.create_blob(content=b"test", content_type="text/plain") | ||
|
|
||
| # Manually encrypt with key 1 | ||
| compressed = bytes(blob.raw_content) | ||
| encrypted, key_id = service.encrypt(compressed) | ||
| blob.raw_content = encrypted | ||
| blob.encryption_key_id = key_id | ||
| blob.save() | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| # Temporarily modify service in command | ||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| assert "All blobs already use the current encryption key" in stdout.getvalue() | ||
|
|
||
| def test_re_encrypt_postgres_blob(self): | ||
| """Test re-encrypting a PostgreSQL blob with real encryption.""" | ||
| import pyzstd | ||
|
|
||
| service = TieredStorageService() | ||
| old_key = secrets.token_hex(32) | ||
| new_key = secrets.token_hex(32) | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| original_content = b"test content for re-encryption" * 20 | ||
|
|
||
| # Create blob and encrypt with old key (key_id=2) | ||
| blob = mailbox.create_blob(content=original_content, content_type="text/plain") | ||
| compressed = bytes(blob.raw_content) | ||
|
|
||
| service.encryption_keys = {"2": old_key} | ||
| service.active_key_id = 2 | ||
| encrypted, key_id = service.encrypt(compressed) | ||
| blob.raw_content = encrypted | ||
| blob.encryption_key_id = key_id | ||
| blob.save() | ||
|
|
||
| # Now configure service for key rotation (new key is "1", old is "2") | ||
| service.encryption_keys = {"1": new_key, "2": old_key} | ||
| service.active_key_id = 1 | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "Re-encrypted" in output | ||
| assert "Re-encrypted: 1" in output | ||
|
|
||
| # Verify blob was updated | ||
| blob.refresh_from_db() | ||
| assert blob.encryption_key_id == 1 | ||
|
|
||
| # Verify content is still readable | ||
| decrypted = service.decrypt(bytes(blob.raw_content), blob.encryption_key_id) | ||
| assert pyzstd.decompress(decrypted) == original_content | ||
|
|
||
| @pytest.mark.django_db(transaction=True) | ||
| def test_re_encrypt_object_storage_blob(self): | ||
| """Test re-encrypting an object storage blob with real encryption.""" | ||
| import pyzstd | ||
|
|
||
| service = TieredStorageService() | ||
| old_key = secrets.token_hex(32) | ||
| new_key = secrets.token_hex(32) | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| original_content = b"test content for object storage re-encryption" * 20 | ||
|
|
||
| # Create blob and encrypt with old key (key_id=2) | ||
| blob = mailbox.create_blob(content=original_content, content_type="text/plain") | ||
| compressed = bytes(blob.raw_content) | ||
|
|
||
| service.encryption_keys = {"2": old_key} | ||
| service.active_key_id = 2 | ||
| encrypted, key_id = service.encrypt(compressed) | ||
| blob.raw_content = encrypted | ||
| blob.encryption_key_id = key_id | ||
| blob.save() | ||
|
|
||
| old_path = TieredStorageService.compute_storage_key_for_blob(blob) | ||
| new_path = TieredStorageService.compute_storage_key(bytes(blob.sha256), 1) | ||
|
|
||
| try: | ||
| # Upload to storage | ||
| service.upload_blob(blob) | ||
| blob.storage_location = BlobStorageLocationChoices.OBJECT_STORAGE | ||
| blob.raw_content = None | ||
| blob.save() | ||
|
|
||
| # Configure for key rotation | ||
| service.encryption_keys = {"1": new_key, "2": old_key} | ||
| service.active_key_id = 1 | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "Re-encrypted: 1" in output | ||
|
|
||
| # Verify blob was updated | ||
| blob.refresh_from_db() | ||
| assert blob.encryption_key_id == 1 | ||
|
|
||
| # Verify content moved from old path to new path | ||
| assert not service.storage.exists(old_path) | ||
| assert service.storage.exists(new_path) | ||
| downloaded = service.download_blob(blob) | ||
| assert pyzstd.decompress(downloaded) == original_content | ||
| finally: | ||
| for k in (old_path, new_path): | ||
| if service.storage.exists(k): | ||
| service.storage.delete(k) | ||
|
|
||
| def test_dry_run(self): | ||
| """Test that --dry-run shows what would be done without changes.""" | ||
| service = TieredStorageService() | ||
| key = secrets.token_hex(32) | ||
| service.encryption_keys = {"1": key} | ||
| service.active_key_id = 1 | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| blob = mailbox.create_blob(content=b"test", content_type="text/plain") | ||
| # key_id=0 means unencrypted, needs re-encryption | ||
| blob.encryption_key_id = 0 | ||
| blob.save() | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| dry_run=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "DRY RUN" in output | ||
| assert "Would re-encrypt" in output | ||
|
|
||
| # Verify blob was NOT modified | ||
| blob.refresh_from_db() | ||
| assert blob.encryption_key_id == 0 | ||
|
|
||
| def test_re_encrypt_with_limit(self): | ||
| """Test that --limit restricts number of blobs re-encrypted.""" | ||
| service = TieredStorageService() | ||
| key = secrets.token_hex(32) | ||
| service.encryption_keys = {"1": key} | ||
| service.active_key_id = 1 | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
|
|
||
| # Create 3 blobs with key_id=0 | ||
| for i in range(3): | ||
| blob = mailbox.create_blob( | ||
| content=f"test content {i}".encode(), | ||
| content_type="text/plain", | ||
| ) | ||
| blob.encryption_key_id = 0 | ||
| blob.save() | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| limit=2, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "Blobs to re-encrypt: 2" in output | ||
|
|
||
| def test_re_encrypt_skips_blob_without_content(self): | ||
| """Test that re-encrypt skips PostgreSQL blobs with no content.""" | ||
| service = TieredStorageService() | ||
| key = secrets.token_hex(32) | ||
| service.encryption_keys = {"1": key} | ||
| service.active_key_id = 1 | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| blob = mailbox.create_blob(content=b"test", content_type="text/plain") | ||
| blob.encryption_key_id = 0 | ||
| blob.raw_content = None # Simulate missing content | ||
| blob.save() | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "Skipped: 1" in output | ||
| # Blob row left unchanged. | ||
| blob.refresh_from_db() | ||
| assert blob.encryption_key_id == 0 | ||
| assert blob.raw_content is None |
There was a problem hiding this comment.
The --re-encrypt tests are configuring the wrong source of truth.
re_encrypt_blobs() branches on django.conf.settings.MESSAGES_BLOB_ENCRYPTION_* before it uses the mocked TieredStorageService. In this block, the tests mostly mutate the returned service instance but never override those Django settings, so they depend on ambient test settings instead of the scenario each test is trying to cover.
🛠️ Suggested pattern
- with patch(
- "core.management.commands.verify_tiered_storage.TieredStorageService"
- ) as mock_svc_class:
+ with override_settings(
+ MESSAGES_BLOB_ENCRYPTION_KEYS={"1": key},
+ MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID=1,
+ ), patch(
+ "core.management.commands.verify_tiered_storage.TieredStorageService"
+ ) as mock_svc_class:
mock_svc_class.return_value = serviceApply the same idea to the other re_encrypt=True cases so the command and the mocked service see the same encryption configuration.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/tests/commands/test_verify_tiered_storage.py` around lines
322 - 619, The tests that call the management command with re_encrypt=True
mutate the mocked TieredStorageService but fail to patch the settings that
re_encrypt_blobs() reads (MESSAGES_BLOB_ENCRYPTION_KEYS /
MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID), so the command uses ambient Django
settings instead of the test scenario; fix each re_encrypt=True test by patching
the same settings module that re_encrypt_blobs() uses (e.g., patch
"core.services.tiered_storage.settings" or django.conf.settings) and set
MESSAGES_BLOB_ENCRYPTION_KEYS and MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID to
match the service.encryption_keys and service.active_key_id before you patch
"core.management.commands.verify_tiered_storage.TieredStorageService", ensuring
the command and the mocked TieredStorageService see the same encryption config.
| def test_queues_eligible_blobs_by_age(self): | ||
| """Test that task queues blobs older than cutoff date.""" | ||
| mailbox = factories.MailboxFactory() | ||
|
|
||
| # Create an old blob (should be queued) | ||
| old_blob = mailbox.create_blob( | ||
| content=b"old content", content_type="text/plain" | ||
| ) | ||
| Blob.objects.filter(id=old_blob.id).update( | ||
| created_at=now() | ||
| - timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1) | ||
| ) | ||
|
|
||
| # Create a new blob (should not be queued) | ||
| new_blob = mailbox.create_blob( | ||
| content=b"new content", content_type="text/plain" | ||
| ) | ||
|
|
||
| # Mock the delay call to track what gets queued | ||
| queued_ids = [] | ||
| with patch.object( | ||
| offload_single_blob_task, | ||
| "delay", | ||
| side_effect=queued_ids.append, | ||
| ): | ||
| result = offload_blobs_task() | ||
|
|
||
| assert result["status"] == "success" | ||
| assert str(old_blob.id) in queued_ids | ||
| assert str(new_blob.id) not in queued_ids | ||
|
|
||
| def test_queues_eligible_blobs_by_size(self): | ||
| """Test that task respects minimum size threshold.""" | ||
| mailbox = factories.MailboxFactory() | ||
|
|
||
| # Create a small blob (may or may not be queued depending on OFFLOAD_MIN_SIZE) | ||
| small_blob = mailbox.create_blob(content=b"small", content_type="text/plain") | ||
| Blob.objects.filter(id=small_blob.id).update( | ||
| created_at=now() | ||
| - timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1) | ||
| ) | ||
|
|
||
| # Create a large blob (should be queued if old enough) | ||
| large_content = b"x" * (settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE + 1000) | ||
| large_blob = mailbox.create_blob( | ||
| content=large_content, content_type="text/plain" | ||
| ) | ||
| Blob.objects.filter(id=large_blob.id).update( | ||
| created_at=now() | ||
| - timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1) | ||
| ) | ||
|
|
||
| queued_ids = [] | ||
| with patch.object( | ||
| offload_single_blob_task, | ||
| "delay", | ||
| side_effect=queued_ids.append, | ||
| ): | ||
| result = offload_blobs_task() | ||
|
|
||
| assert result["status"] == "success" | ||
| assert str(large_blob.id) in queued_ids | ||
|
|
||
| # Small blob should only be queued if min_size is 0 | ||
| if settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0: | ||
| assert str(small_blob.id) not in queued_ids | ||
|
|
||
| @override_settings(TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0) | ||
| def test_immediate_offload_with_zero_days(self): | ||
| """Test that OFFLOAD_AFTER_DAYS=0 offloads blobs immediately. | ||
|
|
||
| With CELERY_TASK_ALWAYS_EAGER=True (test settings), calling .delay() | ||
| executes the task synchronously, simulating a worker running alongside. | ||
| """ | ||
| service = TieredStorageService() | ||
| mailbox = factories.MailboxFactory() | ||
| content = b"immediate offload test content" * 20 | ||
| blob = mailbox.create_blob(content=content, content_type="text/plain") | ||
| storage_key = TieredStorageService.compute_storage_key_for_blob(blob) | ||
|
|
||
| try: | ||
| # No age manipulation - blob was just created | ||
| result = offload_blobs_task() | ||
|
|
||
| assert result["status"] == "success" | ||
| assert result["queued"] == 1 | ||
|
|
||
| # With CELERY_TASK_ALWAYS_EAGER, the single blob task already ran | ||
| blob.refresh_from_db() | ||
| assert blob.storage_location == BlobStorageLocationChoices.OBJECT_STORAGE | ||
| assert blob.raw_content is None | ||
|
|
||
| # Verify content is still accessible from S3 | ||
| retrieved = blob.get_content() | ||
| assert retrieved == content | ||
| finally: | ||
| if service.storage.exists(storage_key): | ||
| service.storage.delete(storage_key) | ||
|
|
There was a problem hiding this comment.
These age-based offload tests still depend on the size filter.
offload_blobs_task() also requires size__gte=settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE. Here the payloads are small, so the tests can fail without ever exercising the age/immediacy logic when the default min size is non-zero.
🛠️ Suggested fix
- def test_queues_eligible_blobs_by_age(self):
+ `@override_settings`(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0)
+ def test_queues_eligible_blobs_by_age(self):
"""Test that task queues blobs older than cutoff date."""- `@override_settings`(TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0)
+ `@override_settings`(
+ TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0,
+ TIERED_STORAGE_OFFLOAD_MIN_SIZE=0,
+ )
def test_immediate_offload_with_zero_days(self):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/tests/tasks/test_tiered_storage_tasks.py` around lines 61 -
159, The age-based offload tests fail when TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0
because offload_blobs_task filters by size__gte; update the tests
(test_queues_eligible_blobs_by_age and test_immediate_offload_with_zero_days) to
ensure created blobs meet the size threshold (e.g., make content length >=
settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE) or apply
override_settings(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0) for those tests so the
age/immediacy logic is exercised; reference the tests by name and the
offload_blobs_task and TieredStorageService.compute_storage_key_for_blob symbols
when making the change.
This allows to use S3-compatible object storage to offload blobs, making Postgres much lighter. We design for storing ~1B emails on a single instance.
…f sensitive information' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
…f sensitive information' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
| Blobs (raw RFC822 email bodies and attachments) live in PostgreSQL by | ||
| default. Once a blob is older than `TIERED_STORAGE_OFFLOAD_AFTER_DAYS`, | ||
| a periodic celery task moves its bytes to S3 and clears the PG row's | ||
| `raw_content`. Reads transparently fetch from whichever location the | ||
| row points at — application code only ever calls `blob.get_content()`. | ||
|
|
There was a problem hiding this comment.
Have you concidered moving message body onto the object store while keeping headers around for longer?
This is generally performance critical (no object store hit and no heavy mail parsing) on email list load, while actually it is OK to store headers around in DB for much longer (months?)
| - **Deduplication**: blobs sharing the same SHA-256 share the same S3 | ||
| object. The DB is the source of truth; the existence check on S3 is | ||
| a defensive guard against external deletions. |
There was a problem hiding this comment.
Nice but how is this GCed if data is deleted ?
This allows to use S3-compatible object storage to offload blobs, making Postgres much lighter. We design for storing ~1B emails on a single instance.
Fixes #185.
Summary by CodeRabbit
New Features
Admin
Chores
Tests