Skip to content

source-ashby: implement snapshot and child streams#4309

Open
nicolaslazo wants to merge 2 commits intomainfrom
nlazo/fix-source-ashby-stream-types
Open

source-ashby: implement snapshot and child streams#4309
nicolaslazo wants to merge 2 commits intomainfrom
nlazo/fix-source-ashby-stream-types

Conversation

@nicolaslazo
Copy link
Copy Markdown
Contributor

Description:

All Ashby streams were treated as incremental and requiring no parent entity id parameters. This change

  • Implements snapshot streams
  • Sets flags to pull additional data when available
  • Implements a guard for the scenario when docs are yielded but no sync token is provided by Ashby at the end of pagination

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)

All Ashby streams were treated as incremental and requiring no parent
entity id parameters. This change

- Implements snapshot streams
- Sets flags to pull additional data when available
- Implements a guard for the scenario when docs are yielded but no sync
  token is provided by Ashby at the end of pagination
@nicolaslazo nicolaslazo requested a review from a team April 24, 2026 12:14
@nicolaslazo nicolaslazo self-assigned this Apr 24, 2026
Copy link
Copy Markdown
Member

@Alex-Bair Alex-Bair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % the pagination question for snapshot resources and the timeout concern around concurrently fetching child records while streaming the response containing parent ids.

Comment on lines +135 to +146
return Resource(
name=entity_cls.name,
key=["/_meta/row_id"],
model=entity_cls,
open=open,
initial_state=ResourceState(),
initial_config=ResourceConfig(
name=entity_cls.name,
interval=timedelta(minutes=5),
),
schema_inference=True,
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Would it work to use the recently added SnapshotResource here instead of Resource? It's not necessary, but should remove the need to specify the key and initial_state during instantiation. Using Resource instead is fine, but if there's some rough edge that's preventing the use of SnapshotResource instead I'd like to know so I can smooth it out.

state,
task,
fetch_snapshot=functools.partial(snapshot_fn, entity_cls, http),
tombstone=BaseDocument(_meta=BaseDocument.Meta(op="d")),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Similar to below, you shouldn't have to specify a tombstone here any longer as long as you're fine with the CDK defaulting to use BaseDocument for the tombstone like you are here. You can still specify it if you want, but it should work without it.

) -> AsyncGenerator[ChildEntity, None]:
url = f"{API_BASE_URL}/{entity_cls.path}"

async for parent in snapshot_entity(entity_cls.parent_entity, http, log):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes we've seen that keeping a response open while we're iterating through parent ids & making requests for child resources can lead to TimeoutErrors (here's the spot in source-zendesk-support-native where we fixed that issue). That's seems to be possible here too - the snapshot_entity call streams the API response for parent records, and we're keeping that response open while we make requests for child resources. We've handled this in other connectors by fetching an entire response's worth of parent records at a time, buffering the parent ids in a list, closing that response, then fetching the child records for those in memory parent ids. That'd be a good pattern to use here too to avoid potential TimeoutErrors.

Comment on lines +43 to +66
async def snapshot_entity(
entity_cls: type[AshbyEntity],
http: HTTPSession,
log: Logger,
) -> AsyncGenerator[AshbyEntity, None]:
url = f"{API_BASE_URL}/{entity_cls.path}"

_, response = await http.request_stream(
log, url, method="POST", json={**entity_cls.extra_body}
)
processor = IncrementalJsonProcessor(
response(), "results.item", entity_cls, remainder_cls=ResponseMeta
)

async for item in processor:
yield item

meta = processor.get_remainder()
if not meta.success:
log.error(
"Ashby API error during snapshot",
extra={"entity": entity_cls.name, "errors": meta.errors},
)
raise RuntimeError(f"Ashby API error for {entity_cls.name}: {meta.errors}")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both here and in snapshot_child_entity, do we need to paginate through results using meta.nextCursor like we do in fetch_entity? The API docs' interviewEvent.list page seems to suggest pagination is necessary. It feels like the "paginate through all pages" logic could be abstracted to a helper and re-used across all three functions, although that's not always worth the extra effort if the code's fairly simple.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants