-
-
Notifications
You must be signed in to change notification settings - Fork 35.4k
Expand file tree
/
Copy pathagent.js
More file actions
135 lines (112 loc) · 3.76 KB
/
agent.js
File metadata and controls
135 lines (112 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
'use strict'
const { InvalidArgumentError } = require('../core/errors')
const { kClients, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols')
const DispatcherBase = require('./dispatcher-base')
const Pool = require('./pool')
const Client = require('./client')
const util = require('../core/util')
const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kOnConnectionError = Symbol('onConnectionError')
const kOnDrain = Symbol('onDrain')
const kFactory = Symbol('factory')
const kOptions = Symbol('options')
function defaultFactory (origin, opts) {
return opts && opts.connections === 1
? new Client(origin, opts)
: new Pool(origin, opts)
}
class Agent extends DispatcherBase {
constructor ({ factory = defaultFactory, connect, ...options } = {}) {
if (typeof factory !== 'function') {
throw new InvalidArgumentError('factory must be a function.')
}
if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
throw new InvalidArgumentError('connect must be a function or an object')
}
super()
if (connect && typeof connect !== 'function') {
connect = { ...connect }
}
this[kOptions] = { ...util.deepClone(options), connect }
this[kFactory] = factory
this[kClients] = new Map()
this[kOnDrain] = (origin, targets) => {
this.emit('drain', origin, [this, ...targets])
}
this[kOnConnect] = (origin, targets) => {
const result = this[kClients].get(origin)
if (result) {
result.count += 1
}
this.emit('connect', origin, [this, ...targets])
}
this[kOnDisconnect] = (origin, targets, err) => {
const result = this[kClients].get(origin)
if (result) {
result.count -= 1
if (result.count <= 0) {
this[kClients].delete(origin)
result.dispatcher.destroy()
}
}
this.emit('disconnect', origin, [this, ...targets], err)
}
this[kOnConnectionError] = (origin, targets, err) => {
// TODO: should this decrement result.count here?
this.emit('connectionError', origin, [this, ...targets], err)
}
}
get [kRunning] () {
let ret = 0
for (const { dispatcher } of this[kClients].values()) {
ret += dispatcher[kRunning]
}
return ret
}
[kDispatch] (opts, handler) {
let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
} else {
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
}
const result = this[kClients].get(key)
let dispatcher = result && result.dispatcher
if (!dispatcher) {
dispatcher = this[kFactory](opts.origin, this[kOptions])
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])
this[kClients].set(key, { count: 0, dispatcher })
}
return dispatcher.dispatch(opts, handler)
}
async [kClose] () {
const closePromises = []
for (const { dispatcher } of this[kClients].values()) {
closePromises.push(dispatcher.close())
}
this[kClients].clear()
await Promise.all(closePromises)
}
async [kDestroy] (err) {
const destroyPromises = []
for (const { dispatcher } of this[kClients].values()) {
destroyPromises.push(dispatcher.destroy(err))
}
this[kClients].clear()
await Promise.all(destroyPromises)
}
get stats () {
const allClientStats = {}
for (const { dispatcher } of this[kClients].values()) {
if (dispatcher.stats) {
allClientStats[dispatcher[kUrl].origin] = dispatcher.stats
}
}
return allClientStats
}
}
module.exports = Agent