Skip to content

Commit 0a0364d

Browse files
committed
add proxy example
1 parent 3ce6832 commit 0a0364d

2 files changed

Lines changed: 274 additions & 0 deletions

File tree

examples/proxy/index.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
const { Pool, Client } = require('../../')
2+
const http = require('http')
3+
const proxy = require('./proxy')
4+
5+
const client = new Pool('http://localhost:4001', {
6+
connections: 256,
7+
pipelining: 1
8+
})
9+
10+
Promise.all([
11+
new Promise(resolve => {
12+
// Proxy
13+
http.createServer((req, res) => {
14+
proxy({ req, res }, client)
15+
}).listen(4000, resolve)
16+
}),
17+
new Promise(resolve => {
18+
// Upstream
19+
http.createServer((req, res) => {
20+
res.end('hello world')
21+
}).listen(4001, resolve)
22+
})
23+
]).then(async () => {
24+
const client = new Client('http://localhost:4000')
25+
const { body } = await client.request({
26+
method: 'GET',
27+
path: '/'
28+
})
29+
30+
for await (const chunk of body) {
31+
console.log(String(chunk))
32+
}
33+
})
34+
35+
http.createServer((req, res) => {
36+
res.end('hello world')
37+
}).listen(3001)

examples/proxy/proxy.js

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
const net = require('net')
2+
const { pipeline } = require('stream')
3+
4+
module.exports = async function proxy (ctx, client) {
5+
const { req, socket } = ctx
6+
7+
// TODO: requestTimeout?
8+
9+
if (socket) {
10+
const handler = new WSHandler(ctx)
11+
client.dispatch({
12+
method: req.method,
13+
path: req.originalUrl || req.url,
14+
headers: getRequestHeaders(ctx),
15+
upgrade: 'Websocket'
16+
}, handler)
17+
return handler.promise
18+
} else {
19+
const handler = new HTTPHandler(ctx)
20+
client.dispatch({
21+
method: req.method,
22+
path: req.originalUrl || req.url,
23+
headers: getRequestHeaders(ctx),
24+
body: req
25+
}, handler)
26+
return handler.promise
27+
}
28+
}
29+
30+
class WSHandler {
31+
constructor ({ socket, head }) {
32+
setupSocket(socket)
33+
34+
this.socket = socket
35+
this.head = head
36+
this.promise = new Promise((resolve, reject) => {
37+
this.callback = err => err ? reject(err) : resolve()
38+
})
39+
}
40+
41+
onConnect (abort) {
42+
if (this.socket.destroyed) {
43+
abort()
44+
} else {
45+
this.socket.on('close', abort)
46+
}
47+
}
48+
49+
onUpgrade (statusCode, headers, socket) {
50+
if (this.head && this.head.length) {
51+
socket.unshift(this.head)
52+
}
53+
54+
headers = getResponseHeaders(headers)
55+
56+
setupSocket(socket)
57+
58+
let head = 'HTTP/1.1 101 Switching Protocols\r\nconnection: upgrade\r\nupgrade: websocket'
59+
60+
for (let n = 0; n < headers.length; n += 2) {
61+
const key = headers[n + 0]
62+
const val = headers[n + 1]
63+
64+
if (!Array.isArray(val)) {
65+
head += `\r\n${key}: ${val}`
66+
} else {
67+
for (let i = 0; i < val.length; i++) {
68+
head += `\r\n${key}: ${val[i]}`
69+
}
70+
}
71+
}
72+
head += '\r\n\r\n'
73+
74+
this.socket.write(head)
75+
76+
pipeline(socket, this.socket, socket, this.callback)
77+
}
78+
79+
onError (err) {
80+
this.callback(err)
81+
}
82+
}
83+
84+
class HTTPHandler {
85+
constructor ({ req, res }) {
86+
this.req = req
87+
this.res = res
88+
this.promise = new Promise((resolve, reject) => {
89+
this.callback = err => err ? reject(err) : resolve()
90+
})
91+
}
92+
93+
onConnect (abort) {
94+
if (this.req.aborted) {
95+
abort()
96+
} else {
97+
this.res.on('close', abort)
98+
}
99+
}
100+
101+
onHeaders (statusCode, headers, resume) {
102+
if (statusCode < 200) {
103+
return
104+
}
105+
106+
this.res.on('drain', resume)
107+
108+
headers = getResponseHeaders(headers)
109+
110+
// TODO (perf): res.writeHead should support Array and/or string.
111+
const obj = {}
112+
for (var i = 0; i < headers.length; i += 2) {
113+
var key = headers[i]
114+
var val = obj[key]
115+
if (!val) {
116+
obj[key] = headers[i + 1]
117+
} else {
118+
if (!Array.isArray(val)) {
119+
val = [val]
120+
obj[key] = val
121+
}
122+
val.push(headers[i + 1])
123+
}
124+
}
125+
126+
this.res.writeHead(statusCode, obj)
127+
}
128+
129+
onData (chunk) {
130+
return this.res.write(chunk)
131+
}
132+
133+
onComplete () {
134+
this.res.end()
135+
this.callback()
136+
}
137+
138+
onError (err) {
139+
this.callback(err)
140+
}
141+
}
142+
143+
const HOP_EXPR = /^(te|via|host|upgrade|trailers|forwarded|connection|keep-alive|http2-settings|transfer-encoding|proxy-connection|proxy-authenticate|proxy-authorization)$/i
144+
145+
function getRequestHeaders ({ req, proxyName }) {
146+
const result = []
147+
148+
for (const [key, value] of Object.entries(req.headers)) {
149+
if (key.charAt(0) !== ':' && !HOP_EXPR.test(key)) {
150+
result.push(key, value)
151+
}
152+
}
153+
154+
// TODO(fix): <host> [ ":" <port> ] vs <pseudonym>
155+
// See, https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Via.
156+
let via = req.headers.via || ''
157+
if (proxyName) {
158+
if (via) {
159+
for (const name of via.split(',')) {
160+
if (name.endsWith(proxyName)) {
161+
const err = new Error('loop detected')
162+
err.statusCode = 508
163+
throw err
164+
}
165+
}
166+
via += ','
167+
}
168+
169+
result.push('via', `${via}${req.httpVersion} ${proxyName}`)
170+
} else if (via) {
171+
result.push('via', via)
172+
}
173+
174+
const forwarded = [
175+
`by=${printIp(req.socket.localAddress, req.socket.localPort)}`,
176+
`for=${printIp(req.socket.remoteAddress, req.socket.remotePort)}`,
177+
`proto=${req.socket.encrypted ? 'https' : 'http'}`,
178+
`host=${printIp(req.headers[':authority'] || req.headers.host || '')}`
179+
].join(';')
180+
181+
if (req.headers.forwarded) {
182+
result.push('forwarded', `, ${forwarded}`)
183+
} else {
184+
result.push('forwarded', `${forwarded}`)
185+
}
186+
187+
return result
188+
}
189+
190+
function getResponseHeaders (headers) {
191+
let result = []
192+
let remove
193+
for (let n = 0; n < headers.length; n += 2) {
194+
const key = headers[n + 0]
195+
if (/^connection$/i.test(key)) {
196+
const val = headers[n + 1]
197+
if (/^(connection|keep-alive|upgrade)$/i.test(val)) {
198+
remove = val.split(',')
199+
}
200+
}
201+
}
202+
203+
for (let n = 0; n < headers.length; n += 2) {
204+
const key = headers[n + 0]
205+
const val = headers[n + 1]
206+
207+
if (!HOP_EXPR.test(key) && (!remove || !remove.includes(key))) {
208+
if (!result) {
209+
result = []
210+
}
211+
result.push(key, val)
212+
}
213+
}
214+
215+
return result || headers
216+
}
217+
218+
function setupSocket (socket) {
219+
socket.setTimeout(0)
220+
socket.setNoDelay(true)
221+
socket.setKeepAlive(true, 0)
222+
}
223+
224+
function printIp (address, port) {
225+
const isIPv6 = net.isIPv6(address)
226+
let str = `${address}`
227+
if (isIPv6) {
228+
str = `[${str}]`
229+
}
230+
if (port) {
231+
str = `${str}:${port}`
232+
}
233+
if (isIPv6 || port) {
234+
str = `"${str}"`
235+
}
236+
return str
237+
}

0 commit comments

Comments
 (0)