Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 24 additions & 75 deletions src/matrixrtc/NewMembershipManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,22 @@ export interface IMembershipManager {
/* MembershipActionTypes:
┌─────────────────────┐
│SendFirstDelayedEvent│
| SendDelayedEvent |
└─────────────────────┘
┌─────────────┐
┌────────────│SendJoinEvent│────────────┐
│ └─────────────┘ │
│ ┌─────┐ ┌──────┐ │ ┌──────┐
▼ ▼ │ │ ▼ ▼ ▼ │
┌────────────┐ │ │ ┌───────────────────┐ │
│UpdateExpiry│ │ │ │RestartDelayedEvent│ │
└────────────┘ │ │ └───────────────────┘ │
│ │ │ │ │ │
└─────┘ └──────┘ │ │
│ │
┌────────────────────┐ │ │
│SendMainDelayedEvent│◄───────┘ │
└───────────────────┬┘ │
│ │
└─────────────────────┘
STOP ALL ABOVE
│ ┌─────┐ ┌──────┐ │
▼ ▼ │ │ ▼ ▼
┌────────────┐ │ │ ┌───────────────────┐
│UpdateExpiry│ │ │ │RestartDelayedEvent│
└────────────┘ │ │ └───────────────────┘
│ │ │ │
└─────┘ └──────┘

STOP ALL ABOVE
┌───────────────────────────────┐
│ SendScheduledDelayedLeaveEvent│
Expand All @@ -110,22 +105,19 @@ export interface IMembershipManager {
* @internal
*/
export enum MembershipActionType {
SendFirstDelayedEvent = "SendFirstDelayedEvent",
SendDelayedEvent = "SendDelayedEvent",
// -> MembershipActionType.SendJoinEvent if successful
// -> DelayedLeaveActionType.SendFirstDelayedEvent on error, retry sending the first delayed event.
SendJoinEvent = "SendJoinEvent",
// -> MembershipActionType.SendJoinEvent if we run into a rate limit and need to retry
// -> MembershipActionType.Update if we successfully send the join event then schedule the expire event update
// -> DelayedLeaveActionType.RestartDelayedEvent to recheck the delayed event
// -> DelayedLeaveActionType.RestartDelayedEvent on success start updating the delayed event
RestartDelayedEvent = "RestartDelayedEvent",
// -> DelayedLeaveActionType.SendMainDelayedEvent on missing delay id but there is a rtc state event
// -> DelayedLeaveActionType.SendFirstDelayedEvent on missing delay id and there is no state event
// -> DelayedLeaveActionType.RestartDelayedEvent on success we schedule the next restart
UpdateExpiry = "UpdateExpiry",
// -> MembershipActionType.Update if the timeout has passed so the next update is required.
SendMainDelayedEvent = "SendMainDelayedEvent",
// -> DelayedLeaveActionType.RestartDelayedEvent on success start updating the delayed event
// -> DelayedLeaveActionType.SendMainDelayedEvent on error try again
SendScheduledDelayedLeaveEvent = "SendScheduledDelayedLeaveEvent",
// -> MembershipActionType.SendLeaveEvent on failiour (not found) we need to send the leave manually and cannot use the scheduled delayed event
// -> DelayedLeaveActionType.SendScheduledDelayedLeaveEvent on error we try again.
Expand Down Expand Up @@ -244,7 +236,7 @@ export class MembershipManager implements IMembershipManager {
// If one of these actions are scheduled or are getting inserted in the next iteration, we should already
// take care of our missing membership.
const sendingMembershipActions = [
MembershipActionType.SendFirstDelayedEvent,
MembershipActionType.SendDelayedEvent,
MembershipActionType.SendJoinEvent,
];
logger.warn("Missing own membership: force re-join");
Expand Down Expand Up @@ -381,7 +373,7 @@ export class MembershipManager implements IMembershipManager {
private async membershipLoopHandler(type: MembershipActionType): Promise<ActionUpdate> {
this.oldStatus = this.status;
switch (type) {
case MembershipActionType.SendFirstDelayedEvent: {
case MembershipActionType.SendDelayedEvent: {
// Before we start we check if we come from a state where we have a delay id.
if (!this.state.delayId) {
return this.sendFirstDelayedLeaveEvent(); // Normal case without any previous delayed id.
Expand All @@ -399,17 +391,10 @@ export class MembershipManager implements IMembershipManager {
case MembershipActionType.RestartDelayedEvent: {
if (!this.state.delayId) {
// Delay id got reset. This action was used to check if the hs canceled the delayed event when the join state got sent.
return createInsertActionUpdate(
this.state.hasMemberStateEvent
? MembershipActionType.SendMainDelayedEvent
: MembershipActionType.SendFirstDelayedEvent,
);
return createInsertActionUpdate(MembershipActionType.SendDelayedEvent);
}
return this.restartDelayedEvent(this.state.delayId);
}
case MembershipActionType.SendMainDelayedEvent: {
return this.sendMainDelayedEvent();
}
case MembershipActionType.SendScheduledDelayedLeaveEvent: {
// We are already good
if (!this.state.hasMemberStateEvent) {
Expand Down Expand Up @@ -453,13 +438,13 @@ export class MembershipManager implements IMembershipManager {
)
.then((response) => {
// On success we reset retries and set delayId.
this.state.rateLimitRetries.set(MembershipActionType.SendFirstDelayedEvent, 0);
this.state.networkErrorRetries.set(MembershipActionType.SendFirstDelayedEvent, 0);
this.state.rateLimitRetries.set(MembershipActionType.SendDelayedEvent, 0);
this.state.networkErrorRetries.set(MembershipActionType.SendDelayedEvent, 0);
this.state.delayId = response.delay_id;
return createInsertActionUpdate(MembershipActionType.SendJoinEvent);
})
.catch((e) => {
const repeatActionType = MembershipActionType.SendFirstDelayedEvent;
const repeatActionType = MembershipActionType.SendDelayedEvent;
if (this.manageMaxDelayExceededSituation(e)) {
return createInsertActionUpdate(repeatActionType);
}
Expand All @@ -483,11 +468,11 @@ export class MembershipManager implements IMembershipManager {
._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Cancel)
.then(() => {
this.state.delayId = undefined;
this.resetRateLimitCounter(MembershipActionType.SendFirstDelayedEvent);
return createReplaceActionUpdate(MembershipActionType.SendFirstDelayedEvent);
this.resetRateLimitCounter(MembershipActionType.SendDelayedEvent);
return createReplaceActionUpdate(MembershipActionType.SendDelayedEvent);
})
.catch((e) => {
const repeatActionType = MembershipActionType.SendFirstDelayedEvent;
const repeatActionType = MembershipActionType.SendDelayedEvent;
const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent");
if (update) return update;

Expand Down Expand Up @@ -526,7 +511,7 @@ export class MembershipManager implements IMembershipManager {
const repeatActionType = MembershipActionType.RestartDelayedEvent;
if (this.isNotFoundError(e)) {
this.state.delayId = undefined;
return createInsertActionUpdate(MembershipActionType.SendMainDelayedEvent);
return createInsertActionUpdate(MembershipActionType.SendDelayedEvent);
}
// If the HS does not support delayed events we wont reschedule.
if (this.isUnsupportedDelayedEndpoint(e)) return {};
Expand All @@ -540,40 +525,6 @@ export class MembershipManager implements IMembershipManager {
});
}

private async sendMainDelayedEvent(): Promise<ActionUpdate> {
return await this.client
._unstable_sendDelayedStateEvent(
this.room.roomId,
{
delay: this.membershipServerSideExpiryTimeout,
},
EventType.GroupCallMemberPrefix,
{}, // leave event
this.stateKey,
)
.then((response) => {
this.state.delayId = response.delay_id;
this.resetRateLimitCounter(MembershipActionType.SendMainDelayedEvent);
return createInsertActionUpdate(
MembershipActionType.RestartDelayedEvent,
this.membershipKeepAlivePeriod,
);
})
.catch((e) => {
const repeatActionType = MembershipActionType.SendMainDelayedEvent;
// Don't do any other delayed event work if its not supported.
if (this.isUnsupportedDelayedEndpoint(e)) return {};

if (this.manageMaxDelayExceededSituation(e)) {
return createInsertActionUpdate(repeatActionType);
}
const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent");
if (update) return update;

throw Error("Could not send delayed event, even though delayed events are supported. " + e);
});
}

private async sendScheduledDelayedLeaveEventOrFallbackToSendLeaveEvent(delayId: string): Promise<ActionUpdate> {
return await this.client
._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Send)
Expand Down Expand Up @@ -875,9 +826,8 @@ export class MembershipManager implements IMembershipManager {
if (actions.length === 1) {
const { type } = actions[0];
switch (type) {
case MembershipActionType.SendFirstDelayedEvent:
case MembershipActionType.SendDelayedEvent:
case MembershipActionType.SendJoinEvent:
case MembershipActionType.SendMainDelayedEvent:
return Status.Connecting;
case MembershipActionType.UpdateExpiry: // where no delayed events
return Status.Connected;
Expand All @@ -891,8 +841,7 @@ export class MembershipManager implements IMembershipManager {
const types = actions.map((a) => a.type);
// normal state for connected with delayed events
if (
(types.includes(MembershipActionType.RestartDelayedEvent) ||
types.includes(MembershipActionType.SendMainDelayedEvent)) &&
types.includes(MembershipActionType.RestartDelayedEvent) &&
types.includes(MembershipActionType.UpdateExpiry)
) {
return Status.Connected;
Expand Down
4 changes: 2 additions & 2 deletions src/matrixrtc/NewMembershipManagerActionScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class ActionScheduler {
return;
}
this.running = true;
this._actions = [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }];
this._actions = [{ ts: Date.now(), type: MembershipActionType.SendDelayedEvent }];
try {
while (this._actions.length > 0) {
// Sort so next (smallest ts) action is at the beginning
Expand Down Expand Up @@ -123,7 +123,7 @@ export class ActionScheduler {
}

public initiateJoin(): void {
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }] });
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendDelayedEvent }] });
}
public initiateLeave(): void {
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendScheduledDelayedLeaveEvent }] });
Expand Down