Skip to content

Commit 1e3ec54

Browse files
committed
Add support for prestaging API in the cache
We auto-detect whether the cache has the prestaging API loaded; if so, switch to that instead of downloading to /dev/null as a way to simulate prestaging.
1 parent 4a7c411 commit 1e3ec54

5 files changed

Lines changed: 702 additions & 53 deletions

File tree

client/fed_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,3 +1108,100 @@ func TestPrestage(t *testing.T) {
11081108
}, 2*time.Second, 100*time.Millisecond, "object should be cached after prestage")
11091109
}
11101110
}
1111+
1112+
// TestPrestageWithAPI tests the new Pelican prestage API when available
1113+
func TestPrestageWithAPI(t *testing.T) {
1114+
t.Cleanup(test_utils.SetupTestLogging(t))
1115+
server_utils.ResetTestState()
1116+
defer server_utils.ResetTestState()
1117+
1118+
// Note: This test requires a cache that implements the Pelican prestage API
1119+
// For now, we'll test the detection and fallback logic
1120+
// A full integration test would require deploying a cache with the API support
1121+
1122+
fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg)
1123+
1124+
te, err := client.NewTransferEngine(fed.Ctx)
1125+
require.NoError(t, err)
1126+
1127+
// Create test file
1128+
testFileContent := strings.Repeat("test file content", 10000)
1129+
tempFile, err := os.CreateTemp(t.TempDir(), "test")
1130+
assert.NoError(t, err, "Error creating temp file")
1131+
defer os.Remove(tempFile.Name())
1132+
_, err = tempFile.WriteString(testFileContent)
1133+
assert.NoError(t, err, "Error writing to temp file")
1134+
tempFile.Close()
1135+
1136+
tempToken, _ := getTempToken(t)
1137+
defer tempToken.Close()
1138+
defer os.Remove(tempToken.Name())
1139+
require.NoError(t, param.Set("Logging.DisableProgressBars", true))
1140+
1141+
oldPref, err := config.SetPreferredPrefix(config.PelicanPrefix)
1142+
assert.NoError(t, err)
1143+
defer func() {
1144+
_, err := config.SetPreferredPrefix(oldPref)
1145+
require.NoError(t, err)
1146+
}()
1147+
1148+
for idx, export := range fed.Exports {
1149+
t.Logf("Testing export %d: %s", idx, export.FederationPrefix)
1150+
tempPath := tempFile.Name()
1151+
fileName := filepath.Base(tempPath)
1152+
uploadURL := fmt.Sprintf("pelican://%s:%s%s/prestage-api/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
1153+
export.FederationPrefix, fileName)
1154+
1155+
// Upload the file
1156+
transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name()))
1157+
assert.NoError(t, err)
1158+
assert.Equal(t, int64(len(testFileContent)), transferResultsUpload[0].TransferredBytes)
1159+
1160+
// Create a new client for each export iteration
1161+
tc, err := te.NewClient(client.WithTokenLocation(tempToken.Name()))
1162+
require.NoError(t, err)
1163+
1164+
innerFileUrl, err := url.Parse(uploadURL)
1165+
require.NoError(t, err)
1166+
1167+
// Verify the file is not initially cached
1168+
_, size, err := tc.CacheInfo(fed.Ctx, innerFileUrl)
1169+
require.NoError(t, err)
1170+
require.Equal(t, int64(len(testFileContent)), size)
1171+
1172+
// Prestage the object with forced API usage
1173+
// The test verifies that when forcing API usage, the client properly detects
1174+
// whether the cache supports the API and returns appropriate errors if not
1175+
tj, err := tc.NewPrestageJob(fed.Ctx, innerFileUrl, client.WithForcePrestageAPI(true))
1176+
require.NoError(t, err)
1177+
err = tc.Submit(tj)
1178+
require.NoError(t, err)
1179+
1180+
// Shutdown with timeout - use a separate goroutine to enforce the timeout
1181+
type shutdownResult struct {
1182+
results []client.TransferResults
1183+
err error
1184+
}
1185+
shutdownChan := make(chan shutdownResult, 1)
1186+
go func() {
1187+
results, err := tc.Shutdown()
1188+
shutdownChan <- shutdownResult{results: results, err: err}
1189+
}()
1190+
1191+
// Wait for shutdown or timeout
1192+
var result shutdownResult
1193+
select {
1194+
case result = <-shutdownChan:
1195+
// Shutdown completed
1196+
case <-time.After(20 * time.Second):
1197+
require.Fail(t, "Prestage operation timed out after 20 seconds")
1198+
}
1199+
1200+
require.NoError(t, result.err)
1201+
assert.Equal(t, 1, len(result.results))
1202+
1203+
// Test the Pelican prestage API with forced usage
1204+
require.NoError(t, result.results[0].Error, "Prestage with forced API failed: %v", result.results[0].Error)
1205+
t.Logf("Prestage with forced API succeeded")
1206+
}
1207+
}

