Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,64 @@ pub enum FileError {
NoPath,
}

/// Errors related to IPC communication with the supervisor.
#[derive(Debug, Error, Diagnostic)]
pub enum IpcError {
#[error("failed to connect to supervisor after {attempts} attempts")]
#[diagnostic(code(pitchfork::ipc::connection_failed))]
ConnectionFailed {
attempts: u32,
#[help]
details: Option<String>,
},

#[error("IPC request timed out after {seconds}s")]
#[diagnostic(
code(pitchfork::ipc::timeout),
help(
"the supervisor may be unresponsive or overloaded.\nCheck supervisor status: pitchfork supervisor status\nView logs: pitchfork logs"
)
)]
Timeout { seconds: u64 },

#[error("IPC connection closed unexpectedly")]
#[diagnostic(
code(pitchfork::ipc::connection_closed),
help(
"the supervisor may have crashed or been stopped.\nRestart with: pitchfork supervisor start"
)
)]
ConnectionClosed,

#[error("failed to read IPC response")]
#[diagnostic(code(pitchfork::ipc::read_failed))]
ReadFailed {
#[help]
details: Option<String>,
},

#[error("failed to send IPC request")]
#[diagnostic(code(pitchfork::ipc::send_failed))]
SendFailed {
#[help]
details: Option<String>,
},

#[error("unexpected response from supervisor: expected {expected}, got {actual}")]
#[diagnostic(
code(pitchfork::ipc::unexpected_response),
help("this may indicate a version mismatch between the CLI and supervisor")
)]
UnexpectedResponse { expected: String, actual: String },

#[error("IPC message contains invalid data")]
#[diagnostic(code(pitchfork::ipc::invalid_message))]
InvalidMessage {
#[help]
details: Option<String>,
},
}

/// Find the most similar daemon name for suggestions.
pub fn find_similar_daemon<'a>(
name: &str,
Expand Down Expand Up @@ -217,4 +275,26 @@ mod tests {
let err = FileError::NoPath;
assert!(err.to_string().contains("no file path"));
}

#[test]
fn test_ipc_error_display() {
let err = IpcError::ConnectionFailed {
attempts: 5,
details: Some("connection refused".to_string()),
};
assert!(err.to_string().contains("failed to connect"));
assert!(err.to_string().contains("5 attempts"));

let err = IpcError::Timeout { seconds: 30 };
assert!(err.to_string().contains("timed out"));
assert!(err.to_string().contains("30s"));

let err = IpcError::UnexpectedResponse {
expected: "Ok".to_string(),
actual: "Error".to_string(),
};
assert!(err.to_string().contains("unexpected response"));
assert!(err.to_string().contains("Ok"));
assert!(err.to_string().contains("Error"));
}
}
83 changes: 61 additions & 22 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::daemon::{Daemon, RunOptions};
use crate::error::IpcError;
use crate::ipc::{IpcRequest, IpcResponse, deserialize, fs_name, serialize};
use crate::{Result, supervisor};
use exponential_backoff::Backoff;
use interprocess::local_socket::tokio::{RecvHalf, SendHalf};
use interprocess::local_socket::traits::tokio::Stream;
use miette::{Context, IntoDiagnostic, bail, ensure};
use miette::Context;
use std::path::PathBuf;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
Expand All @@ -31,7 +32,13 @@ impl IpcClient {
let client = Self::connect_(&id, "main").await?;
trace!("Connected to IPC socket");
let rsp = client.request(IpcRequest::Connect).await?;
ensure!(rsp.is_ok(), "Failed to connect to IPC main");
if !rsp.is_ok() {
return Err(IpcError::UnexpectedResponse {
expected: "Ok".to_string(),
actual: format!("{:?}", rsp),
}
.into());
}
debug!("Connected to IPC main");
Ok(client)
}
Expand All @@ -58,28 +65,41 @@ impl IpcClient {
tokio::time::sleep(duration).await;
continue;
} else {
bail!("Failed to connect to IPC socket: {:?}", err);
return Err(IpcError::ConnectionFailed {
attempts: CONNECT_ATTEMPTS,
details: Some(format!(
"{err}\nensure the supervisor is running with: pitchfork supervisor start"

Copilot AI Jan 19, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The help text 'ensure the supervisor is running with: pitchfork supervisor start' is duplicated in the details field here (line 70-71) and in the fallback case (lines 81-82). Consider extracting this message to a constant to maintain consistency.

Copilot uses AI. Check for mistakes.
)),
}
.into());
Comment thread
cursor[bot] marked this conversation as resolved.
}
}
}
}
bail!(
"failed to connect to IPC socket after {} attempts",
CONNECT_ATTEMPTS
)
Err(IpcError::ConnectionFailed {
attempts: CONNECT_ATTEMPTS,
details: Some(
"ensure the supervisor is running with: pitchfork supervisor start".to_string(),
),
}
.into())
}

