Skip to content

Commit 190686c

Browse files
committed
Source ModuleApi from hs and use defer_to_thread
We also make `store_file` and `fetch` async, as they are async in the base class. This also simplifies the implementation. We could go through and convert the whole module from deferreds to async, but that should be done separately.
1 parent 5bdb5d9 commit 190686c

1 file changed

Lines changed: 21 additions & 30 deletions

File tree

s3_storage_provider.py

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
import boto3
2424
import botocore
2525

26-
from twisted.internet import defer, reactor, threads
26+
from twisted.internet import defer, reactor
2727
from twisted.python.failure import Failure
28-
from twisted.python.threadpool import ThreadPool
2928

30-
from synapse.module_api import run_in_background
29+
from synapse.logging.context import make_deferred_yieldable
30+
from synapse.module_api import ModuleApi
3131
from synapse.rest.media.v1._base import Responder
3232
from synapse.rest.media.v1.storage_provider import StorageProvider
3333

@@ -54,6 +54,7 @@ class S3StorageProviderBackend(StorageProvider):
5454
"""
5555

5656
def __init__(self, hs, config):
57+
self._module_api: ModuleApi = hs.get_module_api()
5758
self.cache_directory = hs.config.media.media_store_path
5859
self.bucket = config["bucket"]
5960
self.prefix = config["prefix"]
@@ -82,15 +83,6 @@ def __init__(self, hs, config):
8283
self._s3_client_lock = threading.Lock()
8384

8485
threadpool_size = config.get("threadpool_size", 40)
85-
self._s3_pool = ThreadPool(name="s3-pool", maxthreads=threadpool_size)
86-
self._s3_pool.start()
87-
88-
# Manually stop the thread pool on shutdown. If we don't do this then
89-
# stopping Synapse takes an extra ~30s as Python waits for the threads
90-
# to exit.
91-
reactor.addSystemEventTrigger(
92-
"during", "shutdown", self._s3_pool.stop,
93-
)
9486

9587
def _get_s3_client(self):
9688
# this method is designed to be thread-safe, so that we can share a
@@ -112,32 +104,31 @@ def _get_s3_client(self):
112104
self._s3_client = s3 = b3_session.client("s3", **self.api_kwargs)
113105
return s3
114106

115-
def store_file(self, path, file_info):
107+
async def store_file(self, path, file_info):
116108
"""See StorageProvider.store_file"""
117109

118-
def _store_file():
119-
self._get_s3_client().upload_file(
120-
Filename=os.path.join(self.cache_directory, path),
121-
Bucket=self.bucket,
122-
Key=self.prefix + path,
123-
ExtraArgs=self.extra_args,
124-
)
125-
126-
return run_in_background(
127-
threads.deferToThreadPool, reactor, self._s3_pool, _store_file
110+
return await self._module_api.defer_to_thread(
111+
self._get_s3_client().upload_file,
112+
Filename=os.path.join(self.cache_directory, path),
113+
Bucket=self.bucket,
114+
Key=self.prefix + path,
115+
ExtraArgs=self.extra_args,
128116
)
129117

130-
def fetch(self, path, file_info):
118+
async def fetch(self, path, file_info):
131119
"""See StorageProvider.fetch"""
132120
d = defer.Deferred()
133121

134-
def _get_file():
135-
s3_download_task(
136-
self._get_s3_client(), self.bucket, self.prefix + path, self.extra_args, d
137-
)
122+
await self._module_api.defer_to_thread(
123+
s3_download_task,
124+
self._get_s3_client(),
125+
self.bucket,
126+
self.prefix + path,
127+
self.extra_args,
128+
d,
129+
)
138130

139-
run_in_background(self._s3_pool.callInThread, _get_file)
140-
return d
131+
return await make_deferred_yieldable(d)
141132

142133
@staticmethod
143134
def parse_config(config):

0 commit comments

Comments
 (0)