Skip to content

Commit ed10474

Browse files
authored
fix: honoring read_timeout at the cosmos client level (#44472)
* fix: honoring read_timeout at the cosmos client level * fix - addressing comments * fix - addressing comments * fix - fixing typo * fix - fixing bug where kwargs were not getting passed corectly
1 parent 8784d03 commit ed10474

14 files changed

Lines changed: 401 additions & 33 deletions

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#### Bugs Fixed
1414
* Fixed bug where a compound session token (containing multiple partition tokens) was sent for single-partition feed range queries. See [PR 44484](https://github.com/Azure/azure-sdk-for-python/pull/44484)
1515
* Fixed regression where `user_agent_overwrite` kwarg was not cleaned up properly, causing `TypeError` crash on sync client construction. See [PR 45653](https://github.com/Azure/azure-sdk-for-python/pull/45653)
16+
* Fixed bug where client-level `read_timeout` configuration was not being automatically applied to all queries. See [PR 44472](https://github.com/Azure/azure-sdk-for-python/pull/44472)
1617

1718
#### Other Changes
1819
* Enhanced error logging by attaching endpoint information to exceptions during database account retrieval. See [PR 44484](https://github.com/Azure/azure-sdk-for-python/pull/44484)

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
115115
for key, value in _COMMON_OPTIONS.items():
116116
if key in kwargs:
117117
options[value] = kwargs.pop(key)
118-
if 'read_timeout' in kwargs:
119-
options['read_timeout'] = kwargs['read_timeout']
120-
if 'timeout' in kwargs:
121-
options['timeout'] = kwargs['timeout']
118+
if Constants.Kwargs.READ_TIMEOUT in kwargs:
119+
options[Constants.Kwargs.READ_TIMEOUT] = kwargs[Constants.Kwargs.READ_TIMEOUT]
120+
if Constants.Kwargs.TIMEOUT in kwargs:
121+
options[Constants.Kwargs.TIMEOUT] = kwargs[Constants.Kwargs.TIMEOUT]
122122

123123

124124
options[Constants.OperationStartTime] = time.time()

sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ class Kwargs:
111111
EXCLUDED_LOCATIONS: Literal["excludedLocations"] = "excludedLocations"
112112
AVAILABILITY_STRATEGY: Literal["availabilityStrategy"] = "availabilityStrategy"
113113
"""Availability strategy config. Used either at client level or request level"""
114+
READ_TIMEOUT: Literal["read_timeout"] = "read_timeout"
115+
"""Socket read timeout in seconds. Used either at client level or request level."""
116+
TIMEOUT: Literal["timeout"] = "timeout"
117+
"""Absolute timeout in seconds for the combined HTTP request and response processing."""
114118

115119
class UserAgentFeatureFlags(IntEnum):
116120
"""

sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def is_circuit_breaker_applicable(self, request: RequestObject) -> bool:
5353
return self.global_partition_endpoint_manager_core.is_circuit_breaker_applicable(request)
5454

5555

56-
def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionKeyRangeWrapper]:
56+
def create_pk_range_wrapper(self, request: RequestObject, **kwargs) -> Optional[PartitionKeyRangeWrapper]:
5757
if HttpHeaders.IntendedCollectionRID in request.headers:
5858
container_rid = request.headers[HttpHeaders.IntendedCollectionRID]
5959
else:
@@ -75,12 +75,12 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK
7575
# get the partition key range for the given partition key
7676
epk_range = [partition_key._get_epk_range_for_partition_key(partition_key_value)] # pylint: disable=protected-access
7777
partition_ranges = (self.client._routing_map_provider # pylint: disable=protected-access
78-
.get_overlapping_ranges(container_link, epk_range, options))
78+
.get_overlapping_ranges(container_link, epk_range, options, **kwargs))
7979
partition_range = Range.PartitionKeyRangeToRange(partition_ranges[0])
8080
elif HttpHeaders.PartitionKeyRangeID in request.headers:
8181
pk_range_id = request.headers[HttpHeaders.PartitionKeyRangeID]
8282
epk_range =(self.client._routing_map_provider # pylint: disable=protected-access
83-
.get_range_by_partition_key_range_id(container_link, pk_range_id, options))
83+
.get_range_by_partition_key_range_id(container_link, pk_range_id, options, **kwargs))
8484
if not epk_range:
8585
self.global_partition_endpoint_manager_core.log_warn_or_debug(
8686
"Illegal state: partition key range cache not initialized correctly. "

sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin
7979
pk_range_wrapper = None
8080
if args and (global_endpoint_manager.is_per_partition_automatic_failover_applicable(args[0]) or
8181
global_endpoint_manager.is_circuit_breaker_applicable(args[0])):
82-
pk_range_wrapper = global_endpoint_manager.create_pk_range_wrapper(args[0])
82+
pk_range_wrapper = global_endpoint_manager.create_pk_range_wrapper(args[0], **kwargs)
8383
# instantiate all retry policies here to be applied for each request execution
8484
endpointDiscovery_retry_policy = _endpoint_discovery_retry_policy.EndpointDiscoveryRetryPolicy(
8585
client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,10 @@ async def _get_properties_with_options(self, options: Optional[dict[str, Any]] =
104104
kwargs['excluded_locations'] = options['excludedLocations']
105105
if Constants.OperationStartTime in options:
106106
kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime]
107-
if "timeout" in options:
108-
kwargs['timeout'] = options['timeout']
107+
if Constants.Kwargs.TIMEOUT in options:
108+
kwargs[Constants.Kwargs.TIMEOUT] = options[Constants.Kwargs.TIMEOUT]
109+
if Constants.Kwargs.READ_TIMEOUT in options:
110+
kwargs[Constants.Kwargs.READ_TIMEOUT] = options[Constants.Kwargs.READ_TIMEOUT]
109111

110112
return await self._get_properties(**kwargs)
111113

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ def _build_connection_policy(kwargs: dict[str, Any]) -> ConnectionPolicy:
8282
policy.RequestTimeout = kwargs.pop('request_timeout') / 1000.0
8383
else:
8484
policy.RequestTimeout = kwargs.pop('connection_timeout', policy.RequestTimeout)
85+
86+
policy.ReadTimeout = kwargs.pop(Constants.Kwargs.READ_TIMEOUT, policy.ReadTimeout)
87+
8588
policy.ConnectionMode = kwargs.pop('connection_mode', policy.ConnectionMode)
8689
policy.ProxyConfiguration = kwargs.pop('proxy_config', policy.ProxyConfiguration)
8790
policy.EnableEndpointDiscovery = kwargs.pop('enable_endpoint_discovery', policy.EnableEndpointDiscovery)
@@ -147,6 +150,9 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword
147150
More on consistency levels and possible values: https://aka.ms/cosmos-consistency-levels
148151
:keyword int timeout: An absolute timeout in seconds, for the combined HTTP request and response processing.
149152
:keyword int connection_timeout: The HTTP request timeout in seconds.
153+
:keyword float read_timeout: The socket read timeout in seconds. This is the time the client will wait for a
154+
response from the server after a connection has been established. If not specified, the default value of
155+
65 seconds is used. This can be overridden at the request level.
150156
:keyword str connection_mode: The connection mode for the client - currently only supports 'Gateway'.
151157
:keyword proxy_config: Connection proxy configuration.
152158
:paramtype proxy_config: ~azure.cosmos.ProxyConfiguration

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(self, client: "CosmosClientConnection"):
4949
self.global_partition_endpoint_manager_core = (
5050
_GlobalPartitionEndpointManagerForCircuitBreakerCore(client, self.location_cache))
5151

52-
async def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionKeyRangeWrapper]:
52+
async def create_pk_range_wrapper(self, request: RequestObject, **kwargs) -> Optional[PartitionKeyRangeWrapper]:
5353
if HttpHeaders.IntendedCollectionRID in request.headers:
5454
container_rid = request.headers[HttpHeaders.IntendedCollectionRID]
5555
else:
@@ -71,12 +71,12 @@ async def create_pk_range_wrapper(self, request: RequestObject) -> Optional[Part
7171
# get the partition key range for the given partition key
7272
epk_range = [partition_key._get_epk_range_for_partition_key(partition_key_value)]
7373
partition_ranges = await (self.client._routing_map_provider
74-
.get_overlapping_ranges(container_link, epk_range, options))
74+
.get_overlapping_ranges(container_link, epk_range, options, **kwargs))
7575
partition_range = Range.PartitionKeyRangeToRange(partition_ranges[0])
7676
elif HttpHeaders.PartitionKeyRangeID in request.headers:
7777
pk_range_id = request.headers[HttpHeaders.PartitionKeyRangeID]
7878
epk_range = await (self.client._routing_map_provider
79-
.get_range_by_partition_key_range_id(container_link, pk_range_id, options))
79+
.get_range_by_partition_key_range_id(container_link, pk_range_id, options, **kwargs))
8080
if not epk_range:
8181
self.global_partition_endpoint_manager_core.log_warn_or_debug(
8282
"Illegal state: partition key range cache not initialized correctly. "

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
8282
pk_range_wrapper = None
8383
if args and (global_endpoint_manager.is_per_partition_automatic_failover_applicable(args[0]) or
8484
global_endpoint_manager.is_circuit_breaker_applicable(args[0])):
85-
pk_range_wrapper = await global_endpoint_manager.create_pk_range_wrapper(args[0])
85+
pk_range_wrapper = await global_endpoint_manager.create_pk_range_wrapper(args[0], **kwargs)
8686
# instantiate all retry policies here to be applied for each request execution
8787
endpointDiscovery_retry_policy = _endpoint_discovery_retry_policy.EndpointDiscoveryRetryPolicy(
8888
client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args

sdk/cosmos/azure-cosmos/azure/cosmos/container.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ def _get_properties_with_options(self, options: Optional[dict[str, Any]] = None)
105105
kwargs['excluded_locations'] = options['excludedLocations']
106106
if Constants.OperationStartTime in options:
107107
kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime]
108-
if "timeout" in options:
109-
kwargs['timeout'] = options['timeout']
108+
if Constants.Kwargs.TIMEOUT in options:
109+
kwargs[Constants.Kwargs.TIMEOUT] = options[Constants.Kwargs.TIMEOUT]
110+
if Constants.Kwargs.READ_TIMEOUT in options:
111+
kwargs[Constants.Kwargs.READ_TIMEOUT] = options[Constants.Kwargs.READ_TIMEOUT]
110112
return self._get_properties(**kwargs)
111113

112114
def _get_properties(self, **kwargs: Any) -> dict[str, Any]:

0 commit comments

Comments
 (0)