Skip to content

Configure Celery for at-least-once delivery to prevent task loss on worker death#1497

Merged
JSv4 merged 4 commits intomainfrom
claude/resolve-issue-1493-vyetM
May 4, 2026
Merged

Configure Celery for at-least-once delivery to prevent task loss on worker death#1497
JSv4 merged 4 commits intomainfrom
claude/resolve-issue-1493-vyetM

Conversation

@JSv4
Copy link
Copy Markdown
Collaborator

@JSv4 JSv4 commented May 3, 2026

Summary

This PR configures Celery to use at-least-once delivery semantics instead of the default at-most-once, ensuring that tasks are not silently lost when workers die unexpectedly (OOM, SIGKILL, host failure, deploy eviction). Previously, long-running ingest/parse/embed tasks could fail mid-flight, leaving documents stuck with backend_lock=True and no parsed content.

Key Changes

  • Enable at-least-once delivery in Celery configuration (config/settings/base.py):

    • Set CELERY_TASK_ACKS_LATE = True to defer message acknowledgment until task completion
    • Set CELERY_TASK_REJECT_ON_WORKER_LOST = True to requeue tasks on hard worker kills instead of treating them as successful
    • Added comprehensive comments explaining the trade-offs and linking to Celery documentation
  • Document task idempotency requirements (docs/architecture/asynchronous-processing.md):

    • Added new section explaining the delivery semantics and the idempotency contract
    • Provided patterns and examples for making tasks idempotent (get_or_create, idempotency keys, state re-checks, etc.)
    • Documented the per-task opt-out mechanism for genuinely non-idempotent work
  • Add regression tests (opencontractserver/tests/test_celery_worker_death_resilience.py):

    • Four tests that pin both Django settings and verify they propagate correctly to the Celery app configuration
    • Prevents future settings refactors from silently reverting the resilience guarantee
  • Update changelog (CHANGELOG.md):

    • Documented the change with context on the problem, solution, and trade-offs

Implementation Details

The change leverages Celery's built-in configuration options rather than custom logic. The broker now only removes messages after successful task completion, and hard-killed workers trigger redelivery instead of silent task loss.

Trade-off: At-least-once delivery means tasks may execute multiple times. All Celery tasks in the project must be idempotent — the documentation provides clear patterns for achieving this (deterministic database operations, idempotency keys, state re-checks, etc.). For the rare case where a task cannot be made idempotent, a per-task opt-out is available with explicit documentation of why.

The regression tests ensure the configuration cannot be accidentally reverted through future refactoring.

https://claude.ai/code/session_017LtMNFSpJukVcW9NPQaxxw

Enable CELERY_TASK_ACKS_LATE and CELERY_TASK_REJECT_ON_WORKER_LOST
globally so a worker dying mid-task — OOM, SIGKILL, host loss, deploy
eviction — causes the broker to redeliver the message instead of
silently dropping it. Previously long-running ingest/parse/embed tasks
that died just before their final DB write left documents stuck with
backend_lock=True and no parsed content.

Document the at-least-once / idempotency contract in
docs/architecture/asynchronous-processing.md (including the per-task
opt-out for tasks that genuinely cannot be made idempotent), and pin
the new settings + their Django→Celery namespace propagation with a
regression test so a future settings refactor cannot silently revert
the resilience guarantee.
@claude
Copy link
Copy Markdown

claude Bot commented May 3, 2026

Code Review

Overview

This PR correctly addresses a real reliability gap: Celery's default at-most-once delivery means OOM-killed or SIGKILL'd workers silently lose tasks, leaving documents stuck with backend_lock=True. The two settings added (CELERY_TASK_ACKS_LATE + CELERY_TASK_REJECT_ON_WORKER_LOST) are the right knobs, and the companion CELERY_PREFETCH_MULTIPLIER = 1 (already in place at line 675) means workers won't hold prefetched messages they never start — the critical companion setting for acks_late=True.

The documentation addition and regression tests are both good practice. Below are concerns that need addressing before this can be merged safely.


Critical: Redis visibility timeout is a hidden time bomb

The broker is Redis (CELERY_BROKER_URL = REDIS_URL), not RabbitMQ. With Redis, task_reject_on_worker_lost does not use AMQP-style rejects. Instead, Redis tracks unacknowledged tasks using a visibility timeout — and the Celery default is 1 hour (3600 s). Any task that runs longer than that will be redelivered to a second worker while still running on the first, guaranteeing a double execution even without a worker crash.

Several of the long-running ingest/parse/embed tasks this PR is trying to protect are exactly the ones most likely to exceed 1 hour on large documents.

The PR should either:

  1. Set CELERY_BROKER_TRANSPORT_OPTIONS = {"visibility_timeout": <N>} where N exceeds the maximum expected task runtime, or
  2. Document this explicitly in the new asynchronous-processing.md section so future developers know the failure mode.

