Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 53 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,35 @@ func (s *StreamState) stopStream(logger *zap.Logger) {

logger.Info("Stopping existing stream...")

// Stop FFmpeg process
if s.ffmpegCmd != nil && s.ffmpegCmd.Process != nil {
logger.Debug("Terminating FFmpeg process")
if err := s.ffmpegCmd.Process.Kill(); err != nil {
logger.Warn("Failed to kill FFmpeg process", zap.Error(err))
}
}

// Cancel Chrome context
if s.chromeCancel != nil {
logger.Debug("Cancelling Chrome context")
s.chromeCancel()
}

// Ask FFmpeg to terminate gracefully
if s.ffmpegCmd != nil && s.ffmpegCmd.Process != nil {
logger.Debug("Sending SIGTERM to FFmpeg process")
_ = s.ffmpegCmd.Process.Signal(syscall.SIGTERM)

done := make(chan struct{})
go func(cmd *exec.Cmd) {
// Wait will reap the process and free OS resources
_, _ = cmd.Process.Wait()
close(done)
}(s.ffmpegCmd)

select {
case <-done:
logger.Debug("FFmpeg exited gracefully")
case <-time.After(5 * time.Second):
logger.Warn("FFmpeg did not exit in time, killing")
_ = s.ffmpegCmd.Process.Kill()
// Ensure we still reap it
go func(cmd *exec.Cmd) { _, _ = cmd.Process.Wait() }(s.ffmpegCmd)
}
}

// Cancel main stream context
if s.cancelFunc != nil {
logger.Debug("Cancelling stream context")
Expand Down Expand Up @@ -202,17 +217,20 @@ func main() {
}()

// Setup stream status checker (will start monitoring after stream begins)
setupStreamStatusChecker(ctx, config)
cronScheduler := setupStreamStatusChecker(ctx, config)

// Handle graceful shutdown
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
logger.Info("Received shutdown signal, stopping...")

signal.Stop(c)
// Stop current stream if running
StopCurrentStream(ctx)
if cronScheduler != nil {
cronScheduler.Stop()
}

// Shutdown HTTP server
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -497,6 +515,28 @@ func startFFmpegStream(ctx context.Context, config *Config, display string, stre
return fmt.Errorf("failed to start ffmpeg: %v", err)
}

// Start a goroutine to periodically flush the zapWriter while the command is running
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Check if the process is still running
if cmd.Process != nil && cmd.ProcessState == nil {
// Sync forces any buffered output to be written
zapWriter.Sync()
} else {
// Process is done or never started
return
}
case <-ctx.Done():
return
}
}
}()

// Register this stream as running
globalStreamState.setStreamRunning(streamCancel, chromeCancel, cmd)

Expand Down Expand Up @@ -526,7 +566,7 @@ func startFFmpegStream(ctx context.Context, config *Config, display string, stre
// If the proper enviromental variables are set, setup a cron job to check the status of the stream
// If the stream is not live, then restart the stream
// This is used because various platforms have maximum stream durations and after that we need to restart
func setupStreamStatusChecker(ctx context.Context, config *Config) {
func setupStreamStatusChecker(ctx context.Context, config *Config) *cron.Cron {
logger := utils.GetLoggerFromContext(ctx)

logger.Debug("Setting up stream status checker")
Expand Down Expand Up @@ -568,7 +608,9 @@ func setupStreamStatusChecker(ctx context.Context, config *Config) {
})
c.Start()
logger.Info("Stream status checker started", zap.String("cronString", cronString))
return c
} else {
logger.Debug("Stream status checker not configured, skipping setup")
}
return nil
}