Skip to content

Commit 8242c59

Browse files
committed
Add e2e integration test for POSC origin
1 parent 4244987 commit 8242c59

1 file changed

Lines changed: 209 additions & 0 deletions

File tree

e2e_fed_tests/posc_origin_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
//go:build !windows
2+
3+
/***************************************************************
4+
*
5+
* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License"); you
8+
* may not use this file except in compliance with the License. You may
9+
* obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*
19+
***************************************************************/
20+
21+
package fed_tests
22+
23+
import (
24+
"context"
25+
"fmt"
26+
"math/rand"
27+
"net/url"
28+
"os"
29+
"path/filepath"
30+
"testing"
31+
"time"
32+
33+
"github.com/stretchr/testify/assert"
34+
"github.com/stretchr/testify/require"
35+
36+
"github.com/pelicanplatform/pelican/client"
37+
"github.com/pelicanplatform/pelican/error_codes"
38+
"github.com/pelicanplatform/pelican/fed_test_utils"
39+
"github.com/pelicanplatform/pelican/param"
40+
"github.com/pelicanplatform/pelican/server_utils"
41+
"github.com/pelicanplatform/pelican/test_utils"
42+
)
43+
44+
/*
45+
TestPOSCOrigin_CancelUpload Does the following:
46+
- Create a large file (500MB) and begin an upload to the origin
47+
- During the upload, a temporary is created under the in-progress/anonymous/ directory
48+
- During the upload, verify that we can't pelican object stat the temporary file
49+
- After verifying that the temporary file is present, kill the upload
50+
- Verify that the temporary file is deleted
51+
- Verify that the attempted uploaded file is not present under the storage prefix
52+
- Assert that a pelican object stat for the attempted uploaded file fails with a 404 error
53+
*/
54+
func TestPOSCOrigin_CancelUpload(t *testing.T) {
55+
t.Cleanup(test_utils.SetupTestLogging(t))
56+
server_utils.ResetTestState()
57+
defer server_utils.ResetTestState()
58+
59+
originConfig := `
60+
Origin:
61+
StorageType: "posix"
62+
Exports:
63+
- StoragePrefix: /<SHOULD BE OVERRIDDEN>
64+
FederationPrefix: /test-namespace
65+
Capabilities: ["PublicReads", "Reads", "Writes", "DirectReads", "Listings"]
66+
`
67+
ft := fed_test_utils.NewFedTest(t, originConfig)
68+
69+
// Get the storage prefix from the export (this is the actual filesystem location)
70+
storagePrefix := ft.Exports[0].StoragePrefix
71+
federationPrefix := ft.Exports[0].FederationPrefix
72+
73+
// The POSC plugin creates temp files under the configured Origin.InProgressLocation.
74+
// For unauthenticated uploads, files go under the "anonymous" subdirectory.
75+
// Get the actual in-progress location from the running configuration.
76+
inProgressLocation := param.Origin_InProgressLocation.GetString()
77+
require.NotEmpty(t, inProgressLocation, "Origin.InProgressLocation should be set")
78+
inProgressDir := filepath.Join(inProgressLocation, "anonymous")
79+
80+
t.Logf("In-progress directory: %s", inProgressDir)
81+
t.Logf("Storage prefix: %s", storagePrefix)
82+
83+
// Create a large file with random content (1GB to ensure upload takes long enough to observe)
84+
fileContent := make([]byte, 1024*1024*1024)
85+
for i := range fileContent {
86+
fileContent[i] = byte(rand.Intn(256))
87+
}
88+
89+
// Write the file to a temporary directory
90+
localFilePath := filepath.Join(t.TempDir(), "large_file.bin")
91+
require.NoError(t, os.WriteFile(localFilePath, fileContent, 0644))
92+
93+
// Get the discovery URL for constructing pelican URLs
94+
discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString())
95+
require.NoError(t, err)
96+
97+
// The remote path where we're trying to upload
98+
remoteFileName := "upload_test_file.bin"
99+
uploadUrl := fmt.Sprintf("pelican://%s%s/%s", discoveryUrl.Host, federationPrefix, remoteFileName)
100+
finalFilePath := filepath.Join(storagePrefix, remoteFileName)
101+
102+
// Create a cancellable context for the upload
103+
uploadCtx, uploadCancel := context.WithCancel(ft.Ctx)
104+
105+
// Channel to signal when upload completes (or errors)
106+
uploadDone := make(chan error, 1)
107+
108+
// Start the upload in a goroutine
109+
go func() {
110+
_, err := client.DoPut(uploadCtx, localFilePath, uploadUrl, false, client.WithTokenLocation(""))
111+
uploadDone <- err
112+
}()
113+
114+
// Wait for a temporary file to appear in the in-progress directory
115+
// The POSC plugin creates temp files here during uploads.
116+
// Note: The anonymous directory is created lazily when the first upload starts.
117+
var tempFilePath string
118+
foundTempFile := assert.Eventually(t, func() bool {
119+
// First check if the in-progress location directory exists
120+
if _, err := os.Stat(inProgressLocation); os.IsNotExist(err) {
121+
t.Logf("In-progress location does not exist yet: %s", inProgressLocation)
122+
return false
123+
}
124+
125+
// Check for files in the anonymous subdirectory
126+
entries, err := os.ReadDir(inProgressDir)
127+
if err != nil {
128+
// Directory might not exist yet - this is expected initially
129+
if !os.IsNotExist(err) {
130+
t.Logf("Error reading in-progress dir: %v", err)
131+
}
132+
return false
133+
}
134+
for _, entry := range entries {
135+
if !entry.IsDir() {
136+
tempFilePath = filepath.Join(inProgressDir, entry.Name())
137+
t.Logf("Found temp file: %s", tempFilePath)
138+
return true
139+
}
140+
}
141+
return false
142+
}, 60*time.Second, 100*time.Millisecond, "Temporary file never appeared in in-progress directory")
143+
144+
if !foundTempFile {
145+
// Log diagnostic information before failing
146+
t.Logf("Diagnostic: checking in-progress location contents")
147+
if entries, err := os.ReadDir(inProgressLocation); err == nil {
148+
for _, e := range entries {
149+
t.Logf(" Found in in-progress location: %s (dir=%v)", e.Name(), e.IsDir())
150+
}
151+
} else {
152+
t.Logf(" Could not read in-progress location: %v", err)
153+
}
154+
uploadCancel()
155+
t.Fatal("Test failed: temporary file was never created during upload")
156+
}
157+
158+
t.Logf("Found temporary file during upload: %s", tempFilePath)
159+
160+
// Verify that the temporary file exists on disk
161+
_, err = os.Stat(tempFilePath)
162+
require.NoError(t, err, "Temporary file should exist on disk")
163+
164+
// Verify that we can't stat the temporary file via pelican
165+
// The in-progress directory should not be accessible via the federation namespace
166+
inProgressStatUrl := fmt.Sprintf("pelican://%s%s/in-progress/anonymous/%s",
167+
discoveryUrl.Host, federationPrefix, filepath.Base(tempFilePath))
168+
_, err = client.DoStat(ft.Ctx, inProgressStatUrl, client.WithTokenLocation(""))
169+
require.Error(t, err, "Should not be able to stat the temporary in-progress file via pelican")
170+
var pe *error_codes.PelicanError
171+
require.ErrorAs(t, err, &pe)
172+
require.Equal(t, "Specification.FileNotFound", pe.ErrorType())
173+
174+
// Also verify that the final file doesn't exist yet (upload is still in progress)
175+
_, err = os.Stat(finalFilePath)
176+
assert.True(t, os.IsNotExist(err), "Final file should not exist yet during upload")
177+
178+
// Cancel the upload context to kill the upload
179+
t.Log("Cancelling upload...")
180+
uploadCancel()
181+
182+
// Wait for the upload goroutine to finish
183+
select {
184+
case uploadErr := <-uploadDone:
185+
// We expect an error since we cancelled the upload
186+
t.Logf("Upload finished with error (expected due to cancellation): %v", uploadErr)
187+
require.ErrorAs(t, uploadErr, &context.Canceled)
188+
case <-time.After(30 * time.Second):
189+
t.Fatal("Upload did not finish within timeout after cancellation")
190+
}
191+
192+
// Verify that the temporary file is deleted after the upload is cancelled
193+
assert.Eventually(t, func() bool {
194+
_, err := os.Stat(tempFilePath)
195+
return os.IsNotExist(err)
196+
}, 30*time.Second, 100*time.Millisecond, "Temporary file was not cleaned up after upload cancellation")
197+
198+
// Verify that the final file was not created (upload was cancelled before completion)
199+
_, err = os.Stat(finalFilePath)
200+
assert.True(t, os.IsNotExist(err), "Final file should not exist after cancelled upload")
201+
202+
// Assert that a pelican object stat for the attempted uploaded file fails with a 404 error
203+
_, err = client.DoStat(ft.Ctx, uploadUrl, client.WithTokenLocation(""))
204+
require.Error(t, err, "Stat for cancelled upload file should return an error")
205+
require.ErrorAs(t, err, &pe)
206+
require.Equal(t, "Specification.FileNotFound", pe.ErrorType())
207+
208+
t.Log("POSC origin test passed: temporary files are properly managed during upload cancellation")
209+
}

0 commit comments

Comments
 (0)