Skip to content

Commit f533712

Browse files
authored
add proxy example (#425)
1 parent 3ce6832 commit f533712

2 files changed

Lines changed: 325 additions & 0 deletions

File tree

examples/proxy/index.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
const { Pool, Client } = require('../../')
2+
const http = require('http')
3+
const proxy = require('./proxy')
4+
5+
const pool = new Pool('http://localhost:4001', {
6+
connections: 256,
7+
pipelining: 1
8+
})
9+
10+
async function run () {
11+
await Promise.all([
12+
new Promise(resolve => {
13+
// Proxy
14+
http.createServer((req, res) => {
15+
proxy({ req, res, proxyName: 'example' }, pool).catch(err => {
16+
if (res.headersSent) {
17+
res.destroy(err)
18+
} else {
19+
for (const name of res.getHeaderNames()) {
20+
res.removeHeader(name)
21+
}
22+
res.statusCode = err.statusCode || 500
23+
res.end()
24+
}
25+
})
26+
}).listen(4000, resolve)
27+
}),
28+
new Promise(resolve => {
29+
// Upstream
30+
http.createServer((req, res) => {
31+
res.end('hello world')
32+
}).listen(4001, resolve)
33+
})
34+
])
35+
36+
const client = new Client('http://localhost:4000')
37+
const { body } = await client.request({
38+
method: 'GET',
39+
path: '/'
40+
})
41+
42+
for await (const chunk of body) {
43+
console.log(String(chunk))
44+
}
45+
}
46+
47+
run()
48+
49+
// TODO: Add websocket example.

examples/proxy/proxy.js

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
const net = require('net')
2+
const { pipeline } = require('stream')
3+
const createError = require('http-errors')
4+
5+
module.exports = async function proxy (ctx, client) {
6+
const { req, socket, proxyName } = ctx
7+
8+
const headers = getHeaders({
9+
headers: req.rawHeaders,
10+
httpVersion: req.httpVersion,
11+
socket: req.socket,
12+
proxyName
13+
})
14+
15+
if (socket) {
16+
const handler = new WSHandler(ctx)
17+
client.dispatch({
18+
method: req.method,
19+
path: req.url,
20+
headers,
21+
upgrade: 'Websocket'
22+
}, handler)
23+
return handler.promise
24+
} else {
25+
const handler = new HTTPHandler(ctx)
26+
client.dispatch({
27+
method: req.method,
28+
path: req.url,
29+
headers,
30+
body: req
31+
}, handler)
32+
return handler.promise
33+
}
34+
}
35+
36+
class HTTPHandler {
37+
constructor (ctx) {
38+
const { req, res, proxyName } = ctx
39+
40+
this.proxyName = proxyName
41+
this.req = req
42+
this.res = res
43+
this.resume = null
44+
this.abort = null
45+
this.promise = new Promise((resolve, reject) => {
46+
this.callback = err => err ? reject(err) : resolve()
47+
})
48+
}
49+
50+
onConnect (abort) {
51+
if (this.req.aborted) {
52+
abort()
53+
} else {
54+
this.abort = abort
55+
this.res.on('close', abort)
56+
}
57+
}
58+
59+
onHeaders (statusCode, headers, resume) {
60+
if (statusCode < 200) {
61+
return
62+
}
63+
64+
this.resume = resume
65+
this.res.on('drain', resume)
66+
writeHead(this.res, statusCode, getHeaders({
67+
headers,
68+
proxyName: this.proxyName,
69+
httpVersion: this.httpVersion
70+
}))
71+
}
72+
73+
onData (chunk) {
74+
return this.res.write(chunk)
75+
}
76+
77+
onComplete () {
78+
this.res.end()
79+
this.callback()
80+
}
81+
82+
onError (err) {
83+
this.res.off('close', this.abort)
84+
this.res.off('drain', this.resume)
85+
this.callback(err)
86+
}
87+
}
88+
89+
class WSHandler {
90+
constructor (ctx) {
91+
const { req, socket, proxyName, head } = ctx
92+
93+
setupSocket(socket)
94+
95+
this.proxyName = proxyName
96+
this.httpVersion = req.httpVersion
97+
this.socket = socket
98+
this.head = head
99+
this.abort = null
100+
this.promise = new Promise((resolve, reject) => {
101+
this.callback = err => err ? reject(err) : resolve()
102+
})
103+
}
104+
105+
onConnect (abort) {
106+
if (this.socket.destroyed) {
107+
abort()
108+
} else {
109+
this.abort = abort
110+
this.socket.on('close', abort)
111+
}
112+
}
113+
114+
onUpgrade (statusCode, headers, socket) {
115+
// TODO: Check statusCode?
116+
117+
if (this.head && this.head.length) {
118+
socket.unshift(this.head)
119+
}
120+
121+
setupSocket(socket)
122+
123+
let head = 'HTTP/1.1 101 Switching Protocols\r\nconnection: upgrade\r\nupgrade: websocket'
124+
125+
headers = getHeaders({
126+
headers,
127+
proxyName: this.proxyName,
128+
httpVersion: this.httpVersion
129+
})
130+
131+
for (let n = 0; n < headers.length; n += 2) {
132+
const key = headers[n + 0]
133+
const val = headers[n + 1]
134+
135+
head += `\r\n${key}: ${val}`
136+
}
137+
head += '\r\n\r\n'
138+
139+
this.socket.write(head)
140+
141+
pipeline(socket, this.socket, socket, this.callback)
142+
}
143+
144+
onError (err) {
145+
this.socket.off('close', this.abort)
146+
this.callback(err)
147+
}
148+
}
149+
150+
// This expression matches hop-by-hop headers.
151+
// These headers are meaningful only for a single transport-level connection,
152+
// and must not be retransmitted by proxies or cached.
153+
const HOP_EXPR = /^(te|host|upgrade|trailers|connection|keep-alive|http2-settings|transfer-encoding|proxy-connection|proxy-authenticate|proxy-authorization)$/i
154+
155+
// Removes hop-by-hop and pseudo headers.
156+
// Updates via and forwarded headers.
157+
// Only hop-by-hop headers may be set using the Connection general header.
158+
function getHeaders ({
159+
headers,
160+
proxyName,
161+
httpVersion,
162+
socket
163+
}) {
164+
let via = ''
165+
let forwarded = ''
166+
let host = ''
167+
let authority = ''
168+
let connection = ''
169+
170+
for (let n = 0; n < headers.length; n += 2) {
171+
const key = headers[n + 0]
172+
const val = headers[n + 1]
173+
174+
if (!via && key.length === 3 && key.toLowerCase() === 'via') {
175+
via = val
176+
} else if (!host && key.length === 4 && key.toLowerCase() === 'host') {
177+
host = val
178+
} else if (!forwarded && key.length === 9 && key.toLowerCase() === 'forwarded') {
179+
forwarded = val
180+
} else if (!connection && key.length === 10 && key.toLowerCase() === 'connection') {
181+
connection = val
182+
} else if (!authority && key.length === 10 && key === ':authority') {
183+
authority = val
184+
}
185+
}
186+
187+
let remove
188+
if (connection && !HOP_EXPR.test(connection)) {
189+
remove = connection.split(/,\s*/)
190+
}
191+
192+
const result = []
193+
for (let n = 0; n < headers.length; n += 2) {
194+
const key = headers[n + 0]
195+
const val = headers[n + 1]
196+
197+
if (
198+
key.charAt(0) !== ':' &&
199+
!HOP_EXPR.test(key) &&
200+
(!remove || !remove.includes(key))
201+
) {
202+
result.push(key, val)
203+
}
204+
}
205+
206+
if (socket) {
207+
result.push('forwarded', (forwarded ? forwarded + ', ' : '') + [
208+
`by=${printIp(socket.localAddress, socket.localPort)}`,
209+
`for=${printIp(socket.remoteAddress, socket.remotePort)}`,
210+
`proto=${socket.encrypted ? 'https' : 'http'}`,
211+
`host=${printIp(authority || host || '')}`
212+
].join(';'))
213+
} else if (forwarded) {
214+
// The forwarded header should not be included in response.
215+
throw new createError.BadGateway()
216+
}
217+
218+
if (proxyName) {
219+
if (via) {
220+
if (via.split(',').some(name => name.endsWith(proxyName))) {
221+
throw new createError.LoopDetected()
222+
}
223+
via += ', '
224+
}
225+
via += `${httpVersion} ${proxyName}`
226+
}
227+
228+
if (via) {
229+
result.push('via', via)
230+
}
231+
232+
return result
233+
}
234+
235+
function setupSocket (socket) {
236+
socket.setTimeout(0)
237+
socket.setNoDelay(true)
238+
socket.setKeepAlive(true, 0)
239+
}
240+
241+
function printIp (address, port) {
242+
const isIPv6 = net.isIPv6(address)
243+
let str = `${address}`
244+
if (isIPv6) {
245+
str = `[${str}]`
246+
}
247+
if (port) {
248+
str = `${str}:${port}`
249+
}
250+
if (isIPv6 || port) {
251+
str = `"${str}"`
252+
}
253+
return str
254+
}
255+
256+
function writeHead (res, statusCode, headers) {
257+
// TODO (perf): res.writeHead should support Array and/or string.
258+
const obj = {}
259+
for (var i = 0; i < headers.length; i += 2) {
260+
var key = headers[i]
261+
var val = obj[key]
262+
if (!val) {
263+
obj[key] = headers[i + 1]
264+
} else {
265+
if (!Array.isArray(val)) {
266+
val = [val]
267+
obj[key] = val
268+
}
269+
val.push(headers[i + 1])
270+
}
271+
}
272+
273+
res.writeHead(statusCode, obj)
274+
275+
return res
276+
}

0 commit comments

Comments
 (0)