grove/Transport/
IPCTransport.rs

1//! IPC Transport Implementation
2//!
3//! Provides inter-process communication (IPC) for Grove.
4//! Supports Unix domain sockets on macOS/Linux.
5
6use std::{
7	path::{Path, PathBuf},
8	sync::Arc,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use tokio::sync::RwLock;
14use tracing::{debug, info, instrument, warn};
15
16use crate::Transport::TransportStrategy;
17use crate::Transport::TransportType;
18use crate::Transport::TransportStats;
19use crate::Transport::TransportConfig;
20
21/// IPC transport for local process communication
22#[derive(Clone, Debug)]
23pub struct IPCTransportImpl {
24	/// Socket path
25	socket_path:Option<PathBuf>,
26	/// Named pipe name (Windows)
27	pipe_name:Option<String>,
28	/// Transport configuration
29	config:TransportConfig,
30	/// Connection state
31	connected:Arc<RwLock<bool>>,
32	/// Transport statistics
33	stats:Arc<RwLock<TransportStats>>,
34}
35
36impl IPCTransportImpl {
37	/// Create a new IPC transport with default socket path
38	pub fn new() -> anyhow::Result<Self> {
39		#[cfg(unix)]
40		{
41			let socket_path = Self::default_socket_path();
42			Ok(Self {
43				socket_path:Some(socket_path),
44				pipe_name:None,
45				config:TransportConfig::default(),
46				connected:Arc::new(RwLock::new(false)),
47				stats:Arc::new(RwLock::new(TransportStats::default())),
48			})
49		}
50
51		#[cfg(windows)]
52		{
53			Ok(Self {
54				socket_path:None,
55				pipe_name:Some(r"\\.\pipe\grove-ipc".to_string()),
56				config:TransportConfig::default(),
57				connected:Arc::new(RwLock::new(false)),
58				stats:Arc::new(RwLock::new(TransportStats::default())),
59			})
60		}
61
62		#[cfg(not(any(unix, windows)))]
63		{
64			Err(anyhow::anyhow!("IPC transport not supported on this platform"))
65		}
66	}
67
68	/// Create a new IPC transport with a custom socket path
69	///
70	/// # Arguments
71	///
72	/// * `socket_path` - Path to the Unix domain socket
73	///
74	/// # Example
75	///
76	/// ```rust,no_run
77	/// use std::path::Path;
78	///
79	/// use grove::Transport::IPCTransport;
80	///
81	/// # #[cfg(unix)]
82	/// let transport = IPCTransport::with_socket_path(Path::new("/tmp/grove.sock"))?;
83	/// # Ok::<(), anyhow::Error>(())
84	/// ```
85	pub fn with_socket_path<P:AsRef<Path>>(socket_path:P) -> anyhow::Result<Self> {
86		#[cfg(unix)]
87		{
88			Ok(Self {
89				socket_path:Some(socket_path.as_ref().to_path_buf()),
90				pipe_name:None,
91				config:TransportConfig::default(),
92				connected:Arc::new(RwLock::new(false)),
93				stats:Arc::new(RwLock::new(TransportStats::default())),
94			})
95		}
96
97		#[cfg(not(unix))]
98		{
99			Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
100		}
101	}
102
103	/// Create a new IPC transport with a custom pipe name (Windows)
104	#[cfg(windows)]
105	pub fn with_pipe_name(pipe_name:&str) -> anyhow::Result<Self> {
106		Ok(Self {
107			socket_path:None,
108			pipe_name:Some(pipe_name.to_string()),
109			config:TransportConfig::default(),
110			connected:Arc::new(RwLock::new(false)),
111			stats:Arc::new(RwLock::new(TransportStats::default())),
112		})
113	}
114
115	/// Create a new IPC transport with custom configuration
116	pub fn with_config(config:TransportConfig) -> anyhow::Result<Self> {
117		#[cfg(unix)]
118		{
119			let socket_path = Self::default_socket_path();
120			Ok(Self {
121				socket_path:Some(socket_path),
122				pipe_name:None,
123				config,
124				connected:Arc::new(RwLock::new(false)),
125				stats:Arc::new(RwLock::new(TransportStats::default())),
126			})
127		}
128
129		#[cfg(windows)]
130		{
131			Ok(Self {
132				socket_path:None,
133				pipe_name:Some(r"\\.\pipe\grove-ipc".to_string()),
134				config,
135				connected:Arc::new(RwLock::new(false)),
136				stats:Arc::new(RwLock::new(TransportStats::default())),
137			})
138		}
139
140		#[cfg(not(any(unix, windows)))]
141		{
142			Err(anyhow::anyhow!("IPC transport not supported on this platform"))
143		}
144	}
145
146	/// Get the default socket path for the current platform
147	#[cfg(unix)]
148	fn default_socket_path() -> PathBuf {
149		let mut path = std::env::temp_dir();
150		path.push("grove-ipc.sock");
151		path
152	}
153
154	/// Get the socket path (Unix only)
155	#[cfg(unix)]
156	pub fn socket_path(&self) -> Option<&Path> { self.socket_path.as_deref() }
157
158	/// Get the pipe name (Windows only)
159	#[cfg(windows)]
160	pub fn pipe_name(&self) -> Option<&str> { self.pipe_name.as_deref() }
161
162	/// Get transport statistics
163	pub async fn stats(&self) -> TransportStats { self.stats.read().await.clone() }
164
165	/// Clean up the socket file if it exists
166	#[cfg(unix)]
167	async fn cleanup_socket(&self) -> anyhow::Result<()> {
168		if let Some(path) = &self.socket_path {
169			if path.exists() {
170				tokio::fs::remove_file(path)
171					.await
172					.map_err(|e| anyhow::anyhow!("Failed to remove socket: {}", e))?;
173				debug!("Removed existing socket: {:?}", path);
174			}
175		}
176		Ok(())
177	}
178}
179
180#[async_trait]
181impl TransportStrategy for IPCTransportImpl {
182	type Error = IPCTransportError;
183
184	#[instrument(skip(self))]
185	async fn connect(&self) -> Result<(), Self::Error> {
186		info!("Connecting to IPC transport");
187
188		#[cfg(unix)]
189		{
190			self.cleanup_socket()
191				.await
192				.map_err(|e| IPCTransportError::ConnectionFailed(e.to_string()))?;
193
194			// For a complete implementation, we would create the Unix socket here
195			// For now, we just mark as connected
196			*self.connected.write().await = true;
197
198			info!("IPC connection established: {:?}", self.socket_path);
199		}
200
201		#[cfg(windows)]
202		{
203			*self.connected.write().await = true;
204			info!("IPC connection established: {:?}", self.pipe_name);
205		}
206
207		#[cfg(not(any(unix, windows)))]
208		{
209			return Err(IPCTransportError::NotSupported);
210		}
211
212		Ok(())
213	}
214
215	#[instrument(skip(self, request))]
216	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
217		if !self.is_connected() {
218			return Err(IPCTransportError::NotConnected);
219		}
220
221		debug!("Sending IPC request ({} bytes)", request.len());
222
223		// For a complete implementation, this would send the request via the
224		// socket/pipe For now, we return a mock response
225		let response = vec![];
226
227		// Update statistics
228		let mut stats = self.stats.write().await;
229		stats.record_sent(request.len() as u64, 0);
230		stats.record_received(response.len() as u64);
231
232		Ok(response)
233	}
234
235	#[instrument(skip(self, data))]
236	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
237		if !self.is_connected() {
238			return Err(IPCTransportError::NotConnected);
239		}
240
241		debug!("Sending IPC request without response ({} bytes)", data.len());
242
243		// For a complete implementation, this would send the data via the socket/pipe
244		// For now, we just update statistics
245		let mut stats = self.stats.write().await;
246		stats.record_sent(data.len() as u64, 0);
247
248		Ok(())
249	}
250
251	#[instrument(skip(self))]
252	async fn close(&self) -> Result<(), Self::Error> {
253		info!("Closing IPC connection");
254
255		*self.connected.write().await = false;
256
257		#[cfg(unix)]
258		{
259			// Clean up socket file
260			if let Some(path) = &self.socket_path {
261				if path.exists() {
262					tokio::fs::remove_file(path).await.map_err(|e| {
263						warn!("Failed to remove socket: {}", e);
264						// Don't fail the close operation on cleanup failure
265						IPCTransportError::CleanupFailed(e.to_string())
266					})?;
267				}
268			}
269		}
270
271		info!("IPC connection closed");
272
273		Ok(())
274	}
275
276	fn is_connected(&self) -> bool { self.connected.blocking_read().to_owned() }
277
278	fn transport_type(&self) -> TransportType {
279		TransportType::IPC
280	}
281}
282
283/// IPC transport errors
284#[derive(Debug, thiserror::Error)]
285pub enum IPCTransportError {
286/// Connection failed error
287#[error("Connection failed: {0}")]
288ConnectionFailed(String),
289
290/// Send failed error
291#[error("Send failed: {0}")]
292SendFailed(String),
293
294/// Receive failed error
295#[error("Receive failed: {0}")]
296ReceiveFailed(String),
297
298/// Not connected error
299#[error("Not connected")]
300NotConnected,
301
302/// IPC not supported on this platform error
303#[error("IPC not supported on this platform")]
304NotSupported,
305
306/// Cleanup failed error
307#[error("Cleanup failed: {0}")]
308CleanupFailed(String),
309
310/// Socket error
311#[error("Socket error: {0}")]
312SocketError(String),
313
314/// Timeout error
315#[error("Timeout")]
316Timeout,
317}
318
319#[cfg(test)]
320mod tests {
321	use super::*;
322	use crate::Transport::Strategy::TransportStrategy;
323
324	#[test]
325	fn test_ipc_transport_creation() {
326		#[cfg(any(unix, windows))]
327		{
328			let result = IPCTransportImpl::new();
329			assert!(result.is_ok());
330		}
331	}
332
333	#[test]
334	fn test_ipc_transport_not_supported() {
335		#[cfg(not(any(unix, windows)))]
336		{
337			let result = IPCTransportImpl::new();
338			assert!(result.is_err());
339		}
340	}
341
342	#[cfg(unix)]
343	#[test]
344	fn test_ipc_transport_with_socket_path() {
345		let result = IPCTransportImpl::with_socket_path(Path::new("/tmp/test.sock"));
346		assert!(result.is_ok());
347		let transport = result.unwrap();
348		assert_eq!(transport.socket_path(), Some(Path::new("/tmp/test.sock")));
349	}
350
351	#[cfg(windows)]
352	#[test]
353	fn test_ipc_transport_with_pipe_name() {
354		let result = IPCTransportImpl::with_pipe_name(r"\\.\pipe\test");
355		assert!(result.is_ok());
356		let transport = result.unwrap();
357		assert_eq!(transport.pipe_name(), Some(r"\\.\pipe\test"));
358	}
359
360	#[tokio::test]
361	async fn test_ipc_transport_not_connected() {
362		#[cfg(any(unix, windows))]
363		{
364			let transport = IPCTransportImpl::new().unwrap();
365			assert!(!transport.is_connected());
366		}
367	}
368}