-
-
Notifications
You must be signed in to change notification settings - Fork 679
Expand file tree
/
Copy pathNewMembershipManagerActionScheduler.ts
More file actions
131 lines (119 loc) · 5.39 KB
/
NewMembershipManagerActionScheduler.ts
File metadata and controls
131 lines (119 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import { logger as rootLogger } from "../logger.ts";
import { type EmptyObject } from "../matrix.ts";
import { sleep } from "../utils.ts";
import { MembershipActionType } from "./NewMembershipManager.ts";
const logger = rootLogger.getChild("MatrixRTCSession");
/** @internal */
export interface Action {
/**
* When this action should be executed
*/
ts: number;
/**
* The state of the different loops
* can also be thought of as the type of the action
*/
type: MembershipActionType;
}
/** @internal */
export type ActionUpdate =
| {
/** Replace all existing scheduled actions with this new array */
replace: Action[];
}
| {
/** Add these actions to the existing scheduled actions */
insert: Action[];
}
| EmptyObject;
/**
* This scheduler tracks the state of the current membership participation
* and runs one central timer that wakes up a handler callback with the correct action + state
* whenever necessary.
*
* It can also be awakened whenever a new action is added which is
* earlier then the current "next awake".
* @internal
*/
export class ActionScheduler {
public running = false;
public constructor(
/** This is the callback called for each scheduled action (`this.addAction()`) */
private membershipLoopHandler: (type: MembershipActionType) => Promise<ActionUpdate>,
) {}
// function for the wakeup mechanism (in case we add an action externally and need to leave the current sleep)
private wakeup: (update: ActionUpdate) => void = (update: ActionUpdate): void => {
logger.error("Cannot call wakeup before calling `startWithJoin()`");
};
private _actions: Action[] = [];
public get actions(): Action[] {
return this._actions;
}
/**
* This starts the main loop of the membership manager that handles event sending, delayed event sending and delayed event restarting.
* @param initialActions The initial actions the manager will start with. It should be enough to pass: DelayedLeaveActionType.Initial
* @returns Promise that resolves once all actions have run and no more are scheduled.
* @throws This throws an error if one of the actions throws.
* In most other error cases the manager will try to handle any server errors by itself.
*/
public async startWithJoin(): Promise<void> {
if (this.running) {
logger.error("Cannot call startWithJoin() on NewMembershipActionScheduler while already running");
return;
}
this.running = true;
this._actions = [{ ts: Date.now(), type: MembershipActionType.SendDelayedEvent }];
try {
while (this._actions.length > 0) {
// Sort so next (smallest ts) action is at the beginning
this._actions.sort((a, b) => a.ts - b.ts);
const nextAction = this._actions[0];
let wakeupUpdate: ActionUpdate | undefined = undefined;
// while we await for the next action, wakeup has to resolve the wakeupPromise
const wakeupPromise = new Promise<void>((resolve) => {
this.wakeup = (update: ActionUpdate): void => {
wakeupUpdate = update;
resolve();
};
});
if (nextAction.ts > Date.now()) await Promise.race([wakeupPromise, sleep(nextAction.ts - Date.now())]);
let handlerResult: ActionUpdate = {};
if (!wakeupUpdate) {
logger.debug(
`Current MembershipManager processing: ${nextAction.type}\nQueue:`,
this._actions,
`\nDate.now: "${Date.now()}`,
);
try {
// `this.wakeup` can also be called and sets the `wakupUpdate` object while we are in the handler.
handlerResult = await this.membershipLoopHandler(nextAction.type as MembershipActionType);
} catch (e) {
throw Error(`The MembershipManager shut down because of the end condition: ${e}`);
}
}
// remove the processed action only after we are done processing
this._actions.splice(0, 1);
// The wakeupUpdate always wins since that is a direct external update.
const actionUpdate = wakeupUpdate ?? handlerResult;
if ("replace" in actionUpdate) {
this._actions = actionUpdate.replace;
} else if ("insert" in actionUpdate) {
this._actions.push(...actionUpdate.insert);
}
}
} catch (e) {
// Set the rtc session "not running" state since we cannot recover from here and the consumer user of the
// MatrixRTCSession class needs to manually rejoin.
this.running = false;
throw e;
}
this.running = false;
logger.debug("Leave MembershipManager ActionScheduler loop (no more actions)");
}
public initiateJoin(): void {
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendDelayedEvent }] });
}
public initiateLeave(): void {
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendScheduledDelayedLeaveEvent }] });
}
}