Skip to content

Commit 645e3f8

Browse files
authored
fix: Allow queue to recover from some errors (#36256)
1 parent 1bdb7d1 commit 645e3f8

File tree

2 files changed

+44
-24
lines changed

2 files changed

+44
-24
lines changed

.changeset/thirty-needles-speak.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@rocket.chat/meteor": patch
3+
---
4+
5+
Fixes an issue that caused the queue worker to stop processing if something failed when checking MAC limits or while fetching the list of queues

apps/meteor/server/services/omnichannel/queue.ts

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ export class OmnichannelQueue implements IOmnichannelQueue {
2727

2828
private running = false;
2929

30+
private errorDelay = 10 * 1000; // 10 seconds
31+
3032
private delay() {
3133
const timeout = settings.get<number>('Omnichannel_queue_delay_timeout') ?? 5;
3234
return timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000;
@@ -79,28 +81,38 @@ export class OmnichannelQueue implements IOmnichannelQueue {
7981
}
8082

8183
private async execute() {
82-
if (!this.running) {
83-
queueLogger.debug('Queue stopped. Cannot execute');
84-
return;
85-
}
84+
try {
85+
if (!this.running) {
86+
queueLogger.debug('Queue stopped. Cannot execute');
87+
return;
88+
}
8689

87-
if (await License.shouldPreventAction('monthlyActiveContacts', 1)) {
88-
queueLogger.debug('MAC limit reached. Queue wont execute');
89-
this.running = false;
90-
return;
91-
}
90+
if (await License.shouldPreventAction('monthlyActiveContacts', 1)) {
91+
queueLogger.debug('MAC limit reached. Queue wont execute');
92+
this.running = false;
93+
return;
94+
}
95+
96+
// We still go 1 by 1, but we go with every queue every cycle instead of just 1 queue per cycle
97+
// And we get tracing :)
98+
const queues = await this.getActiveQueues();
99+
for await (const queue of queues) {
100+
await tracerSpan(
101+
'omnichannel.queue',
102+
{ attributes: { workerTime: new Date().toISOString(), queue: queue || 'Public' }, root: true },
103+
() => this.checkQueue(queue),
104+
);
105+
}
92106

93-
// We still go 1 by 1, but we go with every queue every cycle instead of just 1 queue per cycle
94-
// And we get tracing :)
95-
const queues = await this.getActiveQueues();
96-
for await (const queue of queues) {
97-
await tracerSpan(
98-
'omnichannel.queue',
99-
{ attributes: { workerTime: new Date().toISOString(), queue: queue || 'Public' }, root: true },
100-
() => this.checkQueue(queue),
101-
);
107+
this.scheduleExecution();
108+
} catch (e) {
109+
queueLogger.error({
110+
msg: 'Queue Worker Error. Rescheduling with extra delay',
111+
extraDelay: this.errorDelay,
112+
err: e,
113+
});
114+
this.scheduleExecution(this.errorDelay);
102115
}
103-
this.scheduleExecution();
104116
}
105117

106118
private async checkQueue(queue: string | null) {
@@ -136,15 +148,18 @@ export class OmnichannelQueue implements IOmnichannelQueue {
136148
}
137149
}
138150

139-
private scheduleExecution(): void {
151+
private scheduleExecution(extraDelay?: number): void {
140152
if (this.timeoutHandler !== null) {
141153
return;
142154
}
143155

144-
this.timeoutHandler = setTimeout(() => {
145-
this.timeoutHandler = null;
146-
return this.execute();
147-
}, this.delay());
156+
this.timeoutHandler = setTimeout(
157+
() => {
158+
this.timeoutHandler = null;
159+
return this.execute();
160+
},
161+
this.delay() + (extraDelay || 0),
162+
);
148163
}
149164

150165
async shouldStart() {

0 commit comments

Comments
 (0)