Reference: https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout


Significant: Existing tasks are not idempotent, but now run at-least-once

The PR documentation states "all tasks MUST be idempotent" but makes no changes to existing tasks. A quick audit shows several tasks that will produce duplicate data on retry:

  • corpus_tasks.py:251Datacell.objects.create(...) inside a loop. A retry creates duplicate Datacell rows.
  • corpus_tasks.py:589, 681CorpusActionExecution.objects.create(...). Each retry produces a new execution record.
  • agent_tasks.py:148ChatMessage.objects.create(...). A retry appends a duplicate message to the conversation.
  • import_tasks.py — 3 raw .create() calls vs. only 2 get_or_create calls. An interrupted import retried from the beginning will duplicate database objects.

These should either be fixed (switching to get_or_create/update_or_create keyed on a deterministic field) or explicitly opted out with @shared_task(acks_late=False, reject_on_worker_lost=False) before this PR ships. Silently switching 40+ tasks to at-least-once semantics while several are known-non-idempotent is a latent data integrity bug.


Minor: Test accessor pattern is unusual

# test imports the *module* then accesses .app on it
from config import celery_app
celery_app.app.conf.task_acks_late

This works but celery_app reads like the app object, not a module. Consider importing the app instance directly for clarity:

from config.celery_app import app as celery_app_instance
celery_app_instance.conf.task_acks_late

Minor: Celery conf tests have a subtle scope caveat worth documenting

celery_app.conf is populated at import time via config_from_object, not per-test, so @override_settings in other tests won't affect what these tests observe. The tests correctly pin the startup configuration — a short comment to this effect would prevent future developers from assuming they respond to @override_settings.


Positive callouts

  • The CHANGELOG entry is unusually thorough and well-placed.
  • Documenting both the Django setting name and the Celery conf key in the docs table is exactly the right level of detail.
  • The per-task opt-out pattern (@shared_task(acks_late=False, reject_on_worker_lost=False)) is the right escape hatch.
  • The existing CELERY_PREFETCH_MULTIPLIER = 1 being already in place is a good sign — this is the critical guard against a worker prefetching tasks it then dies holding.
  • Leaving the redundant acks_late=True in worker_uploads/tasks.py:76 as an explicit intent declaration is reasonable; a short inline comment noting it is now equivalent to the global default would help future readers.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

JSv4 added 2 commits May 3, 2026 17:47
Test fix (pytest CI failure):
- test_celery_worker_death_resilience: switch from celery_app.app.conf
  to celery_app.conf. config/__init__.py exposes celery_app as the Celery
  app instance (not the module), so .app was a Celery object with no
  such attribute, causing all 4 tests to fail with AttributeError.

Address PR #1497 review feedback:
- Set CELERY_BROKER_TRANSPORT_OPTIONS = {visibility_timeout: 12h} so
  long-running document tasks aren't double-delivered by the Redis
  broker default 1h timeout combined with task_acks_late=True.
- Add visibility-timeout regression test.
- Document Redis visibility-timeout caveat and list known
  non-idempotent tasks (corpus_tasks, agent_tasks, import_tasks) as
  follow-up hardening work in asynchronous-processing.md.
- Add docstring noting that celery_app.conf is import-time-cached
  and unaffected by @override_settings.
- Add inline comment on worker_uploads acks_late=True clarifying it
  is now redundant with the global default but kept as intent.
- Update CHANGELOG to mention visibility timeout and known-non-
  idempotent task tracking.
@claude
Copy link
Copy Markdown

claude Bot commented May 3, 2026

Code Review: PR #1497 — Celery at-least-once delivery

Overview

This is a well-scoped infrastructure PR: three Django settings, documentation of the idempotency contract, and regression tests that pin the configuration. No application logic is touched. The change addresses a real production hazard — long-running ingest/parse/embed tasks being silently lost on worker death — using Celery's built-in mechanisms rather than custom code. Overall this is good work; the comments below are mostly minor.


Issues

1. Magic number violates project conventions (CLAUDE.md)

config/settings/base.py:

CELERY_BROKER_TRANSPORT_OPTIONS = {"visibility_timeout": 12 * 60 * 60}

CLAUDE.md explicitly says "No magic numbers — use constants files in opencontractserver/constants/". The value 43200 (or 12 * 60 * 60) should be a named constant, e.g. in a new opencontractserver/constants/celery.py:

# Maximum expected duration for any document-processing task (seconds).
# Used to set the Redis visibility timeout above the Celery default of 3600s.
CELERY_REDIS_VISIBILITY_TIMEOUT_SECONDS = 12 * 60 * 60

Then base.py references it:

