forked from jestjs/jest
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFifoQueue.ts
More file actions
115 lines (93 loc) · 2.97 KB
/
FifoQueue.ts
File metadata and controls
115 lines (93 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/**
* Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
import type {QueueChildMessage, TaskQueue} from './types';
type WorkerQueueValue = {
task: QueueChildMessage;
/**
* The task that was at the top of the shared queue at the time this
* worker specific task was enqueued. Required to maintain FIFO ordering
* across queues. The worker specific task should only be dequeued if the
* previous shared task is null or has been processed.
*/
previousSharedTask: QueueChildMessage | null;
};
/**
* First-in, First-out task queue that manages a dedicated pool
* for each worker as well as a shared queue. The FIFO ordering is guaranteed
* across the worker specific and shared queue.
*/
export default class FifoQueue implements TaskQueue {
private _workerQueues: Array<
InternalQueue<WorkerQueueValue> | undefined
> = [];
private _sharedQueue = new InternalQueue<QueueChildMessage>();
enqueue(task: QueueChildMessage, workerId?: number): void {
if (workerId == null) {
this._sharedQueue.enqueue(task);
return;
}
let workerQueue = this._workerQueues[workerId];
if (workerQueue == null) {
workerQueue = this._workerQueues[
workerId
] = new InternalQueue<WorkerQueueValue>();
}
const sharedTop = this._sharedQueue.peekLast();
const item = {previousSharedTask: sharedTop, task};
workerQueue.enqueue(item);
}
dequeue(workerId: number): QueueChildMessage | null {
const workerTop = this._workerQueues[workerId]?.peek();
const sharedTaskIsProcessed =
workerTop?.previousSharedTask?.request[1] ?? true;
// Process the top task from the shared queue if
// - there's no task in the worker specific queue or
// - if the non-worker-specific task after which this worker specifif task
// hasn been queued wasn't processed yet
if (workerTop != null && sharedTaskIsProcessed) {
return this._workerQueues[workerId]?.dequeue()?.task ?? null;
}
return this._sharedQueue.dequeue();
}
}
type QueueItem<TValue> = {
value: TValue;
next: QueueItem<TValue> | null;
};
/**
* FIFO queue for a single worker / shared queue.
*/
class InternalQueue<TValue> {
private _head: QueueItem<TValue> | null = null;
private _last: QueueItem<TValue> | null = null;
enqueue(value: TValue): void {
const item = {next: null, value};
if (this._last == null) {
this._head = item;
} else {
this._last.next = item;
}
this._last = item;
}
dequeue(): TValue | null {
if (this._head == null) {
return null;
}
const item = this._head;
this._head = item.next;
if (this._head == null) {
this._last = null;
}
return item.value;
}
peek(): TValue | null {
return this._head?.value ?? null;
}
peekLast(): TValue | null {
return this._last?.value ?? null;
}
}