Grove/Protocol/
SpineConnection.rs1use std::sync::Arc;
38
39use anyhow::Result;
40use tokio::sync::RwLock;
41use tracing::{debug, info, instrument, warn};
42
43use crate::Protocol::ProtocolConfig;
44#[cfg(feature = "grove_echo")]
45use crate::vine::generated::vine::{
46 EchoAction,
47 EchoActionResponse,
48 echo_action_service_client::EchoActionServiceClient,
49};
50
51#[derive(Debug, Clone, Copy, PartialEq)]
53pub enum ConnectionState {
54 Disconnected,
56 Connecting,
58 Connected,
60 Error,
62}
63
64#[derive(Clone, Debug)]
66pub struct HeartbeatConfig {
67 pub interval_seconds:u64,
69 pub max_missed:u32,
71 pub enabled:bool,
73}
74
75impl Default for HeartbeatConfig {
77 fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
78}
79
80#[derive(Clone, Debug, Default)]
82pub struct ConnectionMetrics {
83 pub total_requests:u64,
85 pub successful_requests:u64,
87 pub failed_requests:u64,
89 pub uptime_seconds:u64,
91 pub reconnections:u64,
93}
94
95pub struct SpineConnectionImpl {
97 config:Arc<RwLock<ProtocolConfig>>,
99 state:Arc<RwLock<ConnectionState>>,
101
102 #[cfg(feature = "grove_echo")]
103 echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
105
106 heartbeat_config:HeartbeatConfig,
108 last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
110 metrics:Arc<RwLock<ConnectionMetrics>>,
112}
113
114impl SpineConnectionImpl {
115 #[instrument(skip(config))]
125 pub fn new(config:ProtocolConfig) -> Self {
126 Self {
127 config:Arc::new(RwLock::new(config)),
128 state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
129
130 #[cfg(feature = "grove_echo")]
131 echo_client:None,
132
133 heartbeat_config:HeartbeatConfig::default(),
134 last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
135 metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
136 }
137 }
138
139 #[instrument(skip(self))]
141 pub async fn Connect(&mut self) -> Result<()> {
142 let guard = self.config.read().await;
143 let url = guard.mountain_endpoint.clone();
144 drop(guard);
145
146 info!("Connecting to Spine at: {}", url);
147 *self.state.write().await = ConnectionState::Connecting;
148 *self.state.write().await = ConnectionState::Connected;
149 *self.last_heartbeat.write().await = chrono::Utc::now();
150 info!("Successfully connected to Spine");
151 Ok(())
152 }
153
154 #[instrument(skip(self))]
156 pub async fn Disconnect(&mut self) -> Result<()> {
157 info!("Disconnecting from Spine");
158
159 #[cfg(feature = "grove_echo")]
160 {
161 self.echo_client = None;
162 }
163
164 *self.state.write().await = ConnectionState::Disconnected;
165 info!("Successfully disconnected from Spine");
166 Ok(())
167 }
168
169 pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
171
172 #[instrument(skip(self, _payload))]
179 pub async fn SendRequest(&self, method:&str, _payload:Vec<u8>) -> Result<Vec<u8>> {
180 if self.GetState().await != ConnectionState::Connected {
181 return Err(anyhow::anyhow!("Not connected to Spine"));
182 }
183
184 debug!("Sending request: {}", method);
185
186 let mut metrics = self.metrics.write().await;
187 metrics.total_requests += 1;
188 metrics.successful_requests += 1;
189 Ok(Vec::new())
190 }
191
192 pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
194
195 pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
197}
198
199#[cfg(feature = "grove_echo")]
200impl SpineConnectionImpl {
201 #[instrument(skip(self))]
202 pub async fn ConnectEchoClient(&mut self) -> Result<()> {
203 let guard = self.config.read().await;
204 let url = guard.mountain_endpoint.clone();
205 drop(guard);
206
207 info!("Connecting EchoAction client to: {}", url);
208
209 let channel = tonic::transport::Channel::from_shared(url)
210 .context("Invalid Mountain URL")?
211 .connect()
212 .await
213 .context("Failed to connect EchoAction client")?;
214
215 self.echo_client = Some(EchoActionServiceClient::new(channel));
216 info!("EchoAction client connected");
217 Ok(())
218 }
219
220 #[instrument(skip(self, action))]
221 pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
222 if self.GetState().await != ConnectionState::Connected {
223 return Err(anyhow::anyhow!("Not connected to Spine"));
224 }
225
226 let client = self
227 .echo_client
228 .as_ref()
229 .ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
230
231 let response = client
232 .send_echo_action(action)
233 .await
234 .context("Failed to send EchoAction")?
235 .into_inner();
236
237 if !response.success {
238 anyhow::bail!("EchoAction failed: {}", response.error);
239 }
240
241 Ok(response)
242 }
243
244 pub async fn SendRpcViaEcho(
245 &self,
246 method:&str,
247 payload:Vec<u8>,
248 metadata:HashMap<String, String>,
249 ) -> Result<Vec<u8>> {
250 let mut headers = metadata;
251 headers.insert("rpc_method".to_string(), method.to_string());
252
253 let action = EchoAction {
254 action_id:uuid::Uuid::new_v4().to_string(),
255 source:"grove".to_string(),
256 target:"mountain".to_string(),
257 action_type:"rpc".to_string(),
258 payload,
259 headers,
260 timestamp:chrono::Utc::now().timestamp(),
261 nested_actions:vec![],
262 };
263
264 let response = self.SendEchoAction(action).await?;
265 Ok(response.result)
266 }
267
268 pub async fn SendEventViaEcho(
269 &self,
270 event_name:&str,
271 payload:Vec<u8>,
272 metadata:HashMap<String, String>,
273 ) -> Result<()> {
274 let mut headers = metadata;
275 headers.insert("event_name".to_string(), event_name.to_string());
276
277 let action = EchoAction {
278 action_id:uuid::Uuid::new_v4().to_string(),
279 source:"grove".to_string(),
280 target:"mountain".to_string(),
281 action_type:"event".to_string(),
282 payload,
283 headers,
284 timestamp:chrono::Utc::now().timestamp(),
285 nested_actions:vec![],
286 };
287
288 self.SendEchoAction(action).await?;
289 Ok(())
290 }
291
292 pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn test_connection_state() {
301 let state = ConnectionState::Connected;
302 assert_eq!(state, ConnectionState::Connected);
303 }
304
305 #[test]
306 fn test_heartbeat_config_default() {
307 let config = HeartbeatConfig::default();
308 assert_eq!(config.interval_seconds, 30);
309 assert!(config.enabled);
310 }
311
312 #[tokio::test]
313 async fn test_spine_connection_creation() {
314 let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
315 let connection = SpineConnectionImpl::new(config);
316 assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
317 }
318}