Skip to main content

Grove/Transport/
gRPCTransport.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # gRPC Transport Implementation
3//!
4//! Provides gRPC-based communication for Grove.
5//! Connects to Mountain or other gRPC services.
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::RwLock;
11use tonic::transport::{Channel, Endpoint};
12use tracing::{debug, info, instrument};
13
14use crate::Transport::{
15	Strategy::{TransportStats, TransportStrategy, TransportType},
16	TransportConfig,
17};
18
19/// gRPC transport for communication with Mountain and other gRPC services.
20#[derive(Clone, Debug)]
21pub struct gRPCTransport {
22	/// Connection endpoint address.
23	Endpoint:String,
24	/// gRPC channel (lazily connected).
25	Channel:Arc<RwLock<Option<Channel>>>,
26	/// Transport configuration.
27	Configuration:TransportConfig,
28	/// Whether the transport is currently connected.
29	Connected:Arc<RwLock<bool>>,
30	/// Transport statistics.
31	Statistics:Arc<RwLock<TransportStats>>,
32}
33
34impl gRPCTransport {
35	/// Creates a new gRPC transport with the given address.
36	pub fn New(Address:&str) -> anyhow::Result<Self> {
37		Ok(Self {
38			Endpoint:Address.to_string(),
39			Channel:Arc::new(RwLock::new(None)),
40			Configuration:TransportConfig::default(),
41			Connected:Arc::new(RwLock::new(false)),
42			Statistics:Arc::new(RwLock::new(TransportStats::default())),
43		})
44	}
45
46	/// Creates a new gRPC transport with custom configuration.
47	pub fn WithConfiguration(Address:&str, Configuration:TransportConfig) -> anyhow::Result<Self> {
48		Ok(Self {
49			Endpoint:Address.to_string(),
50			Channel:Arc::new(RwLock::new(None)),
51			Configuration,
52			Connected:Arc::new(RwLock::new(false)),
53			Statistics:Arc::new(RwLock::new(TransportStats::default())),
54		})
55	}
56
57	/// Returns the connection endpoint address.
58	pub fn Address(&self) -> &str { &self.Endpoint }
59
60	/// Returns the active gRPC channel.
61	pub async fn GetChannel(&self) -> anyhow::Result<Channel> {
62		self.Channel
63			.read()
64			.await
65			.as_ref()
66			.cloned()
67			.ok_or_else(|| anyhow::anyhow!("gRPC channel not connected"))
68	}
69
70	/// Returns a snapshot of transport statistics.
71	pub async fn Statistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
72
73	/// Builds an endpoint from the address string.
74	fn BuildEndpoint(&self) -> anyhow::Result<Endpoint> {
75		let EndpointValue = Endpoint::from_shared(self.Endpoint.clone())?
76			.timeout(self.Configuration.ConnectionTimeout)
77			.connect_timeout(self.Configuration.ConnectionTimeout)
78			.tcp_keepalive(Some(self.Configuration.KeepaliveInterval));
79		Ok(EndpointValue)
80	}
81}
82
83#[async_trait]
84impl TransportStrategy for gRPCTransport {
85	type Error = gRPCTransportError;
86
87	#[instrument(skip(self))]
88	async fn connect(&self) -> Result<(), Self::Error> {
89		info!("Connecting to gRPC endpoint: {}", self.Endpoint);
90
91		let EndpointValue = self
92			.BuildEndpoint()
93			.map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
94
95		let ChannelValue = EndpointValue
96			.connect()
97			.await
98			.map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
99
100		*self.Channel.write().await = Some(ChannelValue);
101		*self.Connected.write().await = true;
102
103		info!("gRPC connection established: {}", self.Endpoint);
104		Ok(())
105	}
106
107	#[instrument(skip(self, request))]
108	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
109		let Start = std::time::Instant::now();
110
111		if !self.is_connected() {
112			return Err(gRPCTransportError::NotConnected);
113		}
114
115		debug!("Sending gRPC request ({} bytes)", request.len());
116
117		let Response:Vec<u8> = vec![];
118		let LatencyMicroseconds = Start.elapsed().as_micros() as u64;
119
120		let mut Stats = self.Statistics.write().await;
121		Stats.record_sent(request.len() as u64, LatencyMicroseconds);
122		Stats.record_received(Response.len() as u64);
123
124		debug!("gRPC request completed in {}µs", LatencyMicroseconds);
125		Ok(Response)
126	}
127
128	#[instrument(skip(self, data))]
129	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
130		if !self.is_connected() {
131			return Err(gRPCTransportError::NotConnected);
132		}
133
134		debug!("Sending gRPC notification ({} bytes)", data.len());
135
136		let mut Stats = self.Statistics.write().await;
137		Stats.record_sent(data.len() as u64, 0);
138		Ok(())
139	}
140
141	#[instrument(skip(self))]
142	async fn close(&self) -> Result<(), Self::Error> {
143		info!("Closing gRPC connection: {}", self.Endpoint);
144		*self.Channel.write().await = None;
145		*self.Connected.write().await = false;
146		info!("gRPC connection closed: {}", self.Endpoint);
147		Ok(())
148	}
149
150	fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
151
152	fn transport_type(&self) -> TransportType { TransportType::gRPC }
153}
154
155/// gRPC transport error variants.
156#[derive(Debug, thiserror::Error)]
157pub enum gRPCTransportError {
158	/// Failed to establish connection to gRPC server
159	#[error("Connection failed: {0}")]
160	ConnectionFailed(String),
161	/// Failed to send message to gRPC server
162	#[error("Send failed: {0}")]
163	SendFailed(String),
164	/// Failed to receive message from gRPC server
165	#[error("Receive failed: {0}")]
166	ReceiveFailed(String),
167	/// Transport is not connected
168	#[error("Not connected")]
169	NotConnected,
170	/// Operation timed out
171	#[error("Timeout")]
172	Timeout,
173	/// Generic gRPC error
174	#[error("gRPC error: {0}")]
175	Error(String),
176}
177
178impl From<tonic::transport::Error> for gRPCTransportError {
179	fn from(Error:tonic::transport::Error) -> Self { gRPCTransportError::ConnectionFailed(Error.to_string()) }
180}
181
182impl From<tonic::Status> for gRPCTransportError {
183	fn from(Status:tonic::Status) -> Self { gRPCTransportError::Error(Status.to_string()) }
184}
185
186#[cfg(test)]
187mod tests {
188	use super::*;
189
190	#[test]
191	fn TestgRPCTransportCreation() {
192		let Result = gRPCTransport::New("127.0.0.1:50050");
193		assert!(Result.is_ok());
194		let Transport = Result.unwrap();
195		assert_eq!(Transport.Address(), "127.0.0.1:50050");
196	}
197
198	#[tokio::test]
199	async fn TestgRPCTransportNotConnected() {
200		let Transport = gRPCTransport::New("127.0.0.1:50050").unwrap();
201		assert!(!Transport.is_connected());
202	}
203}