|
22 | 22 | import logging |
23 | 23 | import random |
24 | 24 | from http import HTTPStatus |
25 | | -from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple |
| 25 | +from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple |
26 | 26 |
|
27 | 27 | from canonicaljson import encode_canonical_json |
28 | 28 |
|
|
55 | 55 | from synapse.event_auth import validate_event_for_room_version |
56 | 56 | from synapse.events import EventBase, relation_from_event |
57 | 57 | from synapse.events.builder import EventBuilder |
58 | | -from synapse.events.snapshot import EventContext, UnpersistedEventContextBase |
| 58 | +from synapse.events.snapshot import ( |
| 59 | + EventContext, |
| 60 | + UnpersistedEventContext, |
| 61 | + UnpersistedEventContextBase, |
| 62 | +) |
59 | 63 | from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field |
60 | 64 | from synapse.events.validator import EventValidator |
61 | 65 | from synapse.handlers.directory import DirectoryHandler |
|
66 | 70 | from synapse.replication.http.send_events import ReplicationSendEventsRestServlet |
67 | 71 | from synapse.storage.databases.main.events_worker import EventRedactBehaviour |
68 | 72 | from synapse.types import ( |
| 73 | + JsonDict, |
69 | 74 | PersistedEventPosition, |
70 | 75 | Requester, |
71 | 76 | RoomAlias, |
@@ -1520,6 +1525,92 @@ async def handle_new_client_event( |
1520 | 1525 |
|
1521 | 1526 | return result |
1522 | 1527 |
|
| 1528 | + async def create_and_send_new_client_events( |
| 1529 | + self, |
| 1530 | + requester: Requester, |
| 1531 | + room_id: str, |
| 1532 | + prev_event_id: str, |
| 1533 | + event_dicts: Sequence[JsonDict], |
| 1534 | + ratelimit: bool = True, |
| 1535 | + ignore_shadow_ban: bool = False, |
| 1536 | + ) -> None: |
| 1537 | + """Helper to create and send a batch of new client events. |
| 1538 | +
|
| 1539 | + This supports sending membership events in very limited circumstances |
| 1540 | + (namely that the event is valid as is and doesn't need federation |
| 1541 | + requests or anything). Callers should prefer to use `update_membership`, |
| 1542 | + which correctly handles membership events in all cases. We allow |
| 1543 | + sending membership events here as its useful when copying e.g. bans |
| 1544 | + between rooms. |
| 1545 | +
|
| 1546 | + All other events and state events are supported. |
| 1547 | +
|
| 1548 | + Args: |
| 1549 | + requester: The requester sending the events. |
| 1550 | + room_id: The room ID to send the events in. |
| 1551 | + prev_event_id: The event ID to use as the previous event for the first |
| 1552 | + of the events, must have already been persisted. |
| 1553 | + event_dicts: A sequence of event dictionaries to create and send. |
| 1554 | + ratelimit: Whether to rate limit this send. |
| 1555 | + ignore_shadow_ban: True if shadow-banned users should be allowed to |
| 1556 | + send these events. |
| 1557 | + """ |
| 1558 | + |
| 1559 | + if not event_dicts: |
| 1560 | + # Nothing to do. |
| 1561 | + return |
| 1562 | + |
| 1563 | + state_groups = await self._storage_controllers.state.get_state_group_for_events( |
| 1564 | + [prev_event_id] |
| 1565 | + ) |
| 1566 | + if prev_event_id not in state_groups: |
| 1567 | + # This should only happen if we got passed a prev event ID that |
| 1568 | + # hasn't been persisted yet. |
| 1569 | + raise Exception("Previous event ID not found ") |
| 1570 | + |
| 1571 | + current_state_group = state_groups[prev_event_id] |
| 1572 | + state_map = await self._storage_controllers.state.get_state_ids_for_group( |
| 1573 | + current_state_group |
| 1574 | + ) |
| 1575 | + |
| 1576 | + events_and_contexts_to_send = [] |
| 1577 | + state_map = dict(state_map) |
| 1578 | + depth = None |
| 1579 | + |
| 1580 | + for event_dict in event_dicts: |
| 1581 | + event, context = await self.create_event( |
| 1582 | + requester=requester, |
| 1583 | + event_dict=event_dict, |
| 1584 | + prev_event_ids=[prev_event_id], |
| 1585 | + depth=depth, |
| 1586 | + # Take a copy to ensure each event gets a unique copy of |
| 1587 | + # state_map since it is modified below. |
| 1588 | + state_map=dict(state_map), |
| 1589 | + for_batch=True, |
| 1590 | + ) |
| 1591 | + events_and_contexts_to_send.append((event, context)) |
| 1592 | + |
| 1593 | + prev_event_id = event.event_id |
| 1594 | + depth = event.depth + 1 |
| 1595 | + if event.is_state(): |
| 1596 | + # If this is a state event, we need to update the state map |
| 1597 | + # so that it can be used for the next event. |
| 1598 | + state_map[(event.type, event.state_key)] = event.event_id |
| 1599 | + |
| 1600 | + datastore = self.hs.get_datastores().state |
| 1601 | + events_and_context = ( |
| 1602 | + await UnpersistedEventContext.batch_persist_unpersisted_contexts( |
| 1603 | + events_and_contexts_to_send, room_id, current_state_group, datastore |
| 1604 | + ) |
| 1605 | + ) |
| 1606 | + |
| 1607 | + await self.handle_new_client_event( |
| 1608 | + requester, |
| 1609 | + events_and_context, |
| 1610 | + ignore_shadow_ban=ignore_shadow_ban, |
| 1611 | + ratelimit=ratelimit, |
| 1612 | + ) |
| 1613 | + |
1523 | 1614 | async def _persist_events( |
1524 | 1615 | self, |
1525 | 1616 | requester: Requester, |
|
0 commit comments