from opencontractserver.constants.celery import CELERY_REDIS_VISIBILITY_TIMEOUT_SECONDS
CELERY_BROKER_TRANSPORT_OPTIONS = {"visibility_timeout": CELERY_REDIS_VISIBILITY_TIMEOUT_SECONDS}

2. Regression test has an asymmetry: visibility timeout isn't verified against celery_app.conf

The test class verifies both Django settings AND celery_app.conf propagation for task_acks_late and task_reject_on_worker_lost — but only checks Django settings for the visibility timeout. If the namespace='CELERY' wiring ever broke specifically for CELERY_BROKER_TRANSPORT_OPTIONS, the test wouldn't catch it. A fifth test would close this gap:

def test_celery_app_picks_up_visibility_timeout(self) -> None:
    transport_options = celery_app.conf.broker_transport_options or {}
    visibility = transport_options.get("visibility_timeout")
    self.assertIsInstance(visibility, int)
    self.assertGreater(visibility, 3600)

3. assert isinstance double-narrowing is non-idiomatic

In test_redis_visibility_timeout_exceeds_celery_default:

self.assertIsInstance(visibility, int, ...)
assert isinstance(visibility, int)  # narrow for mypy

The bare assert is a mypy workaround, but it's unusual in a TestCase — a bare assert doesn't produce a useful failure message and can be stripped by the optimizer with python -O. Prefer typing.cast:

visibility = typing.cast(int, transport_options.get("visibility_timeout"))

or just add # type: ignore[arg-type] on the assertGreater line if the assertion above is already sufficient.

4. CHANGELOG entry is one very long paragraph

CLAUDE.md follows Keep a Changelog format. The current entry is a single run-on paragraph that's hard to skim. Suggest breaking it up:

### Changed

- **Celery configured for at-least-once delivery** (`config/settings/base.py:677`, Issue #1493):
  ...
  
### Technical Details

- `CELERY_TASK_ACKS_LATE = True` — broker only removes the message after task success.
- `CELERY_TASK_REJECT_ON_WORKER_LOST = True` — SIGKILL'd workers requeue rather than silently drop.
- `CELERY_BROKER_TRANSPORT_OPTIONS = {"visibility_timeout": 12h}` — prevents Redis from double-delivering tasks that run longer than the 1-hour default.
- Trade-off: all tasks must be idempotent; per-task opt-out available via `@shared_task(acks_late=False, reject_on_worker_lost=False)`.

5. Approximate line numbers in the known non-idempotent tasks table will go stale immediately

| `process_corpus_action` (extract path) | `corpus_tasks.py` ~L251 | ...

Approximate line numbers in documentation tend to be wrong within days of the next refactor. Since the documentation itself explains the risk is low and these are follow-up items, consider removing line numbers entirely and just referencing task names — the names are searchable and stable:

| `process_corpus_action` (extract path) | `opencontractserver/tasks/corpus_tasks.py` | ...

Minor Observations

  • acks_late=True on process_pending_uploads: The new comment ("Redundant since the global default…") is clear and the decision to keep it as documentation is reasonable. Good call.

  • CELERY_BROKER_TRANSPORT_OPTIONS is a full dict replacement: If any future settings file (e.g. production SSL options) needs to add transport options, it will need to merge with this dict rather than override it. This is a latent footgun worth a one-line comment: # If adding new transport options in environment-specific settings, merge rather than replace this dict.

  • backend_lock=True stuck state on partial completion: The PR improves resilience for the case where a worker dies before any state is written. The case where a worker dies after setting backend_lock=True but before clearing it still leaves documents stuck (the task redelivers and re-runs, but if it was already partially complete the retry may hit idempotency issues). This is pre-existing and out of scope here, but worth a follow-up ticket.

  • Tests use SimpleTestCase: Correct choice — no DB needed, fast, appropriate for pinning settings.

  • Test settings use CELERY_TASK_ALWAYS_EAGER = True: The resilience settings are no-ops in eager mode (tasks run synchronously inline), so the regression tests are correctly testing production configuration rather than test behavior. The comment in the test class explaining this caveat is good.


Summary

Required before merge:

  1. Extract 12 * 60 * 60 to a named constant per project conventions.

Recommended improvements:
2. Add a fifth test verifying celery_app.conf.broker_transport_options picks up the visibility timeout.
3. Replace assert isinstance(...) with typing.cast or a type annotation.

Optional/minor:
4. Break up the CHANGELOG entry into bullet points.
5. Remove approximate line numbers from the docs table.
6. Add a merge-vs-replace comment on CELERY_BROKER_TRANSPORT_OPTIONS.

The core change (the three settings) is correct and the documentation is thorough. The regression tests are well-motivated and appropriately scoped. Address item 1 and the PR is ready.

@JSv4 JSv4 merged commit 73f6678 into main May 4, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants