Skip to content

Commit 419d498

Browse files
committed
feat: webhook outbound
1 parent e6207e3 commit 419d498

File tree

5 files changed

+267
-0
lines changed

5 files changed

+267
-0
lines changed

infra/conf/webhook.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package conf
2+
3+
import (
4+
"github.com/xtls/xray-core/proxy/webhook"
5+
"google.golang.org/protobuf/proto"
6+
)
7+
8+
type WebhookConfig struct {
9+
URL string `json:"url"`
10+
Deduplication uint32 `json:"deduplication"`
11+
}
12+
13+
func (c *WebhookConfig) Build() (proto.Message, error) {
14+
return &webhook.Config{
15+
Url: c.URL,
16+
Deduplication: c.Deduplication,
17+
}, nil
18+
}

infra/conf/xray.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ var (
5252
"hysteria": func() interface{} { return new(HysteriaClientConfig) },
5353
"dns": func() interface{} { return new(DNSOutboundConfig) },
5454
"wireguard": func() interface{} { return &WireGuardConfig{IsClient: true} },
55+
"webhook": func() interface{} { return new(WebhookConfig) },
5556
}, "protocol", "settings")
5657
)
5758

main/distro/all/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
_ "github.com/xtls/xray-core/proxy/vless/outbound"
4848
_ "github.com/xtls/xray-core/proxy/vmess/inbound"
4949
_ "github.com/xtls/xray-core/proxy/vmess/outbound"
50+
_ "github.com/xtls/xray-core/proxy/webhook"
5051
_ "github.com/xtls/xray-core/proxy/wireguard"
5152

5253
// Transports

proxy/webhook/config.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
syntax = "proto3";
2+
3+
package xray.proxy.webhook;
4+
option csharp_namespace = "Xray.Proxy.Webhook";
5+
option go_package = "github.com/xtls/xray-core/proxy/webhook";
6+
option java_package = "com.xray.proxy.webhook";
7+
option java_multiple_files = true;
8+
9+
message Config {
10+
string url = 1;
11+
uint32 deduplication = 2;
12+
}

