Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19668.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a way to re-sign local events with a new signing key.
3 changes: 3 additions & 0 deletions docs/usage/administration/admin_api/background_updates.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,6 @@ The following JSON body parameters are available:
- `job_name` - A string which job to run. Valid values are:
- `populate_stats_process_rooms` - Recalculate the stats for all rooms.
- `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync.
- `event_resign` - Re-sign all locally-sent events with the current signing key. This is useful after rotating the server's signing key to ensure all historical events are signed with the new key. Optional additional parameters:
- `old_key` - Only re-sign events whose signature verifies against this key. Format: `"ed25519:key_id base64_public_key"` (e.g. `"ed25519:my_old_key XGX0JRS2Af3be3knz2fBiRbApjm2Dh61gXDJA8kcJNI"`).
- `before_ts` - Only re-sign events with a `received_ts` less than this value (milliseconds since the epoch).
55 changes: 53 additions & 2 deletions synapse/crypto/event_signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@

from canonicaljson import encode_canonical_json
from signedjson.sign import sign_json
from signedjson.types import SigningKey
from signedjson.types import SigningKey, VerifyKey
from unpaddedbase64 import decode_base64, encode_base64

from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase
from synapse.events.utils import prune_event, prune_event_dict
from synapse.logging.opentracing import trace
from synapse.types import JsonDict
from synapse.types import JsonDict, UserID

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -192,3 +192,54 @@ def add_hashes_and_signatures(
event_dict["signatures"] = compute_event_signature(
room_version, event_dict, signature_name=signature_name, signing_key=signing_key
)


def resign_event(
ev: EventBase,
server_name: str,
signing_key: SigningKey,
time_now: int | None = None,
) -> JsonDict:
"""Re-sign the provided event with the given signing key. Any existing signatures on the event
for this server_name are removed.

If there has been no signature for this event by this server_name, the event is still re-signed.
If there have been signatures on this event by this server_name, the event is not re-checked for
validity. As such, only events that have valid signatures should be passed into this function
e.g. from the event_json table in the database.
"""
event_dict = ev.get_pdu_json(time_now=time_now)
event_dict["signatures"].pop(
server_name, None
) # remove existing signatures for this server_name
event_dict["signatures"].update(
compute_event_signature(
ev.room_version,
event_dict,
server_name,
signing_key,
)
)
return event_dict


def event_needs_resigning(
Comment thread
erikjohnston marked this conversation as resolved.
ev: EventBase, server_name: str, verify_key: VerifyKey
) -> bool:
"""Check if this event needs re-signing.

This returns True if all of the following are True:
- the event `sender` domain matches the `server_name` provided.
- the event has not been already signed with this `verify_key`.
"""
sender = UserID.from_string(ev.sender)
if sender.domain != server_name:
return False
want_key_id = verify_key.alg + ":" + verify_key.version
signed_with_current_key_id = ev.signatures.get(server_name, {}).get(
want_key_id, None
)
if signed_with_current_key_id:
return False

return True
19 changes: 19 additions & 0 deletions synapse/rest/admin/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
import json
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -150,6 +151,24 @@ async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]:
"populate_user_directory_process_users",
),
]
elif job_name == "event_resign":
old_key = body.get("old_key")
if old_key is not None and not isinstance(old_key, str):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"'old_key' must be a string",
)
before_ts = body.get("before_ts")
if before_ts is not None and not isinstance(before_ts, int):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"'before_ts' must be an integer",
)
progress = {
"old_key": old_key,
"before_ts": before_ts,
}
jobs = [("event_resign", json.dumps(progress), "")]
else:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")

Expand Down
193 changes: 190 additions & 3 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from typing import TYPE_CHECKING, cast

import attr
from signedjson.key import decode_verify_key_base64, get_verify_key
from signedjson.sign import SignatureVerifyException, verify_signed_json

from synapse.api.constants import (
MAX_DEPTH,
Expand All @@ -31,14 +33,20 @@
RelationTypes,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import (
event_needs_resigning,
resign_event,
)
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_tuple_comparison_clause,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events import (
SLIDING_SYNC_RELEVANT_STATE_SET,
PersistEventsStore,
Expand All @@ -48,6 +56,7 @@
)
from synapse.storage.databases.main.events_worker import (
DatabaseCorruptionError,
EventRedactBehaviour,
InvalidEventError,
)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
Expand Down Expand Up @@ -112,7 +121,9 @@ class _JoinedRoomStreamOrderingUpdate:
most_recent_bump_stamp: int | None


class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore):
class EventsBackgroundUpdatesStore(
StreamWorkerStore, StateDeltasStore, CacheInvalidationWorkerStore, SQLBaseStore
):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -346,6 +357,11 @@ def __init__(
_BackgroundUpdates.FIXUP_MAX_DEPTH_CAP, self.fixup_max_depth_cap_bg_update
)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENT_RESIGN,
self._resign_events,
)

# We want this to run on the main database at startup before we start processing
# events.
#
Expand Down Expand Up @@ -1370,7 +1386,7 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
)

# Iterate the parent IDs and invalidate caches.
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
self._invalidate_cache_and_stream_bulk(
txn,
self.get_relations_for_event, # type: ignore[attr-defined]
{
Expand All @@ -1381,7 +1397,7 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
for r in relations_to_insert
},
)
self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
self._invalidate_cache_and_stream_bulk(
txn,
self.get_thread_summary, # type: ignore[attr-defined]
{(r[1],) for r in relations_to_insert},
Expand Down Expand Up @@ -2713,6 +2729,177 @@ def redo_max_depth_bg_update_txn(txn: LoggingTransaction) -> tuple[bool, int]:

return num_rooms

async def _resign_events(self, progress: dict, batch_size: int) -> int:
"""Retroactively re-sign events signed with a different key than the
current signing key.

Optional progress parameters:
old_key: If set, only re-sign events whose signature can be
verified with this key. Format: "algorithm:key_id base64key"
(e.g. "ed25519:my_old_key XGX0JRS2Af3be3k...").
before_ts: If set, only re-sign events with a received_ts less
than this value (milliseconds since epoch).
"""

# Read optional filter parameters from progress. These are set once
# when the job is created and preserved across batches.
old_key_str: str | None = progress.get("old_key")
before_ts: int | None = progress.get("before_ts")

# Parse the old verify key if provided.
old_verify_key = None
if old_key_str is not None:
parts = old_key_str.split(" ", 1)
if len(parts) == 2:
key_id, key_base64 = parts
alg, _, version = key_id.partition(":")
old_verify_key = decode_verify_key_base64(alg, version, key_base64)
else:
raise ValueError(
f"Invalid old_key format: expected 'algorithm:version base64key', got {old_key_str!r}"
)

# Load the next set of candidate events to re-sign.
# Returns the event IDs and the highest stream position for those events.
# If no event IDs are returned, this signals the background update is complete.
def _fetch_next_events_txn(
txn: LoggingTransaction,
) -> tuple[list[str], int]:
# Start from the minimum 32-bit integer to ensure we cover events
# with negative stream orderings (e.g. from backfill).
last_stream_pos: int = progress.get("last_stream_pos", -(1 << 31))

sql = """
SELECT event_id, stream_ordering FROM events
WHERE stream_ordering > ? AND sender LIKE ?
"""
args: list[object] = [
last_stream_pos,
f"%:{self.hs.hostname}",
]

if before_ts is not None:
sql += " AND received_ts < ?"
args.append(before_ts)

sql += " ORDER BY stream_ordering ASC LIMIT ?"
args.append(batch_size)

txn.execute(sql, args)
event_rows: list[tuple[str, int]] = txn.fetchall()
if not event_rows:
return [], last_stream_pos

last_stream_pos = event_rows[-1][1]
return [row[0] for row in event_rows], last_stream_pos

next_event_ids, max_stream_pos = await self.db_pool.runInteraction(
"_resign_events._fetch_next_events",
_fetch_next_events_txn,
)
logger.debug(
"Resign[num_checking=%d,sp=%d]", len(next_event_ids), max_stream_pos
)

if not next_event_ids:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENT_RESIGN
)
return 0

next_events = await self.get_events_as_list(
next_event_ids,
redact_behaviour=EventRedactBehaviour.as_is,
)
verify_key = get_verify_key(self.hs.signing_key)
Comment thread
sandhose marked this conversation as resolved.
Outdated

# Re-sign any events that need it.
# A list of event IDs and their newly signed event dicts.
resigned_events: list[tuple[str, JsonDict]] = []
for event in next_events:
if not event_needs_resigning(event, self.hs.hostname, verify_key):
continue

# If old_key is set, only re-sign events whose signature verifies
# with the provided old key.
if old_verify_key is not None:
old_key_id = f"{old_verify_key.alg}:{old_verify_key.version}"
server_sigs = event.signatures.get(self.hs.hostname, {})
if old_key_id not in server_sigs:
# Event wasn't signed with this key ID at all, skip.
continue

# Verify the signature is genuinely from this key. We prune
# first since signatures are computed over the redacted form.
pruned = prune_event_dict(event.room_version, event.get_pdu_json())
try:
verify_signed_json(pruned, self.hs.hostname, old_verify_key)
except SignatureVerifyException:
# In this case, the key ID was right but the signature doesn't match
# the public key we had. We definitely need to log about this.
logger.warning(
"Event %s has a signature for key %s that does not "
"verify — skipping",
event.event_id,
old_key_id,
)
continue

event_dict = resign_event(event, self.hs.hostname, self.hs.signing_key)
resigned_events.append((event.event_id, event_dict))

# Atomically write the new stream pos progress with the new signatures,
# else we may update the pos and crash before writing the new
# signatures, thus not re-signing at all!
def _write_events_txn(
txn: LoggingTransaction,
events_to_write: list[tuple[str, JsonDict]],
max_stream_pos: int,
) -> None:
if events_to_write:
self.db_pool.simple_update_many_txn(
txn,
"event_json",
key_names=["event_id"],
key_values=[[event_id] for event_id, _ in events_to_write],
value_names=["json"],
value_values=[
[json_encoder.encode(event_dict)]
for _, event_dict in events_to_write
],
)
# Always update the progress even if we re-sign nothing.
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENT_RESIGN,
progress={
"last_stream_pos": max_stream_pos,
"old_key": old_key_str,
"before_ts": before_ts,
},
)

# Invalidate the event cache for re-signed events so that other
# workers also pick up the new signatures.
for event_id, _ in events_to_write:
self.invalidate_get_event_cache_after_txn(txn, event_id)
self._send_invalidation_to_replication(
txn, "_get_event_cache", (event_id,)
)

await self.db_pool.runInteraction(
"_resign_events._write_events_txn",
_write_events_txn,
resigned_events,
max_stream_pos,
)

Comment thread
sandhose marked this conversation as resolved.
logger.info("Re-signed %d events", len(resigned_events))

# Even if we don't re-sign them, we need to let the background updater
# know we're still churning through the events.
return len(next_event_ids)


def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
Expand Down
2 changes: 2 additions & 0 deletions synapse/types/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ class _BackgroundUpdates:
FIXUP_MAX_DEPTH_CAP = "fixup_max_depth_cap"

REDACTIONS_RECHECK_BG_UPDATE = "redactions_recheck"

EVENT_RESIGN = "event_resign"
Loading
Loading