Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
### 4.14.1 (Unreleased)

#### Features Added
* Added merge support. See [PR 42924](https://github.com/Azure/azure-sdk-for-python/pull/42924).

#### Breaking Changes

#### Bugs Fixed
* Fixed bug where customer provided excluded region was not always being honored during certain transient failures. See [PR 43602](https://github.com/Azure/azure-sdk-for-python/pull/43602)
* Fixed bug where queries using `feed_range` and `continuation` options would not work as expected. See [PR 43700](https://github.com/Azure/azure-sdk-for-python/pull/43700).

#### Other Changes
* Enhanced logging to ensure when a region is marked unavailable we have the proper context. See [PR 43602](https://github.com/Azure/azure-sdk-for-python/pull/43602)
Expand Down Expand Up @@ -40,7 +42,6 @@ This version and all future versions will require Python 3.9+.
#### Features Added
* Added read_items API to provide an efficient method for retrieving multiple items in a single request. See [PR 42167](https://github.com/Azure/azure-sdk-for-python/pull/42167).
* Added ability to replace a container's indexing policy if a vector embedding policy was present. See [PR 42810](https://github.com/Azure/azure-sdk-for-python/pull/42810).
* Added merge support. See [PR 42924](https://github.com/Azure/azure-sdk-for-python/pull/42924).

#### Bugs Fixed
* Improved the resilience of Database Account Read metadata operation against short-lived network issues by increasing number of retries. See [PR 42525](https://github.com/Azure/azure-sdk-for-python/pull/42525).
Expand Down
14 changes: 14 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,3 +945,17 @@ def _build_properties_cache(properties: dict[str, Any], container_link: str) ->
"_self": properties.get("_self", None), "_rid": properties.get("_rid", None),
"partitionKey": properties.get("partitionKey", None), "container_link": container_link
}

def format_pk_range_options(query_options: Mapping[str, Any]) -> dict[str, Any]:
"""Formats the partition key range options to be used internally from the query ones.

:param dict query_options: The query options being used.
:return: The relevant partition key range options.
:rtype: dict
"""
pk_range_options: dict[str, Any] = {}
if "containerRID" in query_options:
pk_range_options["containerRID"] = query_options["containerRID"]
Comment thread
simorenoh marked this conversation as resolved.
Outdated
if "excludedLocations" in query_options:
pk_range_options["excludedLocations"] = query_options["excludedLocations"]
return pk_range_options
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from typing_extensions import Literal

from azure.cosmos import http_constants
from azure.cosmos._base import format_pk_range_options
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromInternal, \
ChangeFeedStartFromETagAndFeedRange
from azure.cosmos._change_feed.composite_continuation_token import CompositeContinuationToken
Expand Down Expand Up @@ -286,11 +287,12 @@ def populate_request_headers(
self.set_start_from_request_headers(request_headers)

# based on the feed range to find the overlapping partition key range id
pk_range_options = format_pk_range_options(feed_options)
over_lapping_ranges = \
routing_provider.get_overlapping_ranges(
self._container_link,
[self._continuation.current_token.feed_range],
feed_options)
pk_range_options)

self.set_pk_range_id_request_headers(over_lapping_ranges, request_headers)

Expand All @@ -305,11 +307,12 @@ async def populate_request_headers_async(
self.set_start_from_request_headers(request_headers)

# based on the feed range to find the overlapping partition key range id
pk_range_options = format_pk_range_options(feed_options)
over_lapping_ranges = \
await async_routing_provider.get_overlapping_ranges(
self._container_link,
[self._continuation.current_token.feed_range],
feed_options)
pk_range_options)

self.set_pk_range_id_request_headers(over_lapping_ranges, request_headers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3259,9 +3259,9 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]:

# If feed_range_epk exist, query with the range
if feed_range_epk is not None:
last_response_headers = CaseInsensitiveDict()
pk_range_options = base.format_pk_range_options(query_options=options)
over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(resource_id, [feed_range_epk],
Comment thread
simorenoh marked this conversation as resolved.
options)
pk_range_options)
# It is possible to get more than one over lapping range. We need to get the query results for each one
results: dict[str, Any] = {}
# For each over lapping range we will take a sub range of the feed range EPK that overlaps with the over
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""

from azure.cosmos._base import format_pk_range_options
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio import document_producer
from azure.cosmos._execution_context.hybrid_search_aggregator import _retrieve_component_scores, _rewrite_query_infos, \
Expand Down Expand Up @@ -290,8 +290,9 @@ async def _get_target_partition_key_range(self, target_all_ranges):
if target_all_ranges:
return [item async for item in self._client._ReadPartitionKeyRanges(collection_link=self._resource_link)]
query_ranges = self._partitioned_query_ex_info.get_query_ranges()
pk_range_options = format_pk_range_options(self._options)
return await self._routing_provider.get_overlapping_ranges(
self._resource_link,
[routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges],
self._options
pk_range_options
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""

from azure.cosmos._base import format_pk_range_options
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio import document_producer, _queue_async_helper
from azure.cosmos._routing import routing_range
Expand Down Expand Up @@ -162,12 +162,12 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range
)

async def _get_target_partition_key_range(self):

query_ranges = self._partitioned_query_ex_info.get_query_ranges()
pk_range_options = format_pk_range_options(self._options)
return await self._routing_provider.get_overlapping_ranges(
self._resource_link,
[routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges],
self._options
pk_range_options
)

async def _configure_partition_ranges(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""

from azure.cosmos._base import format_pk_range_options
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio.multi_execution_aggregator import _MultiExecutionContextAggregator
from azure.cosmos._execution_context.aio import document_producer
Expand Down Expand Up @@ -107,12 +107,12 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range
)

async def _get_target_partition_key_range(self):

query_ranges = self._partitioned_query_ex_info.get_query_ranges()
pk_range_options = format_pk_range_options(self._options)
return await self._routing_provider.get_overlapping_ranges(
self._resource_link,
[routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges],
self._options
pk_range_options
)

async def _configure_partition_ranges(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""
from typing import Union
from azure.cosmos._base import format_pk_range_options
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context import document_producer
from azure.cosmos._routing import routing_range
Expand Down Expand Up @@ -443,8 +444,9 @@ def _get_target_partition_key_range(self, target_all_ranges):
if target_all_ranges:
return list(self._client._ReadPartitionKeyRanges(collection_link=self._resource_link))
query_ranges = self._partitioned_query_ex_info.get_query_ranges()
pk_range_options = format_pk_range_options(self._options)
return self._routing_provider.get_overlapping_ranges(
self._resource_link,
[routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges],
self._options
pk_range_options
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

import heapq
from azure.cosmos._base import format_pk_range_options
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context import document_producer
from azure.cosmos._routing import routing_range
Expand Down Expand Up @@ -195,12 +196,12 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range
)

def _get_target_partition_key_range(self):

query_ranges = self._partitioned_query_ex_info.get_query_ranges()
pk_range_options = format_pk_range_options(self._options)
return self._routing_provider.get_overlapping_ranges(
self._resource_link,
[routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges],
self._options
pk_range_options
)

next = __next__ # Python 2 compatibility.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""

from azure.cosmos._base import format_pk_range_options
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.multi_execution_aggregator import _MultiExecutionContextAggregator
from azure.cosmos._execution_context import document_producer
Expand Down Expand Up @@ -151,8 +151,9 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range

def _get_target_partition_key_range(self):
query_ranges = self._partitioned_query_ex_info.get_query_ranges()
pk_range_options = format_pk_range_options(self._options)
return self._routing_provider.get_overlapping_ranges(
self._resource_link,
[routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges],
self._options
pk_range_options
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""
from typing import TYPE_CHECKING, Optional

from azure.cosmos._base import format_pk_range_options
from azure.cosmos._constants import _Constants
from azure.cosmos.partition_key import _get_partition_key_from_partition_key_definition
from azure.cosmos._global_partition_endpoint_manager_circuit_breaker_core import \
Expand Down Expand Up @@ -72,8 +73,9 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK
partition_key_value = request.pk_val
# get the partition key range for the given partition key
epk_range = [partition_key._get_epk_range_for_partition_key(partition_key_value)] # pylint: disable=protected-access
pk_range_options = format_pk_range_options(options)
partition_ranges = (self.client._routing_map_provider # pylint: disable=protected-access
.get_overlapping_ranges(container_link, epk_range, options))
.get_overlapping_ranges(container_link, epk_range, pk_range_options))
partition_range = Range.PartitionKeyRangeToRange(partition_ranges[0])
elif HttpHeaders.PartitionKeyRangeID in request.headers:
pk_range_id = request.headers[HttpHeaders.PartitionKeyRangeID]
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_read_items_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ def _partition_items_by_range(self) -> dict[str, list[Tuple[int, str, "Partition
# All items in this list share the same partition key value. Get it from the first item.
pk_value = pk_items[0][2]
epk_range = partition_key._get_epk_range_for_partition_key(pk_value)
pk_range_options = _base.format_pk_range_options(self.options)
overlapping_ranges = self.client._routing_map_provider.get_overlapping_ranges(
collection_rid, [epk_range], self.options
collection_rid, [epk_range], pk_range_options
)
if overlapping_ranges:
range_id = overlapping_ranges[0]["id"]
Expand Down
6 changes: 4 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ def get_session_token(
kind=collection_pk_definition['kind'],
version=collection_pk_definition['version'])
epk_range = partition_key._get_epk_range_for_partition_key(pk_value=pk_value)
pk_range_options = _base.format_pk_range_options(options)
pk_range = routing_map_provider.get_overlapping_ranges(collection_name,
[epk_range],
options)
pk_range_options)
if len(pk_range) > 0:
partition_key_range_id = pk_range[0]['id']
vector_session_token = self._resolve_partition_local_session_token(pk_range, token_dict)
Expand Down Expand Up @@ -210,9 +211,10 @@ async def get_session_token_async(
kind=collection_pk_definition['kind'],
version=collection_pk_definition['version'])
epk_range = partition_key._get_epk_range_for_partition_key(pk_value=pk_value)
pk_range_options = _base.format_pk_range_options(options)
pk_range = await routing_map_provider.get_overlapping_ranges(collection_name,
[epk_range],
options)
pk_range_options)
if len(pk_range) > 0:
partition_key_range_id = pk_range[0]['id']
vector_session_token = self._resolve_partition_local_session_token(pk_range, token_dict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3064,8 +3064,9 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]:
feed_range_epk = partition_key_obj._get_epk_range_for_prefix_partition_key(partition_key_value)

if feed_range_epk is not None:
pk_range_options = base.format_pk_range_options(query_options=options)
over_lapping_ranges = await self._routing_map_provider.get_overlapping_ranges(id_, [feed_range_epk],
options)
pk_range_options)
results: dict[str, Any] = {}
# For each over lapping range we will take a sub range of the feed range EPK that overlaps with the over
# lapping physical partition. The EPK sub range will be one of four:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ async def _partition_items_by_range(self) -> dict[str, list[Tuple[int, str, "Par
for pk_items in items_by_pk_value.values():
pk_value = pk_items[0][2]
epk_range = partition_key._get_epk_range_for_partition_key(pk_value)
pk_range_options = _base.format_pk_range_options(self.options)
overlapping_ranges = await self.client._routing_map_provider.get_overlapping_ranges(
collection_rid, [epk_range], self.options
collection_rid, [epk_range], pk_range_options
)
if overlapping_ranges:
range_id = overlapping_ranges[0]["id"]
Expand Down
38 changes: 38 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/document_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,44 @@ def query_items_cross_partition_with_pagination(container):
print(f' - Pages: {page_count}')
print(f' - Total items: {total_item_count}')

def query_items_with_feed_ranges_and_pagination(container):
print('\n1.5c Querying with Feed Range and Pagination\n')
# We again use max_item_count to control page size
max_items_per_page = 3

# First we fetch the relevant feed ranges for the container - feed ranges represent logical partitions
# or ranges of partition key values, and can be used to query as well
feed_ranges = container.read_feed_ranges()

# For this example, we will just use the first feed range
feed_ranges_list = list(feed_ranges)
query_iterable = container.query_items(
query="SELECT * FROM c",
feed_range=feed_ranges_list[0], # Query specific feed range
max_item_count=max_items_per_page
)

# Iterate through pages and count both pages and total items
total_item_count = 0
page_count = 0

item_pages = query_iterable.by_page()
for page in item_pages:
page_count += 1
items_in_page = list(page)
items_in_current_page = len(items_in_page)
total_item_count += items_in_current_page

print(f'Page {page_count}: Retrieved {items_in_current_page} items (max per page: {max_items_per_page})')

# Process items in this page
for item in items_in_page:
# Do something with each item
pass

print(f'\nTotal pages processed: {page_count}')
print(f'Total items retrieved: {total_item_count}')
print(f'Note: max_item_count limits items PER PAGE, not total results\n')

def replace_item(container, doc_id):
print('\n1.6 Replace an Item\n')
Expand Down
32 changes: 32 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/document_management_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,38 @@ async def query_items_cross_partition_with_pagination(container):
print(f' - Pages: {page_count}')
print(f' - Total items: {total_item_count}')

async def query_items_with_feed_ranges_and_pagination(container):
print('\n1.5c Querying with Feed Range and Pagination\n')
# We again use max_item_count to control page size
max_items_per_page = 3

# First we fetch the relevant feed ranges for the container - feed ranges represent logical partitions
# or ranges of partition key values, and can be used to query as well
feed_ranges = container.read_feed_ranges()

# For this example, we will just use the first feed range
feed_ranges_list = [feed_range async for feed_range in feed_ranges]
query_iterable = container.query_items(
query="SELECT * FROM c",
feed_range=feed_ranges_list[0], # Query specific feed range
max_item_count=max_items_per_page
)

# Iterate through pages and count both pages and total items
total_item_count = 0
page_count = 0

item_pages = query_iterable.by_page()
async for page in item_pages:
page_count += 1
items_in_page = [item async for item in page]
total_item_count += len(items_in_page)

print(f'Page {page_count}: {len(items_in_page)} items from across partitions')

print(f'\nCross-partition query completed:')
print(f' - Pages: {page_count}')
print(f' - Total items: {total_item_count}')

async def replace_item(container, doc_id):
print('\n1.6 Replace an Item\n')
Expand Down
Loading
Loading