Skip to content

Commit 4eb24c2

Browse files
committed
Test solution with add done callback
1 parent 4bf3577 commit 4eb24c2

4 files changed

Lines changed: 82 additions & 28 deletions

File tree

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
import logging
2323
from collections import OrderedDict
24+
from functools import partial
2425
from typing import Callable, MutableMapping
2526

2627
import grpc
27-
2828
from opentelemetry import context, trace
2929
from opentelemetry.instrumentation.grpc import grpcext
3030
from opentelemetry.instrumentation.grpc._utilities import RpcInfo
@@ -72,12 +72,11 @@ def _safe_invoke(function: Callable, *args):
7272
"Error when invoking function '%s'", function_name, exc_info=ex
7373
)
7474

75-
7675
class OpenTelemetryClientInterceptor(
7776
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
7877
):
7978
def __init__(
80-
self, tracer, filter_=None, request_hook=None, response_hook=None
79+
self, tracer, filter_=None, request_hook=None, response_hook=None
8180
):
8281
self._tracer = tracer
8382
self._filter = filter_
@@ -131,10 +130,10 @@ def _intercept(self, request, metadata, client_info, invoker):
131130
else:
132131
mutable_metadata = OrderedDict(metadata)
133132
with self._start_span(
134-
client_info.full_method,
135-
end_on_exit=False,
136-
record_exception=False,
137-
set_status_on_exception=False,
133+
client_info.full_method,
134+
end_on_exit=False,
135+
record_exception=False,
136+
set_status_on_exception=False,
138137
) as span:
139138
result = None
140139
try:
@@ -188,14 +187,17 @@ def intercept_unary(self, request, metadata, client_info, invoker):
188187
# the span across the generated responses and detect any errors, we wrap
189188
# the result in a new generator that yields the response values.
190189
def _intercept_server_stream(
191-
self, request_or_iterator, metadata, client_info, invoker
190+
self, request_or_iterator, metadata, client_info, invoker
192191
):
193192
if not metadata:
194193
mutable_metadata = OrderedDict()
195194
else:
196195
mutable_metadata = OrderedDict(metadata)
197196

198-
with self._start_span(client_info.full_method) as span:
197+
with self._start_span(
198+
client_info.full_method,
199+
end_on_exit=False
200+
) as span:
199201
inject(mutable_metadata, setter=_carrier_setter)
200202
metadata = tuple(mutable_metadata.items())
201203
rpc_info = RpcInfo(
@@ -207,17 +209,29 @@ def _intercept_server_stream(
207209
if client_info.is_client_stream:
208210
rpc_info.request = request_or_iterator
209211

210-
try:
211-
yield from invoker(request_or_iterator, metadata)
212-
except grpc.RpcError as err:
213-
span.set_status(Status(StatusCode.ERROR))
214-
span.set_attribute(
215-
SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0]
216-
)
217-
raise err
212+
stream = invoker(request_or_iterator, metadata)
213+
214+
def done_callback(future, span_):
215+
try:
216+
future.result()
217+
except grpc.FutureCancelledError:
218+
span_.set_status(Status(StatusCode.OK))
219+
span_.set_attribute(
220+
SpanAttributes.RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0]
221+
)
222+
except grpc.RpcError as err:
223+
span_.set_status(Status(StatusCode.ERROR))
224+
span_.set_attribute(
225+
SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0]
226+
)
227+
finally:
228+
span_.end()
229+
230+
stream.add_done_callback(partial(done_callback, span_=span))
231+
return stream
218232

219233
def intercept_stream(
220-
self, request_or_iterator, metadata, client_info, invoker
234+
self, request_or_iterator, metadata, client_info, invoker
221235
):
222236
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
223237
return invoker(request_or_iterator, metadata)

instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@ def request_messages():
4343
stub.ClientStreamingMethod(request_messages())
4444

4545

46-
def server_streaming_method(stub, error=False):
46+
def server_streaming_method(stub, error=False, serialize=True):
4747
request = Request(
4848
client_id=CLIENT_ID, request_data="error" if error else "data"
4949
)
5050
response_iterator = stub.ServerStreamingMethod(request)
51-
list(response_iterator)
51+
if serialize:
52+
list(response_iterator)
53+
return response_iterator
5254

