3232)
3333from synapse .metrics .background_process_metrics import run_as_background_process
3434from synapse .storage ._base import SQLBaseStore , db_to_json , make_in_list_sql_clause
35- from synapse .storage .database import Database , LoggingTransaction
35+ from synapse .storage .database import (
36+ Database ,
37+ LoggingTransaction ,
38+ make_tuple_comparison_clause ,
39+ )
3640from synapse .types import Collection , get_verify_key_from_cross_signing_key
3741from synapse .util .caches .descriptors import (
3842 Cache ,
4953 "drop_device_list_streams_non_unique_indexes"
5054)
5155
56+ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
57+
5258
5359class DeviceWorkerStore (SQLBaseStore ):
5460 def get_device (self , user_id , device_id ):
@@ -714,6 +720,11 @@ def __init__(self, database: Database, db_conn, hs):
714720 self ._drop_device_list_streams_non_unique_indexes ,
715721 )
716722
723+ # clear out duplicate device list outbound pokes
724+ self .db .updates .register_background_update_handler (
725+ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES , self ._remove_duplicate_outbound_pokes ,
726+ )
727+
717728 @defer .inlineCallbacks
718729 def _drop_device_list_streams_non_unique_indexes (self , progress , batch_size ):
719730 def f (conn ):
@@ -728,6 +739,66 @@ def f(conn):
728739 )
729740 return 1
730741
742+ async def _remove_duplicate_outbound_pokes (self , progress , batch_size ):
743+ # for some reason, we have accumulated duplicate entries in
744+ # device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
745+ # efficient.
746+ #
747+ # For each duplicate, we delete all the existing rows and put one back.
748+
749+ KEY_COLS = ["stream_id" , "destination" , "user_id" , "device_id" ]
750+ last_row = progress .get (
751+ "last_row" ,
752+ {"stream_id" : 0 , "destination" : "" , "user_id" : "" , "device_id" : "" },
753+ )
754+
755+ def _txn (txn ):
756+ clause , args = make_tuple_comparison_clause (
757+ self .db .engine , [(x , last_row [x ]) for x in KEY_COLS ]
758+ )
759+ sql = """
760+ SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
761+ FROM device_lists_outbound_pokes
762+ WHERE %s
763+ GROUP BY %s
764+ HAVING count(*) > 1
765+ ORDER BY %s
766+ LIMIT ?
767+ """ % (
768+ clause , # WHERE
769+ "," .join (KEY_COLS ), # GROUP BY
770+ "," .join (KEY_COLS ), # ORDER BY
771+ )
772+ txn .execute (sql , args + [batch_size ])
773+ rows = self .db .cursor_to_dict (txn )
774+
775+ row = None
776+ for row in rows :
777+ self .db .simple_delete_txn (
778+ txn , "device_lists_outbound_pokes" , {x : row [x ] for x in KEY_COLS },
779+ )
780+
781+ row ["sent" ] = False
782+ self .db .simple_insert_txn (
783+ txn , "device_lists_outbound_pokes" , row ,
784+ )
785+
786+ if row :
787+ self .db .updates ._background_update_progress_txn (
788+ txn , BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES , {"last_row" : row },
789+ )
790+
791+ return len (rows )
792+
793+ rows = await self .db .runInteraction (BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES , _txn )
794+
795+ if not rows :
796+ await self .db .updates ._end_background_update (
797+ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES
798+ )
799+
800+ return rows
801+
731802
732803class DeviceStore (DeviceWorkerStore , DeviceBackgroundUpdateStore ):
733804 def __init__ (self , database : Database , db_conn , hs ):
0 commit comments