Allow connection broker to be used for director->cache communications#2378
Conversation
When the director wants to connect to services leveraging the broker, have the global transport object use a new broker-aware dialer function.
h2zh
left a comment
There was a problem hiding this comment.
My review focus is the new dialer. It seems that the key of brokerEndpoints is designed to store service name but actually uses url.
When you address these comments, could you create a new commit for the changes instead of rebasing the changes to existing commits? In this way, I’m able to view what has been changed.
| brokerDialer.UseBroker(sType, sAd.WebURL.Host, sAd.BrokerURL.String()) | ||
| if sAd.Type == server_structs.OriginType.String() { | ||
| brokerDialer.UseBroker(sType, sAd.URL.Host, sAd.BrokerURL.String()) | ||
| } |
There was a problem hiding this comment.
If we use service name as the key for brokerEndpoints (as what it is in this PR), the code here should be:
| brokerDialer.UseBroker(sType, sAd.WebURL.Host, sAd.BrokerURL.String()) | |
| if sAd.Type == server_structs.OriginType.String() { | |
| brokerDialer.UseBroker(sType, sAd.URL.Host, sAd.BrokerURL.String()) | |
| } | |
| brokerDialer.UseBroker(sType, sAd.Name, sAd.BrokerURL.String()) |
There was a problem hiding this comment.
I'll tweak the comments and inline documentation but the original form is correct.
The second argument is the address - as passed to the DialContext function. This is typically the host:port used in a URL to contact the server (for the origin, this is the xrootd URL and web URL; for the cache, only the web URL).
Long-term, this is indeed something I'd like to clean-up: what sort of "hostname" should be provided in URLs that are meant to be used in the broker? It's not clear the self-identified hostname is the right thing.
For now, I think we shouldn't try to clean this piece up in this PR.
| func (d *BrokerDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { | ||
| info := d.brokerEndpoints.Get(addr) | ||
| if info == nil { | ||
| // If the endpoint is not found in the cache, use the default dialer. | ||
| return d.dialerContext(ctx, network, addr) | ||
| } | ||
|
|
||
| sType := info.Value().ServerType | ||
| prefix := "" | ||
| if sType.IsEnabled(server_structs.CacheType) { | ||
| addrSplit := strings.SplitN(addr, ":", 2) | ||
| prefix = "/caches/" + addrSplit[0] | ||
| } else { | ||
| prefix = "/origins/" + addr | ||
| } | ||
| return ConnectToOrigin(ctx, info.Value().BrokerUrl, prefix, addr) |
There was a problem hiding this comment.
Pelican service name, prefix and URL/WebURL have the following pattern (no exception in Director data):
Origin: prefix = “/origins/” + webUrl
Cache: prefix = “/caches/” + name
addr in this function seems to be webUrl, so the prefix this snippet produces for all cache servers are incorrect.
However, there is no any fixed pattern between Pelican service name and webURL. It means we can't convert addr(webURL) to name then to prefix. According to the function signature of ConnectToOrigin, its last input param is the name of origin/cache, which is missing in this code snippet. Also, the brokerEndpoints requires name as the key.
There was a problem hiding this comment.
Per the unit tests, this seems to match the current default behaviors, no? Are you saying this isn't matching what's currently in the OSDF?
Unfortunately, if the unit tests don't match OSDF's behavior, we might have to accept this as scaffolding to a larger set of fixes for the broker URL & naming in general as a follow-up in v7.18 as discussed today.
Tries to make the functionality clearer (via variable and method names) and adds a few comments in code the reviewer found confusing.
|
Since I've got a few pending PRs that depend on this one -- and the remaining issues are around the registry naming (which we know needs cleaned up) -- I'm going to pull PI privilege and go ahead and merge this. Will create a follow-up issue for the naming piece. |
This hooks the use of the broker into the director, allowing it to contact the cache.
With this, caches no longer need to allow incoming connections from the director to enable functionality like monitoring.