Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed bug where client-level `read_timeout` configuration was not being automatically applied to all queries. See [PR 44472](https://github.com/Azure/azure-sdk-for-python/pull/44472)

#### Other Changes

Expand Down
8 changes: 4 additions & 4 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
for key, value in _COMMON_OPTIONS.items():
if key in kwargs:
options[value] = kwargs.pop(key)
if 'read_timeout' in kwargs:
options['read_timeout'] = kwargs['read_timeout']
if 'timeout' in kwargs:
options['timeout'] = kwargs['timeout']
if Constants.Kwargs.READ_TIMEOUT in kwargs:
options[Constants.Kwargs.READ_TIMEOUT] = kwargs[Constants.Kwargs.READ_TIMEOUT]
if Constants.Kwargs.TIMEOUT in kwargs:
options[Constants.Kwargs.TIMEOUT] = kwargs[Constants.Kwargs.TIMEOUT]


options[Constants.OperationStartTime] = time.time()
Expand Down
4 changes: 4 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class Kwargs:
EXCLUDED_LOCATIONS: Literal["excludedLocations"] = "excludedLocations"
AVAILABILITY_STRATEGY: Literal["availabilityStrategy"] = "availabilityStrategy"
"""Availability strategy config. Used either at client level or request level"""
READ_TIMEOUT: Literal["read_timeout"] = "read_timeout"
"""Socket read timeout in seconds. Used either at client level or request level."""
TIMEOUT: Literal["timeout"] = "timeout"
"""Absolute timeout in seconds for the combined HTTP request and response processing."""

class UserAgentFeatureFlags(IntEnum):
"""
Expand Down
6 changes: 4 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ async def _get_properties_with_options(self, options: Optional[dict[str, Any]] =
kwargs['excluded_locations'] = options['excludedLocations']
if Constants.OperationStartTime in options:
kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime]
if "timeout" in options:
kwargs['timeout'] = options['timeout']
if Constants.Kwargs.TIMEOUT in options:
kwargs[Constants.Kwargs.TIMEOUT] = options[Constants.Kwargs.TIMEOUT]
if Constants.Kwargs.READ_TIMEOUT in options:
kwargs[Constants.Kwargs.READ_TIMEOUT] = options[Constants.Kwargs.READ_TIMEOUT]

return await self._get_properties(**kwargs)

Expand Down
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def _build_connection_policy(kwargs: dict[str, Any]) -> ConnectionPolicy:
policy.RequestTimeout = kwargs.pop('request_timeout') / 1000.0
else:
policy.RequestTimeout = kwargs.pop('connection_timeout', policy.RequestTimeout)

policy.ReadTimeout = kwargs.pop(Constants.Kwargs.READ_TIMEOUT, policy.ReadTimeout)

policy.ConnectionMode = kwargs.pop('connection_mode', policy.ConnectionMode)
policy.ProxyConfiguration = kwargs.pop('proxy_config', policy.ProxyConfiguration)
policy.EnableEndpointDiscovery = kwargs.pop('enable_endpoint_discovery', policy.EnableEndpointDiscovery)
Expand Down Expand Up @@ -147,6 +150,9 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword
More on consistency levels and possible values: https://aka.ms/cosmos-consistency-levels
:keyword int timeout: An absolute timeout in seconds, for the combined HTTP request and response processing.
:keyword int connection_timeout: The HTTP request timeout in seconds.
:keyword float read_timeout: The socket read timeout in seconds. This is the time the client will wait for a
response from the server after a connection has been established. If not specified, the default value of
65 seconds is used. This can be overridden at the request level.
:keyword str connection_mode: The connection mode for the client - currently only supports 'Gateway'.
:keyword proxy_config: Connection proxy configuration.
:paramtype proxy_config: ~azure.cosmos.ProxyConfiguration
Expand Down
6 changes: 4 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ def _get_properties_with_options(self, options: Optional[dict[str, Any]] = None)
kwargs['excluded_locations'] = options['excludedLocations']
if Constants.OperationStartTime in options:
kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime]
if "timeout" in options:
kwargs['timeout'] = options['timeout']
if Constants.Kwargs.TIMEOUT in options:
kwargs[Constants.Kwargs.TIMEOUT] = options[Constants.Kwargs.TIMEOUT]
if Constants.Kwargs.READ_TIMEOUT in options:
kwargs[Constants.Kwargs.READ_TIMEOUT] = options[Constants.Kwargs.READ_TIMEOUT]
return self._get_properties(**kwargs)

def _get_properties(self, **kwargs: Any) -> dict[str, Any]:
Expand Down
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ def _build_connection_policy(kwargs: dict[str, Any]) -> ConnectionPolicy:
policy.RequestTimeout = kwargs.pop('request_timeout') / 1000.0
else:
policy.RequestTimeout = kwargs.pop('connection_timeout', policy.RequestTimeout)

policy.ReadTimeout = kwargs.pop(Constants.Kwargs.READ_TIMEOUT, policy.ReadTimeout)

policy.ConnectionMode = kwargs.pop('connection_mode', policy.ConnectionMode)
policy.ProxyConfiguration = kwargs.pop('proxy_config', policy.ProxyConfiguration)
policy.EnableEndpointDiscovery = kwargs.pop('enable_endpoint_discovery', policy.EnableEndpointDiscovery)
Expand Down Expand Up @@ -169,6 +172,9 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword
More on consistency levels and possible values: https://aka.ms/cosmos-consistency-levels
:keyword int timeout: An absolute timeout in seconds, for the combined HTTP request and response processing.
:keyword int connection_timeout: The HTTP request timeout in seconds.
:keyword float read_timeout: The socket read timeout in seconds. This is the time the client will wait for a
Comment thread
tvaron3 marked this conversation as resolved.
response from the server after a connection has been established. If not specified, the default value of
65 seconds is used. This can be overridden at the request level.
:keyword str connection_mode: The connection mode for the client - currently only supports 'Gateway'.
:keyword proxy_config: Connection proxy configuration.
:paramtype proxy_config: ~azure.cosmos.ProxyConfiguration
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class ConnectionPolicy: # pylint: disable=too-many-instance-attributes

__defaultRequestTimeout: int = 5 # seconds
__defaultDBAConnectionTimeout: int = 3 # seconds
__defaultReadTimeout: int = 65 # seconds
__defaultReadTimeout: float = 65 # seconds
__defaultRecoveryReadTimeout: int = 6 # seconds
__defaultDBAReadTimeout: int = 3 # seconds
__defaultMaxBackoff: int = 1 # seconds
Expand All @@ -352,7 +352,7 @@ def __init__(self) -> None:
# RequestTimeout is the connection timeout for all operations except database account
self.RequestTimeout: int = self.__defaultRequestTimeout
self.DBAConnectionTimeout: int = self.__defaultDBAConnectionTimeout
self.ReadTimeout: int = self.__defaultReadTimeout
self.ReadTimeout: float = self.__defaultReadTimeout
# The request timeout for a request trying to recover a unavailable partition
# This is only applicable if circuit breaker is enabled
self.RecoveryReadTimeout: int = self.__defaultRecoveryReadTimeout
Expand Down
211 changes: 197 additions & 14 deletions sdk/cosmos/azure-cosmos/tests/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,78 @@ def test_timeout_for_point_operation(self):
)
self.assertEqual(result['id'], 'test_item_1')

def test_request_level_timeout_overrides_client_read_timeout(self):
"""Test that request-level read_timeout overrides client-level timeout for reads and writes"""

# Create container with normal client
normal_container = self.databaseForTest.create_container(
id='request_timeout_container_' + str(uuid.uuid4()),
partition_key=PartitionKey(path="/pk")
)

try:
# Create test items
for i in range(5):
test_item = {
'id': f'test_item_{i}',
'pk': f'partition{i}',
'data': f'test_data_{i}'
}
normal_container.create_item(test_item)

# Create client with very short read timeout at client level
with cosmos_client.CosmosClient(
url=self.host,
credential=self.masterKey,
read_timeout=0.000000000001 # Very short timeout that would normally fail
) as timeout_client:
database = timeout_client.get_database_client(self.databaseForTest.id)
container = database.get_container_client(normal_container.id)

# Test 1: Point read with request-level timeout should succeed (overrides client timeout)
result = container.read_item(
item='test_item_0',
partition_key='partition0',
read_timeout=30 # Higher timeout at request level
)
self.assertEqual(result['id'], 'test_item_0')

# Test 2: Query with request-level timeout should succeed (overrides client timeout)
results = list(container.query_items(
query="SELECT * FROM c WHERE c.pk = @pk",
parameters=[{"name": "@pk", "value": "partition1"}],
read_timeout=30 # Higher timeout at request level
))
self.assertEqual(len(results), 1)
self.assertEqual(results[0]['id'], 'test_item_1')

# Test 3: Upsert (write) with request-level timeout should succeed
upsert_item = {
'id': 'test_item_0',
'pk': 'partition0',
'data': 'updated_data'
}
result = container.upsert_item(
body=upsert_item,
read_timeout=30 # Higher timeout at request level
)
self.assertEqual(result['data'], 'updated_data')

# Test 4: Create (write) with request-level timeout should succeed
new_item = {
'id': 'new_test_item',
'pk': 'new_partition',
'data': 'new_data'
}
result = container.create_item(
body=new_item,
read_timeout=30 # Higher timeout at request level
)
self.assertEqual(result['id'], 'new_test_item')

finally:
self.databaseForTest.delete_container(normal_container.id)

def test_point_operation_read_timeout(self):
"""Test that point operations respect client provided read timeout"""

Expand All @@ -1475,21 +1547,132 @@ def test_point_operation_read_timeout(self):
partition_key=PartitionKey(path="/pk")
)

# Create a test item
test_item = {
'id': 'test_item_1',
'pk': 'partition1',
'data': 'test_data'
}
container.create_item(test_item)
try:
container.read_item(
item='test_item_1',
partition_key='partition1',
read_timeout=0.000003
)
except Exception as e:
print(f"Exception is {e}")
# Create a test item
test_item = {
'id': 'test_item_1',
'pk': 'partition1',
'data': 'test_data'
}
container.create_item(test_item)

# Point read with extremely short timeout should time out
with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)):
container.read_item(
item='test_item_1',
partition_key='partition1',
read_timeout=0.000003
)
finally:
self.databaseForTest.delete_container(container.id)

def test_client_level_read_timeout_on_queries_and_point_operations(self):
"""Test that queries and point operations respect client-level read timeout"""

# Create container with normal client
normal_container = self.databaseForTest.create_container(
id='read_timeout_container_' + str(uuid.uuid4()),
partition_key=PartitionKey(path="/pk")
)

try:
# Create test items
for i in range(5):
test_item = {
'id': f'test_item_{i}',
'pk': f'partition{i}',
'data': f'test_data_{i}'
}
normal_container.create_item(test_item)

# Create client with very short read timeout
with cosmos_client.CosmosClient(
url=self.host,
credential=self.masterKey,
read_timeout=0.000000000001 # Very short timeout to force failure
) as timeout_client:
database = timeout_client.get_database_client(self.databaseForTest.id)
container = database.get_container_client(normal_container.id)

# Test 1: Point read operation should time out
with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)):
container.read_item(
item='test_item_0',
partition_key='partition0'
)

# Test 2: Query operation should time out
with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)):
list(container.query_items(
query="SELECT * FROM c WHERE c.pk = @pk",
parameters=[{"name": "@pk", "value": "partition0"}]
))
finally:
self.databaseForTest.delete_container(normal_container.id)

def test_policy_level_read_timeout_on_queries_and_point_operations(self):
Comment thread
dibahlfi marked this conversation as resolved.
"""Test that queries and point operations respect connection-policy level read timeout"""

# Create container with normal client
normal_container = self.databaseForTest.create_container(
id='read_timeout_container_' + str(uuid.uuid4()),
partition_key=PartitionKey(path="/pk")
)

try:
# Create test items
for i in range(5):
test_item = {
'id': f'test_item_{i}',
'pk': f'partition{i}',
'data': f'test_data_{i}'
}
normal_container.create_item(test_item)

# Create client with very short read timeout via connection policy
connection_policy = documents.ConnectionPolicy()
connection_policy.ReadTimeout = 0.000000000001
with cosmos_client.CosmosClient(
url=self.host,
credential=self.masterKey,
connection_policy=connection_policy
) as timeout_client:
database = timeout_client.get_database_client(self.databaseForTest.id)
container = database.get_container_client(normal_container.id)

# Test 1: Point read operation should time out
with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)):
container.read_item(
item='test_item_0',
partition_key='partition0'
)

# Test 2: Query operation should time out
with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)):
list(container.query_items(
query="SELECT * FROM c WHERE c.pk = @pk",
parameters=[{"name": "@pk", "value": "partition0"}]
))

# Test 3: Point read with request-level override should succeed
# Request-level read_timeout takes precedence over the short policy-level timeout
result = container.read_item(
item='test_item_0',
partition_key='partition0',
read_timeout=30
)
self.assertEqual(result['id'], 'test_item_0')

# Test 4: Query with request-level override should succeed
results = list(container.query_items(
query="SELECT * FROM c WHERE c.pk = @pk",
parameters=[{"name": "@pk", "value": "partition1"}],
read_timeout=30
))
self.assertEqual(len(results), 1)
self.assertEqual(results[0]['id'], 'test_item_1')
finally:
self.databaseForTest.delete_container(normal_container.id)

# TODO: for read timeouts azure-core returns a ServiceResponseError, needs to be fixed in azure-core and then this test can be enabled
@unittest.skip
Expand Down
Loading
Loading