Skip to content

Commit 0c4dbe0

Browse files
authored
Merge pull request #584 from latchbio/aidan/better-retries-in-tinyurl
fix: tinyurl needs better backoff scheme
2 parents 7474c9a + 9974623 commit 0c4dbe0

5 files changed

Lines changed: 87 additions & 31 deletions

File tree

CHANGELOG.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,25 @@ Types of changes
1616

1717
# Latch SDK Changelog
1818

19+
## 2.69.0 - 2026-03-02
20+
21+
### Added
22+
23+
* `Execution.abort()` method to abort a running execution
24+
25+
### Fixed
26+
27+
* Add to retryable statuses in `tinyrequests`
28+
1929
## 2.68.4 - 2026-02-25
2030

21-
## Fixed
31+
### Fixed
2232

2333
* Fix thread-safety issue in GQL client causing `TransportAlreadyConnected` errors under concurrent usage
2434

2535
## 2.68.3 - 2026-02-09
2636

27-
## Fixed
37+
### Fixed
2838

2939
* Add guard to directory and file type transformer
3040

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ include = ["src/**/*.py", "src/**/py.typed", "src/latch_cli/services/init/*"]
1212

1313
[project]
1414
name = "latch"
15-
version = "2.68.4"
15+
version = "2.69.0"
1616
description = "The Latch SDK"
1717
authors = [{ name = "Kenny Workman", email = "kenny@latch.bio" }]
1818
maintainers = [{ name = "Ayush Kamat", email = "ayush@latch.bio" }]

src/latch/functions/secrets.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
from urllib.parse import urljoin
22

3-
from latch_sdk_config.latch import NUCLEUS_URL
4-
53
from latch.utils import current_workspace
64
from latch_cli.tinyrequests import post
75
from latch_cli.utils import get_auth_header
6+
from latch_sdk_config.latch import NUCLEUS_URL
87

98

109
def get_secret(secret_name: str):
@@ -24,10 +23,7 @@ def get_secret(secret_name: str):
2423

2524
resp = post(
2625
url=urljoin(NUCLEUS_URL, "/secrets/get-new"),
27-
json={
28-
"name": secret_name,
29-
"ws_id": current_workspace(),
30-
},
26+
json={"name": secret_name, "ws_id": current_workspace()},
3127
headers={"Authorization": get_auth_header()},
3228
)
3329

src/latch_cli/services/launch/launch_v2.py

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,43 @@ async def wait(self) -> Union[CompletedExecution, None]:
201201

202202
return None
203203

204+
def abort(self) -> bool:
205+
if self.status in {
206+
"SUCCEEDED",
207+
"FAILED",
208+
"ABORTED",
209+
"SKIPPED",
210+
"ABORTING",
211+
"CANCELLED",
212+
}:
213+
return False
214+
215+
response = tinyrequests.post(
216+
config.api.execution.abort,
217+
headers={"Authorization": get_auth_header()},
218+
json={"execution_id": self.id},
219+
)
220+
221+
try:
222+
response_data = response.json()
223+
except JSONDecodeError as e:
224+
raise RuntimeError(
225+
f"Could not parse abort response as JSON: ({response.status_code}) {response}"
226+
) from e
227+
228+
if response.status_code != 200:
229+
error = response_data.get("error", response_data)
230+
raise RuntimeError(
231+
f"Execution abort failed (HTTP {response.status_code}): {error}"
232+
)
233+
234+
if response_data.get("success") is False:
235+
error = response_data.get("error", response_data)
236+
raise RuntimeError(f"Execution abort failed: {error}")
237+
238+
self.status = "ABORTING"
239+
return True
240+
204241