pub async fn send(&self, msg: IpcRequest) -> Result<()> {
let mut msg = serialize(&msg)?;
if msg.contains(&0) {
bail!("IPC message contains null byte");
return Err(IpcError::InvalidMessage {
details: Some("message contains null byte".to_string()),
}
.into());
}
msg.push(0);
let mut send = self.send.lock().await;
send.write_all(&msg)
.await
.into_diagnostic()
.wrap_err("failed to send IPC message")?;
.map_err(|e| IpcError::SendFailed {
details: Some(e.to_string()),
})?;
Ok(())
}

Expand All @@ -88,11 +108,21 @@ impl IpcClient {
let mut bytes = Vec::new();
match tokio::time::timeout(timeout, recv.read_until(0, &mut bytes)).await {
Ok(Ok(_)) => {}
Ok(Err(err)) => bail!("failed to read IPC message: {err}"),
Err(_) => bail!("IPC read timed out after {timeout:?}"),
Ok(Err(err)) => {
return Err(IpcError::ReadFailed {
details: Some(err.to_string()),
}
.into());
}
Err(_) => {
return Err(IpcError::Timeout {
seconds: timeout.as_secs(),
}
.into());
}
}
if bytes.is_empty() {
bail!("IPC connection closed unexpectedly (supervisor may have stopped)");
return Err(IpcError::ConnectionClosed.into());
}
deserialize(&bytes).wrap_err("failed to deserialize IPC response")
}
Expand All @@ -101,6 +131,13 @@ impl IpcClient {
self.request_with_timeout(msg, REQUEST_TIMEOUT).await
}

fn unexpected_response(expected: &str, actual: &IpcResponse) -> IpcError {
IpcError::UnexpectedResponse {
expected: expected.to_string(),
actual: format!("{:?}", actual),
}
}

async fn request_with_timeout(
&self,
msg: IpcRequest,
Expand All @@ -121,7 +158,7 @@ impl IpcClient {
info!("daemon {} already enabled", id);
Ok(false)
}
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => Err(Self::unexpected_response("Yes or No", &rsp).into()),
}
}

Expand All @@ -136,7 +173,7 @@ impl IpcClient {
info!("daemon {} already disabled", id);
Ok(false)
}
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => Err(Self::unexpected_response("Yes or No", &rsp).into()),
}
}

Expand Down Expand Up @@ -185,7 +222,9 @@ impl IpcClient {
error!("Failed to print logs: {}", e);
}
}
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => {
return Err(Self::unexpected_response("DaemonStart or DaemonReady", &rsp).into());
}
}
Ok((started_daemons, exit_code))
}
Expand All @@ -194,7 +233,7 @@ impl IpcClient {
let rsp = self.request(IpcRequest::GetActiveDaemons).await?;
match rsp {
IpcResponse::ActiveDaemons(daemons) => Ok(daemons),
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => Err(Self::unexpected_response("ActiveDaemons", &rsp).into()),
}
}

