-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathfuture_group.dart
More file actions
133 lines (113 loc) · 4.64 KB
/
future_group.dart
File metadata and controls
133 lines (113 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'cancelable_operation.dart';
/// A sentinel object indicating that a member of a [FutureGroup] was canceled
/// rather than completing normally.
final _canceledResult = Object();
/// A collection of futures waits until all added [Future]s complete.
///
/// Futures are added to the group with [add]. Once you're finished adding
/// futures, signal that by calling [close]. Then, once all added futures have
/// completed, [future] will complete with a list of values from the futures in
/// the group, in the order they were added.
///
/// If any added future completes with an error, [future] will emit that error
/// and the group will be closed, regardless of the state of other futures in
/// the group.
///
/// This is similar to [Future.wait] with `eagerError` set to `true`, except
/// that a [FutureGroup] can have futures added gradually over time rather than
/// needing them all at once.
class FutureGroup<T> implements Sink<Future<T>> {
/// The number of futures that have yet to complete.
var _pending = 0;
/// Whether the group is closed, meaning that no more futures may be added.
bool get isClosed => _closed;
var _closed = false;
/// The future that fires once [close] has been called and all futures in the
/// group have completed.
///
/// This will also complete with an error if any of the futures in the group
/// fails, regardless of whether [close] was called.
Future<List<T>> get future => _completer.future;
final _completer = Completer<List<T>>();
/// Whether this group contains no futures.
///
/// A [FutureGroup] is idle when it contains no futures, which is the case for
/// a newly created group or one where all added futures have been removed or
/// completed.
bool get isIdle => _pending == 0;
/// A broadcast stream that emits an event whenever this group becomes idle.
///
/// A [FutureGroup] is idle when it contains no futures, which is the case for
/// a newly created group or one where all added futures have been removed or
/// completed.
///
/// This stream will close when this group is idle *and* [close] has been
/// called.
///
/// Events are delivered asynchronously, so it's possible for the group to
/// become active again before the event is delivered.
Stream get onIdle =>
(_onIdleController ??= StreamController.broadcast(sync: true)).stream;
StreamController? _onIdleController;
/// The values emitted by the futures that have been added to the group, in
/// the order they were added.
///
/// This is type `Object?` rather than `T?` so it can contain
/// [_canceledResult]. The slots for futures that haven't completed yet are
/// `null`.
final _values = <Object?>[];
/// Wait for [task] to complete.
@override
void add(Future<T> task) => _add(task);
/// Wait for [task] to complete.
///
/// If [task] is canceled, it's removed from the group without adding a value
/// to [future].
void addCancelable(CancelableOperation<T> task) {
_add(task
.then((value) => value, onCancel: () => _canceledResult)
.valueOrCancellation());
}
void _add(Future<Object?> task) {
if (_closed) throw StateError('The FutureGroup is closed.');
// Ensure that future values are put into [values] in the same order they're
// added to the group by pre-allocating a slot for them and recording its
// index.
var index = _values.length;
_values.add(null);
_pending++;
task.then((value) {
if (_completer.isCompleted) return null;
_pending--;
_values[index] = value;
if (_pending != 0) return null;
var onIdleController = _onIdleController;
if (onIdleController != null) onIdleController.add(null);
if (!_closed) return null;
if (onIdleController != null) onIdleController.close();
_completer.complete([
for (var value in _values)
if (value != _canceledResult) value as T
]);
}).catchError((Object error, StackTrace stackTrace) {
if (_completer.isCompleted) return null;
_completer.completeError(error, stackTrace);
});
}
/// Signals to the group that the caller is done adding futures, and so
/// [future] should fire once all added futures have completed.
@override
void close() {
_closed = true;
if (_pending != 0) return;
if (_completer.isCompleted) return;
_completer.complete([
for (var value in _values)
if (value != _canceledResult) value as T
]);
}
}