205242
def launch_from_launch_plan(
206243
*, wf_name: str, lp_name: str, version: Optional[str] = None
@@ -291,7 +328,9 @@ def launch_from_launch_plan(
291328

292329
raw_python_outputs = meta.get("python_outputs")
293330
if raw_python_outputs is None:
294-
print("No python outputs found. If your workflow has outputs, re-register workflow with latch version >= 2.65.1 in workflow environment to access outputs in Execution object.")
331+
print(
332+
"No python outputs found. If your workflow has outputs, re-register workflow with latch version >= 2.65.1 in workflow environment to access outputs in Execution object."
333+
)
295334
break
296335

297336
try:
@@ -311,7 +350,11 @@ def launch_from_launch_plan(
311350

312351

313352
def launch(
314-
*, wf_name: str, params: dict[str, Any], version: Optional[str] = None, best_effort: bool = True
353+
*,
354+
wf_name: str,
355+
params: dict[str, Any],
356+
version: Optional[str] = None,
357+
best_effort: bool = True,
315358
) -> Execution:
316359
"""Create an execution of workflow `wf_name` with version `version` and parameters `params`.
317360
@@ -329,7 +372,9 @@ def launch(
329372
Execution ID of the launched workflow.
330373
"""
331374
target_account_id = current_workspace()
332-
wf_id, interface, defaults = get_workflow_interface(target_account_id, wf_name, version)
375+
wf_id, interface, defaults = get_workflow_interface(
376+
target_account_id, wf_name, version
377+
)
333378

334379
flyte_interface_types: dict[str, Variable] = VariableMap.from_flyte_idl(
335380
gpjson.ParseDict(interface, _interface_pb2.VariableMap())
@@ -348,13 +393,17 @@ def launch(
348393
raw_python_outputs = meta.get("python_outputs")
349394

350395
if raw_python_outputs is None:
351-
print("No python outputs found. If your workflow has outputs, re-register workflow with latch version >= 2.65.1 in workflow environment to access outputs in Execution object.")
396+
print(
397+
"No python outputs found. If your workflow has outputs, re-register workflow with latch version >= 2.65.1 in workflow environment to access outputs in Execution object."
398+
)
352399
break
353400

354401
try:
355402
python_outputs = dill.loads(base64.b64decode(raw_python_outputs)) # noqa: S301
356403
except dill.UnpicklingError as e:
357-
raise RuntimeError("Failed to decode the workflow outputs. Ensure your python version matches the version in the workflow environment.") from e
404+
raise RuntimeError(
405+
"Failed to decode the workflow outputs. Ensure your python version matches the version in the workflow environment."
406+
) from e
358407

359408
break
360409

@@ -364,8 +413,7 @@ def launch(
364413
params_for_launch: dict[str, Any] = params
365414
if best_effort:
366415
fixed_literals = convert_inputs_to_literals(
367-
params=params,
368-
flyte_interface_types=flyte_interface_types,
416+
params=params, flyte_interface_types=flyte_interface_types
369417
)
370418
defaults = defaults["parameters"]
371419

@@ -407,7 +455,9 @@ def _is_optional_none(var_type: dict[str, Any]) -> bool:
407455
params_for_launch = params_json
408456
else:
409457
if raw_python_interface_with_defaults is None:
410-
raise RuntimeError("Missing python interface in workflow metadata. Try using with best_effort=True.")
458+
raise RuntimeError(
459+
"Missing python interface in workflow metadata. Try using with best_effort=True."
460+
)
411461

412462
python_interface_with_defaults: Union[dict[str, tuple[type, Any]], None] = None
413463
try:
@@ -441,7 +491,9 @@ def _is_optional_none(var_type: dict[str, Any]) -> bool:
441491
ctx,
442492
incoming_values=params,
443493
flyte_interface_types=flyte_interface_types,
444-
native_types={k: v[0] for k, v in python_interface_with_defaults.items()},
494+
native_types={
495+
k: v[0] for k, v in python_interface_with_defaults.items()
496+
},
445497
)
446498
except TypeTransformerFailedError as e:
447499
if "is not an instance of" in str(e):
@@ -450,14 +502,11 @@ def _is_optional_none(var_type: dict[str, Any]) -> bool:
450502
) from e
451503
raise
452504

453-
params_for_launch = {k: gpjson.MessageToDict(v.to_flyte_idl()) for k, v in fixed_literals.items()}
505+
params_for_launch = {
506+
k: gpjson.MessageToDict(v.to_flyte_idl()) for k, v in fixed_literals.items()
507+
}
454508

455-
return launch_workflow(
456-
target_account_id,
457-
wf_id,
458-
params_for_launch,
459-
python_outputs,
460-
)
509+
return launch_workflow(target_account_id, wf_id, params_for_launch, python_outputs)
461510

462511

463512
def launch_workflow(

src/latch_cli/tinyrequests.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ def _req(
111111
return TinyResponse(resp, url, stream=stream)
112112

113113

114+
_retryable_status_codes = {429, 500, 502, 503, 504}
115+
116+
114117
def request(
115118
method: str,
116119
url: str,
@@ -121,8 +124,8 @@ def request(
121124
stream: bool = False,
122125
num_retries: int = 3,
123126
) -> TinyResponse:
124-
"""Send HTTP request. Retry on 500s or ConnectionErrors.
125-
Implements exponential backoff between retries.
127+
"""Send HTTP request. Retry on transient errors (429, 5xx, ConnectionError)
128+
with exponential backoff.
126129
"""
127130
err = None
128131
res = None
@@ -135,15 +138,13 @@ def request(
135138
res = _req(
136139
method, url, headers=headers, data=data, json=json, stream=stream
137140
)
138-
if res.status_code < 500:
141+
if res.status_code not in _retryable_status_codes:
139142
return res
140143
except ConnectionError as e:
141144
err = e
142145

143146
if attempt < num_retries:
144-
# todo(rahul): tune the sleep interval based on the startup time of the server
145-
# todo(rahul): change sleep interval based on which service we are calling
146-
time.sleep(2**attempt * 5)
147+
time.sleep(2**attempt)
147148

148149
if res is None:
149150
raise err

0 commit comments

Comments
 (0)