Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ae3164f
Add client cache to hold container properties
bambriz Apr 20, 2024
99c1e95
Added tests for container properties cache
bambriz Apr 20, 2024
991e351
Update _cosmos_client_connection_async.py
bambriz Apr 20, 2024
561e3ce
Update _cosmos_client_connection_async.py
bambriz Apr 20, 2024
b3c339c
Fix typos
bambriz Apr 21, 2024
746b104
Update container.py
bambriz Apr 21, 2024
266c0af
Update _container.py
bambriz Apr 21, 2024
989b824
Update cache test
bambriz Apr 21, 2024
28f4874
Update test_container_properties_cache.py
bambriz Apr 22, 2024
4cad0de
Merge remote-tracking branch 'upstream/main' into container-cache
bambriz Apr 22, 2024
db76ce4
update tests
bambriz Apr 22, 2024
b41a368
Update test_container_properties_cache.py
bambriz Apr 22, 2024
edbfad1
Update tests
bambriz Apr 23, 2024
dbc1f38
tests updates
bambriz Apr 23, 2024
8e00fcd
Mypy fixes
bambriz Apr 23, 2024
cce2ec9
Merge remote-tracking branch 'upstream/main' into container-cache
bambriz Apr 23, 2024
a6f23ff
mypy fix
bambriz Apr 23, 2024
2e3154b
Update
bambriz Apr 26, 2024
c27b782
Cache update
bambriz Apr 29, 2024
f904170
test update
bambriz Apr 29, 2024
ef5615c
Merge remote-tracking branch 'upstream/main' into container-cache
bambriz Apr 29, 2024
9740468
Update CHANGELOG.md
bambriz Apr 29, 2024
f435151
Merge remote-tracking branch 'upstream/main' into container-cache
bambriz Apr 30, 2024
5a0d5c8
Merge remote-tracking branch 'upstream/main' into container-cache
bambriz Apr 30, 2024
c290772
added set_properties_cache method
bambriz May 1, 2024
7698d79
Update _base.py
bambriz May 1, 2024
4d643b3
Update _cosmos_client_connection.py
bambriz May 1, 2024
20893a7
MyPy fixes
bambriz May 2, 2024
e6832f1
Fix mypy issue
bambriz May 2, 2024
0b6d113
Update _cosmos_client_connection.py
bambriz May 2, 2024
9e9dc9a
Update _cosmos_client_connection.py
bambriz May 2, 2024
30c7443
Update _cosmos_client_connection_async.py
bambriz May 3, 2024
870960e
Update _cosmos_client_connection.py
bambriz May 3, 2024
468d912
Update container cache
bambriz May 8, 2024
05b9e37
Update sdk/cosmos/azure-cosmos/CHANGELOG.md
bambriz May 8, 2024
3c190d6
Additional pylint updates
bambriz May 9, 2024
4a22ad4
more pylint fixes
bambriz May 9, 2024
438f7cd
pylint fix
bambriz May 9, 2024
0b0c8b3
Update test_container_properties_cache.py
bambriz May 10, 2024
d384380
Container Cache updates
bambriz May 21, 2024
1a79bd7
Merge remote-tracking branch 'upstream/main' into container-cache
bambriz May 21, 2024
4c2db18
Update CHANGELOG.md
bambriz May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Consolidated Container Properties Cache to be in the Client to cache partition key definition and container rid to avoid unnecessary container reads. See [PR 35731](https://github.com/Azure/azure-sdk-for-python/pull/35731)

#### Other Changes

Expand Down
7 changes: 7 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,3 +853,10 @@ def _format_batch_operations(
final_operations.append(operation)

return final_operations


def _set_properties_cache(properties: Dict[str, Any]) -> Dict[str, Any]:
return {
"_self": properties.get("_self", None), "_rid": properties.get("_rid", None),
"partitionKey": properties.get("partitionKey", None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)

from . import _base as base
from ._base import _set_properties_cache
from . import documents
from .documents import ConnectionPolicy, DatabaseAccount
from ._constants import _Constants as Constants
Expand Down Expand Up @@ -144,7 +145,7 @@ def __init__(

self.connection_policy = connection_policy or ConnectionPolicy()
self.partition_resolvers: Dict[str, RangePartitionResolver] = {}
self.partition_key_definition_cache: Dict[str, Any] = {}
self.__container_properties_cache: Dict[str, Dict[str, Any]] = {}
self.default_headers: Dict[str, Any] = {
http_constants.HttpHeaders.CacheControl: "no-cache",
http_constants.HttpHeaders.Version: http_constants.Versions.CurrentVersion,
Expand Down Expand Up @@ -231,6 +232,26 @@ def __init__(
self.session: Optional[_session.Session] = None
self._set_client_consistency_level(database_account, consistency_level)

@property
def _container_properties_cache(self) -> Dict[str, Dict[str, Any]]:
"""Gets the container properties cache from the client.
:returns: the container properties cache for the client.
:rtype: Dict[str, Dict[str, Any]]"""
return self.__container_properties_cache

def _set_container_properties_cache(self, container_link: str, properties: Optional[Dict[str, Any]]) -> None:
"""Sets the container properties cache for the specified container.

This will only update the properties cache for a specified container.
:param container_link: The container link will be used as the key to cache the container properties.
:type container_link: str
:param properties: These are the container properties to cache.
:type properties: Optional[Dict[str, Any]]"""
if properties:
self.__container_properties_cache[container_link] = properties
else:
self.__container_properties_cache[container_link] = {}

def _set_client_consistency_level(
self,
database_account: DatabaseAccount,
Expand Down Expand Up @@ -1262,7 +1283,6 @@ def CreateItem(

if base.IsItemContainerLink(database_or_container_link):
options = self._AddPartitionKey(database_or_container_link, document, options)

return self.Create(document, path, "docs", collection_id, None, options, **kwargs)

def UpsertItem(
Expand Down Expand Up @@ -3280,13 +3300,14 @@ def _UpdateSessionIfRequired(
self.session.update_session(response_result, response_headers)

def _get_partition_key_definition(self, collection_link: str) -> Optional[Dict[str, Any]]:
partition_key_definition = None
partition_key_definition: Optional[Dict[str, Any]]
# If the document collection link is present in the cache, then use the cached partitionkey definition
if collection_link in self.partition_key_definition_cache:
partition_key_definition = self.partition_key_definition_cache.get(collection_link)
if collection_link in self.__container_properties_cache:
cached_container: Dict[str, Any] = self.__container_properties_cache.get(collection_link, {})
partition_key_definition = cached_container.get("partitionKey")
# Else read the collection from backend and add it to the cache
else:
collection = self.ReadContainer(collection_link)
partition_key_definition = collection.get("partitionKey")
self.partition_key_definition_cache[collection_link] = partition_key_definition
container = self.ReadContainer(collection_link)
partition_key_definition = container.get("partitionKey")
self.__container_properties_cache[collection_link] = _set_properties_cache(container)
return partition_key_definition
23 changes: 12 additions & 11 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
validate_cache_staleness_value,
_deserialize_throughput,
_replace_throughput,
GenerateGuidId
GenerateGuidId,
_set_properties_cache
)
from ..exceptions import CosmosResourceNotFoundError
from ..http_constants import StatusCodes
Expand Down Expand Up @@ -81,19 +82,21 @@ def __init__(
) -> None:
self.client_connection = client_connection
self.id = id
self._properties = properties
self.database_link = database_link
self.container_link = "{}/colls/{}".format(database_link, self.id)
self._is_system_key: Optional[bool] = None
self._scripts: Optional[ScriptsProxy] = None
if properties:
self.client_connection._set_container_properties_cache(self.container_link,
_set_properties_cache(properties)) # pylint: disable=protected-access, line-too-long

def __repr__(self) -> str:
return "<ContainerProxy [{}]>".format(self.container_link)[:1024]

async def _get_properties(self) -> Dict[str, Any]:
if self._properties is None:
self._properties = await self.read()
return self._properties
if self.container_link not in self.client_connection._container_properties_cache: # pylint: disable=protected-access, line-too-long
await self.read()
return self.client_connection._container_properties_cache[self.container_link] # pylint: disable=protected-access, line-too-long

@property
async def is_system_key(self) -> bool:
Expand Down Expand Up @@ -167,12 +170,10 @@ async def read(
request_options["populatePartitionKeyRangeStatistics"] = populate_partition_key_range_statistics
if populate_quota_info is not None:
request_options["populateQuotaInfo"] = populate_quota_info

collection_link = self.container_link
self._properties = await self.client_connection.ReadContainer(
collection_link, options=request_options, **kwargs
)
return self._properties
container = await self.client_connection.ReadContainer(self.container_link, options=request_options, **kwargs)
# Only cache Container Properties that will not change in the lifetime of the container
self.client_connection._set_container_properties_cache(self.container_link, _set_properties_cache(container)) # pylint: disable=protected-access, line-too-long
return container

@distributed_trace_async
async def create_item(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
ProxyPolicy)

from .. import _base as base
from .._base import _set_properties_cache
from .. import documents
from .._routing import routing_range
from ..documents import ConnectionPolicy, DatabaseAccount
Expand Down Expand Up @@ -145,7 +146,7 @@ def __init__(

self.connection_policy = connection_policy or ConnectionPolicy()
self.partition_resolvers: Dict[str, RangePartitionResolver] = {}
self.partition_key_definition_cache: Dict[str, Any] = {}
self.__container_properties_cache: Dict[str, Dict[str, Any]] = {}
self.default_headers: Dict[str, Any] = {
http_constants.HttpHeaders.CacheControl: "no-cache",
http_constants.HttpHeaders.Version: http_constants.Versions.CurrentVersion,
Expand Down Expand Up @@ -227,6 +228,26 @@ def __init__(
# Routing map provider
self._routing_map_provider: SmartRoutingMapProvider = SmartRoutingMapProvider(self)

@property
def _container_properties_cache(self) -> Dict[str, Dict[str, Any]]:
"""Gets the container properties cache from the client.
:returns: the container properties cache for the client.
:rtype: Dict[str, Dict[str, Any]]"""
return self.__container_properties_cache

def _set_container_properties_cache(self, container_link: str, properties: Optional[Dict[str, Any]]) -> None:
"""Sets the container properties cache for the specified container.

This will only update the properties cache for a specified container.
:param container_link: The container link will be used as the key to cache the container properties.
:type container_link: str
:param properties: These are the container properties to cache.
:type properties: Optional[Dict[str, Any]]"""
if properties:
self.__container_properties_cache[container_link] = properties
else:
self.__container_properties_cache[container_link] = {}

@property
def _Session(self) -> Optional[_session.Session]:
"""Gets the session object from the client.
Expand Down Expand Up @@ -3035,13 +3056,14 @@ async def _AddPartitionKey(self, collection_link, document, options):
# TODO: Refresh the cache if partition is extracted automatically and we get a 400.1001

# If the document collection link is present in the cache, then use the cached partitionkey definition
if collection_link in self.partition_key_definition_cache:
partitionKeyDefinition = self.partition_key_definition_cache.get(collection_link)
if collection_link in self.__container_properties_cache:
cached_container = self.__container_properties_cache.get(collection_link)
partitionKeyDefinition = cached_container.get("partitionKey")
# Else read the collection from backend and add it to the cache
else:
collection = await self.ReadContainer(collection_link)
partitionKeyDefinition = collection.get("partitionKey")
self.partition_key_definition_cache[collection_link] = partitionKeyDefinition
container = await self.ReadContainer(collection_link)
partitionKeyDefinition = container.get("partitionKey")
self.__container_properties_cache[collection_link] = _set_properties_cache(container)

# If the collection doesn't have a partition key definition, skip it as it's a legacy collection
if partitionKeyDefinition:
Expand Down
22 changes: 12 additions & 10 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
validate_cache_staleness_value,
_deserialize_throughput,
_replace_throughput,
GenerateGuidId
GenerateGuidId,
_set_properties_cache
)
from .exceptions import CosmosResourceNotFoundError
from .http_constants import StatusCodes
Expand Down Expand Up @@ -83,17 +84,19 @@ def __init__(
self.id = id
self.container_link = "{}/colls/{}".format(database_link, self.id)
self.client_connection = client_connection
self._properties = properties
self._is_system_key: Optional[bool] = None
self._scripts: Optional[ScriptsProxy] = None
if properties:
self.client_connection._set_container_properties_cache(self.container_link,
_set_properties_cache(properties)) # pylint: disable=protected-access, line-too-long

def __repr__(self) -> str:
return "<ContainerProxy [{}]>".format(self.container_link)[:1024]

def _get_properties(self) -> Dict[str, Any]:
if self._properties is None:
self._properties = self.read()
return self._properties
if self.container_link not in self.client_connection._container_properties_cache: # pylint: disable=protected-access, line-too-long
self.read()
return self.client_connection._container_properties_cache[self.container_link] # pylint: disable=protected-access, line-too-long

@property
def is_system_key(self) -> bool:
Expand Down Expand Up @@ -173,11 +176,10 @@ def read( # pylint:disable=docstring-missing-param
request_options["populatePartitionKeyRangeStatistics"] = populate_partition_key_range_statistics
if populate_quota_info is not None:
request_options["populateQuotaInfo"] = populate_quota_info
collection_link = self.container_link
self._properties = self.client_connection.ReadContainer(
collection_link, options=request_options, **kwargs
)
return self._properties
container = self.client_connection.ReadContainer(self.container_link, options=request_options, **kwargs)
# Only cache Container Properties that will not change in the lifetime of the container
self.client_connection._set_container_properties_cache(self.container_link, _set_properties_cache(container)) # pylint: disable=protected-access, line-too-long
return container

@distributed_trace
def read_item( # pylint:disable=docstring-missing-param
Expand Down
28 changes: 28 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,34 @@
print(json.dumps(properties, indent=True))
# [END get_database_properties]

# Retrieve the properties of a container
# [START get_container_properties]
# Get properties will return a cache of two container properties: RID and the Partition Key Definition (This will not consume RUs)
properties = container._get_properties()

# Print _rid and partitionKey
print("Resource ID: ", properties.get('_rid'))
print("Partition Key: ", properties.get('partitionKey'))

# Read the container to get the latests of all the Container Properties. (This will make a backend requests and will consume RUs)
container_properties = container.read()

# Print each property one by one if they are currently in the container properties
print("indexingPolicy: ", container_properties.get("indexingPolicy"))
print("etag: ", container_properties.get('_etag'))
print("lastModified: ", container_properties.get('lastModified'))
print("defaultTtl: ", container_properties.get('defaultTtl'))
print("uniqueKeyPolicy: ", container_properties.get('uniqueKeyPolicy'))
print("conflictResolutionPolicy: ", container_properties.get('conflictResolutionPolicy'))
print("changeFeedPolicy: ", container_properties.get('changeFeedPolicy'))
print("geospatialConfig: ", container_properties.get('geospatialConfig'))

# Print remaining properties if they are in the current container properties
for key, value in container_properties.items():
if key not in ['_rid', 'partitionKey', 'indexingPolicy', '_etag', 'lastModified', 'defaultTtl', 'uniqueKeyPolicy', 'conflictResolutionPolicy', 'changeFeedPolicy', 'geospatialConfig']:
print(f"{key}: {value}")
# [END get_container_properties]

# Modify the properties of an existing container
# This example sets the default time to live (TTL) for items in the
# container to 3600 seconds (1 hour). An item in container is deleted
Expand Down
29 changes: 29 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/examples_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,35 @@ async def examples_async():
print(json.dumps(properties, indent=True))
# [END get_database_properties]

# Retrieve the properties of a container
# [START get_container_properties]
# Get properties will return a cache of two container properties: RID and the Partition Key Definition (This will not consume RUs)
properties = await container._get_properties()

# Print _rid and partitionKey
print("Resource ID: ", properties.get('_rid'))
print("Partition Key: ", properties.get('partitionKey'))

# Read the container to get the latests of all the Container Properties. (This will make a backend requests and will consume RUs)
container_properties = await container.read()

# Print each property one by one if they are currently in the container properties
print("indexingPolicy: ", container_properties.get("indexingPolicy"))
print("etag: ", container_properties.get('_etag'))
print("lastModified: ", container_properties.get('lastModified'))
print("defaultTtl: ", container_properties.get('defaultTtl'))
print("uniqueKeyPolicy: ", container_properties.get('uniqueKeyPolicy'))
print("conflictResolutionPolicy: ", container_properties.get('conflictResolutionPolicy'))
print("changeFeedPolicy: ", container_properties.get('changeFeedPolicy'))
print("geospatialConfig: ", container_properties.get('geospatialConfig'))

# Print remaining properties if they are in the current container properties
for key, value in container_properties.items():
if key not in ['_rid', 'partitionKey', 'indexingPolicy', '_etag', 'lastModified', 'defaultTtl', 'uniqueKeyPolicy',
'conflictResolutionPolicy', 'changeFeedPolicy', 'geospatialConfig']:
print(f"{key}: {value}")
# [END get_container_properties]

# Modify the properties of an existing container
# This example sets the default time to live (TTL) for items in the
# container to 3600 seconds (1 hour). An item in container is deleted
Expand Down
Loading