Mountain/Vine/
Client.rs

1//! # Vine Client
2//!
3//! Provides a simplified, thread-safe client for communicating with a `Cocoon`
4//! sidecar process via gRPC. It manages a shared pool of connections.
5
6use std::{
7	collections::{HashMap, hash_map::DefaultHasher},
8	hash::{Hash, Hasher},
9	sync::Arc,
10	time::Duration,
11};
12
13use lazy_static::lazy_static;
14use log::{debug, error, info};
15use parking_lot::Mutex;
16use serde_json::{Value, from_slice, to_vec};
17use tokio::time::timeout;
18use tonic::transport::Channel;
19
20use super::{
21	Error::VineError,
22	Generated::{GenericNotification, GenericRequest, cocoon_service_client::CocoonServiceClient},
23};
24
25type CocoonClient = CocoonServiceClient<Channel>;
26
27lazy_static! {
28	static ref SIDECAR_CLIENTS: Arc<Mutex<HashMap<String, CocoonClient>>> = Arc::new(Mutex::new(HashMap::new()));
29}
30
31/// Establishes a gRPC connection to a sidecar process.
32pub async fn ConnectToSideCar(SideCarIdentifier:String, Address:String) -> Result<(), VineError> {
33	info!("[VineClient] Connecting to sidecar '{}' at '{}'...", SideCarIdentifier, Address);
34
35	let endpoint = format!("http://{}", Address);
36
37	let channel = Channel::from_shared(endpoint)?.connect().await?;
38
39	let client = CocoonServiceClient::new(channel);
40
41	SIDECAR_CLIENTS.lock().insert(SideCarIdentifier.clone(), client);
42
43	info!("[VineClient] Successfully connected to sidecar '{}'.", SideCarIdentifier);
44
45	Ok(())
46}
47
48/// Sends a fire-and-forget notification to a sidecar.
49pub async fn SendNotification(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
50	let mut client = {
51		let guard = SIDECAR_CLIENTS.lock();
52
53		guard.get(&SideCarIdentifier).cloned()
54	};
55
56	if let Some(ref mut client) = client {
57		let request = GenericNotification { method:Method, parameter:to_vec(&Parameters)? };
58
59		client.send_mountain_notification(request).await?;
60
61		Ok(())
62	} else {
63		Err(VineError::ClientNotConnected(SideCarIdentifier))
64	}
65}
66
67/// Sends a request to a sidecar and awaits a response.
68pub async fn SendRequest(
69	SideCarIdentifier:&str,
70
71	Method:String,
72
73	Parameters:Value,
74
75	TimeoutMilliseconds:u64,
76) -> Result<Value, VineError> {
77	debug!(
78		"[VineClient] Sending request '{}' to sidecar '{}'...",
79		Method, SideCarIdentifier
80	);
81
82	let mut client = {
83		let guard = SIDECAR_CLIENTS.lock();
84
85		guard.get(SideCarIdentifier).cloned()
86	};
87
88	if let Some(ref mut client) = client {
89		let mut hasher = DefaultHasher::new();
90
91		uuid::Uuid::new_v4().hash(&mut hasher);
92
93		let RequestIdentifier = hasher.finish();
94
95		let request = GenericRequest {
96			request_identifier:RequestIdentifier,
97
98			method:Method.clone(),
99
100			parameter:to_vec(&Parameters)?,
101		};
102
103		let future = client.process_mountain_request(request);
104
105		match timeout(Duration::from_millis(TimeoutMilliseconds), future).await {
106			Ok(Ok(response)) => {
107				let response_data = response.into_inner();
108
109				if let Some(rpc_error) = response_data.error {
110					error!(
111						"[VineClient] Received RPC error from sidecar '{}': {}",
112						SideCarIdentifier, rpc_error.message
113					);
114
115					Err(VineError::RPCError(rpc_error.message))
116				} else {
117					let deserialized_value = from_slice(&response_data.result)?;
118
119					Ok(deserialized_value)
120				}
121			},
122
123			Ok(Err(status)) => {
124				error!(
125					"[VineClient] gRPC status error from sidecar '{}': {}",
126					SideCarIdentifier, status
127				);
128
129				Err(VineError::from(status))
130			},
131
132			Err(_) => {
133				error!("[VineClient] Request to sidecar '{}' timed out.", SideCarIdentifier);
134
135				Err(VineError::RequestTimeout {
136					SideCarIdentifier:SideCarIdentifier.to_string(),
137
138					MethodName:Method,
139
140					TimeoutMilliseconds,
141				})
142			},
143		}
144	} else {
145		Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()))
146	}
147}