Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
Sorry @nickcomua, your pull request is larger than the review limit of 150000 diff characters
WalkthroughReplaces task-row orchestration with a domain-driven reconciler: moves Convex APIs into Changes
Sequence Diagram(s)sequenceDiagram
participant Frontend
participant Convex
participant Reconciler
participant Service
rect rgba(100, 200, 150, 0.5)
Note over Frontend,Service: Domain-Driven Reconciler Flow
Frontend->>Convex: humanMutation(domainEntity)
Convex->>Convex: persist domain state, mark pendingWork
Reconciler->>Convex: subscribe pendingWork(chats|clients|media|qrAuths)
Convex-->>Reconciler: workItem {service, key, handler}
Reconciler->>Reconciler: dedupe & dispatch
Reconciler->>Service: HTTP call with {"entity_id": key}
Service->>Convex: workerQuery(entity_id) — fetch live state
Convex-->>Service: current domain record
Service->>Convex: workerMutation(entity_id, updates)
Reconciler->>Convex: subscribe domain watcher (phase/step/status)
Convex-->>Reconciler: terminal state -> cancel/stop dispatch
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
bins/crm-worker/src/main.rs (1)
191-195:⚠️ Potential issue | 🟡 MinorUpdate shutdown log labels to “Reconciler” for consistency.
The runtime branch still logs “Orchestrator exited/panicked”, which is now misleading after the reconciler migration.
Suggested fix
- result = orchestrator_handle => { + result = orchestrator_handle => { match result { - Ok(()) => info!("Orchestrator exited"), - Err(e) => tracing::error!(error = %e, "Orchestrator task panicked"), + Ok(()) => info!("Reconciler exited"), + Err(e) => tracing::error!(error = %e, "Reconciler task panicked"), } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/main.rs` around lines 191 - 195, The shutdown log messages still refer to "Orchestrator" but the component was migrated to the Reconciler; update the success and error log labels in the orchestrator_handle match arm so that info!("Orchestrator exited") becomes info!("Reconciler exited") and tracing::error!(error = %e, "Orchestrator task panicked") becomes tracing::error!(error = %e, "Reconciler task panicked"), ensuring the same variables (result, e) and structures are unchanged.
🧹 Nitpick comments (18)
bins/crm-chat-web/src/components/search-dialog.tsx (1)
7-7: Use the configured package alias for the cross-package import.The relative import
../../../convex-backend/convex/model/messagesshould use the package aliascrm-chat-convex-backenddefined inpackage.json. Change line 7 to:import type { TextByKeywordsParameters } from "crm-chat-convex-backend/convex/model/messages";This is consistent with how
lib/convex.tsimports from the same package and avoids fragile relative paths.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-chat-web/src/components/search-dialog.tsx` at line 7, Replace the fragile relative import of the TextByKeywordsParameters type in search-dialog.tsx with the configured package alias; locate the import that references "../../../convex-backend/convex/model/messages" and change it to import the type from the package alias "crm-chat-convex-backend/convex/model/messages" so it matches how lib/convex.ts imports the same package and avoids brittle relative paths.bins/crm-worker/src/services/phone_auth.rs (1)
73-76: Unusedcancel_tokeninrun_inner.The
cancel_tokenis created and passed towait_for_step, but it's never actually cancelled from withinrun_inner. The workflow relies entirely on detecting terminal states (Cancelled,Failed,Connected) via the subscription rather than external cancellation signals.If external cancellation isn't needed, consider removing the
cancel_tokenparameter fromwait_for_stepto simplify the code. If it is intended for future external cancellation (e.g., from Restate), this is fine as-is.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/services/phone_auth.rs` around lines 73 - 76, The CancellationToken created by CancellationToken::new() in run_inner is never cancelled or used beyond being passed into wait_for_step; either remove the unused cancel_token parameter from wait_for_step and all related call sites (so run_inner no longer creates a CancellationToken) or, if external cancellation is intended, wire a cancellation source into run_inner (keep CancellationToken creation) and ensure it is triggered from the external cancellation path (e.g., Restate) and propagated into wait_for_step; update references to cancel_token, run_inner, and wait_for_step accordingly.bins/crm-chat-web/src/components/client-settings.tsx (2)
48-48: Missing handler for the new"Queued"scan phase.The
scanPhasetype now includes"Queued", butgetScanStatusdoesn't explicitly handle it. Currently, a"Queued"chat would fall through to line 99-100 showing "Syncing..." ifscanEnabledis true, which may be acceptable but inconsistent with the explicit handling of other phases.Consider adding explicit handling for visual consistency:
💡 Suggested explicit Queued handling
function getScanStatus(chat: ChatDoc): { label: string; className: string } { + // Queued = waiting to start scanning + if (chat.scanPhase === "Queued") { + return { + label: "Queued", + className: "bg-blue-500/15 text-blue-700", + }; + } // Active scanning phases take priority — show progress while work is happening if (chat.scanPhase === "ScanningMessages") {Also applies to: 80-103
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-chat-web/src/components/client-settings.tsx` at line 48, The new scanPhase union includes "Queued" but getScanStatus does not explicitly handle it; update the getScanStatus function in client-settings.tsx to add a case for scanPhase === "Queued" (alongside existing "ScanningMessages", "DownloadingMedia", "Listening" branches) and return the appropriate label/icon/state for queued (e.g., "Queued" or "Waiting to scan") so the UI is consistent; ensure any conditional checks that use scanEnabled and scanPhase (inside getScanStatus and where it's consumed) also account for "Queued" so it doesn't unintentionally fall through to the generic "Syncing..." message.
456-458: Remove the type assertion and alignChatDocwith the generated Convex types.The cast
as ChatDoc[] | undefinedmasks a type mismatch: the backend schema defines_id: v.id("chats")and_creationTime: v.number(), but the frontendChatDocinterface uses_id: stringwithout_creationTime. Use the generatedDoc<"chats">type instead (as done insearch-dialog.tsx) or ensureChatDocproperly extends the schema fields. TheuseQuerycall will infer the correct return type without the assertion once the interface is aligned with the backend schema.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-chat-web/src/components/client-settings.tsx` around lines 456 - 458, Remove the unsafe type assertion on the `chats` result and align the frontend doc type with the Convex-generated type: update the `ChatDoc` usage so `useQuery(api.model.chats.listByClient, { clientId })` can infer its return type (either replace `ChatDoc` with the generated `Doc<"chats">` type as done in `search-dialog.tsx`, or change your `ChatDoc` interface to include `_id: DocId`/the proper `v.id("chats")` type and `_creationTime: number`). After that, delete `as ChatDoc[] | undefined` so `chats` uses the correct inferred type from `useQuery`.bins/crm-worker/src/ops/media.rs (1)
59-72: Consider grouping related parameters into a struct.The function has 12 parameters which triggered the
clippy::too_many_argumentssuppression. While the suppression is acceptable for now, consider extracting related parameters (e.g.,mime_type,file_name,width,height,duration,known_file_size) into aMediaMetadatastruct for improved readability and maintainability in a future refactor.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/ops/media.rs` around lines 59 - 72, The function download_and_upload currently takes many related parameters; extract the media-specific ones into a new MediaMetadata struct (e.g., struct MediaMetadata { mime_type: Option<String>, file_name: Option<String>, width: Option<f64>, height: Option<f64>, duration: Option<f64>, known_file_size: Option<usize> }) and change download_and_upload to accept a MediaMetadata instance instead of the six individual fields; update all call sites to construct MediaMetadata (preserving Option types and ownership/borrowing as needed) and adjust any pattern matching or field accesses inside download_and_upload to use metadata.mime_type, metadata.file_name, etc., keeping error semantics and return type intact.bins/crm-worker/src/services/update_listener.rs (1)
214-243: Background media download task is fire-and-forget.The spawned download task (lines 225-242) logs errors but doesn't propagate them back. This is acceptable for real-time media downloads where the primary message flow shouldn't be blocked, but consider whether failed downloads should be queued for retry via the reconciler pattern used elsewhere in this PR.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/services/update_listener.rs` around lines 214 - 243, The fire-and-forget task spawned in the UpdateListener (the tokio::spawn block that calls download_and_upload_media) currently only logs errors; modify the Err branch inside that spawned async to also enqueue the failed media for retry via your reconciler/retry queue (e.g., call the existing reconciler enqueue function with dl_media_ext, dl_chat_ext, dl_msg_ext and dl_summary) so failed downloads are retried; locate the error handling in the tokio::spawn for download_and_upload_media and add a call to the reconciler/retry API (rather than only warn!) to persist retry metadata for the reconciler loop.bins/convex-backend/tests/integration_test.rs (1)
88-89: Consider asserting specific error messages.The assertion
assert_mutation_error(result, "")with an empty string likely only checks that some error occurred. For more robust tests, consider asserting the expected error message or error type (e.g., "invalid document ID" or "validator").🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/convex-backend/tests/integration_test.rs` around lines 88 - 89, The test currently uses assert_mutation_error(result, "") which only checks for any error; update the assertion to verify the specific error message or type returned for invalid document IDs so the test is robust. Locate the call to assert_mutation_error and change it to assert that the mutation result contains the expected text (e.g., "invalid document ID" or "validator") or unwrap the error from result and assert_eq! or assert!(error.message.contains("invalid document ID")) against the concrete error string/type produced by your validation code so the test fails only when the wrong error is returned.bins/crm-worker/src/services/reconciler.rs (2)
67-84: Consider parallelizing Restate dispatch for better throughput.The
reconcile_batchfunction dispatches items sequentially withawaiton eachrestate_send. For high-throughput scenarios, parallel dispatch could reduce latency.⚡ Optional parallel dispatch
+use futures::future::join_all; + async fn reconcile_batch( items: &[WorkItem], in_flight: &HashSet<(String, String)>, http: &reqwest::Client, ingress_url: &str, ) -> Vec<(String, String)> { - let mut newly_dispatched = Vec::new(); - for item in items { - let flight_key = (item.service.clone(), item.key.clone()); - if !in_flight.contains(&flight_key) { - restate_send(http, ingress_url, &item.service, &item.key, &item.handler).await; - info!(service = %item.service, key = %item.key, handler = %item.handler, "Work dispatched"); - newly_dispatched.push(flight_key); - } - } - newly_dispatched + let to_dispatch: Vec<_> = items + .iter() + .filter(|item| !in_flight.contains(&(item.service.clone(), item.key.clone()))) + .collect(); + + let futures = to_dispatch.iter().map(|item| async { + restate_send(http, ingress_url, &item.service, &item.key, &item.handler).await; + info!(service = %item.service, key = %item.key, handler = %item.handler, "Work dispatched"); + (item.service.clone(), item.key.clone()) + }); + + join_all(futures).await }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/services/reconciler.rs` around lines 67 - 84, reconcile_batch currently awaits restate_send sequentially; change it to dispatch all non-duplicate items concurrently and then await their completion while still collecting the (service,key) pairs in newly_dispatched. Locate the reconcile_batch function and replace the per-item await with spawning or collecting futures (e.g., build a Vec of futures or use FuturesUnordered / futures::stream::FuturesUnordered) for restate_send(http, ingress_url, &item.service, &item.key, &item.handler) and info! logging (ensure logging happens after the send future resolves or capture success/failure), then await all futures (or use a bounded concurrency stream if needed) and push the corresponding flight_key into newly_dispatched only for items that were actually dispatched; keep the same function signature and return value.
161-175: Pruning on every stream event may be inefficient.The pruning logic iterates all five key sets and retains matching items on every single subscription update. For large in-flight sets, this
O(n)operation per event could become a performance concern.Consider pruning less frequently (e.g., on a timer or when
in_flightexceeds a threshold) if profiling reveals this as a bottleneck.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/services/reconciler.rs` around lines 161 - 175, The current code always rebuilds all_current and calls in_flight.retain(...) on every stream event which makes pruning O(n) per event; modify the logic so pruning happens only conditionally (e.g., when in_flight.len() exceeds a threshold or a timer/interval has elapsed). Add a pruning gate such as a PRUNE_THRESHOLD constant and/or a last_prune: Instant field (or counter) and check (in_flight.len() > PRUNE_THRESHOLD || last_prune.elapsed() >= PRUNE_INTERVAL) before computing all_current and calling in_flight.retain; when you run the prune update last_prune and compute pruned and debug exactly as now. Keep references to the existing symbols (in_flight, phone_auth_keys, qr_auth_keys, clients_keys, chats_keys, media_keys, all_current, pruned) so the retain logic is identical but guarded by the new conditional.bins/crm-chat-web/src/hooks/use-qr-auth.ts (2)
73-79: Useasync/awaitinstead of promise chain and removeconsole.error.The coding guidelines specify using
async/awaitsyntax instead of promise chains and removingconsole.log/console.errorfrom production code.♻️ Proposed refactor
- const startQrAuth = (): void => { - setAuthId(null); - startMutation({}).then( - (id) => setAuthId(id), - (error) => console.error("[qrAuth.start]", error) - ); + const startQrAuth = async (): Promise<void> => { + setAuthId(null); + try { + const id = await startMutation({}); + setAuthId(id); + } catch (error) { + // Consider using a proper error reporting mechanism + // or exposing error state to the caller + } };As per coding guidelines: "Use
async/awaitsyntax instead of promise chains for better readability" and "Removeconsole.log,debugger, andalertstatements from production code".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-chat-web/src/hooks/use-qr-auth.ts` around lines 73 - 79, startQrAuth currently uses a promise chain and console.error; change it to an async function that awaits startMutation and sets the result via setAuthId, and replace the console.error with proper error handling (e.g., call the existing error handler or state updater instead of console). Specifically, update startQrAuth to be async, call setAuthId(null) first, then try { const id = await startMutation({}); setAuthId(id); } catch (err) { /* invoke project error/reporting mechanism or set error state here instead of console.error */ } so you reference startMutation and setAuthId in the fix.
56-63: Consider validatingqueryResult.stepbefore type assertion.The type assertion
queryResult.step as QrAuthStepon line 58 assumes the backend always returns a valid step value. If the backend returns an unexpected step, this could cause subtle bugs downstream.🛡️ Proposed defensive approach
+const VALID_STEPS = new Set<QrAuthStep>([ + "Pending", "Generating", "Token", "Authorized", + "AlreadyAuthorized", "Failed", "Cancelled" +]); + let progress: QrAuthProgress | null = null; if (queryResult) { + const step = VALID_STEPS.has(queryResult.step as QrAuthStep) + ? (queryResult.step as QrAuthStep) + : "Failed"; progress = { - step: queryResult.step as QrAuthStep, + step, qrUrl: queryResult.qrUrl, qrExpires: queryResult.qrExpires, error: queryResult.error, }; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-chat-web/src/hooks/use-qr-auth.ts` around lines 56 - 63, Validate queryResult.step before force-casting to QrAuthStep: check that queryResult.step is one of the allowed QrAuthStep enum/string values and only then set progress.step = queryResult.step; otherwise set a safe default (e.g., QrAuthStep.Unknown or null) and log or surface an error. Update the block that assigns progress (referencing queryResult.step, QrAuthStep, and progress) to perform this guard and fallback so downstream code never receives an invalid step value.bins/convex-backend/convex/model/presence.ts (2)
103-108: Frequent scheduled mutations may accumulate.Each heartbeat schedules a new
checkOfflinemutation. With many concurrent users sending heartbeats every few seconds, this creates a large number of scheduled mutations. While they're idempotent, this could add scheduler overhead.Consider tracking a single scheduled check per user (e.g., via a table or state) to avoid redundant scheduling.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/convex-backend/convex/model/presence.ts` around lines 103 - 108, The heartbeat handler currently calls ctx.scheduler.runAfter each time (scheduling internal.model.presence.checkOffline) which can pile up; change it to track one scheduled check per user (e.g., add a presence schedule table or field keyed by userId like presenceSchedule with scheduledAt/timeoutId) and only create a new scheduled job if there isn't an active scheduled check for ctx.caller.tokenIdentifier, otherwise update/reschedule or skip creating a duplicate; use the presence table/state to store the scheduled-check marker and clear it when internal.model.presence.checkOffline runs so future heartbeats can schedule again.
135-148: Full table scan may not scale well.The
disconnectmutation performs a full scan ofqrAuthstable (ctx.db.query("qrAuths").collect()). While the comment notes "very few users have active QR auths at any given time," this assumption may not hold as the system scales.Consider using an index on non-terminal steps if this becomes a bottleneck:
📊 Index-based alternative (requires schema change)
If scaling becomes an issue, consider:
- Add a composite index on
(step, userId)- Query only non-terminal steps:
withIndex("by_step", q => q.eq("step", "Pending")).or(...)For now, the current approach is acceptable if QR auth usage remains low-volume.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/convex-backend/convex/model/presence.ts` around lines 135 - 148, The current disconnect mutation does a full table scan via ctx.db.query("qrAuths").collect() to build usersToCheck from qrAuths with non-terminal steps, which won't scale; change the logic to query only relevant rows by using an indexed query on step (e.g., use a withIndex or filtered query on "step" to fetch only entries where step is "Pending"|"Generating"|"Token") instead of collecting the whole table, keeping the rest of the loop that adds auth.userId to usersToCheck and referring to the same qrAuths variable and step checks.bins/convex-backend/convex/functions.ts (1)
46-51: Consider a more precise worker detection pattern.The check
caller.tokenIdentifier.includes("|mch_")could theoretically match a human user whose user ID happens to contain|mch_(unlikely but possible). A more precise pattern would match specifically after the|separator.🔒 More precise pattern
function isWorkerCaller(caller: UserIdentity): boolean { // Convex tokenIdentifier = "{issuer}|{subject}" // Clerk M2M tokens have subject "mch_*", human tokens have "user_*" - return caller.tokenIdentifier.includes("|mch_"); + const parts = caller.tokenIdentifier.split("|"); + const subject = parts[parts.length - 1]; + return subject?.startsWith("mch_") ?? false; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/convex-backend/convex/functions.ts` around lines 46 - 51, The current isWorkerCaller(caller: UserIdentity) uses caller.tokenIdentifier.includes("|mch_") which can false-positive match elsewhere; change the check to parse the tokenIdentifier exactly into issuer and subject (e.g., split on '|' or use a regex that requires the '|' separator) and then test that the subject startsWith "mch_" (or matches /^mch_/), ensuring you reference caller.tokenIdentifier and the isWorkerCaller function when making the change.bins/crm-worker/src/services/qr_auth.rs (1)
81-89: Use typed enum comparison instead of converting to string.The
auth.stepfield is a generated enum type (matching thev.union()definition in TypeScript schema), not a plain string. The code at line 82 converts it to a string for comparison, which is less type-safe than direct enum comparison. The codebase already establishes the idiomatic pattern inphone_auth.rswith astep_to_str()helper function that matches on enum variants. Adopt the same approach here for consistency and to leverage the type system for safety.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/services/qr_auth.rs` around lines 81 - 89, The code converts auth.step to a string and compares it to "Pending"; instead, compare the enum directly and use the existing step_to_str() pattern for logging: change the idempotency guard to match on auth.step (or compare to the Pending variant of the enum that defines auth.step) rather than converting to string, and when logging or constructing the error use the step_to_str() helper (as in phone_auth.rs) to produce the human-readable step for the info! and error message; update the block around auth.step, step_str, and the returned QrAuthResult accordingly.bins/crm-worker/src/ops/domain_watcher.rs (1)
127-173: Dead code annotation is appropriate.
spawn_media_status_watcherandspawn_chat_scan_watcherare marked#[allow(dead_code)]which is reasonable if they're planned for future use. Consider removing them entirely if not needed soon, or adding a comment explaining the planned usage.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/ops/domain_watcher.rs` around lines 127 - 173, The functions spawn_media_status_watcher and spawn_chat_scan_watcher are annotated with #[allow(dead_code)] but lack context; either remove these unused functions if they won't be used soon, or retain them and add a short explanatory comment (e.g., TODO/FIXME) above each function describing the planned usage and why they are kept (for example: reserved for future background watchers that cancel via CancellationToken when media/chat records change); update the comment for spawn_media_status_watcher and spawn_chat_scan_watcher and keep the #[allow(dead_code)] if you choose to retain them so future readers know they are intentionally dormant.bins/crm-worker/src/services/chat_scanner.rs (1)
103-107: External ID extraction logic is fragile.The
strip_prefixpattern assumeschat_idalways has format"clientId:externalId". If this format changes or a malformedchat_idis encountered,unwrap_orfalls back to the fullchat_id, which may cause issues downstream.Consider adding validation or logging when the expected format isn't matched.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/services/chat_scanner.rs` around lines 103 - 107, The extraction of chat_external_id using chat.chat_id.strip_prefix(&format!("{}:", chat.client_id)).unwrap_or(&chat.chat_id) is fragile; replace it with explicit validation: check whether chat.chat_id starts_with(&format!("{}:", chat.client_id)) and then splitn(2, ':') (or otherwise parse) to extract the external id, and if the expected prefix/format is not present log a warning (using the module logger) including chat.chat_id and chat.client_id and either return an error/None or use a well-documented fallback rather than silently using the full chat_id; update any callers of chat_external_id to handle the possible error/None if you choose that path.bins/convex-backend/convex/model/media.ts (1)
646-671:pendingWorkuses filter scan instead of index.The query uses
.filter((q) => q.eq(q.field("status"), "Pending"))which scans all records. Consider using the existingby_userId_statusindex or adding a dedicated index for pending work queries to improve performance at scale.♻️ Consider using an index-based query
If there's no user constraint for worker queries, you might need a new index like
by_statusor use a different query pattern. For example:- const allPending = await ctx.db - .query("media") - .filter((q) => q.eq(q.field("status"), "Pending")) - .collect(); + // Consider adding .index("by_status", ["status"]) to mediaTable + const allPending = await ctx.db + .query("media") + .withIndex("by_status", (q) => q.eq("status", "Pending")) + .collect();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/convex-backend/convex/model/media.ts` around lines 646 - 671, The handler that builds pending work currently scans the media table via .filter(q => q.eq(q.field("status"), "Pending")) causing full-table scans; change both queries (the Pending and Downloading lookups) to use an index-based query on the media table (e.g., use an index like by_status or the existing by_userId_status if applicable) by calling the query with the index and matching the status value, or add a new by_status index and query against it; update the handler logic that computes slots and returns work items to use those index-backed collections instead of .filter scans.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@bins/convex-backend/build.rs`:
- Around line 31-35: The directory traversal in collect_function_files silently
returns on std::fs::read_dir errors which can drop parts of the Convex tree and
produce incomplete function_paths; update collect_function_files to fail loudly
by propagating the error (return a Result) or panic with the directory path
(e.g., use expect/unwrap_or_else with dir.display()) so the build fails when
read_dir(dir) errors, ensuring callers (and the build) see the failure instead
of continuing with missing inputs.
- Around line 9-12: The build script currently emits rerun-if-changed only for
"convex/schema.ts" and for each discovered file collected into function_paths by
collect_function_files, which misses additions/renames in subdirectories and
skipped dirs; update the build.rs logic to also emit a directory-level watch by
printing "cargo:rerun-if-changed=convex" (so Cargo recursively watches the
entire convex/ tree) in addition to the existing per-file prints — locate the
code that constructs function_paths and the per-file prints
(collect_function_files and where function_paths is iterated) and add the single
directory-level println before or after those per-file emit calls.
- Line 2: The function collect_function_files (and the sibling parameter on line
~31) currently takes &PathBuf; change its signature to accept &std::path::Path
instead and update any callers to pass .as_path() or a &Path reference (e.g.,
my_pathbuf.as_path()); also update the use/import from PathBuf to Path where
appropriate. This keeps the function zero-copy, satisfies Clippy's ptr_arg lint,
and requires no ownership changes inside collect_function_files
(std::fs::read_dir(dir) will continue to accept &Path).
In `@bins/convex-backend/convex/model/chats.ts`:
- Around line 228-229: Remove the debug artifact comment by deleting the line
containing "// TODO buillshit" immediately above the export of purgeChatData
(the internalMutation export named purgeChatData); ensure no other stray
TODO/development-only comments remain in that vicinity so the export block is
clean.
In `@bins/convex-backend/convex/model/clients.ts`:
- Around line 76-78: The deleteClient mutation currently hard-deletes the client
row (so getPhase returns null) which prevents consumers
(DialogSync/UpdateListener) from seeing a Disconnected phase; change
deleteClient to atomically set the client's phase to "Disconnected" (and
optional timestamp/metadata) instead of removing the row, so getPhase resolves
to "Disconnected" and teardown watchers receive the cancellation signal; if
physical removal is required, schedule or perform the hard-delete only after
worker shutdown completes (or add a separate cleanup job), and remove the
current hard-delete call (lines that perform the delete) in favor of an update
that sets phase to "Disconnected".
- Around line 86-88: Replace the inline ownership checks that compare
client.userId to ctx.caller.tokenIdentifier with the shared requireOwner helper
from convex/helpers/auth.ts: import and call requireOwner(ctx, client.userId)
(or the appropriate requireOwner signature) wherever you currently throw new
Error("Unauthorized..."), including the occurrences referencing client.userId at
the shown snippet and the similar check around lines 190-192; this ensures
consistent auth semantics and surfaces authorization failures via the shared
auth contract instead of generic thrown errors.
- Around line 241-248: Fetch of the client (ctx.db.get(clientId)) must not
unconditionally set phase to "Listening" because a late/duplicate worker can
overwrite newer transitions; after loading client and confirming it exists,
check client.phase === "Syncing" and only then call ctx.db.patch(clientId, {
phase: "Listening", photosSynced: false }); otherwise return without patching to
preserve newer states like "NeedsSync" or "Disconnected".
In `@bins/convex-backend/convex/model/messages.ts`:
- Around line 378-426: textByKeywordsValidator's scope.chatId is an Id<"chats">
but the messages.chatId field is a string, so the .eq("chatId", s.chatId) filter
will not match; in the textByKeywords handler (function textByKeywords) change
the "chat" case to resolve the chat document ID to the stored string chatId by
fetching the chat (use ctx.db.get(s.chatId)), handle a missing chat by returning
an empty paginated query (e.g., .take(0).paginate(args.paginationOpts) or
equivalent), and then use the fetched chat.chatId string in the .eq("chatId",
...) filter for ctx.db.query("messages").withSearchIndex("search_text", ...).
In `@bins/convex-backend/convex/model/phoneAuth.ts`:
- Around line 211-213: Replace the inline ownership checks that compare
auth.userId to ctx.caller.tokenIdentifier with the shared Convex auth helper
requireOwner from convex/helpers/auth.ts: locate the blocks in phoneAuth.ts
where auth.userId !== ctx.caller.tokenIdentifier is used (and the similar
occurrences around the other noted spots) and call requireOwner(ctx, auth)
instead so authorization failures go through the shared path rather than
throwing generic Errors; ensure you import requireOwner at the top and remove
the manual throw statements so the helper handles the error flow.
- Around line 293-296: Terminal transitions in phoneAuth currently only update
step/error and leave transient secrets (loginCode, passwordToken, password) on
the document; modify every place that sets step to a terminal value
("Cancelled", "Connected", "Failed", etc.) to also scrub these fields by
patching them to null (or removing them) and updating updatedAt. Specifically,
in the ctx.db.patch calls that set step to "Cancelled"/"Connected"/"Failed"
(e.g., the call using authId), include loginCode: null, passwordToken: null,
password: null (and preserve existing error/step/updatedAt logic), and make the
same change for every other terminal-path ctx.db.patch in this module so no
transient auth material remains on terminal phoneAuth rows.
In `@bins/crm-worker/src/services/media_downloader.rs`:
- Line 105: The direct cast media.file_size.map(|s| s as usize) is unsafe for
f64 -> usize; update the code around media.file_size mapping in
media_downloader.rs to avoid a blind as-cast: either keep the field as
Option<f64> (pass Option<f64> through so the callee (see media.rs line 117)
handles it) or perform a guarded conversion that checks for non-negative, finite
values and only then converts to usize (returning None otherwise). Replace the
map(|s| s as usize) usage with one of these safe patterns and adjust any
downstream consumers accordingly.
---
Outside diff comments:
In `@bins/crm-worker/src/main.rs`:
- Around line 191-195: The shutdown log messages still refer to "Orchestrator"
but the component was migrated to the Reconciler; update the success and error
log labels in the orchestrator_handle match arm so that info!("Orchestrator
exited") becomes info!("Reconciler exited") and tracing::error!(error = %e,
"Orchestrator task panicked") becomes tracing::error!(error = %e, "Reconciler
task panicked"), ensuring the same variables (result, e) and structures are
unchanged.
---
Nitpick comments:
In `@bins/convex-backend/convex/functions.ts`:
- Around line 46-51: The current isWorkerCaller(caller: UserIdentity) uses
caller.tokenIdentifier.includes("|mch_") which can false-positive match
elsewhere; change the check to parse the tokenIdentifier exactly into issuer and
subject (e.g., split on '|' or use a regex that requires the '|' separator) and
then test that the subject startsWith "mch_" (or matches /^mch_/), ensuring you
reference caller.tokenIdentifier and the isWorkerCaller function when making the
change.
In `@bins/convex-backend/convex/model/media.ts`:
- Around line 646-671: The handler that builds pending work currently scans the
media table via .filter(q => q.eq(q.field("status"), "Pending")) causing
full-table scans; change both queries (the Pending and Downloading lookups) to
use an index-based query on the media table (e.g., use an index like by_status
or the existing by_userId_status if applicable) by calling the query with the
index and matching the status value, or add a new by_status index and query
against it; update the handler logic that computes slots and returns work items
to use those index-backed collections instead of .filter scans.
In `@bins/convex-backend/convex/model/presence.ts`:
- Around line 103-108: The heartbeat handler currently calls
ctx.scheduler.runAfter each time (scheduling
internal.model.presence.checkOffline) which can pile up; change it to track one
scheduled check per user (e.g., add a presence schedule table or field keyed by
userId like presenceSchedule with scheduledAt/timeoutId) and only create a new
scheduled job if there isn't an active scheduled check for
ctx.caller.tokenIdentifier, otherwise update/reschedule or skip creating a
duplicate; use the presence table/state to store the scheduled-check marker and
clear it when internal.model.presence.checkOffline runs so future heartbeats can
schedule again.
- Around line 135-148: The current disconnect mutation does a full table scan
via ctx.db.query("qrAuths").collect() to build usersToCheck from qrAuths with
non-terminal steps, which won't scale; change the logic to query only relevant
rows by using an indexed query on step (e.g., use a withIndex or filtered query
on "step" to fetch only entries where step is "Pending"|"Generating"|"Token")
instead of collecting the whole table, keeping the rest of the loop that adds
auth.userId to usersToCheck and referring to the same qrAuths variable and step
checks.
In `@bins/convex-backend/tests/integration_test.rs`:
- Around line 88-89: The test currently uses assert_mutation_error(result, "")
which only checks for any error; update the assertion to verify the specific
error message or type returned for invalid document IDs so the test is robust.
Locate the call to assert_mutation_error and change it to assert that the
mutation result contains the expected text (e.g., "invalid document ID" or
"validator") or unwrap the error from result and assert_eq! or
assert!(error.message.contains("invalid document ID")) against the concrete
error string/type produced by your validation code so the test fails only when
the wrong error is returned.
In `@bins/crm-chat-web/src/components/client-settings.tsx`:
- Line 48: The new scanPhase union includes "Queued" but getScanStatus does not
explicitly handle it; update the getScanStatus function in client-settings.tsx
to add a case for scanPhase === "Queued" (alongside existing "ScanningMessages",
"DownloadingMedia", "Listening" branches) and return the appropriate
label/icon/state for queued (e.g., "Queued" or "Waiting to scan") so the UI is
consistent; ensure any conditional checks that use scanEnabled and scanPhase
(inside getScanStatus and where it's consumed) also account for "Queued" so it
doesn't unintentionally fall through to the generic "Syncing..." message.
- Around line 456-458: Remove the unsafe type assertion on the `chats` result
and align the frontend doc type with the Convex-generated type: update the
`ChatDoc` usage so `useQuery(api.model.chats.listByClient, { clientId })` can
infer its return type (either replace `ChatDoc` with the generated
`Doc<"chats">` type as done in `search-dialog.tsx`, or change your `ChatDoc`
interface to include `_id: DocId`/the proper `v.id("chats")` type and
`_creationTime: number`). After that, delete `as ChatDoc[] | undefined` so
`chats` uses the correct inferred type from `useQuery`.
In `@bins/crm-chat-web/src/components/search-dialog.tsx`:
- Line 7: Replace the fragile relative import of the TextByKeywordsParameters
type in search-dialog.tsx with the configured package alias; locate the import
that references "../../../convex-backend/convex/model/messages" and change it to
import the type from the package alias
"crm-chat-convex-backend/convex/model/messages" so it matches how lib/convex.ts
imports the same package and avoids brittle relative paths.
In `@bins/crm-chat-web/src/hooks/use-qr-auth.ts`:
- Around line 73-79: startQrAuth currently uses a promise chain and
console.error; change it to an async function that awaits startMutation and sets
the result via setAuthId, and replace the console.error with proper error
handling (e.g., call the existing error handler or state updater instead of
console). Specifically, update startQrAuth to be async, call setAuthId(null)
first, then try { const id = await startMutation({}); setAuthId(id); } catch
(err) { /* invoke project error/reporting mechanism or set error state here
instead of console.error */ } so you reference startMutation and setAuthId in
the fix.
- Around line 56-63: Validate queryResult.step before force-casting to
QrAuthStep: check that queryResult.step is one of the allowed QrAuthStep
enum/string values and only then set progress.step = queryResult.step; otherwise
set a safe default (e.g., QrAuthStep.Unknown or null) and log or surface an
error. Update the block that assigns progress (referencing queryResult.step,
QrAuthStep, and progress) to perform this guard and fallback so downstream code
never receives an invalid step value.
In `@bins/crm-worker/src/ops/domain_watcher.rs`:
- Around line 127-173: The functions spawn_media_status_watcher and
spawn_chat_scan_watcher are annotated with #[allow(dead_code)] but lack context;
either remove these unused functions if they won't be used soon, or retain them
and add a short explanatory comment (e.g., TODO/FIXME) above each function
describing the planned usage and why they are kept (for example: reserved for
future background watchers that cancel via CancellationToken when media/chat
records change); update the comment for spawn_media_status_watcher and
spawn_chat_scan_watcher and keep the #[allow(dead_code)] if you choose to retain
them so future readers know they are intentionally dormant.
In `@bins/crm-worker/src/ops/media.rs`:
- Around line 59-72: The function download_and_upload currently takes many
related parameters; extract the media-specific ones into a new MediaMetadata
struct (e.g., struct MediaMetadata { mime_type: Option<String>, file_name:
Option<String>, width: Option<f64>, height: Option<f64>, duration: Option<f64>,
known_file_size: Option<usize> }) and change download_and_upload to accept a
MediaMetadata instance instead of the six individual fields; update all call
sites to construct MediaMetadata (preserving Option types and
ownership/borrowing as needed) and adjust any pattern matching or field accesses
inside download_and_upload to use metadata.mime_type, metadata.file_name, etc.,
keeping error semantics and return type intact.
In `@bins/crm-worker/src/services/chat_scanner.rs`:
- Around line 103-107: The extraction of chat_external_id using
chat.chat_id.strip_prefix(&format!("{}:",
chat.client_id)).unwrap_or(&chat.chat_id) is fragile; replace it with explicit
validation: check whether chat.chat_id starts_with(&format!("{}:",
chat.client_id)) and then splitn(2, ':') (or otherwise parse) to extract the
external id, and if the expected prefix/format is not present log a warning
(using the module logger) including chat.chat_id and chat.client_id and either
return an error/None or use a well-documented fallback rather than silently
using the full chat_id; update any callers of chat_external_id to handle the
possible error/None if you choose that path.
In `@bins/crm-worker/src/services/phone_auth.rs`:
- Around line 73-76: The CancellationToken created by CancellationToken::new()
in run_inner is never cancelled or used beyond being passed into wait_for_step;
either remove the unused cancel_token parameter from wait_for_step and all
related call sites (so run_inner no longer creates a CancellationToken) or, if
external cancellation is intended, wire a cancellation source into run_inner
(keep CancellationToken creation) and ensure it is triggered from the external
cancellation path (e.g., Restate) and propagated into wait_for_step; update
references to cancel_token, run_inner, and wait_for_step accordingly.
In `@bins/crm-worker/src/services/qr_auth.rs`:
- Around line 81-89: The code converts auth.step to a string and compares it to
"Pending"; instead, compare the enum directly and use the existing step_to_str()
pattern for logging: change the idempotency guard to match on auth.step (or
compare to the Pending variant of the enum that defines auth.step) rather than
converting to string, and when logging or constructing the error use the
step_to_str() helper (as in phone_auth.rs) to produce the human-readable step
for the info! and error message; update the block around auth.step, step_str,
and the returned QrAuthResult accordingly.
In `@bins/crm-worker/src/services/reconciler.rs`:
- Around line 67-84: reconcile_batch currently awaits restate_send sequentially;
change it to dispatch all non-duplicate items concurrently and then await their
completion while still collecting the (service,key) pairs in newly_dispatched.
Locate the reconcile_batch function and replace the per-item await with spawning
or collecting futures (e.g., build a Vec of futures or use FuturesUnordered /
futures::stream::FuturesUnordered) for restate_send(http, ingress_url,
&item.service, &item.key, &item.handler) and info! logging (ensure logging
happens after the send future resolves or capture success/failure), then await
all futures (or use a bounded concurrency stream if needed) and push the
corresponding flight_key into newly_dispatched only for items that were actually
dispatched; keep the same function signature and return value.
- Around line 161-175: The current code always rebuilds all_current and calls
in_flight.retain(...) on every stream event which makes pruning O(n) per event;
modify the logic so pruning happens only conditionally (e.g., when
in_flight.len() exceeds a threshold or a timer/interval has elapsed). Add a
pruning gate such as a PRUNE_THRESHOLD constant and/or a last_prune: Instant
field (or counter) and check (in_flight.len() > PRUNE_THRESHOLD ||
last_prune.elapsed() >= PRUNE_INTERVAL) before computing all_current and calling
in_flight.retain; when you run the prune update last_prune and compute pruned
and debug exactly as now. Keep references to the existing symbols (in_flight,
phone_auth_keys, qr_auth_keys, clients_keys, chats_keys, media_keys,
all_current, pruned) so the retain logic is identical but guarded by the new
conditional.
In `@bins/crm-worker/src/services/update_listener.rs`:
- Around line 214-243: The fire-and-forget task spawned in the UpdateListener
(the tokio::spawn block that calls download_and_upload_media) currently only
logs errors; modify the Err branch inside that spawned async to also enqueue the
failed media for retry via your reconciler/retry queue (e.g., call the existing
reconciler enqueue function with dl_media_ext, dl_chat_ext, dl_msg_ext and
dl_summary) so failed downloads are retried; locate the error handling in the
tokio::spawn for download_and_upload_media and add a call to the
reconciler/retry API (rather than only warn!) to persist retry metadata for the
reconciler loop.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 56b51a23-e5eb-44a8-b970-947876f2534d
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockbins/convex-backend/convex/_generated/api.d.tsis excluded by!**/_generated/**
📒 Files selected for processing (71)
bins/convex-backend/Cargo.tomlbins/convex-backend/build.rsbins/convex-backend/convex/README.mdbins/convex-backend/convex/chats.tsbins/convex-backend/convex/clients.tsbins/convex-backend/convex/functions.tsbins/convex-backend/convex/helpers/auth.tsbins/convex-backend/convex/helpers/tasks.tsbins/convex-backend/convex/helpers/validators.tsbins/convex-backend/convex/model/chats.tsbins/convex-backend/convex/model/clients.tsbins/convex-backend/convex/model/media.tsbins/convex-backend/convex/model/messages.tsbins/convex-backend/convex/model/notifications.tsbins/convex-backend/convex/model/phoneAuth.tsbins/convex-backend/convex/model/presence.tsbins/convex-backend/convex/model/qrAuth.tsbins/convex-backend/convex/notifications.tsbins/convex-backend/convex/presence.tsbins/convex-backend/convex/qrAuth.tsbins/convex-backend/convex/schema.tsbins/convex-backend/convex/search.tsbins/convex-backend/convex/testHelpers.tsbins/convex-backend/convex/workerOps.tsbins/convex-backend/convex/workerTasks.tsbins/convex-backend/eslint.config.jsbins/convex-backend/tests/integration_test.rsbins/convex-backend/tests/schema_test.rsbins/convex-backend/tsconfig.jsonbins/crm-chat-web/eslint.config.jsbins/crm-chat-web/src/components/chat-list.tsxbins/crm-chat-web/src/components/chats-page.tsxbins/crm-chat-web/src/components/client-settings.tsxbins/crm-chat-web/src/components/download-manager.tsxbins/crm-chat-web/src/components/media-renderer.tsxbins/crm-chat-web/src/components/message-list.tsxbins/crm-chat-web/src/components/notifications-panel.tsxbins/crm-chat-web/src/components/right-sidebar.tsxbins/crm-chat-web/src/components/search-dialog.tsxbins/crm-chat-web/src/components/telegram-clients-manager.tsxbins/crm-chat-web/src/hooks/use-presence.tsbins/crm-chat-web/src/hooks/use-qr-auth.tsbins/crm-chat-web/tests/client-deletion.spec.tsbins/crm-chat-web/tests/e2e-telegram/media-rendering.spec.tsbins/crm-chat-web/tests/e2e-telegram/media-visual.spec.tsbins/crm-chat-web/tests/e2e-telegram/qr-auth-real.spec.tsbins/crm-chat-web/tests/e2e-telegram/qr-auth.spec.tsbins/crm-chat-web/tests/e2e-telegram/scan-chats.spec.tsbins/crm-chat-web/tests/helpers.tsbins/crm-chat-web/tests/scroll-to-message.spec.tsbins/crm-chat-web/tests/settings.spec.tsbins/crm-worker/src/main.rsbins/crm-worker/src/ops/cancel_watcher.rsbins/crm-worker/src/ops/convex.rsbins/crm-worker/src/ops/domain_watcher.rsbins/crm-worker/src/ops/media.rsbins/crm-worker/src/ops/mod.rsbins/crm-worker/src/ops/telegram.rsbins/crm-worker/src/services/chat_scanner.rsbins/crm-worker/src/services/dialog_sync.rsbins/crm-worker/src/services/media_downloader.rsbins/crm-worker/src/services/mod.rsbins/crm-worker/src/services/phone_auth.rsbins/crm-worker/src/services/profile_photo_sync.rsbins/crm-worker/src/services/qr_auth.rsbins/crm-worker/src/services/reconciler.rsbins/crm-worker/src/services/update_listener.rslibs/messanger-interface/src/error.rslibs/messanger-telegram/src/lib.rslibs/messanger-telegram/src/messenger.rstests/e2e-telegram/tests/integration_test.rs
💤 Files with no reviewable changes (13)
- bins/convex-backend/Cargo.toml
- bins/convex-backend/convex/README.md
- bins/convex-backend/convex/notifications.ts
- bins/convex-backend/convex/helpers/tasks.ts
- bins/crm-worker/src/ops/cancel_watcher.rs
- bins/convex-backend/convex/workerOps.ts
- bins/convex-backend/convex/helpers/auth.ts
- bins/convex-backend/convex/search.ts
- bins/convex-backend/convex/qrAuth.ts
- bins/convex-backend/convex/presence.ts
- bins/convex-backend/convex/chats.ts
- bins/convex-backend/convex/clients.ts
- bins/convex-backend/convex/workerTasks.ts
| const textByKeywordsValidator = v.object({ | ||
| paginationOpts: paginationOptsValidator, | ||
| keywords: v.string(), | ||
| scope: v.union( | ||
| v.object({ type: v.literal("all") }), | ||
| v.object({ type: v.literal("client"), clientId: v.id("clients") }), | ||
| v.object({ type: v.literal("chat"), chatId: v.id("chats") }) | ||
| ), | ||
| }); | ||
| export type TextByKeywordsParameters = Infer<typeof textByKeywordsValidator>; | ||
| export const textByKeywords = humanQuery({ | ||
| args: textByKeywordsValidator, | ||
| handler: async (ctx, args) => { | ||
| const { caller } = ctx; | ||
|
|
||
| const keywords = args.keywords.trim(); | ||
|
|
||
| const scopedQuery = ((s) => { | ||
| switch (s.type) { | ||
| case "all": | ||
| return ctx.db | ||
| .query("messages") | ||
| .withSearchIndex("search_text", (q) => | ||
| q.search("text", keywords).eq("userId", caller.tokenIdentifier) | ||
| ); | ||
| case "client": | ||
| return ctx.db | ||
| .query("messages") | ||
| .withSearchIndex("search_text", (q) => | ||
| q | ||
| .search("text", keywords) | ||
| .eq("userId", caller.tokenIdentifier) | ||
| .eq("clientId", s.clientId) | ||
| ); | ||
| case "chat": | ||
| return ctx.db | ||
| .query("messages") | ||
| .withSearchIndex("search_text", (q) => | ||
| q | ||
| .search("text", keywords) | ||
| .eq("userId", caller.tokenIdentifier) | ||
| .eq("chatId", s.chatId) | ||
| ); | ||
| default: | ||
| throw new Error( | ||
| `Unknown search scope type: ${(s as { type: string }).type}` | ||
| ); | ||
| } | ||
| })(args.scope); |
There was a problem hiding this comment.
Type mismatch: chatId in scope is v.id("chats") but message's chatId is a string.
The textByKeywords validator defines scope.chatId as v.id("chats") (Line 384), but the messages table stores chatId as a string (Line 28). The search index filter on Line 419 (.eq("chatId", s.chatId)) compares a string field against an Id<"chats">, which will fail to match.
🐛 Proposed fix
scope: v.union(
v.object({ type: v.literal("all") }),
v.object({ type: v.literal("client"), clientId: v.id("clients") }),
- v.object({ type: v.literal("chat"), chatId: v.id("chats") })
+ v.object({ type: v.literal("chat"), chatId: v.string() })
),Or if you need to accept the chat document ID and resolve the string chatId:
case "chat":
const chat = await ctx.db.get(s.chatId);
if (!chat) return await ctx.db.query("messages").take(0).paginate(args.paginationOpts);
return ctx.db
.query("messages")
.withSearchIndex("search_text", (q) =>
q
.search("text", keywords)
.eq("userId", caller.tokenIdentifier)
.eq("chatId", chat.chatId) // use the string chatId from the chat doc
);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@bins/convex-backend/convex/model/messages.ts` around lines 378 - 426,
textByKeywordsValidator's scope.chatId is an Id<"chats"> but the messages.chatId
field is a string, so the .eq("chatId", s.chatId) filter will not match; in the
textByKeywords handler (function textByKeywords) change the "chat" case to
resolve the chat document ID to the stored string chatId by fetching the chat
(use ctx.db.get(s.chatId)), handle a missing chat by returning an empty
paginated query (e.g., .take(0).paginate(args.paginationOpts) or equivalent),
and then use the fetched chat.chatId string in the .eq("chatId", ...) filter for
ctx.db.query("messages").withSearchIndex("search_text", ...).
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
bins/convex-backend/build.rs (1)
36-36: Consider explicit error handling for individual directory entries.While the directory-level error now panics (good),
.flatten()silently dropsErrentries from the iterator. This is inconsistent with the fail-fast approach and could hide I/O errors for specific entries.♻️ Optional: Handle entry errors explicitly
- for entry in entries.flatten() { + for entry in entries { + let entry = entry + .unwrap_or_else(|e| panic!("failed to read entry in {}: {}", dir.display(), e));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/convex-backend/build.rs` at line 36, Replace the silent .flatten() over directory entries so individual errors are not dropped: iterate over entries as Result<DirEntry, io::Error> (e.g., for entry_result in entries) and explicitly handle Err by panicking or logging and exiting (use entry_result.unwrap_or_else(|e| panic!(...)) or match to include the error), then proceed with the successful DirEntry; update the loop that currently reads "for entry in entries.flatten()" to use the explicit error-handling approach so any per-entry I/O error surfaces (reference the entries iterator and the loop handling in build.rs).bins/crm-worker/src/services/media_downloader.rs (1)
39-52: Consider returningOk(())for missing media to avoid retry churn.In the reconciler pattern, there's a race window where media may be deleted between the
pendingWorkquery dispatch and this handler executing. Returning an error here causes Restate to retry the invocation, which will fail repeatedly.Since the idempotency guard on line 49 already handles the "nothing to do" case gracefully by returning
Ok(()), applying the same pattern for missing media would be consistent:♻️ Suggested change
let media = self .convex .query_media_get_for_download(MediaGetForDownloadArgs { mediaId: media_id.clone(), }) .await - .map_err(|e| anyhow::anyhow!("Failed to query media: {e}"))? - .ok_or_else(|| anyhow::anyhow!("Media record {} not found", media_id))?; + .map_err(|e| anyhow::anyhow!("Failed to query media: {e}"))?; + + let Some(media) = media else { + info!(media_id, "MediaDownloader: media not found, skipping"); + return Ok(()); + };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@bins/crm-worker/src/services/media_downloader.rs` around lines 39 - 52, The handler treats a missing media row as an error (the .ok_or_else(...) after query_media_get_for_download), which causes retries; instead treat None as a benign "nothing to do" and return Ok(()). Update the code that calls convex.query_media_get_for_download(MediaGetForDownloadArgs { mediaId: ... }) so it handles the Option result explicitly (e.g., match or if let None) and, on None, log an informational message including media_id and return Ok(()); keep the existing idempotency check for media.status when Some(media).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@bins/convex-backend/convex/model/chats.ts`:
- Around line 215-234: The mutation updateScanEnabled currently performs
unbatched deletions via ctx.db.query(...).collect() and immediate
ctx.db.delete/ctx.storage.delete calls which can exceed Convex read/write
limits; change it to schedule an internal purge job via ctx.scheduler (e.g.,
create an internal mutation name like "purgeChatData") that accepts chatId and
performs paginated/batched deletes of messages and media, and replace the
immediate delete block in updateScanEnabled with a scheduler enqueue call;
ensure the scheduled purge implementation uses indexed queries with pagination
and deletes storage files before deleting media records to avoid hitting the
8,192 read/write limit.
In `@bins/convex-backend/convex/model/clients.ts`:
- Around line 256-259: The loop in workerCompleteSync that re-queues chats only
sets scanPhase to "Queued" and may leave stale progress in syncedMessages and
totalMessages; update the ctx.db.patch call inside the for loop (where chats are
iterated and chat.scanEnabled && !chat.fullScanned is checked) to also reset
syncedMessages to 0 and totalMessages to 0 (matching the behavior in the rescan
mutation) so re-queued chats start with clean progress state alongside
scanPhase: "Queued".
In `@bins/convex-backend/convex/model/phoneAuth.ts`:
- Around line 571-595: pendingWork currently calls
ctx.db.query("phoneAuths").withIndex("by_step", ...).collect() for each step
which can materialize an unbounded result set; change pendingWork (the
workerQuery handler) to accept a limit/slots argument and page/slice results
instead of using .collect() unboundedly: replace the .collect() usage with a
bounded fetch pattern (e.g., fetch or collect only a window or use query
pagination or collect then slice like allPending.slice(0, slots)) for the
phoneAuths query keyed by index "by_step" so you only push up to the requested
limit of work items for the PhoneAuthWorkflow run handler. Ensure function
signature in pendingWork accepts the limit param and enforce it when building
the work array.
---
Nitpick comments:
In `@bins/convex-backend/build.rs`:
- Line 36: Replace the silent .flatten() over directory entries so individual
errors are not dropped: iterate over entries as Result<DirEntry, io::Error>
(e.g., for entry_result in entries) and explicitly handle Err by panicking or
logging and exiting (use entry_result.unwrap_or_else(|e| panic!(...)) or match
to include the error), then proceed with the successful DirEntry; update the
loop that currently reads "for entry in entries.flatten()" to use the explicit
error-handling approach so any per-entry I/O error surfaces (reference the
entries iterator and the loop handling in build.rs).
In `@bins/crm-worker/src/services/media_downloader.rs`:
- Around line 39-52: The handler treats a missing media row as an error (the
.ok_or_else(...) after query_media_get_for_download), which causes retries;
instead treat None as a benign "nothing to do" and return Ok(()). Update the
code that calls convex.query_media_get_for_download(MediaGetForDownloadArgs {
mediaId: ... }) so it handles the Option result explicitly (e.g., match or if
let None) and, on None, log an informational message including media_id and
return Ok(()); keep the existing idempotency check for media.status when
Some(media).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 58a72b0d-1c05-4261-89b4-d9646cd86563
📒 Files selected for processing (6)
bins/convex-backend/build.rsbins/convex-backend/convex/model/chats.tsbins/convex-backend/convex/model/clients.tsbins/convex-backend/convex/model/phoneAuth.tsbins/crm-worker/src/main.rsbins/crm-worker/src/services/media_downloader.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- bins/crm-worker/src/main.rs
| for (const chat of chats) { | ||
| if (chat.scanEnabled && !chat.fullScanned) { | ||
| await ctx.db.patch(chat._id, { scanPhase: "Queued" }); | ||
| } |
There was a problem hiding this comment.
Consider resetting progress fields when re-queuing chats.
When workerCompleteSync queues chats for scanning, it only sets scanPhase: "Queued" without resetting syncedMessages and totalMessages. The rescan mutation (lines 298-303 in chats.ts) resets these fields when queuing. This inconsistency could leave stale progress offsets for chats being re-queued after a new dialog sync.
🛠️ Proposed fix
for (const chat of chats) {
if (chat.scanEnabled && !chat.fullScanned) {
- await ctx.db.patch(chat._id, { scanPhase: "Queued" });
+ await ctx.db.patch(chat._id, {
+ scanPhase: "Queued",
+ syncedMessages: undefined,
+ totalMessages: undefined,
+ });
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@bins/convex-backend/convex/model/clients.ts` around lines 256 - 259, The loop
in workerCompleteSync that re-queues chats only sets scanPhase to "Queued" and
may leave stale progress in syncedMessages and totalMessages; update the
ctx.db.patch call inside the for loop (where chats are iterated and
chat.scanEnabled && !chat.fullScanned is checked) to also reset syncedMessages
to 0 and totalMessages to 0 (matching the behavior in the rescan mutation) so
re-queued chats start with clean progress state alongside scanPhase: "Queued".
| export const pendingWork = workerQuery({ | ||
| args: {}, | ||
| returns: v.array(workItem), | ||
| handler: async (ctx) => { | ||
| const work: { service: string; key: string; handler: string }[] = []; | ||
| for (const step of [ | ||
| "SendingCode", | ||
| "VerifyingCode", | ||
| "VerifyingPassword", | ||
| ] as const) { | ||
| const auths = await ctx.db | ||
| .query("phoneAuths") | ||
| .withIndex("by_step", (q) => q.eq("step", step)) | ||
| .collect(); | ||
| for (const a of auths) { | ||
| work.push({ | ||
| service: "PhoneAuthWorkflow", | ||
| key: a._id, | ||
| handler: "run", | ||
| }); | ||
| } | ||
| } | ||
| return work; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, verify the file exists and check the exact content at the specified lines
fd -p "phoneAuth.ts" | head -20Repository: nickcomua/crm-chat
Length of output: 107
🏁 Script executed:
# Get the file and check its structure
cat -n bins/convex-backend/convex/model/phoneAuth.ts | sed -n '560,605p'Repository: nickcomua/crm-chat
Length of output: 1242
🏁 Script executed:
# Also check what workerQuery is and how it's used
rg "workerQuery" bins/convex-backend/convex --type ts -A 3 | head -50Repository: nickcomua/crm-chat
Length of output: 3381
🏁 Script executed:
# Check if there are other queries in the codebase that handle large datasets with pagination
rg "\.paginate\(\)" bins/convex-backend/convex --type ts -B 3 -A 3 | head -80Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Search for how phoneAuths entries are created to understand potential size
rg "phoneAuths" bins/convex-backend/convex --type ts | grep -E "(insert|create|new)" | head -20Repository: nickcomua/crm-chat
Length of output: 147
🏁 Script executed:
# Check if there are limits/cleanup mechanisms for phoneAuths
rg "phoneAuths" bins/convex-backend/convex/model/phoneAuth.ts --type ts | head -30Repository: nickcomua/crm-chat
Length of output: 607
🏁 Script executed:
# Look for other uses of workerQuery to see if there's a pattern for handling large datasets
rg "workerQuery" bins/convex-backend/convex/model --type ts -A 10 | grep -E "(paginate|collect|limit)" | head -30Repository: nickcomua/crm-chat
Length of output: 566
🏁 Script executed:
# Check the structure of phoneAuths and its indexes to understand the data distribution
rg "phoneAuthsTable\|defineTable.*phoneAuth" bins/convex-backend/convex --type ts -A 30Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Look for how phoneAuths entries transition between states to understand retention
rg "step.*SendingCode\|step.*VerifyingCode\|step.*VerifyingPassword" bins/convex-backend/convex/model/phoneAuth.ts --type ts -B 2 -A 2Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Let me view the entire phoneAuth.ts file to understand the table structure and data flow
wc -l bins/convex-backend/convex/model/phoneAuth.tsRepository: nickcomua/crm-chat
Length of output: 111
🏁 Script executed:
# Check the table schema definition
rg "phoneAuthFields\|const phoneAuth" bins/convex-backend/convex/model/phoneAuth.ts --type ts -A 15Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Look at the beginning of the file to see imports and table definitions
head -100 bins/convex-backend/convex/model/phoneAuth.tsRepository: nickcomua/crm-chat
Length of output: 2977
🏁 Script executed:
# Check for cleanup/transition logic - look for where step is updated to terminal states
rg "step.*Connected\|step.*Failed\|step.*Cancelled" bins/convex-backend/convex/model/phoneAuth.ts --type ts -B 3 -A 3Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Check if there's any expiry/TTL logic or document deletion
rg "delete\|remove\|ttl\|expiry\|expire" bins/convex-backend/convex/model/phoneAuth.ts --type ts -i -B 2 -A 2Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Look at the full pendingWork context and surrounding code
sed -n '565,595p' bins/convex-backend/convex/model/phoneAuth.tsRepository: nickcomua/crm-chat
Length of output: 943
🏁 Script executed:
# Search for where step transitions happen to understand state machine
rg "step.*:" bins/convex-backend/convex/model/phoneAuth.ts --type ts | head -40Repository: nickcomua/crm-chat
Length of output: 920
🏁 Script executed:
# Search for any pagination patterns or limits being used in the codebase
rg "limit\|take\|skip\|page" bins/convex-backend/convex --type ts | grep -i "query\|db" | head -20Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Check if entries get cleaned up from phoneAuths - look for patch/update removing items from these steps
rg "patch\|update" bins/convex-backend/convex/model/phoneAuth.ts --type ts -B 2 -A 5 | head -60Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Check the full handler logic for step transitions to see if items stay indefinitely
sed -n '100,200p' bins/convex-backend/convex/model/phoneAuth.tsRepository: nickcomua/crm-chat
Length of output: 3038
🏁 Script executed:
# Search for any reconciler/cleanup code that might remove stale entries
rg "reconcil\|cleanup\|stale\|ttl" bins/convex-backend/convex --type ts -iRepository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Check if there's a .paginate() method available in Convex queries
rg "paginate\|page\|batch" bins/convex-backend/convex --type ts | head -20Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Look at how the pendingWork result is actually used/consumed
rg "pendingWork" bins/convex-backend/convex --type ts -B 3 -A 3Repository: nickcomua/crm-chat
Length of output: 4439
🏁 Script executed:
# Check if there's any size monitoring or metrics around these queries
rg "metric\|count\|size\|monitor" bins/convex-backend/convex/model --type ts | head -20Repository: nickcomua/crm-chat
Length of output: 44
🏁 Script executed:
# Look at the step transitions more carefully - see if terminal states are ever reached
sed -n '200,400p' bins/convex-backend/convex/model/phoneAuth.tsRepository: nickcomua/crm-chat
Length of output: 6088
🏁 Script executed:
# Check the media.ts pendingWork to see how it limits results
sed -n '500,550p' bins/convex-backend/convex/model/media.tsRepository: nickcomua/crm-chat
Length of output: 1226
🏁 Script executed:
# Check qrAuth.ts pendingWork as another example
rg "export const pendingWork" bins/convex-backend/convex/model/qrAuth.ts -A 20Repository: nickcomua/crm-chat
Length of output: 431
🏁 Script executed:
# Check the media.ts pendingWork implementation to see how it limits results
rg "export const pendingWork" bins/convex-backend/convex/model/media.ts -A 25Repository: nickcomua/crm-chat
Length of output: 878
🏁 Script executed:
# Look for clients.ts pendingWork to see if there are any other patterns
rg "export const pendingWork" bins/convex-backend/convex/model/clients.ts -A 20Repository: nickcomua/crm-chat
Length of output: 792
pendingWork should paginate results instead of using unbounded .collect().
Each step bucket is loaded with .collect() and fully materialized into the work array. Per Convex API documentation, .collect() should only be used for tightly bounded result sets. While items should transit through these states (SendingCode → WaitingCode → VerifyingCode → etc.) relatively quickly, accumulated stuck sessions or transient bugs can cause unbounded growth in any of these buckets, stalling the entire dispatcher.
The pattern used in media.ts:pendingWork demonstrates the correct approach: accept a limit parameter and slice results (e.g., allPending.slice(0, slots)). Apply the same pattern here.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@bins/convex-backend/convex/model/phoneAuth.ts` around lines 571 - 595,
pendingWork currently calls ctx.db.query("phoneAuths").withIndex("by_step",
...).collect() for each step which can materialize an unbounded result set;
change pendingWork (the workerQuery handler) to accept a limit/slots argument
and page/slice results instead of using .collect() unboundedly: replace the
.collect() usage with a bounded fetch pattern (e.g., fetch or collect only a
window or use query pagination or collect then slice like allPending.slice(0,
slots)) for the phoneAuths query keyed by index "by_step" so you only push up to
the requested limit of work items for the PhoneAuthWorkflow run handler. Ensure
function signature in pendingWork accepts the limit param and enforce it when
building the work array.
Merge activity
|
Summary by CodeRabbit
Refactor
New Features