1use std::fmt;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde::{Deserialize, Serialize};
11
12use crate::Transport::{IPCTransport::IPCTransport, WASMTransport::WASMTransportImpl, gRPCTransport::gRPCTransport};
13
14#[async_trait]
19pub trait TransportStrategy: Send + Sync {
20 type Error: std::error::Error + Send + Sync + 'static;
22
23 async fn connect(&self) -> Result<(), Self::Error>;
25
26 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error>;
28
29 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error>;
31
32 async fn close(&self) -> Result<(), Self::Error>;
34
35 fn is_connected(&self) -> bool;
37
38 fn transport_type(&self) -> TransportType;
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum TransportType {
45 gRPC,
47 IPC,
49 WASM,
51 Unknown,
53}
54
55impl fmt::Display for TransportType {
56 fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result {
57 match self {
58 Self::gRPC => write!(f, "grpc"),
59 Self::IPC => write!(f, "ipc"),
60 Self::WASM => write!(f, "wasm"),
61 Self::Unknown => write!(f, "unknown"),
62 }
63 }
64}
65
66impl std::str::FromStr for TransportType {
67 type Err = anyhow::Error;
68
69 fn from_str(s:&str) -> Result<Self, Self::Err> {
70 match s.to_lowercase().as_str() {
71 "grpc" => Ok(Self::gRPC),
72 "ipc" => Ok(Self::IPC),
73 "wasm" => Ok(Self::WASM),
74 _ => Err(anyhow::anyhow!("Unknown transport type: {}", s)),
75 }
76 }
77}
78
79#[derive(Debug)]
83pub enum Transport {
84 gRPC(gRPCTransport),
86 IPC(IPCTransport),
88 WASM(WASMTransportImpl),
90}
91
92impl Transport {
93 pub fn transport_type(&self) -> TransportType {
95 match self {
96 Self::gRPC(_) => TransportType::gRPC,
97 Self::IPC(_) => TransportType::IPC,
98 Self::WASM(_) => TransportType::WASM,
99 }
100 }
101
102 pub async fn connect(&self) -> anyhow::Result<()> {
104 match self {
105 Self::gRPC(transport) => {
106 transport
107 .connect()
108 .await
109 .map_err(|e| anyhow::anyhow!("gRPC connect error: {}", e))
110 },
111 Self::IPC(transport) => {
112 transport
113 .connect()
114 .await
115 .map_err(|e| anyhow::anyhow!("IPC connect error: {}", e))
116 },
117 Self::WASM(transport) => {
118 transport
119 .connect()
120 .await
121 .map_err(|e| anyhow::anyhow!("WASM connect error: {}", e))
122 },
123 }
124 }
125
126 pub async fn send(&self, request:&[u8]) -> anyhow::Result<Vec<u8>> {
128 match self {
129 Self::gRPC(transport) => {
130 transport
131 .send(request)
132 .await
133 .map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
134 },
135 Self::IPC(transport) => {
136 transport
137 .send(request)
138 .await
139 .map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
140 },
141 Self::WASM(transport) => {
142 transport
143 .send(request)
144 .await
145 .map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
146 },
147 }
148 }
149
150 pub async fn send_no_response(&self, data:&[u8]) -> anyhow::Result<()> {
152 match self {
153 Self::gRPC(transport) => {
154 transport
155 .send_no_response(data)
156 .await
157 .map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
158 },
159 Self::IPC(transport) => {
160 transport
161 .send_no_response(data)
162 .await
163 .map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
164 },
165 Self::WASM(transport) => {
166 transport
167 .send_no_response(data)
168 .await
169 .map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
170 },
171 }
172 }
173
174 pub async fn close(&self) -> anyhow::Result<()> {
176 match self {
177 Self::gRPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("gRPC close error: {}", e)),
178 Self::IPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("IPC close error: {}", e)),
179 Self::WASM(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("WASM close error: {}", e)),
180 }
181 }
182
183 pub fn is_connected(&self) -> bool {
185 match self {
186 Self::gRPC(transport) => transport.is_connected(),
187 Self::IPC(transport) => transport.is_connected(),
188 Self::WASM(transport) => transport.is_connected(),
189 }
190 }
191
192 pub fn AsgRPC(&self) -> Option<&gRPCTransport> {
194 match self {
195 Self::gRPC(Transport) => Some(Transport),
196 _ => None,
197 }
198 }
199
200 pub fn AsIPC(&self) -> Option<&IPCTransport> {
202 match self {
203 Self::IPC(Transport) => Some(Transport),
204 _ => None,
205 }
206 }
207
208 pub fn as_wasm(&self) -> Option<&WASMTransportImpl> {
210 match self {
211 Self::WASM(transport) => Some(transport),
212 _ => None,
213 }
214 }
215}
216
217impl Default for Transport {
218 fn default() -> Self {
219 Self::gRPC(
220 gRPCTransport::New("127.0.0.1:50050").unwrap_or_else(|_| {
221 gRPCTransport::New("0.0.0.0:50050").expect("Failed to create default gRPC transport")
222 }),
223 )
224 }
225}
226
227impl fmt::Display for Transport {
228 fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Transport({})", self.transport_type()) }
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct TransportMessage {
234 pub message_type:String,
236 pub message_id:String,
238 pub timestamp:u64,
240 pub payload:Bytes,
242 pub metadata:Option<serde_json::Value>,
244}
245
246impl TransportMessage {
247 pub fn new(message_type:impl Into<String>, payload:Bytes) -> Self {
249 Self {
250 message_type:message_type.into(),
251 message_id:uuid::Uuid::new_v4().to_string(),
252 timestamp:std::time::SystemTime::now()
253 .duration_since(std::time::UNIX_EPOCH)
254 .map(|d| d.as_secs())
255 .unwrap_or(0),
256 payload,
257 metadata:None,
258 }
259 }
260
261 pub fn with_metadata(mut self, metadata:serde_json::Value) -> Self {
263 self.metadata = Some(metadata);
264 self
265 }
266
267 pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
269 serde_json::to_vec(self).map(Bytes::from).map_err(|e| anyhow::anyhow!("{}", e))
270 }
271
272 pub fn from_bytes(bytes:&[u8]) -> anyhow::Result<Self> {
274 serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!("{}", e))
275 }
276}
277
278#[derive(Debug, Clone, Default, Serialize, Deserialize)]
280pub struct TransportStats {
281 pub messages_sent:u64,
283 pub messages_received:u64,
285 pub errors:u64,
287 pub bytes_sent:u64,
289 pub bytes_received:u64,
291 pub avg_latency_us:u64,
293 pub uptime_seconds:u64,
295}
296
297impl TransportStats {
298 pub fn record_sent(&mut self, bytes:u64, latency_us:u64) {
300 self.messages_sent += 1;
301 self.bytes_sent += bytes;
302
303 if self.messages_sent > 0 {
305 self.avg_latency_us = (self.avg_latency_us * (self.messages_sent - 1) + latency_us) / self.messages_sent;
306 }
307 }
308
309 pub fn record_received(&mut self, bytes:u64) {
311 self.messages_received += 1;
312 self.bytes_received += bytes;
313 }
314
315 pub fn record_error(&mut self) { self.errors += 1; }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[test]
324 fn test_transport_type_to_string() {
325 assert_eq!(TransportType::gRPC.to_string(), "grpc");
326 assert_eq!(TransportType::IPC.to_string(), "ipc");
327 assert_eq!(TransportType::WASM.to_string(), "wasm");
328 }
329
330 #[test]
331 fn test_transport_type_from_str() {
332 assert_eq!("grpc".parse::<TransportType>().unwrap(), TransportType::gRPC);
333 assert_eq!("ipc".parse::<TransportType>().unwrap(), TransportType::IPC);
334 assert_eq!("wasm".parse::<TransportType>().unwrap(), TransportType::WASM);
335 assert!("unknown".parse::<TransportType>().is_err());
336 }
337
338 #[test]
339 fn test_transport_display() {
340 let transport = Transport::default();
343 let display = format!("{}", transport);
344 assert!(display.contains("Transport"));
345 }
346
347 #[test]
348 fn test_transport_message_creation() {
349 let message = TransportMessage::new("test_type", Bytes::from("hello"));
350 assert_eq!(message.message_type, "test_type");
351 assert_eq!(message.payload, Bytes::from("hello"));
352 assert!(!message.message_id.is_empty());
353 }
354
355 #[test]
356 fn test_transport_message_serialization() {
357 let message = TransportMessage::new("test", Bytes::from("data"));
358 let bytes = message.to_bytes().unwrap();
359 let deserialized = TransportMessage::from_bytes(&bytes).unwrap();
360 assert_eq!(deserialized.message_type, message.message_type);
361 assert_eq!(deserialized.payload, message.payload);
362 }
363
364 #[test]
365 fn test_transport_stats() {
366 let mut stats = TransportStats::default();
367 stats.record_sent(100, 1000);
368 stats.record_received(50);
369 stats.record_error();
370
371 assert_eq!(stats.messages_sent, 1);
372 assert_eq!(stats.messages_received, 1);
373 assert_eq!(stats.errors, 1);
374 assert_eq!(stats.bytes_sent, 100);
375 assert_eq!(stats.bytes_received, 50);
376 assert_eq!(stats.avg_latency_us, 1000);
377 }
378}