Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4389a6a
fix(supervisor): prevent file descriptor leaks in SSE streaming and IPC
benjaminwestern Mar 4, 2026
3f1ef0d
refactor(logs): address PR review feedback for SSE streaming
benjaminwestern Mar 4, 2026
0ed5f7b
fix: address PR review - file rotation regression and IPC error handling
benjaminwestern Mar 4, 2026
83ddd64
Merge branch 'main' into main
benjaminwestern Mar 4, 2026
869d7a5
Merge branch 'main' into main
benjaminwestern Mar 5, 2026
757f649
Merge branch 'jdx:main' into main
benjaminwestern Mar 7, 2026
c6ba744
fix(logs): track inode to detect rotation when opening fresh file handle
benjaminwestern Mar 7, 2026
d39cd87
fix(logs): fix cross-platform compilation and seek error handling
benjaminwestern Mar 7, 2026
4fb7352
fix(logs): add clear event on file rotation
benjaminwestern Mar 7, 2026
a03e967
fix(logs): seed inode tracking and eliminate redundant metadata calls
benjaminwestern Mar 7, 2026
8da097e
fix(logs): add non-unix fallback for cached_metadata and fresh_ino
benjaminwestern Mar 7, 2026
43a71be
fix(logs): remove unreachable code and blocking metadata call
benjaminwestern Mar 7, 2026
17fa531
fix(logs): add debug logging for SeekFailed and ReadFailed errors
benjaminwestern Mar 7, 2026
09f931c
fix(logs): discard broken handle on metadata failure and remove redun…
benjaminwestern Mar 7, 2026
9a76a46
fix(logs): preserve SSE tail semantics during reconnects
benjaminwestern Mar 8, 2026
f1f6093
test(logs): harden SSE rotation regression coverage
benjaminwestern Mar 8, 2026
c0e76b7
test(logs): harden SSE web startup and polling
benjaminwestern Mar 8, 2026
5056980
test(logs): polish SSE cleanup and inode polling
benjaminwestern Mar 8, 2026
d19e09e
test(logs): align SSE checks with platform support
benjaminwestern Mar 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions src/ipc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ impl IpcServer {
while let Some(req) = incoming_chan.recv().await {
if let Err(err) = tx.send((req, outgoing_chan.clone())).await {
debug!("Failed to send message: {err:?}");
break;
}
}
trace!("IPC connection handler task terminated cleanly");
});
}
Err(err) => {
Expand All @@ -140,9 +142,7 @@ impl IpcServer {
bail!("IPC message contains null byte");
}
msg.push(0);
if let Err(err) = send.write_all(&msg).await {
trace!("Failed to send message: {err:?}");
}
send.write_all(&msg).await.into_diagnostic()?;
Comment thread
benjaminwestern marked this conversation as resolved.
Ok(())
}

Expand Down Expand Up @@ -207,8 +207,10 @@ impl IpcServer {

if let Err(err) = tx.send(msg).await {
warn!("Failed to emit message: {err:?}");
break;
}
}
trace!("IPC read task terminated cleanly");
});
rx
}
Expand All @@ -228,9 +230,35 @@ impl IpcServer {
}
};
if let Err(err) = Self::send(&mut send, msg).await {
warn!("Failed to send message: {err:?}");
// Broken-pipe / reset is expected when a client disconnects normally
// Traverse the error source chain to find the original io::Error
// since miette wraps it in a DiagnosticError
use std::error::Error as StdError;
let is_disconnect = {
let mut cur: Option<&dyn StdError> = Some(err.as_ref() as &dyn StdError);
let mut found = false;
while let Some(e) = cur {
if let Some(io) = e.downcast_ref::<std::io::Error>() {
found = matches!(
io.kind(),
std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::ConnectionReset
);
break;
}
cur = e.source();
}
found
};
Comment thread
benjaminwestern marked this conversation as resolved.
if is_disconnect {
debug!("IPC client disconnected: {err:?}");
} else {
warn!("Failed to send message: {err:?}");
}
break;
}
}
trace!("IPC send task terminated cleanly");
});
tx
}
Expand Down
270 changes: 248 additions & 22 deletions src/web/routes/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::pitchfork_toml::PitchforkToml;
use crate::state_file::StateFile;
use crate::web::bp;
use crate::web::helpers::{html_escape, url_encode};
use std::io::{Read, Seek, SeekFrom};

