Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ type (
)

const (
ewmaInterval = 15 * time.Second
// The EWMA library we use assumes there's a single tick per second
ewmaInterval = time.Second

attrProjectName classAdAttr = "ProjectName"
attrJobId classAdAttr = "GlobalJobId"
Expand Down Expand Up @@ -785,7 +786,7 @@ func NewTransferEngine(ctx context.Context) (te *TransferEngine, err error) {
closeChan: make(chan bool),
closeDoneChan: make(chan bool),
ewmaTick: time.NewTicker(ewmaInterval),
ewma: ewma.NewMovingAverage(),
ewma: ewma.NewMovingAverage(20), // By explicitly setting the age to 20s, the first 10 seconds will use an average of historical samples instead of EWMA
pelicanUrlCache: pelicanUrlCache,
}
workerCount := param.Client_WorkerCount.GetInt()
Expand Down Expand Up @@ -2531,7 +2532,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
if totalSize >= 0 {
finalSize = totalSize
}
callback(dest, downloaded, finalSize, true)
callback(dest, downloaded+bytesSoFar, finalSize, true)
}
if te != nil {
te.ewmaCtr.Add(int64(time.Since(lastUpdate)))
Expand Down Expand Up @@ -2609,6 +2610,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
req.Header.Set("X-Pelican-Timeout", headerTimeout.Round(time.Millisecond).String())
if bytesSoFar > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", bytesSoFar))
log.Debugln("Resuming transfer starting at offset", bytesSoFar)
}
req.Header.Set("TE", "trailers")
req.Header.Set("User-Agent", userAgent)
Expand Down Expand Up @@ -2713,6 +2715,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
stoppedTransferTimeout := compatToDuration(param.Client_StoppedTransferTimeout.GetDuration(), "Client.StoppedTransferTimeout")
slowTransferRampupTime := compatToDuration(param.Client_SlowTransferRampupTime.GetDuration(), "Client.SlowTransferRampupTime")
slowTransferWindow := compatToDuration(param.Client_SlowTransferWindow.GetDuration(), "Client.SlowTransferWindow")
progressInterval := param.Logging_Client_ProgressInterval.GetDuration()
stoppedTransferDebugLine.Do(func() {
log.WithFields(fields).Debugf("Configuration values for stopped transfer timeout: %s; slow transfer ramp-up: %s; slow transfer look-back window: %s",
stoppedTransferTimeout.String(), slowTransferRampupTime.String(), slowTransferWindow.String())
Expand All @@ -2736,6 +2739,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
defer pw.Close()

// Loop of the download
lastProgressUpdate := time.Now()
Loop:
for {
select {
Expand All @@ -2747,7 +2751,7 @@ Loop:
}
lastUpdate = currentTime
if callback != nil {
callback(dest, downloaded, totalSize, false)
callback(dest, downloaded+bytesSoFar, totalSize, false)
}

case <-t.C:
Expand Down Expand Up @@ -2780,6 +2784,11 @@ Loop:
limit = int64(float64(limit) / concurrency)
}
transferRate := pw.BytesPerSecond()
if progressInterval > 0 && time.Since(lastProgressUpdate) > progressInterval {
lastProgressUpdate = time.Now()
log.Infof("Download of %s has %d bytes complete out of %d; recent transfer rate is %s/s", transferUrl.String(), downloaded, totalSize, ByteCountSI(transferRate))
}

if transferRate < limit {
// Give the download `slowTransferRampupTime` (default 120) seconds to start
if time.Since(downloadStart) < slowTransferRampupTime {
Expand All @@ -2791,6 +2800,7 @@ Loop:
log.WithFields(fields).Errorln("Problem displaying slow message", err, status)
continue
}
log.Warningln("Transfer of", transferUrl.String(), "is below threshold; attempt will error out if it remains below threshold for", slowTransferWindow)
startBelowLimit = time.Now()
continue
} else if time.Since(startBelowLimit) < slowTransferWindow {
Expand All @@ -2800,7 +2810,11 @@ Loop:
// The download is below the threshold for more than `SlowTransferWindow` seconds, cancel the download
cancel()

log.WithFields(fields).Errorf("Cancelled: Download speed of %s/s is below the limit of %s/s", ByteCountSI(transferRate), ByteCountSI(int64(downloadLimit)))
if concurrency > 1 {
log.WithFields(fields).Errorf("Cancelling download attempt of %s: Download speed of %s/s is below the computed limit of %s/s (configured limit of %s/s divided by estimated %.1f concurrent transfers on average)", transferUrl.String(), ByteCountSI(transferRate), ByteCountSI(int64(limit)), ByteCountSI(int64(downloadLimit)), concurrency)
} else {
log.WithFields(fields).Errorf("Cancelling download attempt of %s: Download speed of %s/s is below the limit of %s/s", transferUrl.String(), ByteCountSI(transferRate), ByteCountSI(int64(downloadLimit)))
}

err = &SlowTransferError{
BytesTransferred: pw.BytesComplete(),
Expand Down Expand Up @@ -2939,16 +2953,22 @@ func (pw *progressWriter) Write(p []byte) (n int, err error) {
pw.firstByteTime = time.Now()
}
now := time.Now()
elapsed := now.Sub(pw.firstByteTime)
elapsedUS := elapsed.Microseconds()
if elapsed < 5*time.Second && elapsedUS > 0 {
pw.bytesPerSecond.Store(1000000 * pw.bytesWritten.Load() / elapsedUS)
} else if elapsedUS > 0 {
oldBytesPerSecond := pw.bytesPerSecond.Load()
elapsed = now.Sub(pw.lastRateSample)
alpha := math.Exp(-1 * float64(elapsed) / float64(10*time.Second))
recentRate := 1000000 * int64(len(p)) / elapsedUS
pw.bytesPerSecond.Store(int64(float64(oldBytesPerSecond)*alpha + float64(recentRate)*(1-alpha)))
startupTime := now.Sub(pw.firstByteTime)
startupTimeUS := startupTime.Microseconds()
// Transfer is ramping up, less than 5 seconds total. Take the average rate since start.
if startupTime < 5*time.Second && startupTimeUS > 0 {
pw.bytesPerSecond.Store(1000000 * pw.bytesWritten.Load() / startupTimeUS)
pw.lastRateSample = now
} else {
elapsed := now.Sub(pw.lastRateSample)
pw.lastRateSample = now
elapsedUS := elapsed.Microseconds()
if elapsedUS > 0 {
oldBytesPerSecond := pw.bytesPerSecond.Load()
alpha := math.Exp(-1 * float64(elapsed) / float64(10*time.Second))
recentRate := 1000000 * int64(len(p)) / elapsedUS
pw.bytesPerSecond.Store(int64(float64(oldBytesPerSecond)*alpha + float64(recentRate)*(1-alpha)))
}
}
n, err = pw.writer.Write(p)
pw.bytesWritten.Add(int64(n))
Expand Down
27 changes: 17 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,8 +1702,12 @@ func SetClientDefaults(v *viper.Viper) error {
v.SetDefault(param.Client_WorkerCount.GetName(), 5)
v.SetDefault(param.Server_TLSCACertificateFile.GetName(), filepath.Join(configDir, "certificates", "tlsca.pem"))

var downloadLimit int64 = 1024 * 100
v.SetDefault(param.Client_MinimumDownloadSpeed.GetName(), downloadLimit)
// Default is set outside of defaults.yaml to allow SetDefault call below to override
viper.SetDefault(param.Client_MinimumDownloadSpeed.GetName(), 102400)
if param.MinimumDownloadSpeed.IsSet() {
viper.SetDefault(param.Client_MinimumDownloadSpeed.GetName(), param.MinimumDownloadSpeed.GetInt())
}

if v == viper.GetViper() {
viper.AutomaticEnv()
upperPrefix := GetPreferredPrefix()
Expand Down Expand Up @@ -1763,7 +1767,6 @@ func SetClientDefaults(v *viper.Viper) error {
}
}
// Check the environment variable STASHCP_MINIMUM_DOWNLOAD_SPEED (and all the prefix variants)
var downloadLimit int64 = 1024 * 100
var prefixes_with_cp []ConfigPrefix
for _, prefix := range prefixes {
prefixes_with_cp = append(prefixes_with_cp, prefix+"CP")
Expand All @@ -1773,19 +1776,23 @@ func SetClientDefaults(v *viper.Viper) error {
if len(downloadLimitStr) == 0 {
continue
}
var err error
downloadLimit, err = strconv.ParseInt(downloadLimitStr, 10, 64)
downloadLimit, err := strconv.ParseInt(downloadLimitStr, 10, 64)
if err != nil {
log.Errorf("Environment variable %s_MINIMUM_DOWNLOAD_SPEED=%s is not parsable as integer: %s",
prefixes, downloadLimitStr, err.Error())
continue
}
if downloadLimit < 0 {
log.Errorf("Environment variable %s_MINIMUM_DOWNLOAD_SPEED=%s is negative value; ignoring and will use"+
"built-in default of %s", prefixes, downloadLimitStr, viper.Get(param.Client_MinimumDownloadSpeed.GetName()))
continue
}

// Backward compatibility environment variables do not overwrite the new-style ones
viper.SetDefault(param.Client_MinimumDownloadSpeed.GetName(), downloadLimit)

break
}
if param.MinimumDownloadSpeed.IsSet() {
viper.SetDefault(param.Client_MinimumDownloadSpeed.GetName(), param.MinimumDownloadSpeed.GetInt())
} else {
viper.Set(param.Client_MinimumDownloadSpeed.GetName(), downloadLimit)
}
// Handle more legacy config options
if param.DisableProxyFallback.IsSet() {
viper.SetDefault(param.Client_DisableProxyFallback.GetName(), param.DisableProxyFallback.GetBool())
Expand Down
2 changes: 2 additions & 0 deletions config/resources/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

Logging:
Level: error
Client:
ProgressInterval: 1m
Client:
SlowTransferRampupTime: 100s
SlowTransferWindow: 30s
Expand Down
7 changes: 7 additions & 0 deletions docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ type: bool
default: false
components: ["Client"]
---
name: Logging.Client.ProgressInterval
Comment thread
jhiemstrawisc marked this conversation as resolved.
description: |+
Interval at which the client's download progress is logged.
type: duration
default: 1m
components: ["client"]
---
name: Logging.Origin.Cms
description: |+
Trace level of XRootD cluster management service, one of the main XRootD executables.
Expand Down
1 change: 1 addition & 0 deletions param/parameters.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions param/parameters_struct.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading