Skip to content

Commit c3218b3

Browse files
committed
add proxy example
1 parent 3ce6832 commit c3218b3

2 files changed

Lines changed: 304 additions & 0 deletions

File tree

examples/proxy/index.js

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
const { Pool, Client } = require('../../')
2+
const http = require('http')
3+
const proxy = require('./proxy')
4+
5+
const client = new Pool('http://localhost:4001', {
6+
connections: 256,
7+
pipelining: 1
8+
})
9+
10+
Promise.all([
11+
new Promise(resolve => {
12+
// Proxy
13+
http.createServer(async (req, res) => {
14+
try {
15+
await proxy({ req, res, proxyName: 'example' }, client)
16+
} catch (err) {
17+
if (res.headersSent) {
18+
res.destroy(err)
19+
} else {
20+
for (const name of res.getHeaderNames()) {
21+
res.removeHeader(name)
22+
}
23+
res.writeHead(err.statusCode || 500)
24+
res.end()
25+
}
26+
}
27+
}).listen(4000, resolve)
28+
}),
29+
new Promise(resolve => {
30+
// Upstream
31+
http.createServer((req, res) => {
32+
res.end('hello world')
33+
}).listen(4001, resolve)
34+
})
35+
]).then(async () => {
36+
const client = new Client('http://localhost:4000')
37+
const { body } = await client.request({
38+
method: 'GET',
39+
path: '/'
40+
})
41+
42+
for await (const chunk of body) {
43+
console.log(String(chunk))
44+
}
45+
})
46+
47+
// TODO: Add websocket example.

