2626
2727from twisted .internet import defer , reactor
2828from twisted .python .failure import Failure
29+ from twisted .python .threadpool import ThreadPool
2930
3031from synapse .logging .context import make_deferred_yieldable
3132from synapse .module_api import ModuleApi
@@ -89,6 +90,15 @@ def __init__(self, hs, config):
8990 self ._s3_client_lock = threading .Lock ()
9091
9192 threadpool_size = config .get ("threadpool_size" , 40 )
93+ self ._s3_pool = ThreadPool (name = "s3-pool" , maxthreads = threadpool_size )
94+ self ._s3_pool .start ()
95+
96+ # Manually stop the thread pool on shutdown. If we don't do this then
97+ # stopping Synapse takes an extra ~30s as Python waits for the threads
98+ # to exit.
99+ reactor .addSystemEventTrigger (
100+ "during" , "shutdown" , self ._s3_pool .stop ,
101+ )
92102
93103 def _get_s3_client (self ):
94104 # this method is designed to be thread-safe, so that we can share a
@@ -113,7 +123,8 @@ def _get_s3_client(self):
113123 async def store_file (self , path , file_info ):
114124 """See StorageProvider.store_file"""
115125
116- return await self ._module_api .defer_to_thread (
126+ return await self ._module_api .defer_to_threadpool (
127+ self ._s3_pool ,
117128 self ._get_s3_client ().upload_file ,
118129 Filename = os .path .join (self .cache_directory , path ),
119130 Bucket = self .bucket ,
@@ -125,7 +136,8 @@ async def fetch(self, path, file_info):
125136 """See StorageProvider.fetch"""
126137 d = defer .Deferred ()
127138
128- await self ._module_api .defer_to_thread (
139+ await self ._module_api .defer_to_threadpool (
140+ self ._s3_pool ,
129141 s3_download_task ,
130142 self ._get_s3_client (),
131143 self .bucket ,
0 commit comments