Skip to content

Commit 968b70a

Browse files
authored
Merge pull request #2378 from bbockelm/universal_broker
Allow connection broker to be used for director->cache communications
2 parents 11d8152 + d136d4e commit 968b70a

21 files changed

Lines changed: 648 additions & 112 deletions

broker/broker_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func Setup(t *testing.T, ctx context.Context, egrp *errgroup.Group) {
110110
})
111111
require.NoError(t, err)
112112

113+
namespaceKeys = nil
113114
LaunchNamespaceKeyMaintenance(ctx, egrp)
114115
}
115116

@@ -193,7 +194,11 @@ func TestBroker(t *testing.T) {
193194
viper.Set("Federation.RegistryUrl", param.Server_ExternalWebUrl.GetString())
194195
listenerChan := make(chan any)
195196
ctxQuick, deadlineCancel := context.WithTimeout(ctx, 5*time.Second) // Have shorter timeout for this handshake
196-
err = LaunchRequestMonitor(ctxQuick, egrp, listenerChan)
197+
198+
externalWebUrl, err := url.Parse(param.Server_ExternalWebUrl.GetString())
199+
require.NoError(t, err)
200+
201+
err = LaunchRequestMonitor(ctxQuick, egrp, server_structs.CacheType, externalWebUrl.Hostname(), listenerChan)
197202
require.NoError(t, err)
198203

199204
// Initiate the callback using the cache-based routines.
@@ -203,9 +208,9 @@ func TestBroker(t *testing.T) {
203208
brokerUrl.Path = "/api/v1.0/broker/reverse"
204209
query := brokerUrl.Query()
205210
query.Set("origin", param.Server_Hostname.GetString())
206-
query.Set("prefix", "/foo")
211+
query.Set("prefix", "/caches/"+externalWebUrl.Hostname())
207212
brokerUrl.RawQuery = query.Encode()
208-
clientConn, err := ConnectToOrigin(ctxQuick, brokerUrl.String(), "/foo", param.Server_Hostname.GetString())
213+
clientConn, err := ConnectToService(ctxQuick, brokerUrl.String(), "/caches/"+externalWebUrl.Hostname(), param.Server_Hostname.GetString())
209214
require.NoError(t, err)
210215
log.Debugf("Cache got reversed client connection with cache side %s and origin side %s", clientConn.LocalAddr(), clientConn.RemoteAddr())
211216

broker/client.go

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func generateRequestId() string {
157157
}
158158

159159
// Given an origin's broker URL, return a connected socket to the origin
160-
func ConnectToOrigin(ctx context.Context, brokerUrl, prefix, originName string) (conn net.Conn, err error) {
160+
func ConnectToService(ctx context.Context, brokerUrl, prefix, originName string) (conn net.Conn, err error) {
161161

162162
// Ensure we have a local CA for signing an origin host certificate.
163163
if err = config.GenerateCACert(); err != nil {
@@ -226,7 +226,7 @@ func ConnectToOrigin(ctx context.Context, brokerUrl, prefix, originName string)
226226
// Create a cloned transport which disables HTTP/2 (as that TCP string can't
227227
// be hijacked which we will need to do below). The clone ensures that we're
228228
// not going to be reusing TCP connections.
229-
tr := config.GetTransport().Clone()
229+
tr := config.GetBasicTransport().Clone()
230230
tr.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
231231
client := &http.Client{Transport: tr}
232232

@@ -385,7 +385,7 @@ func ConnectToOrigin(ctx context.Context, brokerUrl, prefix, originName string)
385385
//
386386
// The TCP socket used for the callback will be converted to a one-shot listener
387387
// and reused with the origin as the "server".
388-
func doCallback(ctx context.Context, brokerResp reversalRequest) (listener net.Listener, err error) {
388+
func doCallback(ctx context.Context, sType server_structs.ServerType, brokerResp reversalRequest) (listener net.Listener, err error) {
389389
log.Debugln("Origin starting callback to cache at", brokerResp.CallbackUrl)
390390

391391
privateKey, err := privateKeyFromBytes(brokerResp.PrivateKey)
@@ -415,7 +415,18 @@ func doCallback(ctx context.Context, brokerResp reversalRequest) (listener net.L
415415
}
416416
cacheAud.Path = ""
417417

418-
token, err := createToken(param.Origin_FederationPrefix.GetString(), param.Server_Hostname.GetString(), cacheAud.String(), token_scopes.Broker_Callback)
418+
servicePrefix := ""
419+
url, err := url.Parse(param.Server_ExternalWebUrl.GetString())
420+
if err != nil {
421+
err = errors.Wrap(err, "failure when parsing the external web URL")
422+
return
423+
}
424+
if sType.IsEnabled(server_structs.CacheType) {
425+
servicePrefix = server_structs.GetCacheNs(url.Hostname())
426+
} else {
427+
servicePrefix = server_structs.GetOriginNs(url.Host)
428+
}
429+
token, err := createToken(servicePrefix, url.Hostname(), cacheAud.String(), token_scopes.Broker_Callback)
419430
if err != nil {
420431
err = errors.Wrap(err, "failure when constructing the cache callback token")
421432
return
@@ -550,32 +561,59 @@ func doCallback(ctx context.Context, brokerResp reversalRequest) (listener net.L
550561
// TLS listener where you can invoke "Accept" once before it automatically
551562
// closes itself. It is the result of a successful connection reversal to
552563
// a cache.
553-
func LaunchRequestMonitor(ctx context.Context, egrp *errgroup.Group, resultChan chan any) (err error) {
564+
//
565+
// The request monitor is used by the "private service" (the service behind the
566+
// firewall) to know when to setup connections requested by the "public service"
567+
// (e.g., a cache).
568+
func LaunchRequestMonitor(ctx context.Context, egrp *errgroup.Group, sType server_structs.ServerType, privateName string, resultChan chan any) (err error) {
554569
fedInfo, err := config.GetFederation(ctx)
555570
if err != nil {
556571
return err
557572
}
558573

574+
prefix := ""
575+
if sType.IsEnabled(server_structs.CacheType) {
576+
prefix = server_structs.GetCacheNs(privateName)
577+
} else {
578+
prefix = server_structs.GetOriginNs(privateName)
579+
}
580+
559581
brokerUrl := fedInfo.BrokerEndpoint
560582
if brokerUrl == "" {
561583
return errors.New("Broker service is not set or discovered; cannot enable broker functionality. Try setting Federation.BrokerUrl")
562584
}
563585
brokerEndpoint := brokerUrl + "/api/v1.0/broker/retrieve"
564-
originUrl, err := url.Parse(param.Server_ExternalWebUrl.GetString())
565-
if err != nil {
566-
return
567-
}
568586
oReq := originRequest{
569-
Origin: originUrl.Hostname(),
570-
Prefix: param.Origin_FederationPrefix.GetString(),
587+
Origin: privateName,
588+
Prefix: prefix,
571589
}
572590
req, err := json.Marshal(&oReq)
573591
if err != nil {
574592
return
575593
}
576594
reqReader := bytes.NewReader(req)
577595

596+
// Create a token that will be used to retrieve requests from the broker;
597+
// this is done before the goroutine starts to guarantee that we are looking up
598+
// the Viper config from a single-threaded context. Otherwise, during startup,
599+
// we may have concurrent read and write operations to the Viper config, which
600+
// can lead to a panic.
601+
brokerAud, err := url.Parse(fedInfo.BrokerEndpoint)
602+
if err != nil {
603+
log.Errorln("Failure when parsing broker URL:", err)
604+
return
605+
}
606+
brokerAud.Path = ""
607+
token, err := createToken(prefix, param.Server_Hostname.GetString(), brokerAud.String(), token_scopes.Broker_Retrieve)
608+
if err != nil {
609+
log.Errorln("Failure when constructing the broker retrieve token:", err)
610+
return
611+
}
612+
613+
client := &http.Client{Transport: config.GetBasicTransport()}
614+
578615
egrp.Go(func() (err error) {
616+
firstLoop := true
579617
for {
580618
sleepDuration := time.Second + time.Duration(mrand.Intn(500))*time.Microsecond
581619
select {
@@ -595,25 +633,21 @@ func LaunchRequestMonitor(ctx context.Context, egrp *errgroup.Group, resultChan
595633
req.Header.Set("Content-Type", "application/json")
596634
req.Header.Set("User-Agent", "pelican-origin/"+config.GetVersion())
597635

598-
brokerAud, err := url.Parse(fedInfo.BrokerEndpoint)
599-
if err != nil {
600-
log.Errorln("Failure when parsing broker URL:", err)
601-
break
602-
}
603-
brokerAud.Path = ""
604-
605-
token, err := createToken(param.Origin_FederationPrefix.GetString(), param.Server_Hostname.GetString(), brokerAud.String(), token_scopes.Broker_Retrieve)
606-
if err != nil {
607-
log.Errorln("Failure when constructing the broker retrieve token:", err)
608-
break
636+
if !firstLoop {
637+
token, err = createToken(prefix, param.Server_Hostname.GetString(), brokerAud.String(), token_scopes.Broker_Retrieve)
638+
if err != nil {
639+
log.Errorln("Failure when constructing the broker retrieve token:", err)
640+
break
641+
}
609642
}
643+
firstLoop = false
610644
req.Header.Set("Authorization", "Bearer "+token)
611645

612-
tr := config.GetTransport()
613-
client := &http.Client{Transport: tr}
614-
615646
resp, err := client.Do(req)
616647
if err != nil {
648+
if errors.Is(err, context.Canceled) {
649+
break
650+
}
617651
log.Errorln("Failure when invoking the broker URL for retrieving requests", err)
618652
break
619653
}
@@ -642,7 +676,7 @@ func LaunchRequestMonitor(ctx context.Context, egrp *errgroup.Group, resultChan
642676
}
643677

644678
if brokerResp.Status == server_structs.RespOK {
645-
listener, err := doCallback(ctx, brokerResp.Request)
679+
listener, err := doCallback(ctx, sType, brokerResp.Request)
646680
if err != nil {
647681
if errors.Is(err, context.Canceled) {
648682
log.Debugln("Shutdown encountered while processing callback")

broker/dialer.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/***************************************************************
2+
*
3+
* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"); you
6+
* may not use this file except in compliance with the License. You may
7+
* obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
***************************************************************/
18+
19+
// This file contains methods for a dialer that can use the broker
20+
// functionality to connect to a remote service.
21+
22+
package broker
23+
24+
import (
25+
"context"
26+
"net"
27+
"strings"
28+
29+
"github.com/jellydator/ttlcache/v3"
30+
"golang.org/x/sync/errgroup"
31+
32+
"github.com/pelicanplatform/pelican/param"
33+
"github.com/pelicanplatform/pelican/server_structs"
34+
)
35+
36+
type (
37+
brokerPrefixInfo struct {
38+
ServerType server_structs.ServerType
39+
BrokerUrl string
40+
}
41+
42+
// BrokerDialer is a dialer that can use the broker
43+
// functionality to connect to a remote service.
44+
BrokerDialer struct {
45+
dialerContext func(ctx context.Context, network, addr string) (net.Conn, error)
46+
// Map from service name to broker endpoint.
47+
// If the service name is not found in the cache, then the dialer
48+
// will use a normal TCP connection to the service.
49+
brokerEndpoints *ttlcache.Cache[string, brokerPrefixInfo]
50+
}
51+
)
52+
53+
// NewBrokerDialer creates a new BrokerDialer.
54+
func NewBrokerDialer(ctx context.Context, egrp *errgroup.Group) *BrokerDialer {
55+
56+
dialer := &net.Dialer{
57+
Timeout: param.Transport_DialerTimeout.GetDuration(),
58+
KeepAlive: param.Transport_DialerKeepAlive.GetDuration(),
59+
}
60+
brokerEndpoints := ttlcache.New(
61+
ttlcache.WithTTL[string, brokerPrefixInfo](param.Transport_BrokerEndpointCacheTTL.GetDuration()),
62+
ttlcache.WithDisableTouchOnHit[string, brokerPrefixInfo](),
63+
)
64+
65+
go brokerEndpoints.Start()
66+
egrp.Go(func() error {
67+
<-ctx.Done()
68+
brokerEndpoints.DeleteAll()
69+
brokerEndpoints.Stop()
70+
return nil
71+
})
72+
73+
return &BrokerDialer{
74+
dialerContext: dialer.DialContext,
75+
brokerEndpoints: brokerEndpoints,
76+
}
77+
}
78+
79+
// Set the dialer to use `brokerUrl` as the broker endpoint for
80+
// the service `name`.
81+
func (d *BrokerDialer) UseBroker(serverType server_structs.ServerType, name, brokerUrl string) {
82+
d.brokerEndpoints.Set(name, brokerPrefixInfo{
83+
ServerType: serverType,
84+
BrokerUrl: brokerUrl,
85+
}, ttlcache.DefaultTTL)
86+
}
87+
88+
// DialContext dials a connection to the given network and address using the broker.
89+
func (d *BrokerDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
90+
info := d.brokerEndpoints.Get(addr)
91+
if info == nil {
92+
// If the endpoint is not found in the cache, use the default dialer.
93+
return d.dialerContext(ctx, network, addr)
94+
}
95+
96+
sType := info.Value().ServerType
97+
prefix := ""
98+
if sType.IsEnabled(server_structs.CacheType) {
99+
addrSplit := strings.SplitN(addr, ":", 2)
100+
prefix = "/caches/" + addrSplit[0]
101+
} else {
102+
prefix = "/origins/" + addr
103+
}
104+
return ConnectToService(ctx, info.Value().BrokerUrl, prefix, addr)
105+
}

broker/request_manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type (
3232
PrivateKey string `json:"private_key,omitempty"`
3333
RequestId string `json:"request_id,omitempty"`
3434
Prefix string `json:"prefix,omitempty"`
35-
OriginName string `json:"origin,omitempty"`
35+
OriginName string `json:"origin,omitempty"` // Name of the service for the reversal request. Originally, brokers were for origins-only (hence the inexact name of the parameter).
3636
}
3737

3838
requestInfo struct {
@@ -68,7 +68,7 @@ func getOriginQueue(prefix, origin string) chan reversalRequest {
6868
// Send a request to a given origin's queue.
6969
// Return a requestTimeout error if no origin retrieved the request before the context timed out.
7070
func handleRequest(ctx context.Context, origin string, req reversalRequest, timeout time.Duration) (err error) {
71-
queue := getOriginQueue(req.Prefix, origin)
71+
queue := getOriginQueue("/", origin)
7272
maxTime := timeout - 500*time.Millisecond - time.Duration(rand.Intn(500))*time.Millisecond
7373
if maxTime <= 0 {
7474
maxTime = time.Millisecond
@@ -90,7 +90,7 @@ func handleRequest(ctx context.Context, origin string, req reversalRequest, time
9090
}
9191

9292
// Handle the origin's request to retrieve any pending reversals.
93-
func handleRetrieve(appCtx context.Context, ginCtx context.Context, prefix, origin string, timeout time.Duration) (req reversalRequest, err error) {
93+
func handleRetrieve(appCtx context.Context, ginCtx context.Context, origin string, timeout time.Duration) (req reversalRequest, err error) {
9494
// Return randomly short of the timeout.
9595
maxTime := timeout - 500*time.Millisecond - time.Duration(rand.Intn(500))*time.Millisecond
9696
if maxTime <= 0 {
@@ -99,7 +99,7 @@ func handleRetrieve(appCtx context.Context, ginCtx context.Context, prefix, orig
9999
tick := time.NewTicker(maxTime)
100100
defer tick.Stop()
101101
select {
102-
case req = <-getOriginQueue(prefix, origin):
102+
case req = <-getOriginQueue("/", origin):
103103
break
104104
case <-tick.C:
105105
err = errRetrieveTimeout

broker/server_apis.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ func newBrokerRespTimeout() (result brokerRetrievalResp) {
7171
return
7272
}
7373

74+
// Retrieve any pending reversal requests from the connection broker.
75+
//
76+
// This is long-polled by the service relying on the connection broker
77+
// (e.g., an origin); it will return any reversal requests from a public
78+
// service (e.g., a cache) for the origin to make a connection.
7479
func retrieveRequest(ctx context.Context, ginCtx *gin.Context) {
7580
timeoutStr := "5s"
7681
if val := ginCtx.Request.Header.Get("X-Pelican-Timeout"); val != "" {
@@ -108,7 +113,7 @@ func retrieveRequest(ctx context.Context, ginCtx *gin.Context) {
108113
ginCtx.AbortWithStatusJSON(http.StatusUnauthorized, newBrokerRespFail("Authorization denied"))
109114
}
110115

111-
req, err := handleRetrieve(ctx, ginCtx, originReq.Prefix, originReq.Origin, timeoutVal)
116+
req, err := handleRetrieve(ctx, ginCtx, originReq.Origin, timeoutVal)
112117
if errors.Is(err, errRetrieveTimeout) {
113118
ginCtx.JSON(http.StatusOK, newBrokerRespTimeout())
114119
return
@@ -120,6 +125,10 @@ func retrieveRequest(ctx context.Context, ginCtx *gin.Context) {
120125
ginCtx.JSON(http.StatusOK, newBrokerReqResp(req))
121126
}
122127

128+
// Service a request to the broker to initiate a connection.
129+
//
130+
// The connection reversal request will cause a listening service (e.g., an origin)
131+
// to connect to the endpoint provided by the public service (e.g., a cache).
123132
func reverseRequest(ctx context.Context, ginCtx *gin.Context) {
124133
timeoutStr := "5s"
125134
if val := ginCtx.Request.Header.Get("X-Pelican-Timeout"); val != "" {
@@ -178,13 +187,25 @@ func reverseRequest(ctx context.Context, ginCtx *gin.Context) {
178187
}
179188
}
180189

190+
// Register the central broker functionality with the gin router.
191+
//
192+
// Typically, this is done by the director; two APIs are exposed:
193+
// - `retrieve`: Services needing connection brokering (e.g., origins behind a firewall)
194+
// will long-poll this endpoint to retrieve any connection brokering requests from
195+
// a public service (e.g., a cache).
196+
// - `reverse`: Invoked by a public service (e.g., a cache) that would like to connect
197+
// to a service behind a firewall (e.g., an origin). Official request for the origin
198+
// to make a connection.
181199
func RegisterBroker(ctx context.Context, router *gin.RouterGroup) {
182200
// Establish the routes used for cache/origin redirection
183201
router.POST("/api/v1.0/broker/retrieve", func(ginCtx *gin.Context) { retrieveRequest(ctx, ginCtx) })
184202
router.POST("/api/v1.0/broker/reverse", func(ginCtx *gin.Context) { reverseRequest(ctx, ginCtx) })
185203
}
186204

187-
// Cache's HTTP handler function for callbacks from an origin
205+
// Server's HTTP handler function for callbacks from a remote service behind a broker.
206+
//
207+
// The server will authorize the request then hand the go routine waiting for the connection
208+
// reversal. It will return once the other routine has completed the connection reversal.
188209
func handleCallback(ctx context.Context, ginCtx *gin.Context) {
189210
callbackReq := callbackRequest{}
190211
if err := ginCtx.Bind(&callbackReq); err != nil {

0 commit comments

Comments
 (0)