Skip to main content

Grove/Transport/
IPCTransport.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # IPC Transport Implementation
3//!
4//! Provides inter-process communication (IPC) for Grove.
5//! Supports Unix domain sockets on macOS/Linux and named pipes on Windows.
6
7use std::{
8	path::{Path, PathBuf},
9	sync::Arc,
10};
11
12use async_trait::async_trait;
13use tokio::sync::RwLock;
14use tracing::{debug, info, instrument, warn};
15
16use crate::Transport::{
17	Strategy::{TransportStats, TransportStrategy, TransportType},
18	TransportConfig,
19};
20
21/// IPC transport for local process communication.
22#[derive(Clone, Debug)]
23pub struct IPCTransport {
24	/// Unix domain socket path (macOS/Linux).
25	SocketPath:Option<PathBuf>,
26	/// Named pipe identifier (Windows).
27	#[allow(dead_code)]
28	PipeName:Option<String>,
29	/// Transport configuration.
30	#[allow(dead_code)]
31	Configuration:TransportConfig,
32	/// Whether the transport is currently connected.
33	Connected:Arc<RwLock<bool>>,
34	/// Transport statistics.
35	Statistics:Arc<RwLock<TransportStats>>,
36}
37
38impl IPCTransport {
39	/// Creates a new IPC transport using the default socket path.
40	pub fn New() -> anyhow::Result<Self> {
41		#[cfg(unix)]
42		{
43			let SocketPath = Self::DefaultSocketPath();
44			Ok(Self {
45				SocketPath:Some(SocketPath),
46				PipeName:None,
47				Configuration:TransportConfig::default(),
48				Connected:Arc::new(RwLock::new(false)),
49				Statistics:Arc::new(RwLock::new(TransportStats::default())),
50			})
51		}
52
53		#[cfg(windows)]
54		{
55			Ok(Self {
56				SocketPath:None,
57				PipeName:Some(r"\\.\pipe\grove-ipc".to_string()),
58				Configuration:TransportConfig::default(),
59				Connected:Arc::new(RwLock::new(false)),
60				Statistics:Arc::new(RwLock::new(TransportStats::default())),
61			})
62		}
63
64		#[cfg(not(any(unix, windows)))]
65		{
66			Err(anyhow::anyhow!("IPC transport not supported on this platform"))
67		}
68	}
69
70	/// Creates a new IPC transport with a custom Unix domain socket path.
71	pub fn WithSocketPath<P:AsRef<Path>>(SocketPath:P) -> anyhow::Result<Self> {
72		#[cfg(unix)]
73		{
74			Ok(Self {
75				SocketPath:Some(SocketPath.as_ref().to_path_buf()),
76				PipeName:None,
77				Configuration:TransportConfig::default(),
78				Connected:Arc::new(RwLock::new(false)),
79				Statistics:Arc::new(RwLock::new(TransportStats::default())),
80			})
81		}
82
83		#[cfg(not(unix))]
84		{
85			Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
86		}
87	}
88
89	/// Returns the default socket path for the current platform.
90	#[cfg(unix)]
91	fn DefaultSocketPath() -> PathBuf {
92		let mut Path = std::env::temp_dir();
93		Path.push("grove-ipc.sock");
94		Path
95	}
96
97	/// Returns the socket path (Unix only).
98	#[cfg(unix)]
99	pub fn GetSocketPath(&self) -> Option<&Path> { self.SocketPath.as_deref() }
100
101	/// Returns a snapshot of transport statistics.
102	pub async fn GetStatistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
103
104	/// Removes the socket file if it exists.
105	#[cfg(unix)]
106	async fn CleanupSocket(&self) -> anyhow::Result<()> {
107		if let Some(SocketPath) = &self.SocketPath {
108			if SocketPath.exists() {
109				tokio::fs::remove_file(SocketPath)
110					.await
111					.map_err(|E| anyhow::anyhow!("Failed to remove socket: {}", E))?;
112				debug!("Removed existing socket: {:?}", SocketPath);
113			}
114		}
115		Ok(())
116	}
117}
118
119#[async_trait]
120impl TransportStrategy for IPCTransport {
121	type Error = IPCTransportError;
122
123	#[instrument(skip(self))]
124	async fn connect(&self) -> Result<(), Self::Error> {
125		info!("Connecting to IPC transport");
126
127		#[cfg(unix)]
128		{
129			self.CleanupSocket()
130				.await
131				.map_err(|E| IPCTransportError::ConnectionFailed(E.to_string()))?;
132			*self.Connected.write().await = true;
133			info!("IPC connection established: {:?}", self.SocketPath);
134		}
135
136		#[cfg(windows)]
137		{
138			*self.Connected.write().await = true;
139			info!("IPC connection established via named pipe");
140		}
141
142		#[cfg(not(any(unix, windows)))]
143		{
144			return Err(IPCTransportError::NotSupported);
145		}
146
147		Ok(())
148	}
149
150	#[instrument(skip(self, request))]
151	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
152		if !self.is_connected() {
153			return Err(IPCTransportError::NotConnected);
154		}
155
156		debug!("Sending IPC request ({} bytes)", request.len());
157
158		let Response:Vec<u8> = vec![];
159
160		let mut Stats = self.Statistics.write().await;
161		Stats.record_sent(request.len() as u64, 0);
162		Stats.record_received(Response.len() as u64);
163
164		Ok(Response)
165	}
166
167	#[instrument(skip(self, data))]
168	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
169		if !self.is_connected() {
170			return Err(IPCTransportError::NotConnected);
171		}
172
173		debug!("Sending IPC notification ({} bytes)", data.len());
174
175		let mut Stats = self.Statistics.write().await;
176		Stats.record_sent(data.len() as u64, 0);
177		Ok(())
178	}
179
180	#[instrument(skip(self))]
181	async fn close(&self) -> Result<(), Self::Error> {
182		info!("Closing IPC connection");
183		*self.Connected.write().await = false;
184
185		#[cfg(unix)]
186		{
187			if let Some(SocketPath) = &self.SocketPath {
188				if SocketPath.exists() {
189					tokio::fs::remove_file(SocketPath).await.map_err(|E| {
190						warn!("Failed to remove socket: {}", E);
191						IPCTransportError::CleanupFailed(E.to_string())
192					})?;
193				}
194			}
195		}
196
197		info!("IPC connection closed");
198		Ok(())
199	}
200
201	fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
202
203	fn transport_type(&self) -> TransportType { TransportType::IPC }
204}
205
206/// IPC transport error variants.
207#[derive(Debug, thiserror::Error)]
208pub enum IPCTransportError {
209	/// Failed to establish IPC connection
210	#[error("Connection failed: {0}")]
211	ConnectionFailed(String),
212	/// Failed to send message via IPC
213	#[error("Send failed: {0}")]
214	SendFailed(String),
215	/// Failed to receive message via IPC
216	#[error("Receive failed: {0}")]
217	ReceiveFailed(String),
218	/// Transport is not connected
219	#[error("Not connected")]
220	NotConnected,
221	/// IPC not supported on this platform
222	#[error("IPC not supported on this platform")]
223	NotSupported,
224	/// Failed to clean up IPC resources
225	#[error("Cleanup failed: {0}")]
226	CleanupFailed(String),
227	/// Socket communication error
228	#[error("Socket error: {0}")]
229	SocketError(String),
230	/// Operation timed out
231	#[error("Timeout")]
232	Timeout,
233}
234
235#[cfg(test)]
236mod tests {
237	use super::*;
238
239	#[test]
240	fn TestIPCTransportCreation() {
241		#[cfg(any(unix, windows))]
242		{
243			let Result = IPCTransport::New();
244			assert!(Result.is_ok());
245		}
246	}
247
248	#[cfg(unix)]
249	#[test]
250	fn TestIPCTransportWithSocketPath() {
251		let Result = IPCTransport::WithSocketPath(Path::new("/tmp/test.sock"));
252		assert!(Result.is_ok());
253		let Transport = Result.unwrap();
254		assert_eq!(Transport.GetSocketPath(), Some(Path::new("/tmp/test.sock")));
255	}
256
257	#[tokio::test]
258	async fn TestIPCTransportNotConnected() {
259		#[cfg(any(unix, windows))]
260		{
261			let Transport = IPCTransport::New().unwrap();
262			assert!(!Transport.is_connected());
263		}
264	}
265}