client/handle_http.go

Lines changed: 152 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ type (
280280
writer io.WriteCloser // Optional writer for downloads - if set, write to this instead of localPath
281281
reader io.ReadCloser // Optional reader for uploads - if set, read from this instead of localPath
282282
inPlace bool // If true, write directly to final destination; if false, use temporary file
283+
forcePrestageAPI bool // If true, force use of prestage API and error if not supported (no fallback)
283284
}
284285

285286
// A TransferJob associated with a client's request
@@ -300,25 +301,27 @@ type (
300301

301302
// An object able to process transfer jobs.
302303
TransferEngine struct {
303-
ctx context.Context // The context provided upon creation of the engine.
304-
cancel context.CancelFunc
305-
egrp *errgroup.Group // The errgroup for the worker goroutines
306-
work chan *clientTransferJob
307-
files chan *clientTransferFile
308-
results chan *clientTransferResults
309-
jobLookupDone chan *clientTransferJob // Indicates the job lookup handler is done with the job
310-
workersActive int
311-
resultsMap map[uuid.UUID]chan *TransferResults
312-
workMap map[uuid.UUID]chan *TransferJob
313-
notifyChan chan bool
314-
closeChan chan bool
315-
closeDoneChan chan bool
316-
ewmaTick *time.Ticker
317-
ewma ewma.MovingAverage
318-
ewmaVal atomic.Int64
319-
ewmaCtr atomic.Int64
320-
clientLock sync.RWMutex
321-
pelicanUrlCache *pelican_url.Cache
304+
ctx context.Context // The context provided upon creation of the engine.
305+
cancel context.CancelFunc
306+
egrp *errgroup.Group // The errgroup for the worker goroutines
307+
work chan *clientTransferJob
308+
files chan *clientTransferFile
309+
results chan *clientTransferResults
310+
jobLookupDone chan *clientTransferJob // Indicates the job lookup handler is done with the job
311+
workersActive int
312+
resultsMap map[uuid.UUID]chan *TransferResults
313+
workMap map[uuid.UUID]chan *TransferJob
314+
notifyChan chan bool
315+
closeChan chan bool
316+
closeDoneChan chan bool
317+
ewmaTick *time.Ticker
318+
ewma ewma.MovingAverage
319+
ewmaVal atomic.Int64
320+
ewmaCtr atomic.Int64
321+
clientLock sync.RWMutex
322+
pelicanUrlCache *pelican_url.Cache
323+
prestageAPISupport map[string]bool // Lookup table for caches that support the Pelican prestage API (key: host)
324+
prestageAPIMutex sync.RWMutex // Protects the prestageAPISupport map
322325
}
323326

324327
TransferCallbackFunc = func(path string, downloaded int64, totalSize int64, completed bool)
@@ -343,22 +346,23 @@ type (
343346
setupResults sync.Once
344347
}
345348

346-
TransferOption = option.Interface
347-
identTransferOptionCaches struct{}
348-
identTransferOptionCallback struct{}
349-
identTransferOptionTokenLocation struct{}
350-
identTransferOptionAcquireToken struct{}
351-
identTransferOptionToken struct{}
352-
identTransferOptionSynchronize struct{}
353-
identTransferOptionCollectionsUrl struct{}
354-
identTransferOptionChecksums struct{}
355-
identTransferOptionRequireChecksum struct{}
356-
identTransferOptionRecursive struct{}
357-
identTransferOptionDepth struct{}
358-
identTransferOptionWriter struct{}
359-
identTransferOptionReader struct{}
360-
identTransferOptionInPlace struct{}
361-
identTransferOptionDryRun struct{}
349+
TransferOption = option.Interface
350+
identTransferOptionCaches struct{}
351+
identTransferOptionCallback struct{}
352+
identTransferOptionTokenLocation struct{}
353+
identTransferOptionAcquireToken struct{}
354+
identTransferOptionToken struct{}
355+
identTransferOptionSynchronize struct{}
356+
identTransferOptionCollectionsUrl struct{}
357+
identTransferOptionChecksums struct{}
358+
identTransferOptionRequireChecksum struct{}
359+
identTransferOptionRecursive struct{}
360+
identTransferOptionDepth struct{}
361+
identTransferOptionWriter struct{}
362+
identTransferOptionReader struct{}
363+
identTransferOptionInPlace struct{}
364+
identTransferOptionDryRun struct{}
365+
identTransferOptionForcePrestageAPI struct{}
362366

363367
transferDetailsOptions struct {
364368
NeedsToken bool
@@ -619,21 +623,22 @@ func NewTransferEngine(ctx context.Context) (te *TransferEngine, err error) {
619623
pelicanUrlCache := pelican_url.StartCache(ctx, egrp)
620624

621625
te = &TransferEngine{
622-
ctx: ctx,
623-
cancel: cancel,
624-
egrp: egrp,
625-
work: work,
626-
files: files,
627-
results: results,
628-
resultsMap: make(map[uuid.UUID]chan *TransferResults),
629-
workMap: make(map[uuid.UUID]chan *TransferJob),
630-
jobLookupDone: make(chan *clientTransferJob, 5),
631-
notifyChan: make(chan bool),
632-
closeChan: make(chan bool),
633-
closeDoneChan: make(chan bool),
634-
ewmaTick: time.NewTicker(ewmaInterval),
635-
ewma: ewma.NewMovingAverage(20), // By explicitly setting the age to 20s, the first 10 seconds will use an average of historical samples instead of EWMA
636-
pelicanUrlCache: pelicanUrlCache,
626+
ctx: ctx,
627+
cancel: cancel,
628+
egrp: egrp,
629+
work: work,
630+
files: files,
631+
results: results,
632+
resultsMap: make(map[uuid.UUID]chan *TransferResults),
633+
workMap: make(map[uuid.UUID]chan *TransferJob),
634+
jobLookupDone: make(chan *clientTransferJob, 5),
635+
notifyChan: make(chan bool),
636+
closeChan: make(chan bool),
637+
closeDoneChan: make(chan bool),
638+
ewmaTick: time.NewTicker(ewmaInterval),
639+
ewma: ewma.NewMovingAverage(20), // By explicitly setting the age to 20s, the first 10 seconds will use an average of historical samples instead of EWMA
640+
pelicanUrlCache: pelicanUrlCache,
641+
prestageAPISupport: make(map[string]bool),
637642
}
638643
workerCount := param.Client_WorkerCount.GetInt()
639644
if workerCount <= 0 {
@@ -762,6 +767,15 @@ func WithDryRun(enable bool) TransferOption {
762767
return option.New(identTransferOptionDryRun{}, enable)
763768
}
764769

770+
// Create an option to force use of the Pelican prestage API
771+
//
772+
// When enabled for prestage transfers, the client will return an error if the cache
773+
// does not support the Pelican prestage API instead of falling back to the traditional
774+
// method. This is useful for testing to ensure the API is actually being used.
775+
func WithForcePrestageAPI(force bool) TransferOption {
776+
return option.New(identTransferOptionForcePrestageAPI{}, force)
777+
}
778+
765779
// Create a new client to work with an engine
766780
func (te *TransferEngine) NewClient(options ...TransferOption) (client *TransferClient, err error) {
767781
log.Debugln("Making new clients")
@@ -794,6 +808,9 @@ func (te *TransferEngine) NewClient(options ...TransferOption) (client *Transfer
794808
client.syncLevel = option.Value().(SyncLevel)
795809
case identTransferOptionDryRun{}:
796810
client.dryRun = option.Value().(bool)
811+
case identTransferOptionForcePrestageAPI{}:
812+
// This option is handled at the job level, not client level
813+
// Skip it here; it will be processed in NewTransferJob/NewPrestageJob
797814
}
798815
}
799816
func() {
@@ -1311,6 +1328,8 @@ func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL
13111328
tj.token.SetToken(option.Value().(string))
13121329
case identTransferOptionSynchronize{}:
13131330
tj.syncLevel = option.Value().(SyncLevel)
1331+
case identTransferOptionForcePrestageAPI{}:
1332+
tj.forcePrestageAPI = option.Value().(bool)
13141333
}
13151334
}
13161335

@@ -2111,7 +2130,89 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
21112130
return
21122131
}
21132132
}
2114-
} else {
2133+
} else { // Prestage case
2134+
// Check if we should use the Pelican prestage API
2135+
// We'll try the API for the first attempt (if supported), then fall back to the traditional method
2136+
if len(transfer.attempts) > 0 {
2137+
firstAttempt := transfer.attempts[0]
2138+
cacheHost := firstAttempt.Url.Host
2139+
2140+
// Check if this cache supports the prestage API
2141+
supportsAPI := false
2142+
if transfer.engine != nil {
2143+
// First check with read lock
2144+
transfer.engine.prestageAPIMutex.RLock()
2145+
supported, checked := transfer.engine.prestageAPISupport[cacheHost]
2146+
transfer.engine.prestageAPIMutex.RUnlock()
2147+
2148+
if !checked {
2149+
// Acquire write lock to perform the check
2150+
transfer.engine.prestageAPIMutex.Lock()
2151+
// Double-check in case another thread already did the check while we were waiting
2152+
supported, checked = transfer.engine.prestageAPISupport[cacheHost]
2153+
if !checked {
2154+
// We're the first thread to check, perform the API support test
2155+
supportsAPI = checkPrestageAPISupport(transfer.ctx, firstAttempt.Url, transfer.token)
2156+
transfer.engine.prestageAPISupport[cacheHost] = supportsAPI
2157+
} else {
2158+
supportsAPI = supported
2159+
}
2160+
transfer.engine.prestageAPIMutex.Unlock()
2161+
} else {
2162+
supportsAPI = supported
2163+
}
2164+
}
2165+
2166+
if supportsAPI {
2167+
// Use the Pelican prestage API
2168+
log.Debugln("Using Pelican prestage API for", transfer.remoteURL.Path, "at", cacheHost)
2169+
transferResults = newTransferResults(transfer.job)
2170+
transferStartTime := time.Now()
2171+
2172+
bytesTransferred, err := invokePrestageAPI(transfer.ctx, firstAttempt.Url, transfer.remoteURL.Path, transfer.token, transfer.callback)
2173+
2174+
endTime := time.Now()
2175+
attempt := TransferResult{
2176+
CacheAge: -1,
2177+
Number: 0,
2178+
Endpoint: cacheHost,
2179+
TransferEndTime: endTime,
2180+
TransferTime: endTime.Sub(transferStartTime),
2181+
TransferFileBytes: bytesTransferred,
2182+
}
2183+
2184+
if err != nil {
2185+
log.Debugln("Prestage API failed:", err)
2186+
attempt.Error = newTransferAttemptError(cacheHost, "", false, false, err)
2187+
transferResults.Error = err
2188+
} else {
2189+
transferResults.TransferredBytes = bytesTransferred
2190+
}
2191+
2192+
transferResults.Attempts = append(transferResults.Attempts, attempt)
2193+
transferResults.TransferStartTime = transferStartTime
2194+
2195+
// If the API succeeded, return early
2196+
if err == nil {
2197+
return transferResults, nil
2198+
}
2199+
2200+
// If API failed and we're forcing API usage, return the error
2201+
if transfer.job != nil && transfer.job.forcePrestageAPI {
2202+
return transferResults, errors.Wrap(err, "prestage API required but failed")
2203+
}
2204+
2205+
// If API failed, fall through to traditional method
2206+
log.Debugln("Falling back to traditional prestage method")
2207+
} else if transfer.job != nil && transfer.job.forcePrestageAPI {
2208+
// API not supported but forced - return error immediately
2209+
transferResults = newTransferResults(transfer.job)
2210+
transferResults.Error = errors.Errorf("cache %s does not support the Pelican prestage API, but API usage is required", cacheHost)
2211+
return transferResults, transferResults.Error
2212+
}
2213+
}
2214+
2215+
// Traditional prestage: download to /dev/null
21152216
localPath = os.DevNull
21162217
fileWriter = io.Discard
21172218
}

0 commit comments

Comments
 (0)