5355

5456
def bidirectional_streaming_method(stub, error=False):

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
# pylint:disable=cyclic-import
15+
from time import sleep
1516

1617
import grpc
1718
from tests.protobuf import ( # pylint: disable=no-name-in-module
@@ -55,32 +56,32 @@ def __init__(self):
5556
pass
5657

5758
def intercept_unary_unary(
58-
self, continuation, client_call_details, request
59+
self, continuation, client_call_details, request
5960
):
6061
return self._intercept_call(continuation, client_call_details, request)
6162

6263
def intercept_unary_stream(
63-
self, continuation, client_call_details, request
64+
self, continuation, client_call_details, request
6465
):
6566
return self._intercept_call(continuation, client_call_details, request)
6667

6768
def intercept_stream_unary(
68-
self, continuation, client_call_details, request_iterator
69+
self, continuation, client_call_details, request_iterator
6970
):
7071
return self._intercept_call(
7172
continuation, client_call_details, request_iterator
7273
)
7374

7475
def intercept_stream_stream(
75-
self, continuation, client_call_details, request_iterator
76+
self, continuation, client_call_details, request_iterator
7677
):
7778
return self._intercept_call(
7879
continuation, client_call_details, request_iterator
7980
)
8081

8182
@staticmethod
8283
def _intercept_call(
83-
continuation, client_call_details, request_or_iterator
84+
continuation, client_call_details, request_or_iterator
8485
):
8586
return continuation(client_call_details, request_or_iterator)
8687

@@ -93,7 +94,9 @@ def setUp(self):
9394
self.server.start()
9495
# use a user defined interceptor along with the opentelemetry client interceptor
9596
interceptors = [Interceptor()]
96-
self.channel = grpc.insecure_channel("localhost:25565")
97+
self.channel = grpc.insecure_channel("localhost:25565", options=[
98+
# (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
99+
])
97100
self.channel = grpc.intercept_channel(self.channel, *interceptors)
98101
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
99102

@@ -169,6 +172,39 @@ def test_unary_stream(self):
169172
},
170173
)
171174

175+
def test_unary_stream_can_be_cancel(self):
176+
responses = server_streaming_method(self._stub)
177+
for i, _ in enumerate(responses):
178+
if i == 1:
179+
responses.cancel()
180+
break
181+
sleep(10)
182+
self.server.stop(None)
183+
self.channel.close()
184+
spans = self.memory_exporter.get_finished_spans()
185+
self.assertEqual(len(spans), 1)
186+
span = spans[0]
187+
188+
self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod")
189+
self.assertIs(span.kind, trace.SpanKind.CLIENT)
190+
191+
# Check version and name in span's instrumentation info
192+
self.assertEqualSpanInstrumentationInfo(
193+
span, opentelemetry.instrumentation.grpc
194+
)
195+
196+
self.assertSpanHasAttributes(
197+
span,
198+
{
199+
SpanAttributes.RPC_METHOD: "ServerStreamingMethod",
200+
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
201+
SpanAttributes.RPC_SYSTEM: "grpc",
202+
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[
203+
0
204+
],
205+
},
206+
)
207+
172208
def test_stream_unary(self):
173209
client_streaming_method(self._stub)
174210
spans = self.memory_exporter.get_finished_spans()
@@ -272,7 +308,7 @@ def test_error_stream_stream(self):
272308
)
273309

274310
def test_client_interceptor_trace_context_propagation(
275-
self,
311+
self,
276312
): # pylint: disable=no-self-use
277313
"""ensure that client interceptor correctly inject trace context into all outgoing requests."""
278314
previous_propagator = get_global_textmap()

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ def setUp(self):
9898
self.server.start()
9999
# use a user defined interceptor along with the opentelemetry client interceptor
100100
interceptors = [Interceptor()]
101-
self.channel = grpc.insecure_channel("localhost:25565")
101+
self.channel = grpc.insecure_channel("localhost:25565",options=[
102+
(grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
103+
])
102104
self.channel = grpc.intercept_channel(self.channel, *interceptors)
103105
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
104106

0 commit comments

Comments
 (0)