Conversation
* Consolidate fetch logic into generalized funcs * Move URL munging and config to models * Refactor models to better represent API entity hierarchy
* Factor json dep out * Add model to handle entity envelope * Move more url munging out of api module
* Add generic api functions for incremental models with an exception for Stories since we fetch those per task * Make incremental models aware of their event_type * Fetch fresh copy of entity whenever we see an event action other than "deleted"
Alex-Bair
left a comment
There was a problem hiding this comment.
Thanks for the PR @enlore! I apologize for the delay reviewing it. It looks pretty good! I didn't realize the Asana API was so complex, especially around sync token handling.
I have a handful of questions, comments, and suggested changes. Please let me know if I was unclear anywhere or you have any questions.
Like Nicolas did with source-posthog, I'm happy to do the finishing touches (e.g. add snapshot tests that use our encrypted dev Asana credentials) once my comments are addressed. Thanks again!
|
|
||
| from source_asana_native import Connector | ||
|
|
||
| if __name__ == "__main__": |
There was a problem hiding this comment.
nit: the if __name__ == "__main__" clause isn't needed here in the __main__.py file since this file only serves as the connector's entry point.
| api_key: Annotated[ | ||
| str, | ||
| Field( | ||
| description="Personal Access Token for Asana.", | ||
| title="API Key", | ||
| json_schema_extra={"secret": True}, | ||
| ), | ||
| ] |
There was a problem hiding this comment.
Asana supports multiple types of authentication, not just personal access tokens. We'll want to eventually support both in source-asana-native, and it's easier to add a new authentication method if we nest all of them under a credentials field in the EndpointConfig. Instead of leaving api_key as it is now, can it instead be nested inside a credentials field so it's easier to add OAuth support later? Leveraging the AccessToken class from estuary_cdk.flow can help here too:
| api_key: Annotated[ | |
| str, | |
| Field( | |
| description="Personal Access Token for Asana.", | |
| title="API Key", | |
| json_schema_extra={"secret": True}, | |
| ), | |
| ] | |
| credentials: AccessToken = Field( | |
| discriminator="credentials_title", | |
| title="Authentication", | |
| ) |
| json_schema_extra={"advanced": True}, | ||
| ), | ||
| ] | ||
|
|
There was a problem hiding this comment.
Per the Asana docs, all API requests use the same base URL, so base_url does not need to be a user facing setting. It can just be a constant in api.py.
That simplification should also allow the removal of the advanced field from the EndpointConfig.
| state, | ||
| task, | ||
| fetch_snapshot=fetch_fn, | ||
| tombstone=TOMBSTONE, |
There was a problem hiding this comment.
For snapshot bindings, it's sufficient to use BaseDocument(_meta=BaseDocument.Meta(op="d")) as the tombstone argument here instead of a subclass of BaseDocument. The tombstone argument is only used by the CDK here as part of the deletion inference mechanism described in the CDK's README, and I don't see many situations where it would be helpful to use a subclass of BaseDocument instead of BaseDocument itself as the tombstone.
| tombstone=TOMBSTONE, | |
| tombstone=BaseDocument(_meta=BaseDocument.Meta(op="d")), |
Honestly, it makes sense to me for the CDK to fallback to using BaseDocument as a tombstone if one isn't provided in a common.open_binding call. I'll update the CDK to allow that so developing connectors is a smidge easier & consistent.
| resources.append( | ||
| common.Resource( | ||
| name=name, | ||
| key=["/gid"], |
There was a problem hiding this comment.
For the CDK's deletion inference for snapshot resources to work, the key needs to be the synthetic key ["_meta/row_id"].
| key=["/gid"], | |
| key=["/_meta/row_id"], |
This allows the CDK's deletion inference mechanism to work end-to-end through the Estuary platform. The CDK assigns each document an incrementing row_id on every snapshot cycle and emits tombstones for trailing positions when the count decreases. If the key is ["/gid"], those tombstones won't match any existing document and deletions won't propagate through to destination systems.
This is another thing I could simplify in the CDK since all snapshot bindings will use the same key. I'll make that update to the CDK too.
| common.open_binding( | ||
| binding, | ||
| binding_index, | ||
| state, | ||
| task, | ||
| fetch_page=fp, | ||
| fetch_changes=fc, | ||
| tombstone=TOMBSTONE, | ||
| ) |
There was a problem hiding this comment.
tombstone doesn't need provided to open_binding when there's no fetch_snapshot argument.
| common.open_binding( | |
| binding, | |
| binding_index, | |
| state, | |
| task, | |
| fetch_page=fp, | |
| fetch_changes=fc, | |
| tombstone=TOMBSTONE, | |
| ) | |
| common.open_binding( | |
| binding, | |
| binding_index, | |
| state, | |
| task, | |
| fetch_page=fp, | |
| fetch_changes=fc, | |
| ) |
| projects = await _collect_projects(http, config, log) | ||
|
|
||
| for model in INCREMENTAL_MODELS: | ||
| backfill_fn = _get_backfill_fn(model) | ||
|
|
||
| fetch_page: dict[str, Any] = {} | ||
| fetch_changes: dict[str, Any] = {} | ||
| initial_inc: dict[str, ResourceState.Incremental] = {} | ||
| initial_backfill: dict[str, ResourceState.Backfill] = {} | ||
|
|
||
| for project in projects: | ||
| fetch_page[project.gid] = functools.partial( | ||
| backfill_fn, | ||
| http, | ||
| base_url, | ||
| project.gid, | ||
| ) | ||
| fetch_changes[project.gid] = functools.partial( | ||
| fetch_events, | ||
| model, | ||
| http, | ||
| base_url, | ||
| project.gid, | ||
| ) | ||
| initial_inc[project.gid] = ResourceState.Incremental(cursor=("",)) | ||
| initial_backfill[project.gid] = ResourceState.Backfill( | ||
| cutoff=("",), | ||
| next_page=None, | ||
| ) |
There was a problem hiding this comment.
Since the number of subtasks for each resource is dynamically determined by how many projects there are, we'll need to handle the case where a project is created after a capture has already been set up in Estuary. In that situation, the initial_state with the added project isn't used by the connector - it uses the state sent to it by the runtime instead. I talked about that challenge in this comment with Nicolas & pointed out how we've solved it elsewhere. Let me know if that comment or the examples are unclear & I can explain further.
| if e.code == 412: | ||
| new_token = SyncTokenResponse.model_validate_json(e.body).sync | ||
| yield (new_token,) | ||
| return |
There was a problem hiding this comment.
If the sync token expires and we fetch a fresh one, we'll miss all the incremental changes between when the connector last checked for changes and when we fetch that new sync token, right? That seems to be the case, and the binding should be backfilled if that happens since that's a possible way for the connector to miss data. The estuary-cdk has triggers that can be yielded for the exact purpose of automatically triggering a backfill:
| if e.code == 412: | |
| new_token = SyncTokenResponse.model_validate_json(e.body).sync | |
| yield (new_token,) | |
| return | |
| if e.code == 412: | |
| log.warning("triggering automatic backfill due to sync token expiration") | |
| yield Triggers.BACKFILL |
You can see an example of that trigger in action in source-hubspot-native here. It essentially causes the binding's state to be wiped to the resource's initial_state the next time the connector restarts.
| async for task in _fetch_paginated(tasks_url, Task, http, log): | ||
| url = Story.get_url(base_url, task.gid) | ||
| async for story in _fetch_paginated(url, Story, http, log): | ||
| yield story |
There was a problem hiding this comment.
Should the API calls to fetch stories be wrapped in a try/catch to handle Stories.tolerated_errors, similar to what's done in fetch_team_memberships and a handful of the other function above?
| async def fetch_stories( | ||
| http: HTTPSession, | ||
| config: EndpointConfig, | ||
| log: Logger, | ||
| ) -> AsyncGenerator[Story]: | ||
| base_url = config.advanced.base_url | ||
|
|
||
| for project in await _collect_projects(http, config, log): | ||
| tasks_url = Task.get_url(base_url, project.gid) | ||
| async for task in _fetch_paginated(tasks_url, Task, http, log): | ||
| url = Story.get_url(base_url, task.gid) | ||
| async for story in _fetch_paginated(url, Story, http, log): | ||
| yield story |
There was a problem hiding this comment.
The fetch_stories function is unused. If it's not needed anywhere, can it be removed?
|
Following up on the action items I took, a Using |
|
@Alex-Bair I think you should take this forward if possible. Ping me on slack if you want to discuss. |
Description:
MVP of an Asana source connector.
Workflow steps:
An Estuary Flow source connector for Asana.
Captured Resources
Not captured (potentially useful)
Setup
Prerequisites
brew install estuary/tap/flowctl)Installation
Install dependencies:
Configure credentials:
# Edit config.yaml with your Asana personal access token (gitignored)Development
Run tests
Test with flowctl
Asana Event API Sync Token Flow
--- config: theme: redux-dark --- graph TD subgraph CursorDict["Cursor Dict Structure"] direction LR CD["cursor_dict:<br/>{project_gid_1: sync_token_1,<br/> project_gid_2: sync_token_2,<br/> ...}"] CDNote["CDK manages each key as an<br/>independent concurrent subtask"] CD --- CDNote end subgraph Bootstrap["Bootstrap Phase"] direction TB B1["Fetch list of projects<br/>GET /projects"] B2["For each project_gid"] B3["GET /projects/{project_gid}/events<br/>(no sync token)"] B4["Asana returns 412<br/>Precondition Failed"] B5["Extract initial sync_token<br/>from 412 response body"] B6["Store in cursor dict:<br/>cursor_dict[project_gid] = sync_token"] B7["FetchPageFn runs concurrently:<br/>full paginated backfill of<br/>tasks for this project"] B1 --> B2 B2 --> B3 B3 --> B4 B4 --> B5 B5 --> B6 B6 --> B7 end subgraph Incremental["Incremental Phase"] direction TB I1["CDK spawns concurrent subtasks<br/>(one per project_gid in cursor dict)"] I2["GET /projects/{project_gid}/events<br/>?sync={token}"] I3{{"Asana returns 200?"}} I4["Receive events +<br/>has_more flag +<br/>new sync_token"] I5{"has_more?"} I6["Loop: call again<br/>with new token"] I7["Collect changed task GIDs<br/>from events, deduplicate"] I8{"Action type?"} I9["'deleted'<br/>Yield tombstone"] I10["'created' / 'changed' /<br/>'added' / 'removed'"] I11["Batch re-fetch full task docs<br/>GET /tasks/{gid}"] I12["Yield full documents"] I13["Yield new sync_token<br/>as updated cursor"] I1 --> I2 I2 --> I3 I3 -- "Yes (200)" --> I4 I4 --> I5 I5 -- "true" --> I6 I6 --> I2 I5 -- "false" --> I7 I7 --> I8 I8 -- "deleted" --> I9 I8 -- "created / changed /<br/>added / removed" --> I10 I10 --> I11 I11 --> I12 I9 --> I13 I12 --> I13 end subgraph Expiry["Token Expiry / Re-Backfill"] direction TB E1["Asana returns 412<br/>instead of 200"] E2["Extract new sync_token<br/>from 412 response body"] E3["Store new token in cursor dict:<br/>cursor_dict[project_gid] = new_token"] E4["Trigger re-backfill<br/>for that project scope"] E1 --> E2 E2 --> E3 E3 --> E4 end B7 --> I1 I3 -- "No (412)" --> E1 E4 --> I1 style CursorDict fill:#2d2d3d,stroke:#7c7ce0,color:#e0e0e0 style Bootstrap fill:#1a2e1a,stroke:#4caf50,color:#e0e0e0 style Incremental fill:#1a1a2e,stroke:#5c6bc0,color:#e0e0e0 style Expiry fill:#2e1a1a,stroke:#ef5350,color:#e0e0e0Documentation links affected:
N/A
Notes for reviewers:
TODO
Known Issues
flowctl preview. Possibly due to rate limit on free tier account.