Skip to content

Commit f913231

Browse files
committed
add proxy example
1 parent 3ce6832 commit f913231

2 files changed

Lines changed: 311 additions & 0 deletions

File tree

examples/proxy/index.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
const { Pool, Client } = require('../../')
2+
const http = require('http')
3+
const proxy = require('./proxy')
4+
5+
const pool = new Pool('http://localhost:4001', {
6+
connections: 256,
7+
pipelining: 1
8+
})
9+
10+
async function run () {
11+
await Promise.all([
12+
new Promise(resolve => {
13+
// Proxy
14+
http.createServer((req, res) => {
15+
proxy({ req, res, proxyName: 'example' }, pool).catch(err => {
16+
if (res.headersSent) {
17+
res.destroy(err)
18+
} else {
19+
for (const name of res.getHeaderNames()) {
20+
res.removeHeader(name)
21+
}
22+
res.writeHead(err.statusCode || 500)
23+
res.end()
24+
}
25+
})
26+
}).listen(4000, resolve)
27+
}),
28+
new Promise(resolve => {
29+
// Upstream
30+
http.createServer((req, res) => {
31+
res.end('hello world')
32+
}).listen(4001, resolve)
33+
})
34+
])
35+
36+
const client = new Client('http://localhost:4000')
37+
const { body } = await client.request({
38+
method: 'GET',
39+
path: '/'
40+
})
41+
42+
for await (const chunk of body) {
43+
console.log(String(chunk))
44+
}
45+
}
46+
47+
run()
48+
49+
// TODO: Add websocket example.

