grove/Transport/
IPCTransport.rs1use std::{
7 path::{Path, PathBuf},
8 sync::Arc,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use tokio::sync::RwLock;
14use tracing::{debug, info, instrument, warn};
15
16use crate::Transport::TransportStrategy;
17use crate::Transport::TransportType;
18use crate::Transport::TransportStats;
19use crate::Transport::TransportConfig;
20
21#[derive(Clone, Debug)]
23pub struct IPCTransportImpl {
24 socket_path:Option<PathBuf>,
26 pipe_name:Option<String>,
28 config:TransportConfig,
30 connected:Arc<RwLock<bool>>,
32 stats:Arc<RwLock<TransportStats>>,
34}
35
36impl IPCTransportImpl {
37 pub fn new() -> anyhow::Result<Self> {
39 #[cfg(unix)]
40 {
41 let socket_path = Self::default_socket_path();
42 Ok(Self {
43 socket_path:Some(socket_path),
44 pipe_name:None,
45 config:TransportConfig::default(),
46 connected:Arc::new(RwLock::new(false)),
47 stats:Arc::new(RwLock::new(TransportStats::default())),
48 })
49 }
50
51 #[cfg(windows)]
52 {
53 Ok(Self {
54 socket_path:None,
55 pipe_name:Some(r"\\.\pipe\grove-ipc".to_string()),
56 config:TransportConfig::default(),
57 connected:Arc::new(RwLock::new(false)),
58 stats:Arc::new(RwLock::new(TransportStats::default())),
59 })
60 }
61
62 #[cfg(not(any(unix, windows)))]
63 {
64 Err(anyhow::anyhow!("IPC transport not supported on this platform"))
65 }
66 }
67
68 pub fn with_socket_path<P:AsRef<Path>>(socket_path:P) -> anyhow::Result<Self> {
86 #[cfg(unix)]
87 {
88 Ok(Self {
89 socket_path:Some(socket_path.as_ref().to_path_buf()),
90 pipe_name:None,
91 config:TransportConfig::default(),
92 connected:Arc::new(RwLock::new(false)),
93 stats:Arc::new(RwLock::new(TransportStats::default())),
94 })
95 }
96
97 #[cfg(not(unix))]
98 {
99 Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
100 }
101 }
102
103 #[cfg(windows)]
105 pub fn with_pipe_name(pipe_name:&str) -> anyhow::Result<Self> {
106 Ok(Self {
107 socket_path:None,
108 pipe_name:Some(pipe_name.to_string()),
109 config:TransportConfig::default(),
110 connected:Arc::new(RwLock::new(false)),
111 stats:Arc::new(RwLock::new(TransportStats::default())),
112 })
113 }
114
115 pub fn with_config(config:TransportConfig) -> anyhow::Result<Self> {
117 #[cfg(unix)]
118 {
119 let socket_path = Self::default_socket_path();
120 Ok(Self {
121 socket_path:Some(socket_path),
122 pipe_name:None,
123 config,
124 connected:Arc::new(RwLock::new(false)),
125 stats:Arc::new(RwLock::new(TransportStats::default())),
126 })
127 }
128
129 #[cfg(windows)]
130 {
131 Ok(Self {
132 socket_path:None,
133 pipe_name:Some(r"\\.\pipe\grove-ipc".to_string()),
134 config,
135 connected:Arc::new(RwLock::new(false)),
136 stats:Arc::new(RwLock::new(TransportStats::default())),
137 })
138 }
139
140 #[cfg(not(any(unix, windows)))]
141 {
142 Err(anyhow::anyhow!("IPC transport not supported on this platform"))
143 }
144 }
145
146 #[cfg(unix)]
148 fn default_socket_path() -> PathBuf {
149 let mut path = std::env::temp_dir();
150 path.push("grove-ipc.sock");
151 path
152 }
153
154 #[cfg(unix)]
156 pub fn socket_path(&self) -> Option<&Path> { self.socket_path.as_deref() }
157
158 #[cfg(windows)]
160 pub fn pipe_name(&self) -> Option<&str> { self.pipe_name.as_deref() }
161
162 pub async fn stats(&self) -> TransportStats { self.stats.read().await.clone() }
164
165 #[cfg(unix)]
167 async fn cleanup_socket(&self) -> anyhow::Result<()> {
168 if let Some(path) = &self.socket_path {
169 if path.exists() {
170 tokio::fs::remove_file(path)
171 .await
172 .map_err(|e| anyhow::anyhow!("Failed to remove socket: {}", e))?;
173 debug!("Removed existing socket: {:?}", path);
174 }
175 }
176 Ok(())
177 }
178}
179
180#[async_trait]
181impl TransportStrategy for IPCTransportImpl {
182 type Error = IPCTransportError;
183
184 #[instrument(skip(self))]
185 async fn connect(&self) -> Result<(), Self::Error> {
186 info!("Connecting to IPC transport");
187
188 #[cfg(unix)]
189 {
190 self.cleanup_socket()
191 .await
192 .map_err(|e| IPCTransportError::ConnectionFailed(e.to_string()))?;
193
194 *self.connected.write().await = true;
197
198 info!("IPC connection established: {:?}", self.socket_path);
199 }
200
201 #[cfg(windows)]
202 {
203 *self.connected.write().await = true;
204 info!("IPC connection established: {:?}", self.pipe_name);
205 }
206
207 #[cfg(not(any(unix, windows)))]
208 {
209 return Err(IPCTransportError::NotSupported);
210 }
211
212 Ok(())
213 }
214
215 #[instrument(skip(self, request))]
216 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
217 if !self.is_connected() {
218 return Err(IPCTransportError::NotConnected);
219 }
220
221 debug!("Sending IPC request ({} bytes)", request.len());
222
223 let response = vec![];
226
227 let mut stats = self.stats.write().await;
229 stats.record_sent(request.len() as u64, 0);
230 stats.record_received(response.len() as u64);
231
232 Ok(response)
233 }
234
235 #[instrument(skip(self, data))]
236 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
237 if !self.is_connected() {
238 return Err(IPCTransportError::NotConnected);
239 }
240
241 debug!("Sending IPC request without response ({} bytes)", data.len());
242
243 let mut stats = self.stats.write().await;
246 stats.record_sent(data.len() as u64, 0);
247
248 Ok(())
249 }
250
251 #[instrument(skip(self))]
252 async fn close(&self) -> Result<(), Self::Error> {
253 info!("Closing IPC connection");
254
255 *self.connected.write().await = false;
256
257 #[cfg(unix)]
258 {
259 if let Some(path) = &self.socket_path {
261 if path.exists() {
262 tokio::fs::remove_file(path).await.map_err(|e| {
263 warn!("Failed to remove socket: {}", e);
264 IPCTransportError::CleanupFailed(e.to_string())
266 })?;
267 }
268 }
269 }
270
271 info!("IPC connection closed");
272
273 Ok(())
274 }
275
276 fn is_connected(&self) -> bool { self.connected.blocking_read().to_owned() }
277
278 fn transport_type(&self) -> TransportType {
279 TransportType::IPC
280 }
281}
282
283#[derive(Debug, thiserror::Error)]
285pub enum IPCTransportError {
286#[error("Connection failed: {0}")]
288ConnectionFailed(String),
289
290#[error("Send failed: {0}")]
292SendFailed(String),
293
294#[error("Receive failed: {0}")]
296ReceiveFailed(String),
297
298#[error("Not connected")]
300NotConnected,
301
302#[error("IPC not supported on this platform")]
304NotSupported,
305
306#[error("Cleanup failed: {0}")]
308CleanupFailed(String),
309
310#[error("Socket error: {0}")]
312SocketError(String),
313
314#[error("Timeout")]
316Timeout,
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use crate::Transport::Strategy::TransportStrategy;
323
324 #[test]
325 fn test_ipc_transport_creation() {
326 #[cfg(any(unix, windows))]
327 {
328 let result = IPCTransportImpl::new();
329 assert!(result.is_ok());
330 }
331 }
332
333 #[test]
334 fn test_ipc_transport_not_supported() {
335 #[cfg(not(any(unix, windows)))]
336 {
337 let result = IPCTransportImpl::new();
338 assert!(result.is_err());
339 }
340 }
341
342 #[cfg(unix)]
343 #[test]
344 fn test_ipc_transport_with_socket_path() {
345 let result = IPCTransportImpl::with_socket_path(Path::new("/tmp/test.sock"));
346 assert!(result.is_ok());
347 let transport = result.unwrap();
348 assert_eq!(transport.socket_path(), Some(Path::new("/tmp/test.sock")));
349 }
350
351 #[cfg(windows)]
352 #[test]
353 fn test_ipc_transport_with_pipe_name() {
354 let result = IPCTransportImpl::with_pipe_name(r"\\.\pipe\test");
355 assert!(result.is_ok());
356 let transport = result.unwrap();
357 assert_eq!(transport.pipe_name(), Some(r"\\.\pipe\test"));
358 }
359
360 #[tokio::test]
361 async fn test_ipc_transport_not_connected() {
362 #[cfg(any(unix, windows))]
363 {
364 let transport = IPCTransportImpl::new().unwrap();
365 assert!(!transport.is_connected());
366 }
367 }
368}