fn base_html(title: &str, content: &str) -> String {
let bp = bp();
Expand Down Expand Up @@ -341,35 +342,260 @@ pub async fn stream_sse(
}
};
let log_path = daemon_id.log_path();
let mut last_size = std::fs::metadata(&log_path).map(|m| m.len()).unwrap_or(0);
let (mut last_size, mut last_path_ino) = match tokio::task::spawn_blocking({
let path = log_path.clone();
move || {
match std::fs::metadata(&path) {
Ok(metadata) => {
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
(metadata.len(), Some(metadata.ino()))
}
#[cfg(not(unix))]
{
(metadata.len(), None)
}
}
Err(_) => (0, None),
}
}
})
.await
{
Ok(state) => state,
Err(err) => {
warn!(
"SSE log stream: failed to read initial metadata for '{}': {err:?}",
log_path.display()
);
(0, None)
}
};
let mut file_handle: Option<std::fs::File> = None;

// Internal result type for file operations within spawn_blocking
enum FileOpResult {
Data(Vec<u8>),
Truncated,
FileRotated,
SeekFailed,
ReadFailed,
Eof,
}

struct FileOpOutput {
file: Option<std::fs::File>,
size: u64,
result: Option<FileOpResult>,
inode: Option<u64>,
}

loop {
tokio::time::sleep(Duration::from_millis(500)).await;

if let Ok(metadata) = std::fs::metadata(&log_path) {
let current_size = metadata.len();

if current_size > last_size {
// Read new content as bytes to handle invalid UTF-8
if let Ok(file) = std::fs::File::open(&log_path) {
use std::io::{Read, Seek, SeekFrom};
let mut file = file;
if file.seek(SeekFrom::Start(last_size)).is_ok() {
let mut buffer = Vec::new();
if file.read_to_end(&mut buffer).is_ok() && !buffer.is_empty() {
// Use lossy conversion to handle invalid UTF-8 gracefully
let new_content = String::from_utf8_lossy(&buffer);
let escaped = html_escape(&new_content);
yield Ok(Event::default().event("message").data(escaped));
// Use spawn_blocking for all blocking file operations
let file_op_result = {
let path = log_path.clone();
let fh = file_handle.take();
let mut ls = last_size;
let prev_ino = last_path_ino;
tokio::task::spawn_blocking(move || {
let mut file = match fh {
Some(f) => f,
None => match std::fs::File::open(&path) {
Ok(f) => f,
Err(_) => {
return FileOpOutput {
file: None,
size: ls,
result: None,
inode: prev_ino,
};
}
},
Comment thread
greptile-apps[bot] marked this conversation as resolved.
};

// Check if file was rotated while we had no handle (fresh open case)
// and cache metadata to avoid redundant fstat calls
#[cfg(unix)]
let (fresh_open_rotated, cached_metadata, fresh_ino) = {
use std::os::unix::fs::MetadataExt;
if let Ok(meta) = file.metadata() {
let ino = meta.ino();
if let Some(prev_ino_val) = prev_ino {
if ino != prev_ino_val {
// File was rotated since we last had a handle
(true, Some(meta), Some(ino))
} else {
(false, Some(meta), Some(ino))
}
} else {
// No previous inode, capture current one for future checks
(false, Some(meta), Some(ino))
}
} else {
(false, None, None)
}
};
#[cfg(unix)]
if fresh_open_rotated {
return FileOpOutput {
file: None,
size: 0,
result: Some(FileOpResult::FileRotated),
inode: fresh_ino,
};
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment thread
benjaminwestern marked this conversation as resolved.

// Fallback for non-unix platforms
#[cfg(not(unix))]
let (cached_metadata, fresh_ino): (Option<std::fs::Metadata>, Option<u64>) = (None, None);

let metadata = match cached_metadata {
Some(m) => m,
None => match file.metadata() {
Ok(m) => m,
Err(_) => {
return FileOpOutput {
file: None,
size: ls,
result: None,
inode: fresh_ino,
};
}
},
};
Comment thread
greptile-apps[bot] marked this conversation as resolved.
let current_size = metadata.len();
Comment thread
greptile-apps[bot] marked this conversation as resolved.

// Check if file was recreated (inode changed) on Unix systems
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
let path_ino = std::fs::metadata(&path).map(|m| m.ino()).ok();
let file_ino = metadata.ino();
if let Some(path_ino) = path_ino && path_ino != file_ino {
// File was recreated; drop handle and reset
return FileOpOutput {
file: None,
size: 0,
result: Some(FileOpResult::FileRotated),
inode: Some(path_ino),
};
}
}
Comment thread
benjaminwestern marked this conversation as resolved.
Comment thread
greptile-apps[bot] marked this conversation as resolved.

// Use the current file's inode for future rotation checks
// metadata is already available from above, no need for extra fstat
#[cfg(unix)]
let current_ino = fresh_ino.or_else(|| {
use std::os::unix::fs::MetadataExt;
Some(metadata.ino())
});
Comment thread
benjaminwestern marked this conversation as resolved.
#[cfg(not(unix))]
let current_ino: Option<u64> = None;

if current_size > ls {
// Read new content as bytes to handle invalid UTF-8
if file.seek(SeekFrom::Start(ls)).is_err() {
return FileOpOutput {
file: None,
size: ls,
result: Some(FileOpResult::SeekFailed),
inode: current_ino,
};
}
Comment thread
benjaminwestern marked this conversation as resolved.

const MAX_READ_SIZE: u64 = 1024 * 1024;
let bytes_to_read = (current_size - ls).min(MAX_READ_SIZE) as usize;
let mut buffer = Vec::with_capacity(bytes_to_read);
match (&mut file).take(bytes_to_read as u64).read_to_end(&mut buffer) {
Ok(0) => {
return FileOpOutput {
file: Some(file),
size: ls,
result: Some(FileOpResult::Eof),
inode: current_ino,
};
}
Ok(n) => {
ls += n as u64;
return FileOpOutput {
file: Some(file),
size: ls,
result: Some(FileOpResult::Data(buffer)),
inode: current_ino,
};
}
Err(_) => {
return FileOpOutput {
file: None,
size: ls,
result: Some(FileOpResult::ReadFailed),
inode: current_ino,
};
}
// Always update last_size to avoid stalling on invalid content
last_size = current_size;
}
} else if current_size < ls {
// File was truncated (cleared)
return FileOpOutput {
file: Some(file),
size: 0,
result: Some(FileOpResult::Truncated),
inode: current_ino,
};
Comment thread
benjaminwestern marked this conversation as resolved.
}
} else if current_size < last_size {
// File was truncated (cleared), send clear event and reset
yield Ok(Event::default().event("clear").data(""));
last_size = current_size;

FileOpOutput {
file: Some(file),
size: ls,
result: None,
inode: current_ino,
}
}).await
};

match file_op_result {
Ok(output) => {
file_handle = output.file;
last_size = output.size;
last_path_ino = output.inode;

match output.result {
Some(FileOpResult::Data(buffer)) => {
let new_content = String::from_utf8_lossy(&buffer);
let escaped = html_escape(&new_content);
yield Ok(Event::default().event("message").data(escaped));
}
Some(FileOpResult::Truncated) => {
yield Ok(Event::default().event("clear").data(""));
}
Some(FileOpResult::FileRotated) => {
// Signal the client to clear stale content from the previous file
yield Ok(Event::default().event("clear").data(""));
}
Some(FileOpResult::SeekFailed) => {
debug!("SSE log stream: seek failed on '{}', will reopen", log_path.display());
}
Some(FileOpResult::ReadFailed) => {
debug!("SSE log stream: read failed on '{}', will reopen", log_path.display());
}
Some(FileOpResult::Eof) => {
debug!(
"SSE log stream: read 0 bytes from '{}' after size check; retrying",
log_path.display()
);
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment thread
benjaminwestern marked this conversation as resolved.
_ => {}
}
}
Err(err) => {
// file_handle is already None after take() above;
// last_size remains valid from before this iteration.
warn!(
"SSE log stream: spawn_blocking panicked for '{}': {err:?}",
log_path.display()
);
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ impl TestEnv {
/// - A short ID (e.g., "api") - will look for logs in the "project" namespace
/// - A qualified ID (e.g., "project/api") - will convert to filesystem-safe path
pub fn read_logs(&self, daemon_id: &str) -> String {
fs::read_to_string(self.log_path(daemon_id)).unwrap_or_default()
}

/// Get log file path for a daemon
pub fn log_path(&self, daemon_id: &str) -> PathBuf {
// If the ID doesn't contain a namespace, assume it's from the project directory
let qualified_id = if daemon_id.contains('/') {
daemon_id.to_string()
Expand All @@ -154,16 +159,13 @@ impl TestEnv {
// Convert to filesystem-safe path (replace "/" with "--")
let safe_id = qualified_id.replace('/', "--");

let log_path = self
.home_dir
self.home_dir
.join(".local")
.join("state")
.join("pitchfork")
.join("logs")
.join(&safe_id)
.join(format!("{safe_id}.log"));

fs::read_to_string(log_path).unwrap_or_default()
.join(format!("{safe_id}.log"))
}

/// Get the home directory for this test environment
Expand Down
Loading