examples/proxy/proxy.js

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
const net = require('net')
2+
const { pipeline } = require('stream')
3+
const createError = require('http-errors')
4+
5+
module.exports = async function proxy (ctx, client) {
6+
const { req, socket } = ctx
7+
8+
if (socket) {
9+
const handler = new WSHandler(ctx)
10+
client.dispatch({
11+
method: req.method,
12+
path: req.url,
13+
headers: getHeaders(ctx),
14+
upgrade: 'Websocket'
15+
}, handler)
16+
return handler.promise
17+
} else {
18+
const handler = new HTTPHandler(ctx)
19+
client.dispatch({
20+
method: req.method,
21+
path: req.url,
22+
headers: getHeaders(ctx),
23+
body: req
24+
}, handler)
25+
return handler.promise
26+
}
27+
}
28+
29+
class HTTPHandler {
30+
constructor (ctx) {
31+
const { req, res } = ctx
32+
33+
this.req = req
34+
this.res = res
35+
this.resume = null
36+
this.abort = null
37+
this.promise = new Promise((resolve, reject) => {
38+
this.callback = err => err ? reject(err) : resolve()
39+
})
40+
}
41+
42+
onConnect (abort) {
43+
if (this.req.aborted) {
44+
abort()
45+
} else {
46+
this.abort = abort
47+
this.res.on('close', abort)
48+
}
49+
}
50+
51+
onHeaders (statusCode, headers, resume) {
52+
if (statusCode < 200) {
53+
return
54+
}
55+
56+
this.resume = resume
57+
this.res.on('drain', resume)
58+
writeHead(this.res, statusCode, getHeaders({ headers }))
59+
}
60+
61+
onData (chunk) {
62+
return this.res.write(chunk)
63+
}
64+
65+
onComplete () {
66+
this.res.end()
67+
this.callback()
68+
}
69+
70+
onError (err) {
71+
this.res.off('close', this.abort)
72+
this.res.off('drain', this.resume)
73+
this.callback(err)
74+
}
75+
}
76+
77+
class WSHandler {
78+
constructor (ctx) {
79+
const { socket, head } = ctx
80+
81+
setupSocket(socket)
82+
83+
this.socket = socket
84+
this.head = head
85+
this.abort = null
86+
this.promise = new Promise((resolve, reject) => {
87+
this.callback = err => err ? reject(err) : resolve()
88+
})
89+
}
90+
91+
onConnect (abort) {
92+
if (this.socket.destroyed) {
93+
abort()
94+
} else {
95+
this.abort = abort
96+
this.socket.on('close', abort)
97+
}
98+
}
99+
100+
onUpgrade (statusCode, headers, socket) {
101+
if (this.head && this.head.length) {
102+
socket.unshift(this.head)
103+
}
104+
105+
setupSocket(socket)
106+
107+
let head = 'HTTP/1.1 101 Switching Protocols\r\nconnection: upgrade\r\nupgrade: websocket'
108+
109+
headers = getHeaders({ headers })
110+
for (let n = 0; n < headers.length; n += 2) {
111+
const key = headers[n + 0]
112+
const val = headers[n + 1]
113+
114+
if (!Array.isArray(val)) {
115+
head += `\r\n${key}: ${val}`
116+
} else {
117+
for (let i = 0; i < val.length; i++) {
118+
head += `\r\n${key}: ${val[i]}`
119+
}
120+
}
121+
}
122+
head += '\r\n\r\n'
123+
124+
this.socket.write(head)
125+
126+
pipeline(socket, this.socket, socket, this.callback)
127+
}
128+
129+
onError (err) {
130+
this.socket.off('close', this.abort)
131+
this.callback(err)
132+
}
133+
}
134+
135+
// This expression matches hop-by-hop headers.
136+
// These headers are meaningful only for a single transport-level connection,
137+
// and must not be retransmitted by proxies or cached.
138+
const HOP_EXPR = /^(te|host|upgrade|trailers|connection|keep-alive|http2-settings|transfer-encoding|proxy-connection|proxy-authenticate|proxy-authorization)$/i
139+
140+
// Removes hop-by-hop and pseudo headers.
141+
// Updates via and forwarded headers.
142+
// Only hop-by-hop headers may be set using the Connection general header.
143+
function getHeaders (ctx) {
144+
const {
145+
proxyName,
146+
req,
147+
headers = req ? req.rawHeaders : undefined
148+
} = ctx
149+
150+
let via = ''
151+
let forwarded = ''
152+
let host = ''
153+
let authority = ''
154+
let connection = ''
155+
156+
for (let n = 0; n < headers.length; n += 2) {
157+
const key = headers[n + 0]
158+
const val = headers[n + 1]
159+
160+
if (!via && key.length === 3 && key.toLowerCase() === 'via') {
161+
via = val
162+
} else if (!host && key.length === 4 && key.toLowerCase() === 'host') {
163+
host = val
164+
} else if (!forwarded && key.length === 9 && key.toLowerCase() === 'forwarded') {
165+
forwarded = val
166+
} else if (!connection && key.length === 10 && key.toLowerCase() === 'connection') {
167+
connection = val
168+
} else if (!authority && key.length === 10 && key === ':authority') {
169+
authority = val
170+
}
171+
}
172+
173+
let remove
174+
if (connection && !HOP_EXPR.test(connection)) {
175+
remove = connection.split(/,\s*/)
176+
}
177+
178+
const result = []
179+
for (let n = 0; n < headers.length; n += 2) {
180+
const key = headers[n + 0]
181+
const val = headers[n + 1]
182+
183+
if (
184+
key.charAt(0) !== ':' &&
185+
!HOP_EXPR.test(key) &&
186+
(!remove || !remove.includes(key))
187+
) {
188+
result.push(key, val)
189+
}
190+
}
191+
192+
if (req) {
193+
const { socket, httpVersion } = req
194+
195+
result.push('forwarded', (forwarded ? forwarded + ', ' : '') + [
196+
`by=${printIp(socket.localAddress, socket.localPort)}`,
197+
`for=${printIp(socket.remoteAddress, socket.remotePort)}`,
198+
`proto=${socket.encrypted ? 'https' : 'http'}`,
199+
`host=${printIp(authority || host || '')}`
200+
].join(';'))
201+
202+
// TODO (fix): Should also apply to responses.
203+
if (proxyName) {
204+
if (via) {
205+
if (via.split(',').some(name => name.endsWith(proxyName))) {
206+
throw new createError.LoopDetected()
207+
}
208+
via += ', '
209+
}
210+
via += `${httpVersion} ${proxyName}`
211+
}
212+
}
213+
214+
if (via) {
215+
result.push('via', via)
216+
}
217+
218+
return result
219+
}
220+
221+
function setupSocket (socket) {
222+
socket.setTimeout(0)
223+
socket.setNoDelay(true)
224+
socket.setKeepAlive(true, 0)
225+
}
226+
227+
function printIp (address, port) {
228+
const isIPv6 = net.isIPv6(address)
229+
let str = `${address}`
230+
if (isIPv6) {
231+
str = `[${str}]`
232+
}
233+
if (port) {
234+
str = `${str}:${port}`
235+
}
236+
if (isIPv6 || port) {
237+
str = `"${str}"`
238+
}
239+
return str
240+
}
241+
242+
function writeHead (res, statusCode, headers) {
243+
// TODO (perf): res.writeHead should support Array and/or string.
244+
const obj = {}
245+
for (var i = 0; i < headers.length; i += 2) {
246+
var key = headers[i]
247+
var val = obj[key]
248+
if (!val) {
249+
obj[key] = headers[i + 1]
250+
} else {
251+
if (!Array.isArray(val)) {
252+
val = [val]
253+
obj[key] = val
254+
}
255+
val.push(headers[i + 1])
256+
}
257+
}
258+
259+
res.writeHead(statusCode, obj)
260+
261+
return res
262+
}

0 commit comments

Comments
 (0)