|
53 | 53 | from twisted.internet import defer |
54 | 54 |
|
55 | 55 | from synapse.appservice import ApplicationServiceState |
| 56 | +from synapse.metrics.background_process_metrics import run_as_background_process |
56 | 57 | from synapse.util.logcontext import run_in_background |
57 | | -from synapse.util.metrics import Measure |
58 | 58 |
|
59 | 59 | logger = logging.getLogger(__name__) |
60 | 60 |
|
@@ -104,27 +104,34 @@ def __init__(self, txn_ctrl, clock): |
104 | 104 | self.clock = clock |
105 | 105 |
|
106 | 106 | def enqueue(self, service, event): |
107 | | - # if this service isn't being sent something |
108 | 107 | self.queued_events.setdefault(service.id, []).append(event) |
109 | | - run_in_background(self._send_request, service) |
110 | 108 |
|
111 | | - @defer.inlineCallbacks |
112 | | - def _send_request(self, service): |
| 109 | + # start a sender for this appservice if we don't already have one |
| 110 | + |
113 | 111 | if service.id in self.requests_in_flight: |
114 | 112 | return |
115 | 113 |
|
| 114 | + run_as_background_process( |
| 115 | + "as-sender-%s" % (service.id, ), |
| 116 | + self._send_request, service, |
| 117 | + ) |
| 118 | + |
| 119 | + @defer.inlineCallbacks |
| 120 | + def _send_request(self, service): |
| 121 | + # sanity-check: we shouldn't get here if this service already has a sender |
| 122 | + # running. |
| 123 | + assert(service.id not in self.requests_in_flight) |
| 124 | + |
116 | 125 | self.requests_in_flight.add(service.id) |
117 | 126 | try: |
118 | 127 | while True: |
119 | 128 | events = self.queued_events.pop(service.id, []) |
120 | 129 | if not events: |
121 | 130 | return |
122 | | - |
123 | | - with Measure(self.clock, "servicequeuer.send"): |
124 | | - try: |
125 | | - yield self.txn_ctrl.send(service, events) |
126 | | - except Exception: |
127 | | - logger.exception("AS request failed") |
| 131 | + try: |
| 132 | + yield self.txn_ctrl.send(service, events) |
| 133 | + except Exception: |
| 134 | + logger.exception("AS request failed") |
128 | 135 | finally: |
129 | 136 | self.requests_in_flight.discard(service.id) |
130 | 137 |
|
@@ -223,7 +230,12 @@ def __init__(self, clock, store, as_api, service, callback): |
223 | 230 | self.backoff_counter = 1 |
224 | 231 |
|
225 | 232 | def recover(self): |
226 | | - self.clock.call_later((2 ** self.backoff_counter), self.retry) |
| 233 | + def _retry(): |
| 234 | + run_as_background_process( |
| 235 | + "as-recoverer-%s" % (self.service.id,), |
| 236 | + self.retry, |
| 237 | + ) |
| 238 | + self.clock.call_later((2 ** self.backoff_counter), _retry) |
227 | 239 |
|
228 | 240 | def _backoff(self): |
229 | 241 | # cap the backoff to be around 8.5min => (2^9) = 512 secs |
|
0 commit comments