Skip to content

Commit 52a7efb

Browse files
committed
Add support for prestage async command
1 parent 41665f7 commit 52a7efb

7 files changed

Lines changed: 306 additions & 5 deletions

File tree

client/handle_http.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2519,7 +2519,7 @@ func fetchChecksum(ctx context.Context, types []ChecksumType, url *url.URL, toke
25192519
// Verify that a file on disk matches the expected size. We ignore directories
25202520
// and generic stat failures unless the file doesn't exist.
25212521
func verifyFileSize(dest string, expectedSize int64, fields log.Fields) error {
2522-
if dest == os.DevNull {
2522+
if dest == os.DevNull || dest == "" {
25232523
log.WithFields(fields).Debugf("Skipping size check because destination is (%s)", os.DevNull)
25242524
return nil
25252525
}
@@ -3036,7 +3036,7 @@ Loop:
30363036

30373037
// Second sanity check to verify the file size as it appears on disk.
30383038
if err = verifyFileSize(dest, totalSize, fields); err != nil {
3039-
err = errors.Wrap(err, "failed to verify size of downloaded file on disk")
3039+
err = errors.Wrapf(err, "failed to verify size of downloaded file (%s) on disk", dest)
30403040
return
30413041
}
30423042
}

client_agent/cli_test.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,163 @@ func TestCLIAsyncPut(t *testing.T) {
346346
t.Logf("TestCLIAsyncPut complete, total time: %s", time.Since(startTime))
347347
}
348348

349+
// TestCLIAsyncPrestage tests the pelican object prestage --async command
350+
func TestCLIAsyncPrestage(t *testing.T) {
351+
// Reset test state
352+
server_utils.ResetTestState()
353+
354+
// Create test federation
355+
fed := fed_test_utils.NewFedTest(t, testOriginConfig)
356+
357+
// Create temporary directory
358+
tempDir := t.TempDir()
359+
360+
// Create token
361+
err := param.Set(param.IssuerKeysDirectory.GetName(), t.TempDir())
362+
require.NoError(t, err)
363+
issuer, err := config.GetServerIssuerURL()
364+
require.NoError(t, err)
365+
366+
tokenConfig := token.NewWLCGToken()
367+
tokenConfig.Lifetime = time.Minute * 5
368+
tokenConfig.Issuer = issuer
369+
tokenConfig.Subject = "test-cli-async-prestage"
370+
tokenConfig.AddAudienceAny()
371+
372+
scopes := []token_scopes.TokenScope{}
373+
readScope, err := token_scopes.Wlcg_Storage_Read.Path("/")
374+
require.NoError(t, err)
375+
scopes = append(scopes, readScope)
376+
modScope, err := token_scopes.Wlcg_Storage_Modify.Path("/")
377+
require.NoError(t, err)
378+
scopes = append(scopes, modScope)
379+
tokenConfig.AddScopes(scopes...)
380+
381+
tkn, err := tokenConfig.CreateToken()
382+
require.NoError(t, err)
383+
384+
tokenFile := filepath.Join(tempDir, "token")
385+
err = os.WriteFile(tokenFile, []byte(tkn), 0644)
386+
require.NoError(t, err)
387+
388+
// Set up client API server
389+
serverConfig, _ := client_agent.CreateTestServerConfig(t)
390+
391+
egrp, egrpCtx := errgroup.WithContext(context.Background())
392+
ctx := context.WithValue(egrpCtx, config.EgrpKey, egrp)
393+
394+
server, err := client_agent.NewServer(ctx, serverConfig)
395+
require.NoError(t, err)
396+
397+
err = server.Start()
398+
require.NoError(t, err)
399+
400+
t.Cleanup(func() {
401+
err := server.Shutdown()
402+
assert.NoError(t, err)
403+
})
404+
405+
// Build pelican binary
406+
pelicanBin := buildPelicanBinary(t)
407+
408+
// Create test file and upload it first
409+
testContent := []byte("Test file for async prestage\n")
410+
uploadFile := filepath.Join(tempDir, "prestage-upload.txt")
411+
err = os.WriteFile(uploadFile, testContent, 0644)
412+
require.NoError(t, err)
413+
414+
federationPrefix := fed.Exports[0].FederationPrefix
415+
discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString())
416+
require.NoError(t, err)
417+
uploadURL := fmt.Sprintf("pelican://%s%s/prestage-test.txt", discoveryUrl.Host, federationPrefix)
418+
419+
// Upload file first using async + wait
420+
uploadCmd := exec.Command(pelicanBin, "object", "put", "--async", "--wait", uploadFile, uploadURL, "--token", tokenFile)
421+
uploadCmd.Env = append(os.Environ(), fmt.Sprintf("PELICAN_CLIENTAGENT_SOCKET=%s", serverConfig.SocketPath))
422+
output, err := uploadCmd.CombinedOutput()
423+
require.NoError(t, err, "Failed to upload file: %s", output)
424+
425+
// Test async prestage without --wait
426+
t.Run("AsyncPrestageWithoutWait", func(t *testing.T) {
427+
cmd := exec.Command(pelicanBin, "object", "prestage", "--async", uploadURL, "--token", tokenFile)
428+
cmd.Env = append(os.Environ(), fmt.Sprintf("PELICAN_CLIENTAGENT_SOCKET=%s", serverConfig.SocketPath))
429+
430+
output, err := cmd.CombinedOutput()
431+
require.NoError(t, err, "Failed to run async prestage: %s", output)
432+
433+
outputStr := string(output)
434+
t.Logf("Command output: %s", outputStr)
435+
436+
// Should contain job ID
437+
assert.Contains(t, outputStr, "Job created:")
438+
assert.Contains(t, outputStr, "Check status with: pelican job status")
439+
440+
// Extract job ID from output
441+
re := regexp.MustCompile(`Job created: ([a-f0-9-]+)`)
442+
matches := re.FindStringSubmatch(outputStr)
443+
require.Len(t, matches, 2, "Could not extract job ID from output")
444+
jobID := matches[1]
445+
t.Logf("Created job ID: %s", jobID)
446+
})
447+
448+
// Test async prestage with --wait
449+
t.Run("AsyncPrestageWithWait", func(t *testing.T) {
450+
cmd := exec.Command(pelicanBin, "object", "prestage", "--async", "--wait", uploadURL, "--token", tokenFile)
451+
cmd.Env = append(os.Environ(), fmt.Sprintf("PELICAN_CLIENTAGENT_SOCKET=%s", serverConfig.SocketPath))
452+
453+
output, err := cmd.CombinedOutput()
454+
outputStr := string(output)
455+
t.Logf("Command output: %s", outputStr)
456+
457+
// Should contain job creation message
458+
assert.Contains(t, outputStr, "Job created:")
459+
assert.Contains(t, outputStr, "Waiting for job to complete")
460+
461+
// If prestage failed, log the client-agent log for debugging
462+
if err != nil {
463+
t.Logf("Prestage failed with error: %v", err)
464+
465+
// Extract job ID to query its status
466+
re := regexp.MustCompile(`Job created: ([a-f0-9-]+)`)
467+
matches := re.FindStringSubmatch(outputStr)
468+
if len(matches) >= 2 {
469+
jobID := matches[1]
470+
t.Logf("Querying failed job ID: %s", jobID)
471+
472+
// Query the job status via API to get detailed error info
473+
apiClient, apiErr := apiclient.NewAPIClient(serverConfig.SocketPath)
474+
if apiErr != nil {
475+
t.Logf("Failed to create API client: %v", apiErr)
476+
} else {
477+
ctx := context.Background()
478+
if jobStatus, statusErr := apiClient.GetJobStatus(ctx, jobID); statusErr == nil {
479+
t.Logf("Job Status: %s", jobStatus.Status)
480+
if jobStatus.Error != "" {
481+
t.Logf("Job Error: %s", jobStatus.Error)
482+
}
483+
if len(jobStatus.Transfers) > 0 {
484+
for i, transfer := range jobStatus.Transfers {
485+
t.Logf("Transfer %d: Status=%s, Operation=%s, Source=%s",
486+
i, transfer.Status, transfer.Operation, transfer.Source)
487+
if transfer.Error != "" {
488+
t.Logf(" Transfer %d Error: %s", i, transfer.Error)
489+
}
490+
}
491+
}
492+
} else {
493+
t.Logf("Failed to get job status: %v", statusErr)
494+
}
495+
}
496+
}
497+
498+
require.NoError(t, err, "Prestage operation should not fail")
499+
}
500+
501+
// Should contain completion message
502+
assert.Contains(t, outputStr, "Job completed successfully")
503+
})
504+
}
505+
349506
// TestCLIJobCommands tests the pelican job subcommands
350507
func TestCLIJobCommands(t *testing.T) {
351508
// Reset test state

client_agent/handlers.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ func (s *Server) CreateJobHandler(c *gin.Context) {
5353
return
5454
}
5555

56+
// Additional validation for specific operations
57+
for _, transfer := range req.Transfers {
58+
if transfer.Operation != "prestage" && transfer.Destination == "" {
59+
c.JSON(http.StatusBadRequest, ErrorResponse{
60+
Code: ErrCodeInvalidRequest,
61+
Error: "Destination is required for " + transfer.Operation + " operations",
62+
})
63+
return
64+
}
65+
}
66+
5667
// Build transfer options
5768
options := buildTransferOptions(req.Options)
5869

client_agent/models.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import (
2424

2525
// TransferRequest represents a single transfer operation within a job
2626
type TransferRequest struct {
27-
Operation string `json:"operation" binding:"required,oneof=get put copy"`
27+
Operation string `json:"operation" binding:"required,oneof=get put copy prestage"`
2828
Source string `json:"source" binding:"required"`
29-
Destination string `json:"destination" binding:"required"`
29+
Destination string `json:"destination"`
3030
Recursive bool `json:"recursive"`
3131
}
3232

client_agent/transfer_manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,8 @@ func (tm *TransferManager) executeTransfer(transfer *Transfer, options []client.
475475
results, err = client.DoPut(transfer.ctx, transfer.Source, transfer.Destination, transfer.Recursive, options...)
476476
case "copy":
477477
results, err = client.DoCopy(transfer.ctx, transfer.Source, transfer.Destination, transfer.Recursive, options...)
478+
case "prestage":
479+
results, err = client.DoPrestage(transfer.ctx, transfer.Source, options...)
478480
default:
479481
err = errors.Errorf("unknown operation: %s", transfer.Operation)
480482
}

client_agent/types/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type StoredJob struct {
3838
type StoredTransfer struct {
3939
ID string
4040
JobID string
41-
Operation string // "get", "put", "copy"
41+
Operation string // "get", "put", "copy", "prestage"
4242
Source string
4343
Destination string
4444
Recursive bool

cmd/object_prestage.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919
package main
2020

2121
import (
22+
"encoding/json"
23+
"fmt"
2224
"os"
25+
"time"
2326

2427
"github.com/pkg/errors"
2528
log "github.com/sirupsen/logrus"
2629
"github.com/spf13/cobra"
2730

2831
"github.com/pelicanplatform/pelican/client"
32+
"github.com/pelicanplatform/pelican/client_agent"
2933
"github.com/pelicanplatform/pelican/config"
3034
"github.com/pelicanplatform/pelican/error_codes"
3135
"github.com/pelicanplatform/pelican/param"
@@ -48,6 +52,8 @@ func init() {
4852
flagSet.StringP("cache", "c", "", `A comma-separated list of preferred caches to try for the transfer, where a "+" in the list indicates
4953
the client should fallback to discovered caches if all preferred caches fail.`)
5054
flagSet.StringP("token", "t", "", "Token file to use for transfer")
55+
flagSet.Bool("async", false, "Run the prestage asynchronously through the client API server and return a job ID")
56+
flagSet.Bool("wait", false, "When used with --async, wait for the job to complete before returning")
5157
objectCmd.AddCommand(prestageCmd)
5258
}
5359

@@ -66,6 +72,131 @@ func prestageMain(cmd *cobra.Command, args []string) {
6672
}
6773
}
6874

75+
// Check for async mode
76+
isAsync, _ := cmd.Flags().GetBool("async")
77+
if isAsync {
78+
// Validate arguments
79+
if len(args) < 1 {
80+
log.Errorln("Prefix(es) to prestage must be specified")
81+
err = cmd.Help()
82+
if err != nil {
83+
log.Errorln("Failed to print out help:", err)
84+
}
85+
os.Exit(1)
86+
}
87+
88+
// Ensure server is running, starting it if necessary
89+
apiClient, err := ensureClientAgentRunning(cmd.Context(), 5)
90+
if err != nil {
91+
log.Errorln("Failed to ensure API server is running:", err)
92+
log.Errorln("You can manually start it with 'pelican client-api serve --daemonize'")
93+
os.Exit(1)
94+
}
95+
96+
// Get flags for transfer options
97+
tokenLocation, _ := cmd.Flags().GetString("token")
98+
99+
// Get preferred caches
100+
caches, err := getPreferredCaches()
101+
if err != nil {
102+
log.Errorln("Failed to get preferred caches:", err)
103+
os.Exit(1)
104+
}
105+
106+
// Convert caches to strings
107+
cacheStrings := make([]string, len(caches))
108+
for i, cache := range caches {
109+
cacheStrings[i] = cache.String()
110+
}
111+
112+
// Build transfer options
113+
options := client_agent.TransferOptions{
114+
Token: tokenLocation,
115+
Caches: cacheStrings,
116+
}
117+
118+
// Create transfers for each source prefix
119+
transfers := make([]client_agent.TransferRequest, len(args))
120+
for i, src := range args {
121+
if !isPelicanUrl(src) {
122+
log.Errorln("Provided URL is not a valid Pelican URL:", src)
123+
os.Exit(1)
124+
}
125+
transfers[i] = client_agent.TransferRequest{
126+
Operation: "prestage",
127+
Source: src,
128+
Destination: "", // Prestage doesn't have a destination
129+
Recursive: false,
130+
}
131+
}
132+
133+
// Create job
134+
jobID, err := apiClient.CreateJob(ctx, transfers, options)
135+
if err != nil {
136+
log.Errorln("Failed to create job:", err)
137+
os.Exit(1)
138+
}
139+
140+
if outputJSON {
141+
result := map[string]interface{}{
142+
"job_id": jobID,
143+
"status": "created",
144+
}
145+
jsonBytes, err := json.MarshalIndent(result, "", " ")
146+
if err != nil {
147+
log.Errorln("Failed to marshal JSON:", err)
148+
os.Exit(1)
149+
}
150+
fmt.Println(string(jsonBytes))
151+
} else {
152+
fmt.Printf("Job created: %s\n", jobID)
153+
}
154+
155+
// Check if we should wait for completion
156+
shouldWait, _ := cmd.Flags().GetBool("wait")
157+
if shouldWait {
158+
if !outputJSON {
159+
fmt.Println("Waiting for job to complete...")
160+
}
161+
162+
// Wait with a reasonable timeout (e.g., 1 hour)
163+
err := apiClient.WaitForJob(ctx, jobID, 1*time.Hour)
164+
if err != nil {
165+
log.Errorln("Error waiting for job:", err)
166+
os.Exit(1)
167+
}
168+
169+
// Get final job status
170+
status, err := apiClient.GetJobStatus(ctx, jobID)
171+
if err != nil {
172+
log.Errorln("Failed to get job status:", err)
173+
os.Exit(1)
174+
}
175+
176+
if outputJSON {
177+
jsonBytes, err := json.MarshalIndent(status, "", " ")
178+
if err != nil {
179+
log.Errorln("Failed to marshal JSON:", err)
180+
os.Exit(1)
181+
}
182+
fmt.Println(string(jsonBytes))
183+
} else {
184+
fmt.Printf("Job completed successfully\n")
185+
}
186+
187+
if status.Status != "completed" {
188+
os.Exit(1)
189+
}
190+
} else {
191+
if !outputJSON {
192+
fmt.Printf("Check status with: pelican job status %s\n", jobID)
193+
}
194+
}
195+
196+
return
197+
}
198+
199+
// Original synchronous behavior
69200
tokenLocation, _ := cmd.Flags().GetString("token")
70201

71202
pb := newProgressBar()

0 commit comments

Comments
 (0)