Grove/Transport/
IPCTransport.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::{
8 path::{Path, PathBuf},
9 sync::Arc,
10};
11
12use async_trait::async_trait;
13use tokio::sync::RwLock;
14use tracing::{debug, info, instrument, warn};
15
16use crate::Transport::{
17 Strategy::{TransportStats, TransportStrategy, TransportType},
18 TransportConfig,
19};
20
21#[derive(Clone, Debug)]
23pub struct IPCTransport {
24 SocketPath:Option<PathBuf>,
26 #[allow(dead_code)]
28 PipeName:Option<String>,
29 #[allow(dead_code)]
31 Configuration:TransportConfig,
32 Connected:Arc<RwLock<bool>>,
34 Statistics:Arc<RwLock<TransportStats>>,
36}
37
38impl IPCTransport {
39 pub fn New() -> anyhow::Result<Self> {
41 #[cfg(unix)]
42 {
43 let SocketPath = Self::DefaultSocketPath();
44 Ok(Self {
45 SocketPath:Some(SocketPath),
46 PipeName:None,
47 Configuration:TransportConfig::default(),
48 Connected:Arc::new(RwLock::new(false)),
49 Statistics:Arc::new(RwLock::new(TransportStats::default())),
50 })
51 }
52
53 #[cfg(windows)]
54 {
55 Ok(Self {
56 SocketPath:None,
57 PipeName:Some(r"\\.\pipe\grove-ipc".to_string()),
58 Configuration:TransportConfig::default(),
59 Connected:Arc::new(RwLock::new(false)),
60 Statistics:Arc::new(RwLock::new(TransportStats::default())),
61 })
62 }
63
64 #[cfg(not(any(unix, windows)))]
65 {
66 Err(anyhow::anyhow!("IPC transport not supported on this platform"))
67 }
68 }
69
70 pub fn WithSocketPath<P:AsRef<Path>>(SocketPath:P) -> anyhow::Result<Self> {
72 #[cfg(unix)]
73 {
74 Ok(Self {
75 SocketPath:Some(SocketPath.as_ref().to_path_buf()),
76 PipeName:None,
77 Configuration:TransportConfig::default(),
78 Connected:Arc::new(RwLock::new(false)),
79 Statistics:Arc::new(RwLock::new(TransportStats::default())),
80 })
81 }
82
83 #[cfg(not(unix))]
84 {
85 Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
86 }
87 }
88
89 #[cfg(unix)]
91 fn DefaultSocketPath() -> PathBuf {
92 let mut Path = std::env::temp_dir();
93 Path.push("grove-ipc.sock");
94 Path
95 }
96
97 #[cfg(unix)]
99 pub fn GetSocketPath(&self) -> Option<&Path> { self.SocketPath.as_deref() }
100
101 pub async fn GetStatistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
103
104 #[cfg(unix)]
106 async fn CleanupSocket(&self) -> anyhow::Result<()> {
107 if let Some(SocketPath) = &self.SocketPath {
108 if SocketPath.exists() {
109 tokio::fs::remove_file(SocketPath)
110 .await
111 .map_err(|E| anyhow::anyhow!("Failed to remove socket: {}", E))?;
112 debug!("Removed existing socket: {:?}", SocketPath);
113 }
114 }
115 Ok(())
116 }
117}
118
119#[async_trait]
120impl TransportStrategy for IPCTransport {
121 type Error = IPCTransportError;
122
123 #[instrument(skip(self))]
124 async fn connect(&self) -> Result<(), Self::Error> {
125 info!("Connecting to IPC transport");
126
127 #[cfg(unix)]
128 {
129 self.CleanupSocket()
130 .await
131 .map_err(|E| IPCTransportError::ConnectionFailed(E.to_string()))?;
132 *self.Connected.write().await = true;
133 info!("IPC connection established: {:?}", self.SocketPath);
134 }
135
136 #[cfg(windows)]
137 {
138 *self.Connected.write().await = true;
139 info!("IPC connection established via named pipe");
140 }
141
142 #[cfg(not(any(unix, windows)))]
143 {
144 return Err(IPCTransportError::NotSupported);
145 }
146
147 Ok(())
148 }
149
150 #[instrument(skip(self, request))]
151 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
152 if !self.is_connected() {
153 return Err(IPCTransportError::NotConnected);
154 }
155
156 debug!("Sending IPC request ({} bytes)", request.len());
157
158 let Response:Vec<u8> = vec![];
159
160 let mut Stats = self.Statistics.write().await;
161 Stats.record_sent(request.len() as u64, 0);
162 Stats.record_received(Response.len() as u64);
163
164 Ok(Response)
165 }
166
167 #[instrument(skip(self, data))]
168 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
169 if !self.is_connected() {
170 return Err(IPCTransportError::NotConnected);
171 }
172
173 debug!("Sending IPC notification ({} bytes)", data.len());
174
175 let mut Stats = self.Statistics.write().await;
176 Stats.record_sent(data.len() as u64, 0);
177 Ok(())
178 }
179
180 #[instrument(skip(self))]
181 async fn close(&self) -> Result<(), Self::Error> {
182 info!("Closing IPC connection");
183 *self.Connected.write().await = false;
184
185 #[cfg(unix)]
186 {
187 if let Some(SocketPath) = &self.SocketPath {
188 if SocketPath.exists() {
189 tokio::fs::remove_file(SocketPath).await.map_err(|E| {
190 warn!("Failed to remove socket: {}", E);
191 IPCTransportError::CleanupFailed(E.to_string())
192 })?;
193 }
194 }
195 }
196
197 info!("IPC connection closed");
198 Ok(())
199 }
200
201 fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
202
203 fn transport_type(&self) -> TransportType { TransportType::IPC }
204}
205
206#[derive(Debug, thiserror::Error)]
208pub enum IPCTransportError {
209 #[error("Connection failed: {0}")]
211 ConnectionFailed(String),
212 #[error("Send failed: {0}")]
214 SendFailed(String),
215 #[error("Receive failed: {0}")]
217 ReceiveFailed(String),
218 #[error("Not connected")]
220 NotConnected,
221 #[error("IPC not supported on this platform")]
223 NotSupported,
224 #[error("Cleanup failed: {0}")]
226 CleanupFailed(String),
227 #[error("Socket error: {0}")]
229 SocketError(String),
230 #[error("Timeout")]
232 Timeout,
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[test]
240 fn TestIPCTransportCreation() {
241 #[cfg(any(unix, windows))]
242 {
243 let Result = IPCTransport::New();
244 assert!(Result.is_ok());
245 }
246 }
247
248 #[cfg(unix)]
249 #[test]
250 fn TestIPCTransportWithSocketPath() {
251 let Result = IPCTransport::WithSocketPath(Path::new("/tmp/test.sock"));
252 assert!(Result.is_ok());
253 let Transport = Result.unwrap();
254 assert_eq!(Transport.GetSocketPath(), Some(Path::new("/tmp/test.sock")));
255 }
256
257 #[tokio::test]
258 async fn TestIPCTransportNotConnected() {
259 #[cfg(any(unix, windows))]
260 {
261 let Transport = IPCTransport::New().unwrap();
262 assert!(!Transport.is_connected());
263 }
264 }
265}