examples/proxy/proxy.js

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
const net = require('net')
2+
const { pipeline } = require('stream')
3+
const createError = require('http-errors')
4+
5+
module.exports = async function proxy (ctx, client) {
6+
const { req, socket } = ctx
7+
8+
if (socket) {
9+
const handler = new WSHandler(ctx)
10+
client.dispatch({
11+
method: req.method,
12+
path: req.url,
13+
headers: getRequestHeaders(ctx),
14+
upgrade: 'Websocket'
15+
}, handler)
16+
return handler.promise
17+
} else {
18+
const handler = new HTTPHandler(ctx)
19+
client.dispatch({
20+
method: req.method,
21+
path: req.url,
22+
headers: getRequestHeaders(ctx),
23+
body: req
24+
}, handler)
25+
return handler.promise
26+
}
27+
}
28+
29+
class HTTPHandler {
30+
constructor (ctx) {
31+
const { req, res } = ctx
32+
33+
this.req = req
34+
this.res = res
35+
this.resume = null
36+
this.abort = null
37+
this.promise = new Promise((resolve, reject) => {
38+
this.callback = err => err ? reject(err) : resolve()
39+
})
40+
}
41+
42+
onConnect (abort) {
43+
if (this.req.aborted) {
44+
abort()
45+
} else {
46+
this.abort = abort
47+
this.res.on('close', abort)
48+
}
49+
}
50+
51+
onHeaders (statusCode, headers, resume) {
52+
if (statusCode < 200) {
53+
return
54+
}
55+
56+
this.resume = resume
57+
this.res.on('drain', resume)
58+
writeHead(this.res, statusCode, getResponseHeaders(headers))
59+
}
60+
61+
onData (chunk) {
62+
return this.res.write(chunk)
63+
}
64+
65+
onComplete () {
66+
this.res.end()
67+
this.callback()
68+
}
69+
70+
onError (err) {
71+
this.res
72+
.off('drain', this.resume)
73+
.off('close', this.abort)
74+
this.callback(err)
75+
}
76+
}
77+
78+
class WSHandler {
79+
constructor (ctx) {
80+
const { socket, head } = ctx
81+
82+
setupSocket(socket)
83+
84+
this.socket = socket
85+
this.head = head
86+
this.promise = new Promise((resolve, reject) => {
87+
this.callback = err => err ? reject(err) : resolve()
88+
})
89+
}
90+
91+
onConnect (abort) {
92+
if (this.socket.destroyed) {
93+
abort()
94+
} else {
95+
this.socket.on('close', abort)
96+
}
97+
}
98+
99+
onUpgrade (statusCode, headers, socket) {
100+
if (this.head && this.head.length) {
101+
socket.unshift(this.head)
102+
}
103+
104+
headers = getResponseHeaders(headers)
105+
106+
setupSocket(socket)
107+
108+
let head = 'HTTP/1.1 101 Switching Protocols\r\nconnection: upgrade\r\nupgrade: websocket'
109+
110+
for (let n = 0; n < headers.length; n += 2) {
111+
const key = headers[n + 0]
112+
const val = headers[n + 1]
113+
114+
if (!Array.isArray(val)) {
115+
head += `\r\n${key}: ${val}`
116+
} else {
117+
for (let i = 0; i < val.length; i++) {
118+
head += `\r\n${key}: ${val[i]}`
119+
}
120+
}
121+
}
122+
head += '\r\n\r\n'
123+
124+
this.socket.write(head)
125+
126+
pipeline(socket, this.socket, socket, this.callback)
127+
}
128+
129+
onError (err) {
130+
this.callback(err)
131+
}
132+
}
133+
134+
const HOP_EXPR = /^(te|via|host|upgrade|trailers|forwarded|connection|keep-alive|http2-settings|transfer-encoding|proxy-connection|proxy-authenticate|proxy-authorization)$/i
135+
136+
function getRequestHeaders (ctx) {
137+
const { req, proxyName } = ctx
138+
const { rawHeaders: headers, socket } = req
139+
140+
const result = []
141+
142+
let via = ''
143+
let forwarded = ''
144+
let host = ''
145+
let authority = ''
146+
147+
for (let n = 0; n < headers.length; n += 2) {
148+
const key = headers[n + 0]
149+
const val = headers[n + 1]
150+
151+
if (!via && /^via$/i.test(key)) {
152+
via = val
153+
} else if (!forwarded && /^forwarded$/.test(key)) {
154+
forwarded = val
155+
} else if (!authority && /^:authority$/i.test(key)) {
156+
authority = val
157+
} else if (!host && /^host$/i.test(key)) {
158+
host = val
159+
} else if (key.charAt(0) !== ':' && !HOP_EXPR.test(key)) {
160+
result.push(key, val)
161+
}
162+
}
163+
164+
// TODO(fix): <host> [ ":" <port> ] vs <pseudonym>
165+
// See, https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Via.
166+
if (proxyName) {
167+
if (via) {
168+
if (via.split(',').some(name => name.endsWith(proxyName))) {
169+
throw new createError.LoopDetected()
170+
}
171+
via += ', '
172+
}
173+
via += `${req.httpVersion} ${proxyName}`
174+
}
175+
176+
if (via) {
177+
result.push('via', via)
178+
}
179+
180+
result.push('forwarded', (forwarded ? forwarded + ', ' : '') + [
181+
`by=${printIp(socket.localAddress, socket.localPort)}`,
182+
`for=${printIp(socket.remoteAddress, socket.remotePort)}`,
183+
`proto=${socket.encrypted ? 'https' : 'http'}`,
184+
`host=${printIp(authority || host || '')}`
185+
].join(';'))
186+
187+
return result
188+
}
189+
190+
function getResponseHeaders (headers) {
191+
let remove
192+
for (let n = 0; n < headers.length; n += 2) {
193+
const key = headers[n + 0]
194+
if (key.length === 10 && /^connection$/i.test(key)) {
195+
const val = headers[n + 1]
196+
if (!HOP_EXPR.test(val)) {
197+
remove = val.split(',')
198+
}
199+
break
200+
}
201+
}
202+
203+
const result = []
204+
for (let n = 0; n < headers.length; n += 2) {
205+
const key = headers[n + 0]
206+
const val = headers[n + 1]
207+
208+
if (!HOP_EXPR.test(key) && (!remove || !remove.includes(key))) {
209+
result.push(key, val)
210+
}
211+
}
212+
213+
return result
214+
}
215+
216+
function setupSocket (socket) {
217+
socket.setTimeout(0)
218+
socket.setNoDelay(true)
219+
socket.setKeepAlive(true, 0)
220+
}
221+
222+
function printIp (address, port) {
223+
const isIPv6 = net.isIPv6(address)
224+
let str = `${address}`
225+
if (isIPv6) {
226+
str = `[${str}]`
227+
}
228+
if (port) {
229+
str = `${str}:${port}`
230+
}
231+
if (isIPv6 || port) {
232+
str = `"${str}"`
233+
}
234+
return str
235+
}
236+
237+
function writeHead (res, statusCode, headers) {
238+
// TODO (perf): res.writeHead should support Array and/or string.
239+
const obj = {}
240+
for (var i = 0; i < headers.length; i += 2) {
241+
var key = headers[i]
242+
var val = obj[key]
243+
if (!val) {
244+
obj[key] = headers[i + 1]
245+
} else {
246+
if (!Array.isArray(val)) {
247+
val = [val]
248+
obj[key] = val
249+
}
250+
val.push(headers[i + 1])
251+
}
252+
}
253+
254+
res.writeHead(statusCode, obj)
255+
256+
return res
257+
}

0 commit comments

Comments
 (0)