proxy/webhook/webhook.go

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// Package webhook is an outbound handler that drops traffic and fires an HTTP
2+
// POST with connection metadata to a configured URL.
3+
//
4+
// URL formats:
5+
//
6+
// https://api.example.com/alert — regular HTTP(S)
7+
// unix:///var/run/webhook.sock — Unix socket, POST to /
8+
// unix:///var/run/webhook.sock:/alert — Unix socket, POST to /alert
9+
package webhook
10+
11+
import (
12+
"bytes"
13+
"context"
14+
"encoding/json"
15+
"net"
16+
"net/http"
17+
"strings"
18+
"sync"
19+
"time"
20+
21+
"github.com/xtls/xray-core/common"
22+
"github.com/xtls/xray-core/common/errors"
23+
"github.com/xtls/xray-core/common/session"
24+
"github.com/xtls/xray-core/transport"
25+
"github.com/xtls/xray-core/transport/internet"
26+
)
27+
28+
const unixScheme = "unix://"
29+
30+
// parseURL splits a webhook URL into an HTTP URL and an optional Unix socket
31+
// path. For regular http/https URLs the input is returned unchanged with an
32+
// empty socketPath. For Unix sockets the format is:
33+
//
34+
// unix:///path/to/socket.sock:/http/path
35+
//
36+
// The :/ separator after the socket path delimits the HTTP request path.
37+
// If omitted, "/" is used.
38+
func parseURL(raw string) (httpURL, socketPath string) {
39+
if !strings.HasPrefix(raw, unixScheme) {
40+
return raw, ""
41+
}
42+
rest := raw[len(unixScheme):]
43+
if idx := strings.Index(rest, ":/"); idx >= 0 {
44+
return "http://localhost" + rest[idx+1:], rest[:idx]
45+
}
46+
return "http://localhost/", rest
47+
}
48+
49+
type event struct {
50+
Email *string `json:"email"`
51+
Level *uint32 `json:"level"`
52+
Protocol *string `json:"protocol"`
53+
Network *string `json:"network"`
54+
Source *string `json:"source"`
55+
Destination *string `json:"destination"`
56+
OriginalTarget *string `json:"originalTarget"`
57+
RouteTarget *string `json:"routeTarget"`
58+
InboundTag *string `json:"inboundTag"`
59+
InboundName *string `json:"inboundName"`
60+
InboundLocal *string `json:"inboundLocal"`
61+
OutboundTag *string `json:"outboundTag"`
62+
Timestamp int64 `json:"ts"`
63+
}
64+
65+
func ptr[T any](v T) *T { return &v }
66+
67+
type Handler struct {
68+
url string
69+
deduplication uint32
70+
client *http.Client
71+
seen sync.Map
72+
done chan struct{}
73+
wg sync.WaitGroup
74+
}
75+
76+
func New(ctx context.Context, config *Config) (*Handler, error) {
77+
if config.Url == "" {
78+
return nil, errors.New("webhook: url must not be empty")
79+
}
80+
httpURL, socketPath := parseURL(config.Url)
81+
h := &Handler{
82+
url: httpURL,
83+
deduplication: config.Deduplication,
84+
client: &http.Client{
85+
Timeout: 5 * time.Second,
86+
},
87+
done: make(chan struct{}),
88+
}
89+
if socketPath != "" {
90+
h.client.Transport = &http.Transport{
91+
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
92+
var d net.Dialer
93+
return d.DialContext(ctx, "unix", socketPath)
94+
},
95+
}
96+
}
97+
if h.deduplication > 0 {
98+
go h.cleanupLoop()
99+
}
100+
return h, nil
101+
}
102+
103+
func (h *Handler) Close() error {
104+
select {
105+
case <-h.done:
106+
default:
107+
close(h.done)
108+
}
109+
h.wg.Wait()
110+
return nil
111+
}
112+
113+
func (h *Handler) cleanupLoop() {
114+
ttl := time.Duration(h.deduplication) * time.Second
115+
ticker := time.NewTicker(ttl)
116+
defer ticker.Stop()
117+
for {
118+
select {
119+
case <-h.done:
120+
return
121+
case <-ticker.C:
122+
now := time.Now()
123+
h.seen.Range(func(key, value any) bool {
124+
if now.Sub(value.(time.Time)) >= ttl {
125+
h.seen.Delete(key)
126+
}
127+
return true
128+
})
129+
}
130+
}
131+
}
132+
133+
func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
134+
outbounds := session.OutboundsFromContext(ctx)
135+
ob := outbounds[len(outbounds)-1]
136+
ob.Name = "webhook"
137+
138+
ev := event{
139+
Timestamp: time.Now().Unix(),
140+
Network: ptr(ob.Target.Network.SystemString()),
141+
Destination: ptr(ob.Target.String()),
142+
OutboundTag: ptr(ob.Tag),
143+
}
144+
if ob.OriginalTarget.Address != nil {
145+
ev.OriginalTarget = ptr(ob.OriginalTarget.String())
146+
}
147+
if ob.RouteTarget.Address != nil {
148+
ev.RouteTarget = ptr(ob.RouteTarget.String())
149+
}
150+
151+
if ib := session.InboundFromContext(ctx); ib != nil {
152+
ev.Source = ptr(ib.Source.String())
153+
ev.InboundTag = ptr(ib.Tag)
154+
ev.InboundName = ptr(ib.Name)
155+
if ib.Local.Address != nil {
156+
ev.InboundLocal = ptr(ib.Local.String())
157+
}
158+
if ib.User != nil {
159+
ev.Email = ptr(ib.User.Email)
160+
ev.Level = ptr(ib.User.Level)
161+
}
162+
}
163+
164+
if ct := session.ContentFromContext(ctx); ct != nil {
165+
ev.Protocol = ptr(ct.Protocol)
166+
}
167+
168+
email := ""
169+
if ev.Email != nil {
170+
email = *ev.Email
171+
}
172+
if !h.isDuplicate(email) {
173+
select {
174+
case <-h.done:
175+
default:
176+
h.wg.Add(1)
177+
go func() {
178+
defer h.wg.Done()
179+
h.post(ctx, &ev)
180+
}()
181+
}
182+
}
183+
184+
common.Interrupt(link.Writer)
185+
return nil
186+
}
187+
188+
func (h *Handler) isDuplicate(email string) bool {
189+
if h.deduplication == 0 || email == "" {
190+
return false
191+
}
192+
ttl := time.Duration(h.deduplication) * time.Second
193+
now := time.Now()
194+
if v, loaded := h.seen.LoadOrStore(email, now); loaded {
195+
if now.Sub(v.(time.Time)) < ttl {
196+
return true
197+
}
198+
h.seen.Store(email, now)
199+
}
200+
return false
201+
}
202+
203+
func (h *Handler) post(ctx context.Context, ev *event) {
204+
body, err := json.Marshal(ev)
205+
if err != nil {
206+
errors.LogWarning(ctx, "webhook: marshal failed: ", err)
207+
return
208+
}
209+
210+
postCtx, cancel := context.WithTimeout(context.Background(), h.client.Timeout)
211+
defer cancel()
212+
213+
req, err := http.NewRequestWithContext(postCtx, http.MethodPost, h.url, bytes.NewReader(body))
214+
if err != nil {
215+
errors.LogWarning(ctx, "webhook: request build failed: ", err)
216+
return
217+
}
218+
req.Header.Set("Content-Type", "application/json")
219+
220+
resp, err := h.client.Do(req)
221+
if err != nil {
222+
errors.LogInfo(ctx, "webhook: POST failed: ", err)
223+
return
224+
}
225+
resp.Body.Close()
226+
if resp.StatusCode >= 400 {
227+
errors.LogWarning(ctx, "webhook: POST returned status ", resp.StatusCode)
228+
}
229+
}
230+
231+
func init() {
232+
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
233+
return New(ctx, config.(*Config))
234+
}))
235+
}

0 commit comments

Comments
 (0)