Skip to main content

Grove/Transport/
Strategy.rs

1//! Transport Strategy Module
2//!
3//! Defines the transport strategy trait and types for different
4//! communication methods (gRPC, IPC, WASM).
5
6use std::fmt;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde::{Deserialize, Serialize};
11
12use crate::Transport::{IPCTransport::IPCTransport, WASMTransport::WASMTransportImpl, gRPCTransport::gRPCTransport};
13
14/// Transport strategy trait
15///
16/// All transport implementations must implement this trait to provide
17/// a common interface for connecting, sending, and closing connections.
18#[async_trait]
19pub trait TransportStrategy: Send + Sync {
20	/// Error type for this transport
21	type Error: std::error::Error + Send + Sync + 'static;
22
23	/// Connect to the transport endpoint
24	async fn connect(&self) -> Result<(), Self::Error>;
25
26	/// Send a request and receive a response
27	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error>;
28
29	/// Send data without expecting a response (fire and forget)
30	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error>;
31
32	/// Close the transport connection
33	async fn close(&self) -> Result<(), Self::Error>;
34
35	/// Check if the transport is connected
36	fn is_connected(&self) -> bool;
37
38	/// Get the transport type identifier
39	fn transport_type(&self) -> TransportType;
40}
41
42/// Transport type enumeration
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum TransportType {
45	/// gRPC transport
46	gRPC,
47	/// Inter-process communication
48	IPC,
49	/// Direct WASM module communication
50	WASM,
51	/// Unknown/unspecified transport
52	Unknown,
53}
54
55impl fmt::Display for TransportType {
56	fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result {
57		match self {
58			Self::gRPC => write!(f, "grpc"),
59			Self::IPC => write!(f, "ipc"),
60			Self::WASM => write!(f, "wasm"),
61			Self::Unknown => write!(f, "unknown"),
62		}
63	}
64}
65
66impl std::str::FromStr for TransportType {
67	type Err = anyhow::Error;
68
69	fn from_str(s:&str) -> Result<Self, Self::Err> {
70		match s.to_lowercase().as_str() {
71			"grpc" => Ok(Self::gRPC),
72			"ipc" => Ok(Self::IPC),
73			"wasm" => Ok(Self::WASM),
74			_ => Err(anyhow::anyhow!("Unknown transport type: {}", s)),
75		}
76	}
77}
78
79/// Transport enumeration.
80///
81/// Union type wrapping all supported transport implementations.
82#[derive(Debug)]
83pub enum Transport {
84	/// gRPC-based transport (Mountain/Air communication).
85	gRPC(gRPCTransport),
86	/// IPC transport (same-machine process communication).
87	IPC(IPCTransport),
88	/// Direct WASM module transport (browser).
89	WASM(WASMTransportImpl),
90}
91
92impl Transport {
93	/// Get the transport type
94	pub fn transport_type(&self) -> TransportType {
95		match self {
96			Self::gRPC(_) => TransportType::gRPC,
97			Self::IPC(_) => TransportType::IPC,
98			Self::WASM(_) => TransportType::WASM,
99		}
100	}
101
102	/// Connect to the transport
103	pub async fn connect(&self) -> anyhow::Result<()> {
104		match self {
105			Self::gRPC(transport) => {
106				transport
107					.connect()
108					.await
109					.map_err(|e| anyhow::anyhow!("gRPC connect error: {}", e))
110			},
111			Self::IPC(transport) => {
112				transport
113					.connect()
114					.await
115					.map_err(|e| anyhow::anyhow!("IPC connect error: {}", e))
116			},
117			Self::WASM(transport) => {
118				transport
119					.connect()
120					.await
121					.map_err(|e| anyhow::anyhow!("WASM connect error: {}", e))
122			},
123		}
124	}
125
126	/// Send a request and receive a response
127	pub async fn send(&self, request:&[u8]) -> anyhow::Result<Vec<u8>> {
128		match self {
129			Self::gRPC(transport) => {
130				transport
131					.send(request)
132					.await
133					.map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
134			},
135			Self::IPC(transport) => {
136				transport
137					.send(request)
138					.await
139					.map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
140			},
141			Self::WASM(transport) => {
142				transport
143					.send(request)
144					.await
145					.map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
146			},
147		}
148	}
149
150	/// Send data without expecting a response
151	pub async fn send_no_response(&self, data:&[u8]) -> anyhow::Result<()> {
152		match self {
153			Self::gRPC(transport) => {
154				transport
155					.send_no_response(data)
156					.await
157					.map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
158			},
159			Self::IPC(transport) => {
160				transport
161					.send_no_response(data)
162					.await
163					.map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
164			},
165			Self::WASM(transport) => {
166				transport
167					.send_no_response(data)
168					.await
169					.map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
170			},
171		}
172	}
173
174	/// Close the transport
175	pub async fn close(&self) -> anyhow::Result<()> {
176		match self {
177			Self::gRPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("gRPC close error: {}", e)),
178			Self::IPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("IPC close error: {}", e)),
179			Self::WASM(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("WASM close error: {}", e)),
180		}
181	}
182
183	/// Check if the transport is connected
184	pub fn is_connected(&self) -> bool {
185		match self {
186			Self::gRPC(transport) => transport.is_connected(),
187			Self::IPC(transport) => transport.is_connected(),
188			Self::WASM(transport) => transport.is_connected(),
189		}
190	}
191
192	/// Get gRPC transport reference (if applicable)
193	pub fn AsgRPC(&self) -> Option<&gRPCTransport> {
194		match self {
195			Self::gRPC(Transport) => Some(Transport),
196			_ => None,
197		}
198	}
199
200	/// Returns the IPC transport reference if this is an IPC transport.
201	pub fn AsIPC(&self) -> Option<&IPCTransport> {
202		match self {
203			Self::IPC(Transport) => Some(Transport),
204			_ => None,
205		}
206	}
207
208	/// Get WASM transport reference (if applicable)
209	pub fn as_wasm(&self) -> Option<&WASMTransportImpl> {
210		match self {
211			Self::WASM(transport) => Some(transport),
212			_ => None,
213		}
214	}
215}
216
217impl Default for Transport {
218	fn default() -> Self {
219		Self::gRPC(
220			gRPCTransport::New("127.0.0.1:50050").unwrap_or_else(|_| {
221				gRPCTransport::New("0.0.0.0:50050").expect("Failed to create default gRPC transport")
222			}),
223		)
224	}
225}
226
227impl fmt::Display for Transport {
228	fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Transport({})", self.transport_type()) }
229}
230
231/// Transport message wrapper
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct TransportMessage {
234	/// Message type identifier
235	pub message_type:String,
236	/// Message ID for correlation
237	pub message_id:String,
238	/// Timestamp (Unix epoch)
239	pub timestamp:u64,
240	/// Message payload
241	pub payload:Bytes,
242	/// Optional metadata
243	pub metadata:Option<serde_json::Value>,
244}
245
246impl TransportMessage {
247	/// Create a new transport message
248	pub fn new(message_type:impl Into<String>, payload:Bytes) -> Self {
249		Self {
250			message_type:message_type.into(),
251			message_id:uuid::Uuid::new_v4().to_string(),
252			timestamp:std::time::SystemTime::now()
253				.duration_since(std::time::UNIX_EPOCH)
254				.map(|d| d.as_secs())
255				.unwrap_or(0),
256			payload,
257			metadata:None,
258		}
259	}
260
261	/// Set metadata for the message
262	pub fn with_metadata(mut self, metadata:serde_json::Value) -> Self {
263		self.metadata = Some(metadata);
264		self
265	}
266
267	/// Serialize the message to bytes
268	pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
269		serde_json::to_vec(self).map(Bytes::from).map_err(|e| anyhow::anyhow!("{}", e))
270	}
271
272	/// Deserialize message from bytes
273	pub fn from_bytes(bytes:&[u8]) -> anyhow::Result<Self> {
274		serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!("{}", e))
275	}
276}
277
278/// Transport statistics
279#[derive(Debug, Clone, Default, Serialize, Deserialize)]
280pub struct TransportStats {
281	/// Number of messages sent
282	pub messages_sent:u64,
283	/// Number of messages received
284	pub messages_received:u64,
285	/// Number of errors encountered
286	pub errors:u64,
287	/// Total bytes sent
288	pub bytes_sent:u64,
289	/// Total bytes received
290	pub bytes_received:u64,
291	/// Average latency in microseconds
292	pub avg_latency_us:u64,
293	/// Connection uptime in seconds
294	pub uptime_seconds:u64,
295}
296
297impl TransportStats {
298	/// Update statistics with a sent message
299	pub fn record_sent(&mut self, bytes:u64, latency_us:u64) {
300		self.messages_sent += 1;
301		self.bytes_sent += bytes;
302
303		// Update average latency
304		if self.messages_sent > 0 {
305			self.avg_latency_us = (self.avg_latency_us * (self.messages_sent - 1) + latency_us) / self.messages_sent;
306		}
307	}
308
309	/// Update statistics with a received message
310	pub fn record_received(&mut self, bytes:u64) {
311		self.messages_received += 1;
312		self.bytes_received += bytes;
313	}
314
315	/// Record an error
316	pub fn record_error(&mut self) { self.errors += 1; }
317}
318
319#[cfg(test)]
320mod tests {
321	use super::*;
322
323	#[test]
324	fn test_transport_type_to_string() {
325		assert_eq!(TransportType::gRPC.to_string(), "grpc");
326		assert_eq!(TransportType::IPC.to_string(), "ipc");
327		assert_eq!(TransportType::WASM.to_string(), "wasm");
328	}
329
330	#[test]
331	fn test_transport_type_from_str() {
332		assert_eq!("grpc".parse::<TransportType>().unwrap(), TransportType::gRPC);
333		assert_eq!("ipc".parse::<TransportType>().unwrap(), TransportType::IPC);
334		assert_eq!("wasm".parse::<TransportType>().unwrap(), TransportType::WASM);
335		assert!("unknown".parse::<TransportType>().is_err());
336	}
337
338	#[test]
339	fn test_transport_display() {
340		// Create a dummy transport to test Display implementation
341		// In real tests, we'd use an actual transport
342		let transport = Transport::default();
343		let display = format!("{}", transport);
344		assert!(display.contains("Transport"));
345	}
346
347	#[test]
348	fn test_transport_message_creation() {
349		let message = TransportMessage::new("test_type", Bytes::from("hello"));
350		assert_eq!(message.message_type, "test_type");
351		assert_eq!(message.payload, Bytes::from("hello"));
352		assert!(!message.message_id.is_empty());
353	}
354
355	#[test]
356	fn test_transport_message_serialization() {
357		let message = TransportMessage::new("test", Bytes::from("data"));
358		let bytes = message.to_bytes().unwrap();
359		let deserialized = TransportMessage::from_bytes(&bytes).unwrap();
360		assert_eq!(deserialized.message_type, message.message_type);
361		assert_eq!(deserialized.payload, message.payload);
362	}
363
364	#[test]
365	fn test_transport_stats() {
366		let mut stats = TransportStats::default();
367		stats.record_sent(100, 1000);
368		stats.record_received(50);
369		stats.record_error();
370
371		assert_eq!(stats.messages_sent, 1);
372		assert_eq!(stats.messages_received, 1);
373		assert_eq!(stats.errors, 1);
374		assert_eq!(stats.bytes_sent, 100);
375		assert_eq!(stats.bytes_received, 50);
376		assert_eq!(stats.avg_latency_us, 1000);
377	}
378}