Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@

#### Features Added
* Added ability to set `throughput_bucket` header at the client level and for all requests. See [PR 40340](https://github.com/Azure/azure-sdk-for-python/pull/40340).
* Added ability to use Filters from Logging module on Diagnostics Logging based on Http request/response related attributes. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897)
* Added ability to use Filters from Logging module on Diagnostics Logging based on Http request/response related attributes. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897).

#### Breaking Changes

#### Bugs Fixed
* Fixed how the environment variables in the sdk are parsed. See [PR 40303](https://github.com/Azure/azure-sdk-for-python/pull/40303).
* Fixed health check to check the first write region when it is not specified in the preferred regions. See [PR 40588](https://github.com/Azure/azure-sdk-for-python/pull/40588).
* Fixed bug where writes were being retried for 5xx status codes for patch and replace. See [PR 40672](https://github.com/Azure/azure-sdk-for-python/pull/40672).

#### Other Changes
* Optimized Diagnostics Logging by reducing time spent on logging. Logged Errors are more readable and formatted. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897)
* Optimized Diagnostics Logging by reducing time spent on logging. Logged Errors are more readable and formatted. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897).
* Health checks are now done concurrently and for all regions for async apis. See [PR 40588](https://github.com/Azure/azure-sdk-for-python/pull/40588).


Expand Down
5 changes: 4 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from . import _timeout_failover_retry_policy
from . import exceptions
from .documents import _OperationType
from .exceptions import CosmosHttpResponseError
from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes, ResourceType


Expand Down Expand Up @@ -337,11 +338,13 @@ def send(self, request):
self.sleep(retry_settings, request.context.transport)
continue
raise err
except CosmosHttpResponseError as err:
raise err
except AzureError as err:
retry_error = err
if _has_database_account_header(request.http_request.headers):
raise err
if self._is_method_retryable(retry_settings, request.http_request):
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
retry_active = self.increment(retry_settings, response=request, error=err)
if retry_active:
self.sleep(retry_settings, request.context.transport)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from .._retry_utility import (_configure_timeout, _has_read_retryable_headers,
_handle_service_response_retries, _handle_service_request_retries,
_has_database_account_header)
from ..exceptions import CosmosHttpResponseError
from ..http_constants import HttpHeaders, StatusCodes, SubStatusCodes


Expand Down Expand Up @@ -305,11 +306,13 @@ async def send(self, request):
except ImportError:
raise err # pylint: disable=raise-missing-from
raise err
except CosmosHttpResponseError as err:
raise err
except AzureError as err:
retry_error = err
if _has_database_account_header(request.http_request.headers):
raise err
if self._is_method_retryable(retry_settings, request.http_request):
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
retry_active = self.increment(retry_settings, response=request, error=err)
if retry_active:
await self.sleep(retry_settings, request.context.transport)
Expand Down
17 changes: 12 additions & 5 deletions sdk/cosmos/azure-cosmos/tests/_fault_injection_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
from azure.cosmos.exceptions import CosmosHttpResponseError
from azure.core.exceptions import ServiceRequestError, ServiceResponseError

from azure.cosmos.http_constants import ResourceType, HttpHeaders

class FaultInjectionTransport(RequestsTransport):
logger = logging.getLogger('azure.cosmos.fault_injection_transport')
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -130,21 +132,26 @@ def predicate_req_for_document_with_id(r: HttpRequest, id_value: str) -> bool:

@staticmethod
def predicate_is_database_account_call(r: HttpRequest) -> bool:
is_db_account_read = (r.headers.get('x-ms-thinclient-proxy-resource-type') == 'databaseaccount'
and r.headers.get('x-ms-thinclient-proxy-operation-type') == 'Read')
is_db_account_read = (r.headers.get(HttpHeaders.ThinClientProxyResourceType) == ResourceType.DatabaseAccount
and r.headers.get(HttpHeaders.ThinClientProxyOperationType) == documents._OperationType.Read)

return is_db_account_read

@staticmethod
def predicate_is_document_operation(r: HttpRequest) -> bool:
is_document_operation = (r.headers.get('x-ms-thinclient-proxy-resource-type') == 'docs')

is_document_operation = r.headers.get(HttpHeaders.ThinClientProxyResourceType) == ResourceType.Document
return is_document_operation

@staticmethod
def predicate_is_operation_type(r: HttpRequest, operation_type: str) -> bool:
is_operation_type = r.headers.get(HttpHeaders.ThinClientProxyOperationType) == operation_type

return is_operation_type

@staticmethod
def predicate_is_write_operation(r: HttpRequest, uri_prefix: str) -> bool:
is_write_document_operation = documents._OperationType.IsWriteOperation(
str(r.headers.get('x-ms-thinclient-proxy-operation-type')))
str(r.headers.get(HttpHeaders.ThinClientProxyOperationType)),)

return is_write_document_operation and uri_prefix in r.url

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from azure.cosmos.exceptions import CosmosHttpResponseError
from azure.core.exceptions import ServiceRequestError, ServiceResponseError

from azure.cosmos.http_constants import ResourceType, HttpHeaders

class FaultInjectionTransportAsync(AioHttpTransport):
logger = logging.getLogger('azure.cosmos.fault_injection_transport_async')
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -128,17 +130,23 @@ def predicate_req_for_document_with_id(r: HttpRequest, id_value: str) -> bool:

@staticmethod
def predicate_is_database_account_call(r: HttpRequest) -> bool:
is_db_account_read = (r.headers.get('x-ms-thinclient-proxy-resource-type') == 'databaseaccount'
and r.headers.get('x-ms-thinclient-proxy-operation-type') == 'Read')
is_db_account_read = (r.headers.get(HttpHeaders.ThinClientProxyResourceType) == ResourceType.DatabaseAccount
and r.headers.get(HttpHeaders.ThinClientProxyOperationType) == documents._OperationType.Read)

return is_db_account_read

@staticmethod
def predicate_is_document_operation(r: HttpRequest) -> bool:
is_document_operation = (r.headers.get('x-ms-thinclient-proxy-resource-type') == 'docs')
is_document_operation = (r.headers.get(HttpHeaders.ThinClientProxyResourceType) ==
ResourceType.Document)

return is_document_operation

@staticmethod
def predicate_is_operation_type(r: HttpRequest, operation_type: str) -> bool:
is_operation_type = r.headers.get(HttpHeaders.ThinClientProxyOperationType) == operation_type
return is_operation_type

@staticmethod
def predicate_is_write_operation(r: HttpRequest, uri_prefix: str) -> bool:
is_write_document_operation = documents._OperationType.IsWriteOperation(
Expand Down
34 changes: 22 additions & 12 deletions sdk/cosmos/azure-cosmos/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import unittest
import uuid

from azure.cosmos._retry_utility import _has_database_account_header, _has_read_retryable_headers
from azure.cosmos.cosmos_client import CosmosClient
from azure.cosmos.exceptions import CosmosHttpResponseError
from azure.cosmos.http_constants import StatusCodes
from azure.cosmos.partition_key import PartitionKey
from azure.cosmos import (ContainerProxy, DatabaseProxy, documents, exceptions, ConnectionRetryPolicy,
from azure.cosmos import (ContainerProxy, DatabaseProxy, documents, exceptions,
http_constants, _retry_utility)
from azure.cosmos.aio import _retry_utility_async
from azure.core.exceptions import AzureError, ServiceRequestError, ServiceResponseError
Expand Down Expand Up @@ -289,7 +291,7 @@ def body(self):


class MockConnectionRetryPolicy(RetryPolicy):
def __init__(self, resource_type, error, **kwargs):
def __init__(self, resource_type, error=None, **kwargs):
self.resource_type = resource_type
self.error = error
self.counter = 0
Expand All @@ -310,7 +312,8 @@ def send(self, request):
# raise the passed in exception for the passed in resource + operation combination
if request.http_request.headers.get(http_constants.HttpHeaders.ThinClientProxyResourceType) == self.resource_type:
self.request_endpoints.append(request.http_request.url)
raise self.error
if self.error:
raise self.error
response = self.next.send(request)
break
except ServiceRequestError as err:
Expand All @@ -336,8 +339,14 @@ def send(self, request):
self.sleep(retry_settings, request.context.transport)
continue
raise err
except CosmosHttpResponseError as err:
raise err
except AzureError as err:
if self._is_method_retryable(retry_settings, request.http_request):
retry_error = err
if _has_database_account_header(request.http_request.headers):
raise err
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
self.counter += 1
retry_active = self.increment(retry_settings, response=request, error=err)
if retry_active:
self.sleep(retry_settings, request.context.transport)
Expand All @@ -353,7 +362,7 @@ def send(self, request):

class MockConnectionRetryPolicyAsync(AsyncRetryPolicy):

def __init__(self, resource_type, error, **kwargs):
def __init__(self, resource_type, error = None, **kwargs):
self.resource_type = resource_type
self.error = error
self.counter = 0
Expand Down Expand Up @@ -387,7 +396,8 @@ async def send(self, request):
if request.http_request.headers.get(
http_constants.HttpHeaders.ThinClientProxyResourceType) == self.resource_type:
self.request_endpoints.append(request.http_request.url)
raise self.error
if self.error:
raise self.error
_retry_utility._configure_timeout(request, absolute_timeout, per_request_timeout)
response = await self.next.send(request)
break
Expand All @@ -403,7 +413,6 @@ async def send(self, request):
if retry_settings['connect'] > 0:
self.counter += 1
retry_active = self.increment(retry_settings, response=request, error=err)
print("Basic Retry in retry utility: ", retry_active)
if retry_active:
await self.sleep(retry_settings, request.context.transport)
continue
Expand All @@ -422,18 +431,19 @@ async def send(self, request):
await self.sleep(retry_settings, request.context.transport)
continue
raise err
except CosmosHttpResponseError as err:
raise err
except AzureError as err:
retry_error = err
if self._is_method_retryable(retry_settings, request.http_request):
if _has_database_account_header(request.http_request.headers):
raise err
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
retry_active = self.increment(retry_settings, response=request, error=err)
self.counter += 1
if retry_active:
await self.sleep(retry_settings, request.context.transport)
continue
raise err
finally:
end_time = time.time()
if absolute_timeout:
absolute_timeout -= (end_time - start_time)

self.update_context(response.context, retry_settings)
return response
43 changes: 42 additions & 1 deletion sdk/cosmos/azure-cosmos/tests/test_retry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,20 @@
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
import test_config
from azure.cosmos import _retry_utility, PartitionKey
from azure.cosmos import _retry_utility, PartitionKey, documents
from azure.cosmos.http_constants import HttpHeaders, StatusCodes
from _fault_injection_transport import FaultInjectionTransport


def setup_method_with_custom_transport(
custom_transport,
**kwargs):
connection_retry_policy = test_config.MockConnectionRetryPolicy(resource_type="docs")
client = cosmos_client.CosmosClient(test_config.TestConfig.host, test_config.TestConfig.masterKey,
transport=custom_transport, connection_retry_policy=connection_retry_policy, **kwargs)
db = client.get_database_client(test_config.TestConfig.TEST_DATABASE_ID)
container = db.get_container_client(test_config.TestConfig.TEST_SINGLE_PARTITION_CONTAINER_ID)
return {"client": client, "db": db, "col": container, "retry_policy": connection_retry_policy}


@pytest.mark.cosmosEmulator
Expand Down Expand Up @@ -442,6 +454,35 @@ def test_resource_throttle_and_connection_retry_total_retry_with_max_backoff(sel
finally:
_retry_utility.ExecuteFunction = self.original_execute_function

def test_patch_replace_no_retry(self):
doc = {'id': str(uuid.uuid4()),
'pk': str(uuid.uuid4()),
'name': 'sample document',
'key': 'value'}
custom_transport = FaultInjectionTransport()
predicate = lambda r: (FaultInjectionTransport.predicate_is_operation_type(r, documents._OperationType.Patch)
or FaultInjectionTransport.predicate_is_operation_type(r, documents._OperationType.Replace))
custom_transport.add_fault(predicate, lambda r: FaultInjectionTransport.error_after_delay(
0,
exceptions.CosmosHttpResponseError(
status_code=502,
message="Some random reverse proxy error.")))

initialized_objects = setup_method_with_custom_transport(
custom_transport,
)
container = initialized_objects["col"]
connection_retry_policy = initialized_objects["retry_policy"]
container.create_item(body=doc)
operations = [{"op": "incr", "path": "/company", "value": 3}]
with self.assertRaises(exceptions.CosmosHttpResponseError):
container.patch_item(item=doc['id'], partition_key=doc['pk'], patch_operations=operations)
assert connection_retry_policy.counter == 0
with self.assertRaises(exceptions.CosmosHttpResponseError):
doc['name'] = "something else"
container.replace_item(item=doc['id'], body=doc)
assert connection_retry_policy.counter == 0

def _MockExecuteFunction(self, function, *args, **kwargs):
response = test_config.FakeResponse({HttpHeaders.RetryAfterInMilliseconds: self.retry_after_in_milliseconds})
raise exceptions.CosmosHttpResponseError(
Expand Down
49 changes: 48 additions & 1 deletion sdk/cosmos/azure-cosmos/tests/test_retry_policy_async.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.

import asyncio
import unittest
import uuid

import pytest

from azure.cosmos import documents

import azure.cosmos._retry_options as retry_options
import azure.cosmos.exceptions as exceptions
import test_config
Expand All @@ -15,6 +17,8 @@
from azure.cosmos.aio import DatabaseProxy, ContainerProxy
import azure.cosmos.aio._retry_utility_async as _retry_utility
from azure.cosmos._retry_options import RetryOptions
from _fault_injection_transport_async import FaultInjectionTransportAsync


class ConnectionMode:
"""Represents the connection mode to be used by the client."""
Expand Down Expand Up @@ -49,6 +53,18 @@ def __init__(self) -> None:
self.ConnectionRetryConfiguration: Optional["ConnectionRetryPolicy"] = None
self.ResponsePayloadOnWriteDisabled: bool = False

async def setup_method_with_custom_transport(
custom_transport,
**kwargs):
connection_policy = ConnectionPolicy()
connection_retry_policy = test_config.MockConnectionRetryPolicyAsync(resource_type="docs")
connection_policy.ConnectionRetryConfiguration = connection_retry_policy
client = CosmosClient(test_config.TestConfig.host, test_config.TestConfig.masterKey,
transport=custom_transport, connection_policy=connection_policy, **kwargs)
db = client.get_database_client(test_config.TestConfig.TEST_DATABASE_ID)
container = db.get_container_client(test_config.TestConfig.TEST_SINGLE_PARTITION_CONTAINER_ID)
return {"client": client, "db": db, "col": container, "retry_policy": connection_retry_policy}

@pytest.mark.cosmosEmulator
class TestRetryPolicyAsync(unittest.IsolatedAsyncioTestCase):
created_database: DatabaseProxy = None
Expand Down Expand Up @@ -490,6 +506,37 @@ async def test_resource_throttle_and_connection_retry_total_retry_with_max_backo
finally:
_retry_utility.ExecuteFunctionAsync = self.original_execute_function

async def test_patch_replace_no_retry_async(self):
doc = {'id': str(uuid.uuid4()),
'pk': str(uuid.uuid4()),
'name': 'sample document',
'key': 'value'}
custom_transport = FaultInjectionTransportAsync()
predicate = lambda r: (FaultInjectionTransportAsync.predicate_is_operation_type(r, documents._OperationType.Patch)
or FaultInjectionTransportAsync.predicate_is_operation_type(r, documents._OperationType.Replace))
custom_transport.add_fault(predicate, lambda r: asyncio.create_task(FaultInjectionTransportAsync.error_after_delay(
0,
exceptions.CosmosHttpResponseError(
status_code=502,
message="Some random reverse proxy error."))))

initialized_objects = await setup_method_with_custom_transport(
custom_transport,
)
container = initialized_objects["col"]
connection_retry_policy = initialized_objects["retry_policy"]
await container.create_item(body=doc)
operations = [{"op": "incr", "path": "/company", "value": 3}]
with self.assertRaises(exceptions.CosmosHttpResponseError):
await container.patch_item(item=doc['id'], partition_key=doc['pk'], patch_operations=operations)
assert connection_retry_policy.counter == 0
with self.assertRaises(exceptions.CosmosHttpResponseError):
doc['name'] = "something else"
await container.replace_item(item=doc['id'], body=doc)
assert connection_retry_policy.counter == 0
# Cleanup
await initialized_objects["client"].close()

async def _MockExecuteFunction(self, function, *args, **kwargs):
response = test_config.FakeResponse({HttpHeaders.RetryAfterInMilliseconds: self.retry_after_in_milliseconds})
raise exceptions.CosmosHttpResponseError(
Expand Down
Loading