Skip to main content

Mountain/Vine/Client/
SendRequest.rs

1#![allow(non_snake_case)]
2
3//! Send a request and await a response. Validates method-name length
4//! and message size, prefers the streaming multiplexer when
5//! `LAND_VINE_STREAMING=1` is on (falls through to unary on any failure
6//! except the authoritative streaming-path timeout), enforces a per-call
7//! timeout via `tokio::time::timeout`, and updates per-connection
8//! activity / failure metadata on completion.
9
10use std::time::Duration;
11
12use serde_json::{Value, from_slice, to_vec};
13use tokio::time::timeout;
14
15use crate::{
16	Vine::{
17		Client::{
18			IsShuttingDown,
19			Shared::{
20				DEFAULT_TIMEOUT_MS,
21				RecordSideCarFailure,
22				SIDECAR_CLIENTS,
23				UpdateSideCarActivity,
24				ValidateMessageSize,
25			},
26		},
27		Error::VineError,
28		Generated::GenericRequest,
29	},
30	dev_log,
31};
32
33pub async fn Fn(
34	SideCarIdentifier:&str,
35	Method:String,
36	Parameters:Value,
37	TimeoutMilliseconds:u64,
38) -> Result<Value, VineError> {
39	if IsShuttingDown::Fn() {
40		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
41	}
42	if Method.is_empty() || Method.len() > 128 {
43		return Err(VineError::RPCError(
44			"Method name must be between 1 and 128 characters".to_string(),
45		));
46	}
47
48	let TimeoutDuration =
49		Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
50
51	if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
52		if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
53			if !Mux.IsClosed() {
54				match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
55					Ok(Result_) => {
56						UpdateSideCarActivity(SideCarIdentifier);
57						return Ok(Result_);
58					},
59					Err(VineError::RequestTimeout { .. }) => {
60						return Err(VineError::RequestTimeout {
61							SideCarIdentifier:SideCarIdentifier.to_string(),
62							MethodName:Method,
63							TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
64						});
65					},
66					Err(Error) => {
67						dev_log!(
68							"grpc",
69							"warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling back to \
70							 unary",
71							SideCarIdentifier,
72							Method,
73							Error
74						);
75					},
76				}
77			}
78		}
79	}
80
81	let ParameterBytes =
82		to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
83	ValidateMessageSize(&ParameterBytes)?;
84
85	let Client = {
86		let Pool = SIDECAR_CLIENTS.lock();
87		Pool.get(SideCarIdentifier).cloned()
88	};
89
90	let Some(mut Client) = Client else {
91		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
92	};
93
94	let RequestIdentifier = std::time::SystemTime::now()
95		.duration_since(std::time::UNIX_EPOCH)
96		.unwrap()
97		.as_nanos() as u64;
98	let MethodForLog = Method.clone();
99	let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
100
101	let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
102
103	match Result_ {
104		Ok(Ok(Response)) => {
105			UpdateSideCarActivity(SideCarIdentifier);
106			dev_log!(
107				"grpc",
108				"[VineClient] Request sent successfully to sidecar '{}': method='{}'",
109				SideCarIdentifier,
110				MethodForLog
111			);
112
113			let InnerResponse = Response.into_inner();
114			let ResultBytes = InnerResponse.result;
115			let ResultValue:Value = from_slice(&ResultBytes)
116				.map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
117
118			if let Some(ErrorData) = InnerResponse.error {
119				return Err(VineError::RPCError(format!(
120					"RPC error from sidecar: code={}, message={}",
121					ErrorData.code, ErrorData.message
122				)));
123			}
124
125			Ok(ResultValue)
126		},
127		Ok(Err(Status)) => {
128			RecordSideCarFailure(SideCarIdentifier);
129			Err(VineError::RPCError(format!("gRPC error: {}", Status)))
130		},
131		Err(_) => {
132			RecordSideCarFailure(SideCarIdentifier);
133			Err(VineError::RequestTimeout {
134				SideCarIdentifier:SideCarIdentifier.to_string(),
135				MethodName:MethodForLog,
136				TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
137			})
138		},
139	}
140}