Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ func SetServerDefaults(v *viper.Viper) error {
v.SetDefault(param.Xrootd_ScitokensConfig.GetName(), filepath.Join(configDir, "xrootd", "scitokens.cfg"))
v.SetDefault(param.Xrootd_Authfile.GetName(), filepath.Join(configDir, "xrootd", "authfile"))
v.SetDefault(param.Xrootd_MacaroonsKeyFile.GetName(), filepath.Join(configDir, "macaroons-secret"))
v.SetDefault(param.Xrootd_ShutdownTimeout.GetName(), 1*time.Minute)
v.SetDefault(param.IssuerKey.GetName(), filepath.Join(configDir, "issuer.jwk"))
v.SetDefault(param.IssuerKeysDirectory.GetName(), filepath.Join(configDir, "issuer-keys"))
v.SetDefault(param.Server_UIPasswordFile.GetName(), filepath.Join(configDir, "server-web-passwd"))
Expand Down
8 changes: 7 additions & 1 deletion daemon/launch_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

"github.com/pelicanplatform/pelican/database"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
)

Expand Down Expand Up @@ -233,9 +234,14 @@ func LaunchDaemons(ctx context.Context, launchers []Launcher, egrp *errgroup.Gro
if !ok {
panic(errors.New("Unable to convert signal to syscall.Signal"))
}
if sys_sig == syscall.SIGTERM {
log.Warnf("Received SIGTERM, pausing the signal forwarding to daemons for %s", param.Xrootd_ShutdownTimeout.GetDuration().String())
time.Sleep(param.Xrootd_ShutdownTimeout.GetDuration())
}
log.Warnf("Forwarding signal %v to daemons\n", sys_sig)
var lastErr error
for idx, daemon := range daemons {
for idx := range daemons {
daemon := &daemons[idx]
if err = daemon.killFunc(daemon.pid, int(sys_sig)); err != nil {
lastErr = errors.Wrapf(err, "Failed to forward signal to %s process", launchers[idx].Name())
}
Expand Down
13 changes: 8 additions & 5 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ import (
type filterType string

const (
permFiltered filterType = "permFiltered" // Read from Director.FilteredServers
tempFiltered filterType = "tempFiltered" // Filtered by web UI, e.g. the server is put in downtime via the director website
serverFiltered filterType = "serverFiltered" // Filtered by the server itself, e.g. the server is put in downtime by the server admin
topoFiltered filterType = "topologyFiltered" // Filtered by Topology, e.g. the server is put in downtime via the OSDF Topology change
tempAllowed filterType = "tempAllowed" // Read from Director.FilteredServers but mutated by web UI
permFiltered filterType = "permFiltered" // Read from Director.FilteredServers
tempFiltered filterType = "tempFiltered" // Filtered by web UI, e.g. the server is put in downtime via the director website
serverFiltered filterType = "serverFiltered" // Filtered by the server itself, e.g. the server is put in downtime by the server admin
shutdownFiltered filterType = "shutdownFiltered" // Filtered by the server itself to block new work during the pre-shutdown drain period
topoFiltered filterType = "topologyFiltered" // Filtered by Topology, e.g. the server is put in downtime via the OSDF Topology change
tempAllowed filterType = "tempAllowed" // Read from Director.FilteredServers but mutated by web UI
)

var (
Expand Down Expand Up @@ -79,6 +80,8 @@ func (f filterType) String() string {
return "Temporarily disabled via the admin website"
case serverFiltered:
return "Temporarily disabled by the server admin"
case shutdownFiltered:
return "Disabled due to the server shutdown in progress"
case topoFiltered:
return "Disabled via the Topology policy"
case tempAllowed:
Expand Down
34 changes: 31 additions & 3 deletions director/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,16 +1171,15 @@ func registerServerAd(engineCtx context.Context, ctx *gin.Context, sType server_
adV2.Version = "unknown"
}

sn := adV2.Name
// Process received server(origin/cache) downtimes and toggle the director's in-memory downtime tracker when necessary
if adV2.Downtimes != nil {
filteredServersMutex.Lock()
defer filteredServersMutex.Unlock()

// Update the cached server downtime list
serverDowntimes[adV2.Name] = adV2.Downtimes
serverDowntimes[sn] = adV2.Downtimes

now := time.Now().UTC().UnixMilli()
sn := adV2.Name
active := false // Flag to indicate if this server has active downtime in this server ad
for _, dt := range adV2.Downtimes {
if dt.StartTime <= now &&
Expand All @@ -1204,6 +1203,35 @@ func registerServerAd(engineCtx context.Context, ctx *gin.Context, sType server_
delete(filteredServers, sn)
}
}
filteredServersMutex.Unlock()
}

// "Status" represents the server's overall health status. It is introduced in Pelican 7.17.0
if adV2.Status != "" { // For backward compatibility, we only process this if it is set
// If the server is about to shutdown, we silently put it into downtime.
// Then it will not receive new requests from the Director, but it will still be able to serve the existing ones.
if metrics.ParseHealthStatus(adV2.Status) == metrics.StatusShuttingDown {
filteredServersMutex.Lock()
// Inspect the existing downtime status for this server
existingFilterType, isServerFiltered := filteredServers[sn]

// Put the server in downtime only if no filter (downtime) exists or it was tempAllowed
if !isServerFiltered || existingFilterType == tempAllowed {
filteredServers[sn] = shutdownFiltered
log.Debugf("Server %s is shutting down, applying downtime to prevent new transfer requests", sn)
}
filteredServersMutex.Unlock()
} else {
// If the server is back online, we flush out existing shutdown filter if it exists
filteredServersMutex.Lock()
if existingFilterType, isServerFiltered := filteredServers[sn]; isServerFiltered {
if existingFilterType == shutdownFiltered {
delete(filteredServers, sn)
log.Debugf("Removed the active downtime for server %s as it has come back online", sn)
}
}
filteredServersMutex.Unlock()
}
}

// Forward to other directors, if applicable
Expand Down
2 changes: 2 additions & 0 deletions director/director_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func checkFilter(serverName string) (bool, filterType) {
return true, topoFiltered
case serverFiltered:
return true, serverFiltered
case shutdownFiltered:
return true, shutdownFiltered
case tempAllowed:
return false, tempAllowed
default:
Expand Down
9 changes: 9 additions & 0 deletions docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,15 @@ default: 10s
hidden: true
components: ["origin", "cache"]
---
name: Xrootd.ShutdownTimeout
description: |+
The maximum amount of time pelican will wait for the xrootd daemons to gracefully shutdown before killing ongoing transfers.
During this period, the Director will stop redirecting clients to the Origin/Cache,
while in-flight transfers are allowed to proceed until timeout.
type: duration
default: 1m
Comment thread
jhiemstrawisc marked this conversation as resolved.
components: ["origin", "cache"]
---
############################
# Monitoring-level configs #
############################
Expand Down
60 changes: 41 additions & 19 deletions launchers/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os/signal"
"sync"
"syscall"
"time"

"github.com/gin-gonic/gin"
"github.com/pkg/errors"
Expand Down Expand Up @@ -62,25 +63,6 @@ func LaunchModules(ctx context.Context, modules server_structs.ServerType) (serv

config.LogPelicanVersion()

egrp.Go(func() error {
Comment thread
h2zh marked this conversation as resolved.
_ = config.RestartFlag
log.Debug("Will shutdown process on signal")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case sig := <-sigs:
log.Warningf("Received signal %v; will shutdown process", sig)
shutdownCancel()
return ErrExitOnSignal
case <-config.RestartFlag:
log.Warningf("Received restart request; will restart the process")
shutdownCancel()
return ErrRestart
case <-ctx.Done():
return nil
}
})

var engine *gin.Engine
engine, err = web_ui.GetEngine()
if err != nil {
Expand Down Expand Up @@ -382,5 +364,45 @@ func LaunchModules(ctx context.Context, modules server_structs.ServerType) (serv
egrp.Go(func() error { return web_ui.InitServerWebLogin(ctx) })
}

egrp.Go(func() error {
_ = config.RestartFlag
log.Debug("Will shutdown process on signal")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case sig := <-sigs:
log.Warningf("Received signal %v; will shutdown process", sig)
// Graceful shutdown if received SIGTERM
if sig == syscall.SIGTERM {
handleGracefulShutdown(ctx, modules, servers)
}
shutdownCancel()
return ErrExitOnSignal
case <-config.RestartFlag:
Comment thread
h2zh marked this conversation as resolved.
log.Warningf("Received restart request; will restart the process")
handleGracefulShutdown(ctx, modules, servers)
shutdownCancel()
return ErrRestart
case <-ctx.Done():
return nil
}
})

return
}

func handleGracefulShutdown(ctx context.Context, modules server_structs.ServerType, servers []server_structs.XRootDServer) {
if modules.IsEnabled(server_structs.OriginType) || modules.IsEnabled(server_structs.CacheType) {
log.Warnf("Waiting %s for in-flight transfers before shutting down", param.Xrootd_ShutdownTimeout.GetDuration().String())

// Set component's health status, so the ad could pick up the shutdown flag (`Status`: `shutting down`)
metrics.SetComponentHealthStatus(metrics.OriginCache_XRootD, metrics.StatusShuttingDown, "The server is shutting down")
// When the server is up again, the ShuttingDown status will be cleared

if advErr := launcher_utils.Advertise(ctx, servers); advErr != nil {
log.Errorf("Failed to advertise before shutdown: %v", advErr)
}
time.Sleep(param.Xrootd_ShutdownTimeout.GetDuration())
log.Warn("Shutdown grace period elapsed; proceeding with shutdown and discarding incomplete transfers")
}
}
6 changes: 3 additions & 3 deletions metrics/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type (

// HealthStatusEnum are stored as Prometheus values and internal struct
const (
StatusCritical HealthStatusEnum = iota + 1
StatusShuttingDown
StatusShuttingDown HealthStatusEnum = iota + 1
StatusCritical
StatusDegraded
StatusWarning
StatusOK
Expand Down Expand Up @@ -118,7 +118,7 @@ func ParseHealthStatus(status string) HealthStatusEnum {
// matched string representation, so we will return "Error: status string index out of range"
// as an indicator
func (status HealthStatusEnum) String() string {
strings := [...]string{"critical", "shutting down", "degraded", "warning", "ok", "unknown"}
strings := [...]string{"shutting down", "critical", "degraded", "warning", "ok", "unknown"}

if int(status) < 1 || int(status) > len(strings) {
return statusIndexErrorMessage
Expand Down
2 changes: 1 addition & 1 deletion metrics/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func TestHealthStatusString(t *testing.T) {
expectedStrings := [...]string{"critical", "shutting down", "degraded", "warning", "ok", "unknown"}
expectedStrings := [...]string{"shutting down", "critical", "degraded", "warning", "ok", "unknown"}

t.Run("health-status-string-handles-out-of-range-index", func(t *testing.T) {
invalidIndex := len(expectedStrings) + 1
Expand Down
1 change: 1 addition & 0 deletions param/parameters.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions param/parameters_struct.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server_structs/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type (
StrategyType string
SortType string

// OriginAdvertiseV2 is the struct used to advertise BOTH Origin and Cache server to the director
OriginAdvertiseV2 struct {
ServerBaseAd
// The namespace prefix to register/look up the server in the registry.
Expand Down
17 changes: 16 additions & 1 deletion web_ui/frontend/app/config/components/RestartBox.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Button } from '@mui/material';
import { Replay } from '@mui/icons-material';
import { useState } from 'react';

import { AlertDispatchContext } from '@/components/AlertProvider';
import { useContext } from 'react';
Expand All @@ -8,12 +9,26 @@ import { restartServer } from '@/helpers/api';

export const RestartBox = () => {
const dispatch = useContext(AlertDispatchContext);
const [isDisabled, setIsDisabled] = useState(false);

const handleRestart = async () => {
setIsDisabled(true);
try {
await alertOnError(restartServer, 'Restart Server', dispatch);
} finally {
// TODO: Disable button for a given time set by param.Xrootd_ShutdownTimeout.GetDuration().Milliseconds()
setTimeout(() => {
setIsDisabled(false);
}, 60000);
}
};

return (
<Button
variant='outlined'
endIcon={<Replay />}
onClick={() => alertOnError(restartServer, 'Restart Server', dispatch)}
onClick={handleRestart}
disabled={isDisabled}
>
Restart Server
</Button>
Expand Down
Loading