Expand All @@ -209,7 +248,7 @@ impl IpcClient {
IpcResponse::Ok => {
trace!("updated shell dir for pid {shell_pid} to {}", dir.display());
}
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => return Err(Self::unexpected_response("Ok", &rsp).into()),
}
Ok(())
}
Expand All @@ -220,7 +259,7 @@ impl IpcClient {
IpcResponse::Ok => {
trace!("cleaned");
}
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => return Err(Self::unexpected_response("Ok", &rsp).into()),
}
Ok(())
}
Expand All @@ -229,15 +268,15 @@ impl IpcClient {
let rsp = self.request(IpcRequest::GetDisabledDaemons).await?;
match rsp {
IpcResponse::DisabledDaemons(daemons) => Ok(daemons),
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => Err(Self::unexpected_response("DisabledDaemons", &rsp).into()),
}
}

pub async fn get_notifications(&self) -> Result<Vec<(log::LevelFilter, String)>> {
let rsp = self.request(IpcRequest::GetNotifications).await?;
match rsp {
IpcResponse::Notifications(notifications) => Ok(notifications),
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => Err(Self::unexpected_response("Notifications", &rsp).into()),
}
}

Expand All @@ -252,7 +291,7 @@ impl IpcClient {
warn!("daemon {} is not running", id);
Ok(())
}
rsp => bail!("unexpected IPC response: {rsp:?}"),
rsp => Err(Self::unexpected_response("Ok or DaemonAlreadyStopped", &rsp).into()),
}
}
}
31 changes: 19 additions & 12 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::Result;
use crate::daemon::{Daemon, RunOptions};
use crate::env;
use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
use miette::IntoDiagnostic;
use miette::{Context, IntoDiagnostic};
use std::path::PathBuf;

pub(crate) mod client;
Expand Down Expand Up @@ -60,22 +60,29 @@ fn fs_name(name: &str) -> Result<Name<'_>> {
}

fn serialize<T: serde::Serialize>(msg: &T) -> Result<Vec<u8>> {
let msg = if *env::IPC_JSON {
serde_json::to_vec(msg).into_diagnostic()?
if *env::IPC_JSON {
serde_json::to_vec(msg)
.into_diagnostic()
.wrap_err("failed to serialize IPC message as JSON")
} else {
rmp_serde::to_vec(msg).into_diagnostic()?
};
Ok(msg)
rmp_serde::to_vec(msg)
.into_diagnostic()
.wrap_err("failed to serialize IPC message as MessagePack")
}
}

fn deserialize<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T> {
let mut bytes = bytes.to_vec();
bytes.pop();
trace!("msg: {:?}", std::str::from_utf8(&bytes).unwrap_or_default());
let msg = if *env::IPC_JSON {
serde_json::from_slice(&bytes).into_diagnostic()?
let preview = std::str::from_utf8(&bytes).unwrap_or("<binary>");
trace!("msg: {:?}", preview);
Comment on lines +77 to +78

Copilot AI Jan 19, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name 'preview' suggests this is a limited view of the data, but the code attempts to convert the entire bytes buffer to UTF-8. Consider renaming to 'message_text' or 'decoded_message' to better reflect that it represents the full decoded content or a fallback label.

Suggested change
let preview = std::str::from_utf8(&bytes).unwrap_or("<binary>");
trace!("msg: {:?}", preview);
let decoded_message = std::str::from_utf8(&bytes).unwrap_or("<binary>");
trace!("msg: {:?}", decoded_message);

Copilot uses AI. Check for mistakes.
if *env::IPC_JSON {
serde_json::from_slice(&bytes)
.into_diagnostic()
.wrap_err("failed to deserialize IPC JSON response")
} else {
rmp_serde::from_slice(&bytes).into_diagnostic()?
};
Ok(msg)
rmp_serde::from_slice(&bytes)
.into_diagnostic()
.wrap_err("failed to deserialize IPC MessagePack response")
}
}