feat(canvas): checkpoint logic (3/4)#9807
feat(canvas): checkpoint logic (3/4)#9807benwu408 wants to merge 2 commits intoonyx-dot-app:mainfrom
Conversation
Add _load_from_checkpoint with staged processing (pages → assignments → announcements) per course, time-window filtering, per-document failure isolation via ConnectorFailure, and proper checkpoint state advancement. Security-critical pagination errors (host/scheme mismatch) propagate while recoverable API errors trigger retries via has_more=True. Implements load_from_checkpoint, load_from_checkpoint_with_perm_sync, build_dummy_checkpoint, and validate_checkpoint_json. Includes unit tests for checkpoint lifecycle, stage advancement, time filtering, failure handling, and perm sync attachment.
There was a problem hiding this comment.
4 issues found across 2 files
Confidence score: 3/5
- There is a concrete runtime risk in
backend/onyx/connectors/canvas/connector.py: checkpoint loading appears to retry on all API errors and does not surface 401/403, which can cause unauthorized/expired credentials to loop indefinitely instead of failing clearly. backend/onyx/connectors/canvas/connector.pyalso has duplicated_handle_canvas_api_errorlogic that overrides prior behavior, potentially misclassifying 5xx upstream failures as validation errors and changing error handling paths.- Test reliability is reduced in
backend/tests/unit/onyx/connectors/canvas/test_canvas_connector.pybecause duplicate class/test names overwrite earlier definitions, so some newly added cases are skipped and coverage drops. - Pay close attention to
backend/onyx/connectors/canvas/connector.pyandbackend/tests/unit/onyx/connectors/canvas/test_canvas_connector.py- API error classification and duplicated tests may hide regressions.
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/tests/unit/onyx/connectors/canvas/test_canvas_connector.py">
<violation number="1" location="backend/tests/unit/onyx/connectors/canvas/test_canvas_connector.py:701">
P2: Duplicate TestConnectorUrlNormalization class definitions overwrite the earlier class, so the newly added tests in the first class are skipped by pytest.</violation>
<violation number="2" location="backend/tests/unit/onyx/connectors/canvas/test_canvas_connector.py:946">
P3: Duplicate test_validate_insufficient_permissions definition overwrites the earlier test, so one variant never runs and coverage is reduced.</violation>
</file>
<file name="backend/onyx/connectors/canvas/connector.py">
<violation number="1" location="backend/onyx/connectors/canvas/connector.py:65">
P2: Duplicate `_handle_canvas_api_error` overrides the original 5xx handling logic, so 5xx Canvas errors are misclassified as validation errors.</violation>
<violation number="2" location="backend/onyx/connectors/canvas/connector.py:519">
P2: Checkpoint loading treats all API errors as retryable and never raises for 401/403, so expired or unauthorized credentials will loop forever instead of surfacing a credential error.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
backend/tests/unit/onyx/connectors/canvas/test_canvas_connector.py
Outdated
Show resolved
Hide resolved
Greptile SummaryThis PR implements the core checkpoint-based indexing for the Canvas connector: Key changes:
Confidence Score: 5/5
Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A([load_from_checkpoint called]) --> B{course_ids empty?}
B -- Yes --> C[_list_courses]
C -- failure --> D[has_more=True\nreturn same checkpoint]
C -- success, 0 courses --> E[has_more=False\nreturn checkpoint]
C -- success, N courses --> F[populate course_ids\nstage=pages, index=0\nhas_more=True\nreturn — no docs yielded]
B -- No --> G{current_index ≥\nlen course_ids?}
G -- Yes --> H[has_more=False\nreturn]
G -- No --> I[Validate stage\npages / assignments / announcements]
I --> J{next_url set?}
J -- Yes --> K[GET full_url]
J -- No --> L[GET endpoint + params]
K --> M{HTTP error?}
L --> M
M -- 401/403 --> N[raise CredentialExpired /\nInsufficientPermissions]
M -- security error no status --> N
M -- 429 / 5xx --> O[has_more=True\nreturn — stage unchanged]
M -- OK --> P[Iterate items]
P --> Q{item in\ntime window?}
Q -- No / no timestamp --> R[skip]
Q -- Yes --> S[convert to Document\nyield doc]
S -- conversion error --> T[yield ConnectorFailure]
P --> U{result_next_url?}
U -- Yes --> V[checkpoint.next_url = result_next_url\nhas_more=True]
U -- No --> W{next stage?}
W -- pages→assignments\nassignments→announcements --> X[advance stage\nnext_url=None]
W -- announcements→done --> Y[advance_course\nreset stage+next_url]
V --> Z([return new_checkpoint])
X --> Z
Y --> Z
Prompt To Fix All With AIThis is a comment left during a code review.
Path: backend/onyx/connectors/canvas/connector.py
Line: 486-493
Comment:
**`stage_config` eagerly built even when unused**
`stage_config` and `config` are constructed on every invocation, including when `new_checkpoint.next_url` is set — in that branch, `config["endpoint"]` and `config["params"]` are never accessed. Moving the dict construction into the `else` branch keeps the hot path (pagination resume) cleaner and avoids building three endpoint strings on every page-turn.
```suggestion
try:
if new_checkpoint.next_url:
response, result_next_url = self.canvas_client.get(
full_url=new_checkpoint.next_url
)
else:
stage_config: dict[str, dict[str, Any]] = {
"pages": {
"endpoint": f"courses/{course_id}/pages",
"params": {"per_page": "100", "include[]": "body", "published": "true"},
},
"assignments": {
"endpoint": f"courses/{course_id}/assignments",
"params": {"per_page": "100", "published": "true"},
},
"announcements": {
"endpoint": "announcements",
"params": {
"per_page": "100",
"context_codes[]": f"course_{course_id}",
"active_only": "true",
},
},
}
config = stage_config[stage]
response, result_next_url = self.canvas_client.get(
config["endpoint"], params=config["params"]
)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: backend/tests/unit/onyx/connectors/canvas/test_canvas_connector.py
Line: 330-344
Comment:
**No test coverage for `next_url` pagination resume path**
`_run_checkpoint` always exhausts `StopIteration` after one generator pass, and `_mock_response` always returns an empty `Link` header — so `result_next_url` is always `None` in every test. The `next_url` branch in `_load_from_checkpoint` (lines 487–490 of `connector.py`) is never exercised. A test like the one below would cover mid-pagination resume and verify that the checkpoint correctly carries `next_url` between calls:
```python
@patch("onyx.connectors.canvas.client.rl_requests")
def test_pagination_resume_via_next_url(self, mock_requests: MagicMock) -> None:
"""When a page has a next_url, checkpoint carries it and resumes from there."""
page1 = _mock_page(10, updated_at="2025-06-15T12:00:00Z")
page2 = _mock_page(11, title="Page 2", updated_at="2025-06-15T13:00:00Z")
call_count = 0
def _dispatcher(url: str, **kwargs: Any) -> MagicMock:
nonlocal call_count
call_count += 1
next_url = f"{FAKE_BASE_URL}/api/v1/courses/1/pages?page=2" if call_count == 1 else ""
return _mock_response(
json_data=[page1] if call_count == 1 else [page2],
link_header=f'<{next_url}>; rel="next"' if next_url else "",
)
mock_requests.get.side_effect = _dispatcher
connector = _build_connector()
cp = CanvasConnectorCheckpoint(
has_more=True, course_ids=[1], current_course_index=0, stage="pages"
)
start = datetime(2025, 6, 1, tzinfo=timezone.utc).timestamp()
end = datetime(2025, 6, 30, tzinfo=timezone.utc).timestamp()
items1, cp = _run_checkpoint(connector, cp, start, end)
assert len(items1) == 1
assert cp.next_url is not None # cursor saved
assert cp.stage == "pages" # stage not advanced yet
items2, cp = _run_checkpoint(connector, cp, start, end)
assert len(items2) == 1
assert cp.next_url is None # exhausted
assert cp.stage == "assignments"
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (2): Last reviewed commit: "fix duplicate tests/functions from earli..." | Re-trigger Greptile |
backend/tests/unit/onyx/connectors/canvas/test_canvas_connector.py
Outdated
Show resolved
Hide resolved
| except OnyxError as oe: | ||
| # Re-raise security errors from _parse_next_link (host/scheme | ||
| # mismatch on pagination URLs) — these must not be silenced. | ||
| # Security errors have no HTTP status code override (they are | ||
| # raised locally, not from an API response). | ||
| is_api_error = oe._status_code_override is not None | ||
| if not is_api_error: | ||
| raise | ||
| logger.warning( | ||
| f"Failed to fetch {stage} for course {course_id}: {oe}" | ||
| ) | ||
| new_checkpoint.has_more = True | ||
| return new_checkpoint |
There was a problem hiding this comment.
Accessing private
_status_code_override to classify errors
The code inspects oe._status_code_override to distinguish HTTP API errors from locally-raised security errors. This couples to an internal detail of OnyxError. Consider adding a public property on OnyxError to expose whether the error originated from an HTTP response, or catching the security-error types explicitly.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/onyx/connectors/canvas/connector.py
Line: 514-526
Comment:
**Accessing private `_status_code_override` to classify errors**
The code inspects `oe._status_code_override` to distinguish HTTP API errors from locally-raised security errors. This couples to an internal detail of `OnyxError`. Consider adding a public property on `OnyxError` to expose whether the error originated from an HTTP response, or catching the security-error types explicitly.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| include_permissions: bool = False, | ||
| ) -> CheckpointOutput[CanvasConnectorCheckpoint]: | ||
| """Shared implementation for load_from_checkpoint and load_from_checkpoint_with_perm_sync.""" | ||
| new_checkpoint = checkpoint.model_copy(deep=True) |
There was a problem hiding this comment.
is model_copy just making a copy of the object?
There was a problem hiding this comment.
yep, makes a copy that we modify then return. its a pydantic method
| # First call: materialize the list of course IDs | ||
| if not new_checkpoint.course_ids: | ||
| try: | ||
| courses = self._list_courses() |
There was a problem hiding this comment.
maybe i don't have a good understanding of what exactly "checkpoint" means - but from reading this i'd be concerned that we're just using the old course_ids if they exist - what if someone joins / drops a course since we last checkpoitned?
There was a problem hiding this comment.
This lowkey confused me and I had to spend some time understanding it, but basically indexing is like every minute or whatever when I sorts thru everything and finds new stuff. every indexing run makes a new empty checkpoint, calls list courses, then finds every single document and yields the ones in the window since the last indexing run. checkpoints are made at every single stage in the indexing process, each load from checkpoint run processes a page(at most 100) of things, and then returns a new checkpoint that is then called. So basically every new index you'd get the full course list again.
| courses = self._list_courses() | ||
| except Exception as e: | ||
| logger.warning(f"Failed to list Canvas courses: {e}") | ||
| new_checkpoint.has_more = True |
There was a problem hiding this comment.
the current indexing run still has more docs to go thru
…perly in checkpoint loop
Description
Add _load_from_checkpoint with staged processing (pages → assignments → announcements) per course, time-window filtering, per-document failure isolation via ConnectorFailure, and proper checkpoint state advancement. Security-critical pagination errors (host/scheme mismatch) propagate while recoverable API errors (404, 429, 5xx) trigger retries. Implements load_from_checkpoint, load_from_checkpoint_with_perm_sync, build_dummy_checkpoint, and validate_checkpoint_json.
Includes unit tests for checkpoint lifecycle, stage advancement, time filtering, failure handling, and perm sync attachment.
How Has This Been Tested?
Unit tests covering checkpoint lifecycle: first-call course materialization, stage processing (pages → assignments → announcements), stage advancement across all 3 stages, time-window filtering, announcement skip on missing posted_at, stage failure retry (no advancement), per-document conversion failure yielding ConnectorFailure, terminal state (has_more=False), invalid stage rejection, and load_from_checkpoint_with_perm_sync attaching ExternalAccess.
Also manually tested against a live Canvas instance.
PR Stack
Stacked PRs on my fork (each targets the previous branch for isolated review):
PRs on upstream onyx (all target main):
Summary by cubic
Adds checkpoint-based indexing to the Canvas connector with staged per‑course processing and time-window filtering. Also adds a permission-sync mode and tighter error handling, including surfacing 401/403 and retrying only recoverable API errors.
New Features
_load_from_checkpointused byload_from_checkpointandload_from_checkpoint_with_perm_sync, plusbuild_dummy_checkpointandvalidate_checkpoint_json.next_urland correct stage/course advancement.posted_at.ConnectorFailurewithout stopping other items.has_more=True.Bug Fixes
_handle_canvas_api_error(no silent retry), while other API failures continue to retry.Written for commit 23933c5. Summary will update on new commits.