Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19668.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a way to re-sign local events with a new signing key.
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,4 @@ The following JSON body parameters are available:
- `job_name` - A string which job to run. Valid values are:
- `populate_stats_process_rooms` - Recalculate the stats for all rooms.
- `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync.
- `event_resign` - Re-sign all locally-sent events with the current signing key. This is useful after rotating the server's signing key to ensure all historical events are signed with the new key.
55 changes: 53 additions & 2 deletions synapse/crypto/event_signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@

from canonicaljson import encode_canonical_json
from signedjson.sign import sign_json
from signedjson.types import SigningKey
from signedjson.types import SigningKey, VerifyKey
from unpaddedbase64 import decode_base64, encode_base64

from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase
from synapse.events.utils import prune_event, prune_event_dict
from synapse.logging.opentracing import trace
from synapse.types import JsonDict
from synapse.types import JsonDict, UserID

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -192,3 +192,54 @@ def add_hashes_and_signatures(
event_dict["signatures"] = compute_event_signature(
room_version, event_dict, signature_name=signature_name, signing_key=signing_key
)


def resign_event(
ev: EventBase,
server_name: str,
signing_key: SigningKey,
time_now: int | None = None,
) -> JsonDict:
"""Re-sign the provided event with the given signing key. Any existing signatures on the event
for this server_name are removed.

If there has been no signature for this event by this server_name, the event is still re-signed.
If there have been signatures on this event by this server_name, the event is not re-checked for
validity. As such, only events that have valid signatures should be passed into this function
e.g. from the event_json table in the database.
"""
event_dict = ev.get_pdu_json(time_now=time_now)
event_dict["signatures"].pop(
server_name, None
) # remove existing signatures for this server_name
event_dict["signatures"].update(
compute_event_signature(
ev.room_version,
event_dict,
server_name,
signing_key,
)
)
return event_dict


def event_needs_resigning(
Comment thread
erikjohnston marked this conversation as resolved.
ev: EventBase, server_name: str, verify_key: VerifyKey
) -> bool:
"""Check if this event needs re-signing.

This returns True if all of the following are True:
- the event `sender` domain matches the `server_name` provided.
- the event has not been already signed with this `verify_key`.
"""
sender = UserID.from_string(ev.sender)
if sender.domain != server_name:
return False
want_key_id = verify_key.alg + ":" + verify_key.version
signed_with_current_key_id = ev.signatures.get(server_name, {}).get(
want_key_id, None
)
if signed_with_current_key_id:
return False

return True
2 changes: 2 additions & 0 deletions synapse/rest/admin/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]:
"populate_user_directory_process_users",
),
]
elif job_name == "event_resign":
jobs = [("event_resign", "{}", "")]
else:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")

Expand Down
129 changes: 126 additions & 3 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import TYPE_CHECKING, cast

import attr
from signedjson.key import get_verify_key

from synapse.api.constants import (
MAX_DEPTH,
Expand All @@ -31,6 +32,10 @@
RelationTypes,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import (
event_needs_resigning,
resign_event,
)
from synapse.events import EventBase, make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
Expand All @@ -39,6 +44,7 @@
LoggingTransaction,
make_tuple_comparison_clause,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events import (
SLIDING_SYNC_RELEVANT_STATE_SET,
PersistEventsStore,
Expand All @@ -48,6 +54,7 @@
)
from synapse.storage.databases.main.events_worker import (
DatabaseCorruptionError,
EventRedactBehaviour,
InvalidEventError,
)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
Expand Down Expand Up @@ -112,7 +119,9 @@ class _JoinedRoomStreamOrderingUpdate:
most_recent_bump_stamp: int | None


class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore):
class EventsBackgroundUpdatesStore(
StreamWorkerStore, StateDeltasStore, CacheInvalidationWorkerStore, SQLBaseStore
):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -346,6 +355,11 @@ def __init__(
_BackgroundUpdates.FIXUP_MAX_DEPTH_CAP, self.fixup_max_depth_cap_bg_update
)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENT_RESIGN,
self._resign_events,
)

# We want this to run on the main database at startup before we start processing
# events.
#
Expand Down Expand Up @@ -1370,7 +1384,7 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
)

# Iterate the parent IDs and invalidate caches.
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
self._invalidate_cache_and_stream_bulk(
txn,
self.get_relations_for_event, # type: ignore[attr-defined]
{
Expand All @@ -1381,7 +1395,7 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
for r in relations_to_insert
},
)
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
self._invalidate_cache_and_stream_bulk(
txn,
self.get_thread_summary, # type: ignore[attr-defined]
{(r[1],) for r in relations_to_insert},
Expand Down Expand Up @@ -2713,6 +2727,115 @@ def redo_max_depth_bg_update_txn(txn: LoggingTransaction) -> tuple[bool, int]:

return num_rooms

async def _resign_events(self, progress: dict, batch_size: int) -> int:
"""Retroactively re-sign events signed with a different key than the
current signing key."""

# Load the next set of candidate events to re-sign.
# Returns the event IDs and the highest stream position for those events.
# If no event IDs are returned, this signals the background update is complete.
def _fetch_next_events_txn(
txn: LoggingTransaction,
) -> tuple[list[str], int]:
# Events with negative stream ordering exist, but those are always
# from backfilling over federation. None of the locally sent events
# should have a negative stream ordering.
last_stream_pos: int = progress.get("last_stream_pos", 0)
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
txn.execute(
"""
SELECT event_id, stream_ordering FROM events
WHERE stream_ordering > ? AND sender LIKE ?
ORDER BY stream_ordering ASC LIMIT ?
""",
(
last_stream_pos,
f"%:{self.hs.hostname}",
batch_size,
),
)
event_rows: list[tuple[str, int]] = txn.fetchall()
if not event_rows:
return [], last_stream_pos

last_stream_pos = event_rows[-1][1]
return [row[0] for row in event_rows], last_stream_pos

next_event_ids, max_stream_pos = await self.db_pool.runInteraction(
"_resign_events._fetch_next_events",
_fetch_next_events_txn,
)
logger.debug(
"Resign[num_checking=%d,sp=%d]", len(next_event_ids), max_stream_pos
)

if not next_event_ids:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENT_RESIGN
)
return 0

next_events = await self.get_events_as_list(
next_event_ids,
redact_behaviour=EventRedactBehaviour.as_is,
)
verify_key = get_verify_key(self.hs.signing_key)
Comment thread
sandhose marked this conversation as resolved.
Outdated

# Re-sign any events that need it.
# A list of event IDs and their newly signed event dicts.
resigned_events: list[tuple[str, JsonDict]] = []
for event in next_events:
if not event_needs_resigning(event, self.hs.hostname, verify_key):
continue

event_dict = resign_event(event, self.hs.hostname, self.hs.signing_key)
resigned_events.append((event.event_id, event_dict))

# Atomically write the new stream pos progress with the new signatures,
# else we may update the pos and crash before writing the new
# signatures, thus not re-signing at all!
def _write_events_txn(
txn: LoggingTransaction,
events_to_write: list[tuple[str, JsonDict]],
max_stream_pos: int,
) -> None:
if events_to_write:
self.db_pool.simple_update_many_txn(
txn,
"event_json",
key_names=["event_id"],
key_values=[[event_id] for event_id, _ in events_to_write],
value_names=["json"],
value_values=[
[json_encoder.encode(event_dict)]
for _, event_dict in events_to_write
],
)
# Always update the progress even if we re-sign nothing.
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENT_RESIGN,
progress={"last_stream_pos": max_stream_pos},
)

# Invalidate the event cache for re-signed events so that other
# workers also pick up the new signatures.
for event_id, _ in events_to_write:
self.invalidate_get_event_cache_after_txn(txn, event_id)
self._send_invalidation_to_replication(
txn, "_get_event_cache", (event_id,)
)

await self.db_pool.runInteraction(
"_resign_events._write_events_txn",
_write_events_txn,
resigned_events,
max_stream_pos,
)

Comment thread
sandhose marked this conversation as resolved.
# Even if we don't re-sign them, we need to let the background updater
# know we're still churning through the events.
return len(next_event_ids)


def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
Expand Down
2 changes: 2 additions & 0 deletions synapse/types/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ class _BackgroundUpdates:
FIXUP_MAX_DEPTH_CAP = "fixup_max_depth_cap"

REDACTIONS_RECHECK_BG_UPDATE = "redactions_recheck"

EVENT_RESIGN = "event_resign"
Loading
Loading