Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit a559c8b

Browse files
authored
Respect the @cancellable flag for ReplicationEndpoints (#12700)
While `ReplicationEndpoint`s register themselves via `JsonResource`, they pass a method that calls the handler, instead of the handler itself, to `register_paths`. As a result, `JsonResource` will not correctly pick up the `@cancellable` flag and we have to apply it ourselves. Signed-off-by: Sean Quah <seanq@element.io>
1 parent 9d8e380 commit a559c8b

4 files changed

Lines changed: 139 additions & 2 deletions

File tree

changelog.d/12700.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Respect the `@cancellable` flag for `ReplicationEndpoint`s.

synapse/replication/http/_base.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626

2727
from synapse.api.errors import HttpResponseException, SynapseError
2828
from synapse.http import RequestTimedOutError
29-
from synapse.http.server import HttpServer
29+
from synapse.http.server import HttpServer, is_method_cancellable
30+
from synapse.http.site import SynapseRequest
3031
from synapse.logging import opentracing
3132
from synapse.logging.opentracing import trace
3233
from synapse.types import JsonDict
@@ -310,6 +311,12 @@ def register(self, http_server: HttpServer) -> None:
310311
url_args = list(self.PATH_ARGS)
311312
method = self.METHOD
312313

314+
if self.CACHE and is_method_cancellable(self._handle_request):
315+
raise Exception(
316+
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
317+
"is set. The cancellable flag would have no effect."
318+
)
319+
313320
if self.CACHE:
314321
url_args.append("txn_id")
315322

@@ -324,7 +331,7 @@ def register(self, http_server: HttpServer) -> None:
324331
)
325332

326333
async def _check_auth_and_handle(
327-
self, request: Request, **kwargs: Any
334+
self, request: SynapseRequest, **kwargs: Any
328335
) -> Tuple[int, JsonDict]:
329336
"""Called on new incoming requests when caching is enabled. Checks
330337
if there is a cached response for the request and returns that,
@@ -340,8 +347,18 @@ async def _check_auth_and_handle(
340347
if self.CACHE:
341348
txn_id = kwargs.pop("txn_id")
342349

350+
# We ignore the `@cancellable` flag, since cancellation wouldn't interupt
351+
# `_handle_request` and `ResponseCache` does not handle cancellation
352+
# correctly yet. In particular, there may be issues to do with logging
353+
# context lifetimes.
354+
343355
return await self.response_cache.wrap(
344356
txn_id, self._handle_request, request, **kwargs
345357
)
346358

359+
# The `@cancellable` decorator may be applied to `_handle_request`. But we
360+
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
361+
# so we have to set up the cancellable flag ourselves.
362+
request.is_render_cancellable = is_method_cancellable(self._handle_request)
363+
347364
return await self._handle_request(request, **kwargs)

tests/replication/http/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2022 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Copyright 2022 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from http import HTTPStatus
16+
from typing import Tuple
17+
18+
from twisted.web.server import Request
19+
20+
from synapse.api.errors import Codes
21+
from synapse.http.server import JsonResource, cancellable
22+
from synapse.replication.http import REPLICATION_PREFIX
23+
from synapse.replication.http._base import ReplicationEndpoint
24+
from synapse.server import HomeServer
25+
from synapse.types import JsonDict
26+
27+
from tests import unittest
28+
from tests.http.server._base import EndpointCancellationTestHelperMixin
29+
30+
31+
class CancellableReplicationEndpoint(ReplicationEndpoint):
32+
NAME = "cancellable_sleep"
33+
PATH_ARGS = ()
34+
CACHE = False
35+
36+
def __init__(self, hs: HomeServer):
37+
super().__init__(hs)
38+
self.clock = hs.get_clock()
39+
40+
@staticmethod
41+
async def _serialize_payload() -> JsonDict:
42+
return {}
43+
44+
@cancellable
45+
async def _handle_request( # type: ignore[override]
46+
self, request: Request
47+
) -> Tuple[int, JsonDict]:
48+
await self.clock.sleep(1.0)
49+
return HTTPStatus.OK, {"result": True}
50+
51+
52+
class UncancellableReplicationEndpoint(ReplicationEndpoint):
53+
NAME = "uncancellable_sleep"
54+
PATH_ARGS = ()
55+
CACHE = False
56+
57+
def __init__(self, hs: HomeServer):
58+
super().__init__(hs)
59+
self.clock = hs.get_clock()
60+
61+
@staticmethod
62+
async def _serialize_payload() -> JsonDict:
63+
return {}
64+
65+
async def _handle_request( # type: ignore[override]
66+
self, request: Request
67+
) -> Tuple[int, JsonDict]:
68+
await self.clock.sleep(1.0)
69+
return HTTPStatus.OK, {"result": True}
70+
71+
72+
class ReplicationEndpointCancellationTestCase(
73+
unittest.HomeserverTestCase, EndpointCancellationTestHelperMixin
74+
):
75+
"""Tests for `ReplicationEndpoint` cancellation."""
76+
77+
def create_test_resource(self):
78+
"""Overrides `HomeserverTestCase.create_test_resource`."""
79+
resource = JsonResource(self.hs)
80+
81+
CancellableReplicationEndpoint(self.hs).register(resource)
82+
UncancellableReplicationEndpoint(self.hs).register(resource)
83+
84+
return resource
85+
86+
def test_cancellable_disconnect(self) -> None:
87+
"""Test that handlers with the `@cancellable` flag can be cancelled."""
88+
path = f"{REPLICATION_PREFIX}/{CancellableReplicationEndpoint.NAME}/"
89+
channel = self.make_request("POST", path, await_result=False)
90+
self._test_disconnect(
91+
self.reactor,
92+
channel,
93+
expect_cancellation=True,
94+
expected_body={"error": "Request cancelled", "errcode": Codes.UNKNOWN},
95+
)
96+
97+
def test_uncancellable_disconnect(self) -> None:
98+
"""Test that handlers without the `@cancellable` flag cannot be cancelled."""
99+
path = f"{REPLICATION_PREFIX}/{UncancellableReplicationEndpoint.NAME}/"
100+
channel = self.make_request("POST", path, await_result=False)
101+
self._test_disconnect(
102+
self.reactor,
103+
channel,
104+
expect_cancellation=False,
105+
expected_body={"result": True},
106+
)

0 commit comments

Comments
 (0)