-
Notifications
You must be signed in to change notification settings - Fork 931
grpc: allow the user to cancel stream v2 #3823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
62c24e7
a2cf54a
641beb7
58683c1
ce09c31
9e59ba6
9bdf76a
979f0c6
fa040d1
4225154
aabae95
8e23a2c
da21d03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,10 +21,10 @@ | |
|
|
||
| import logging | ||
| from collections import OrderedDict | ||
| from functools import partial | ||
| from typing import Callable, MutableMapping | ||
|
|
||
| import grpc | ||
|
|
||
| from opentelemetry import trace | ||
| from opentelemetry.instrumentation.grpc import grpcext | ||
| from opentelemetry.instrumentation.grpc._utilities import RpcInfo | ||
|
|
@@ -77,12 +77,11 @@ def _safe_invoke(function: Callable, *args): | |
| "Error when invoking function '%s'", function_name, exc_info=ex | ||
| ) | ||
|
|
||
|
|
||
| class OpenTelemetryClientInterceptor( | ||
| grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor | ||
| ): | ||
| def __init__( | ||
| self, tracer, filter_=None, request_hook=None, response_hook=None | ||
| self, tracer, filter_=None, request_hook=None, response_hook=None | ||
| ): | ||
| self._tracer = tracer | ||
| self._filter = filter_ | ||
|
|
@@ -136,10 +135,10 @@ def _intercept(self, request, metadata, client_info, invoker): | |
| else: | ||
| mutable_metadata = OrderedDict(metadata) | ||
| with self._start_span( | ||
| client_info.full_method, | ||
| end_on_exit=False, | ||
| record_exception=False, | ||
| set_status_on_exception=False, | ||
| client_info.full_method, | ||
| end_on_exit=False, | ||
| record_exception=False, | ||
| set_status_on_exception=False, | ||
| ) as span: | ||
| result = None | ||
| try: | ||
|
|
@@ -193,14 +192,17 @@ def intercept_unary(self, request, metadata, client_info, invoker): | |
| # the span across the generated responses and detect any errors, we wrap | ||
| # the result in a new generator that yields the response values. | ||
| def _intercept_server_stream( | ||
| self, request_or_iterator, metadata, client_info, invoker | ||
| self, request_or_iterator, metadata, client_info, invoker | ||
| ): | ||
| if not metadata: | ||
| mutable_metadata = OrderedDict() | ||
| else: | ||
| mutable_metadata = OrderedDict(metadata) | ||
|
|
||
| with self._start_span(client_info.full_method) as span: | ||
| with self._start_span( | ||
| client_info.full_method, | ||
| end_on_exit=False | ||
| ) as span: | ||
| inject(mutable_metadata, setter=_carrier_setter) | ||
| metadata = tuple(mutable_metadata.items()) | ||
| rpc_info = RpcInfo( | ||
|
|
@@ -212,15 +214,29 @@ def _intercept_server_stream( | |
| if client_info.is_client_stream: | ||
| rpc_info.request = request_or_iterator | ||
|
|
||
| try: | ||
| yield from invoker(request_or_iterator, metadata) | ||
| except grpc.RpcError as err: | ||
| span.set_status(Status(StatusCode.ERROR)) | ||
| span.set_attribute(RPC_GRPC_STATUS_CODE, err.code().value[0]) | ||
| raise err | ||
| stream = invoker(request_or_iterator, metadata) | ||
|
|
||
| def done_callback(future, span_): | ||
| try: | ||
| future.result() | ||
| except grpc.FutureCancelledError: | ||
| span_.set_status(Status(StatusCode.OK)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think span status as OK makes sense since the choices are unset, ok, or error. Would be good to get others' thoughts on this. The important part is this follows semconv for rpc status code.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't believe that canceling a span from the server side should be considered an error, although it may make sense to look at what another language does. |
||
| span_.set_attribute( | ||
| RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0] | ||
| ) | ||
| except grpc.RpcError as err: | ||
| span_.set_status(Status(StatusCode.ERROR)) | ||
| span_.set_attribute( | ||
| RPC_GRPC_STATUS_CODE, err.code().value[0] | ||
| ) | ||
| finally: | ||
| span_.end() | ||
|
|
||
| stream.add_done_callback(partial(done_callback, span_=span)) | ||
| return stream | ||
|
|
||
| def intercept_stream( | ||
| self, request_or_iterator, metadata, client_info, invoker | ||
| self, request_or_iterator, metadata, client_info, invoker | ||
| ): | ||
| if not is_instrumentation_enabled(): | ||
| return invoker(request_or_iterator, metadata) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -12,7 +12,10 @@ | |||||
| # See the License for the specific language governing permissions and | ||||||
| # limitations under the License. | ||||||
| # pylint:disable=cyclic-import | ||||||
| import logging | ||||||
|
|
||||||
| import threading | ||||||
| import time | ||||||
| from unittest import mock | ||||||
|
|
||||||
| import grpc | ||||||
|
|
@@ -61,32 +64,32 @@ def __init__(self): | |||||
| pass | ||||||
|
|
||||||
| def intercept_unary_unary( | ||||||
| self, continuation, client_call_details, request | ||||||
| self, continuation, client_call_details, request | ||||||
| ): | ||||||
| return self._intercept_call(continuation, client_call_details, request) | ||||||
|
|
||||||
| def intercept_unary_stream( | ||||||
| self, continuation, client_call_details, request | ||||||
| self, continuation, client_call_details, request | ||||||
| ): | ||||||
| return self._intercept_call(continuation, client_call_details, request) | ||||||
|
|
||||||
| def intercept_stream_unary( | ||||||
| self, continuation, client_call_details, request_iterator | ||||||
| self, continuation, client_call_details, request_iterator | ||||||
| ): | ||||||
| return self._intercept_call( | ||||||
| continuation, client_call_details, request_iterator | ||||||
| ) | ||||||
|
|
||||||
| def intercept_stream_stream( | ||||||
| self, continuation, client_call_details, request_iterator | ||||||
| self, continuation, client_call_details, request_iterator | ||||||
| ): | ||||||
| return self._intercept_call( | ||||||
| continuation, client_call_details, request_iterator | ||||||
| ) | ||||||
|
|
||||||
| @staticmethod | ||||||
| def _intercept_call( | ||||||
| continuation, client_call_details, request_or_iterator | ||||||
| continuation, client_call_details, request_or_iterator | ||||||
| ): | ||||||
| return continuation(client_call_details, request_or_iterator) | ||||||
|
|
||||||
|
|
@@ -171,6 +174,40 @@ def test_unary_stream(self): | |||||
| }, | ||||||
| ) | ||||||
|
|
||||||
| def test_unary_stream_can_be_cancel(self): | ||||||
| done = threading.Event() | ||||||
| responses = server_streaming_method(self._stub, serialize=False) | ||||||
| responses.add_done_callback(lambda: done.set()) | ||||||
| for i, _ in enumerate(responses): | ||||||
| if i == 1: | ||||||
| responses.cancel() | ||||||
| break | ||||||
| self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) | ||||||
| done.wait(5) | ||||||
| spans = self.memory_exporter.get_finished_spans() | ||||||
| self.assertEqual(len(spans), 1) | ||||||
| span = spans[0] | ||||||
|
|
||||||
| self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") | ||||||
| self.assertIs(span.kind, trace.SpanKind.CLIENT) | ||||||
|
|
||||||
| # Check version and name in span's instrumentation info | ||||||
| self.assertEqualSpanInstrumentationScope( | ||||||
| span, opentelemetry.instrumentation.grpc | ||||||
| ) | ||||||
|
|
||||||
| self.assertSpanHasAttributes( | ||||||
| span, | ||||||
| { | ||||||
| RPC_METHOD: "ServerStreamingMethod", | ||||||
| RPC_SERVICE: "GRPCTestServer", | ||||||
| RPC_SYSTEM: "grpc", | ||||||
| RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ | ||||||
| 0 | ||||||
| ], | ||||||
| }, | ||||||
| ) | ||||||
|
|
||||||
| def test_stream_unary(self): | ||||||
| client_streaming_method(self._stub) | ||||||
| spans = self.memory_exporter.get_finished_spans() | ||||||
|
|
@@ -221,6 +258,41 @@ def test_stream_stream(self): | |||||
| }, | ||||||
| ) | ||||||
|
|
||||||
| def test_stream_stream_can_be_cancel(self): | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit:
Suggested change
|
||||||
| done = threading.Event() | ||||||
| responses = bidirectional_streaming_method(self._stub, serialize=False) | ||||||
| responses.add_done_callback(lambda: done.set()) | ||||||
| for i, _ in enumerate(responses): | ||||||
| if i == 1: | ||||||
| responses.cancel() | ||||||
| break | ||||||
| self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) | ||||||
| done.wait(5) | ||||||
| spans = self.memory_exporter.get_finished_spans() | ||||||
| self.assertEqual(len(spans), 1) | ||||||
| span = spans[0] | ||||||
|
|
||||||
| self.assertEqual(span.name, "/GRPCTestServer/BidirectionalStreamingMethod") | ||||||
| self.assertIs(span.kind, trace.SpanKind.CLIENT) | ||||||
|
|
||||||
| # Check version and name in span's instrumentation info | ||||||
| self.assertEqualSpanInstrumentationScope( | ||||||
| span, opentelemetry.instrumentation.grpc | ||||||
| ) | ||||||
|
|
||||||
| self.assertSpanHasAttributes( | ||||||
| span, | ||||||
| { | ||||||
| RPC_METHOD: "BidirectionalStreamingMethod", | ||||||
| RPC_SERVICE: "GRPCTestServer", | ||||||
| RPC_SYSTEM: "grpc", | ||||||
| RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ | ||||||
| 0 | ||||||
| ], | ||||||
| }, | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| def test_error_simple(self): | ||||||
| with self.assertRaises(grpc.RpcError): | ||||||
| simple_method(self._stub, error=True) | ||||||
|
|
@@ -296,7 +368,7 @@ def invoker(_request, _metadata): | |||||
| self.assertEqual(span_end_mock.call_count, 1) | ||||||
|
|
||||||
| def test_client_interceptor_trace_context_propagation( | ||||||
| self, | ||||||
| self, | ||||||
| ): # pylint: disable=no-self-use | ||||||
| """ensure that client interceptor correctly inject trace context into all outgoing requests.""" | ||||||
| previous_propagator = get_global_textmap() | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.