Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Build artifacts
stream

# Dependency directories (remove the comment below to include it)
# vendor/

Expand Down
140 changes: 133 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"os/signal"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -82,18 +83,52 @@ func (s *StreamState) stopStream(logger *zap.Logger) {

logger.Info("Stopping existing stream...")

// Stop FFmpeg process
// Stop FFmpeg process with proper cleanup
if s.ffmpegCmd != nil && s.ffmpegCmd.Process != nil {
logger.Debug("Terminating FFmpeg process")
if err := s.ffmpegCmd.Process.Kill(); err != nil {
logger.Warn("Failed to kill FFmpeg process", zap.Error(err))

// First try graceful termination with SIGTERM
if err := s.ffmpegCmd.Process.Signal(syscall.SIGTERM); err != nil {
logger.Warn("Failed to send SIGTERM to FFmpeg process", zap.Error(err))
} else {
// Give process 3 seconds to terminate gracefully
done := make(chan error, 1)
go func() {
done <- s.ffmpegCmd.Wait()
}()

select {
case <-done:
logger.Debug("FFmpeg process terminated gracefully")
case <-time.After(3 * time.Second):
logger.Debug("FFmpeg process did not terminate gracefully, forcing kill")
// Force kill if it doesn't terminate gracefully
if err := s.ffmpegCmd.Process.Kill(); err != nil {
logger.Warn("Failed to kill FFmpeg process", zap.Error(err))
}
// Wait a bit more for the kill to take effect
select {
case <-done:
case <-time.After(2 * time.Second):
logger.Warn("FFmpeg process may not have been killed properly")
}
}
}

// Kill any remaining child processes in the process group
if pgid, err := syscall.Getpgid(s.ffmpegCmd.Process.Pid); err == nil {
logger.Debug("Killing process group", zap.Int("pgid", pgid))
// Kill entire process group to ensure all child processes are terminated
syscall.Kill(-pgid, syscall.SIGKILL)
}
}

// Cancel Chrome context
// Cancel Chrome context with timeout
if s.chromeCancel != nil {
logger.Debug("Cancelling Chrome context")
s.chromeCancel()
// Give Chrome context time to clean up
time.Sleep(1 * time.Second)
}

// Cancel main stream context
Expand Down Expand Up @@ -179,6 +214,9 @@ func main() {
zap.Int("width", config.Width),
zap.Int("height", config.Height))

// Log initial memory stats
logMemoryStats(logger)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -225,6 +263,7 @@ func main() {
}()

// Run the stream in a loop to handle restarts from the cron job or manual restarts
var restartCount int
for {
select {
// Case to handle an expected shutdown signal
Expand All @@ -234,7 +273,20 @@ func main() {
// Default case to start or restart the stream
// This will be triggered by the cron job or manual restarts
default:
logger.Info("Starting/restarting stream...")
logger.Info("Starting/restarting stream...", zap.Int("restartCount", restartCount))

// Perform periodic cleanup every 10 restarts to prevent resource accumulation
if restartCount > 0 && restartCount%10 == 0 {
logger.Info("Performing periodic cleanup", zap.Int("restartCount", restartCount))
logMemoryStats(logger)
cleanupChromeProcesses(logger)
runtime.GC()
// Give system time to cleanup
time.Sleep(2 * time.Second)
logger.Info("Periodic cleanup completed")
logMemoryStats(logger)
}

if err := streamWebpage(ctx, config); err != nil {
if ctx.Err() != nil {
logger.Info("Stream stopped due to context cancellation")
Expand All @@ -243,6 +295,7 @@ func main() {
logger.Info("Stream ended, will restart in 5 seconds", zap.Error(err))
time.Sleep(5 * time.Second)
}
restartCount++
}
}
}
Expand Down Expand Up @@ -324,6 +377,10 @@ func streamWebpage(ctx context.Context, config *Config) error {
time.Sleep(2 * time.Second)
}

// Force garbage collection before starting new stream to free up memory
runtime.GC()
logger.Debug("Forced garbage collection before starting new stream")

// Create a cancellable context for this stream session
streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()
Expand All @@ -347,14 +404,32 @@ func streamWebpage(ctx context.Context, config *Config) error {
chromedp.Flag("disable-blink-features", "AutomationControlled"),
chromedp.Flag("mute-audio", false),
chromedp.Flag("window-position", "0,0"),
// Add memory management flags for Chrome
chromedp.Flag("max_old_space_size", "512"),
chromedp.Flag("memory-pressure-off", true),
chromedp.Flag("disable-background-timer-throttling", true),
chromedp.Flag("disable-renderer-backgrounding", true),
chromedp.Flag("disable-backgrounding-occluded-windows", true),
chromedp.WindowSize(config.Width, config.Height),
)

allocCtx, allocCancel := chromedp.NewExecAllocator(streamCtx, opts...)
defer allocCancel()
defer func() {
logger.Debug("Cancelling Chrome allocator context")
allocCancel()
// Give allocator time to clean up
time.Sleep(500 * time.Millisecond)
}()

chromeCtx, chromeCancel := chromedp.NewContext(allocCtx)
defer chromeCancel()
defer func() {
logger.Debug("Cancelling Chrome context")
chromeCancel()
// Give Chrome context time to clean up
time.Sleep(500 * time.Millisecond)
// Force cleanup of any remaining Chrome processes
cleanupChromeProcesses(logger)
}()

// Start Chrome and navigate to webpage
logger.Info("Starting Chrome browser", zap.String("url", config.WebpageURL))
Expand All @@ -381,6 +456,50 @@ func streamWebpage(ctx context.Context, config *Config) error {
return startFFmpegStream(streamCtx, config, displayInfo, streamCancel, chromeCancel)
}

// Function to clean up any remaining Chrome processes
func cleanupChromeProcesses(logger *zap.Logger) {
logger.Debug("Cleaning up Chrome processes")

// Kill any remaining chrome processes
cmd := exec.Command("pkill", "-f", "chrome")
if err := cmd.Run(); err != nil {
logger.Debug("No Chrome processes to cleanup or cleanup failed", zap.Error(err))
}

// Force kill any stubborn chrome processes
cmd = exec.Command("pkill", "-9", "-f", "chrome")
if err := cmd.Run(); err != nil {
logger.Debug("No Chrome processes to force kill or force kill failed", zap.Error(err))
}

// Also cleanup chromium processes (in case they exist)
cmd = exec.Command("pkill", "-f", "chromium")
if err := cmd.Run(); err != nil {
logger.Debug("No Chromium processes to cleanup or cleanup failed", zap.Error(err))
}

logger.Debug("Chrome process cleanup completed")
}

// Function to log memory usage statistics for monitoring
func logMemoryStats(logger *zap.Logger) {
var m runtime.MemStats
runtime.ReadMemStats(&m)

logger.Info("Memory statistics",
zap.Uint64("allocated_mb", bToMb(m.Alloc)),
zap.Uint64("total_allocated_mb", bToMb(m.TotalAlloc)),
zap.Uint64("heap_objects", m.HeapObjects),
zap.Uint32("num_gc", m.NumGC),
zap.Uint64("sys_mb", bToMb(m.Sys)),
)
}

// Helper function to convert bytes to megabytes
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

// Function to get the display info generated by the start.sh script and feed it to FFmpeg
func getDisplayInfo() (string, error) {
// Try to get the DISPLAY environment variable
Expand Down Expand Up @@ -490,6 +609,9 @@ func startFFmpegStream(ctx context.Context, config *Config, display string, stre
cmd := exec.CommandContext(ctx, "ffmpeg", args...)
cmd.Stdout = zapWriter
cmd.Stderr = zapWriter

// Set process group ID to allow killing all child processes
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

logger.Debug("Starting FFmpeg with command", zap.Strings("args", args))

Expand All @@ -513,6 +635,10 @@ func startFFmpegStream(ctx context.Context, config *Config, display string, stre
globalStreamState.chromeCancel = nil
globalStreamState.ffmpegCmd = nil
globalStreamState.mu.Unlock()

// Force garbage collection after stream ends
runtime.GC()
logger.Debug("Forced garbage collection after stream ended")
}()

if ctx.Err() != nil {
Expand Down
45 changes: 45 additions & 0 deletions cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"os/exec"
"syscall"
"testing"

"go.uber.org/zap"
Expand Down Expand Up @@ -363,3 +364,47 @@ func TestGetDisplayInfo(t *testing.T) {
}
})
}

func TestCleanupChromeProcesses(t *testing.T) {
t.Run("Chrome Process Cleanup Function", func(t *testing.T) {
logger, _ := zap.NewDevelopment()

// This test just ensures the function doesn't crash
// In a real environment, it would clean up Chrome processes
cleanupChromeProcesses(logger)

// If we reach here, the function completed without panicking
// which is what we want to verify
})
}

func TestStopStreamWithProcessGroup(t *testing.T) {
t.Cleanup(resetGlobalStreamState)

t.Run("Stop Stream With Process Group Cleanup", func(t *testing.T) {
logger, _ := zap.NewDevelopment()

// Create a mock command (sleep command that won't actually run long)
cmd := exec.Command("sleep", "0.1")
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
cmd.Start()

// Set up stream state with the mock command
globalStreamState.mu.Lock()
globalStreamState.isRunning = true
globalStreamState.ffmpegCmd = cmd
globalStreamState.mu.Unlock()

// Test that stopStream can handle process group cleanup
globalStreamState.stopStream(logger)

// Verify the stream state is cleaned up
if globalStreamState.isStreamRunning() {
t.Error("Expected stream to be stopped")
}

if globalStreamState.ffmpegCmd != nil {
t.Error("Expected ffmpegCmd to be nil after stopping")
}
})
}
11 changes: 11 additions & 0 deletions start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,16 @@ done
echo "✓ PulseAudio is responding"

echo "=== All Dependencies Ready - Starting Application ==="

# Set memory management environment variables
export GOMEMLIMIT="512MiB" # Limit Go memory usage
export GOGC=100 # Standard garbage collection target

# Cleanup any existing processes to free memory
echo "Cleaning up any existing processes..."
pkill -f chrome 2>/dev/null || true
pkill -f chromium 2>/dev/null || true
pkill -f ffmpeg 2>/dev/null || true

# Start the stream application
exec /stream