Skip to main content

Vine/Client/
SendRequest.rs

1//! Send a request and await a response. Validates method-name length and
2//! message size, prefers the streaming multiplexer when
3//! `LAND_VINE_STREAMING=1` is on and the `multiplexer` cargo feature is
4//! enabled (falls through to unary on any failure except the authoritative
5//! streaming-path timeout), enforces a per-call timeout via
6//! `tokio::time::timeout`, and updates per-connection activity / failure
7//! metadata on completion.
8//!
9//! The hard upper bound on per-call timeouts is
10//! [`crate::DefaultRequestTimeoutMs`] (15 000 ms); when the caller passes
11//! `0` for `TimeoutMilliseconds` the unary path falls back to
12//! [`Shared::DEFAULT_TIMEOUT_MS`] (5 000 ms).
13
14use std::time::Duration;
15
16use serde_json::{Value, from_slice, to_vec};
17use tokio::time::timeout;
18
19use crate::{
20	Client::{
21		IsShuttingDown,
22		Shared::{
23			DEFAULT_TIMEOUT_MS,
24			RecordSideCarFailure,
25			SIDECAR_CLIENTS,
26			UpdateSideCarActivity,
27			ValidateMessageSize,
28		},
29	},
30	Error::VineError,
31	Generated::GenericRequest,
32	dev_log,
33};
34
35pub async fn Fn(
36	SideCarIdentifier:&str,
37
38	Method:String,
39
40	Parameters:Value,
41
42	TimeoutMilliseconds:u64,
43) -> Result<Value, VineError> {
44	if IsShuttingDown::Fn() {
45		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
46	}
47
48	if Method.is_empty() || Method.len() > 128 {
49		return Err(VineError::RPCError(
50			"Method name must be between 1 and 128 characters".to_string(),
51		));
52	}
53
54	let TimeoutDuration =
55		Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
56
57	#[cfg(feature = "multiplexer")]
58	{
59		if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
60			if let Some(Mux) = crate::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
61				if !Mux.IsClosed() {
62					match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
63						Ok(Result_) => {
64							UpdateSideCarActivity(SideCarIdentifier);
65
66							return Ok(Result_);
67						},
68
69						Err(VineError::RequestTimeout { .. }) => {
70							return Err(VineError::RequestTimeout {
71								SideCarIdentifier:SideCarIdentifier.to_string(),
72								MethodName:Method,
73								TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
74							});
75						},
76
77						Err(Error) => {
78							dev_log!(
79								"grpc",
80								"warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling \
81								 back to unary",
82								SideCarIdentifier,
83								Method,
84								Error
85							);
86						},
87					}
88				}
89			}
90		}
91	}
92
93	let ParameterBytes =
94		to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
95
96	ValidateMessageSize(&ParameterBytes)?;
97
98	let Client = {
99		let Pool = SIDECAR_CLIENTS.lock();
100
101		Pool.get(SideCarIdentifier).cloned()
102	};
103
104	let Some(mut Client) = Client else {
105		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
106	};
107
108	use std::sync::atomic::{AtomicU64, Ordering as AO};
109
110	static REQ_SEQ:AtomicU64 = AtomicU64::new(1);
111
112	let RequestIdentifier = REQ_SEQ.fetch_add(1, AO::Relaxed);
113
114	let MethodForLog = Method.clone();
115
116	let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
117
118	let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
119
120	match Result_ {
121		Ok(Ok(Response)) => {
122			UpdateSideCarActivity(SideCarIdentifier);
123
124			dev_log!(
125				"grpc",
126				"[VineClient] Request sent successfully to sidecar '{}': method='{}'",
127				SideCarIdentifier,
128				MethodForLog
129			);
130
131			let InnerResponse = Response.into_inner();
132
133			let ResultBytes = InnerResponse.result;
134
135			let ResultValue:Value = from_slice(&ResultBytes)
136				.map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
137
138			if let Some(ErrorData) = InnerResponse.error {
139				return Err(VineError::RPCError(format!(
140					"RPC error from sidecar: code={}, message={}",
141					ErrorData.code, ErrorData.message
142				)));
143			}
144
145			Ok(ResultValue)
146		},
147
148		Ok(Err(Status)) => {
149			RecordSideCarFailure(SideCarIdentifier);
150
151			Err(VineError::RPCError(format!("gRPC error: {}", Status)))
152		},
153
154		Err(_) => {
155			RecordSideCarFailure(SideCarIdentifier);
156
157			Err(VineError::RequestTimeout {
158				SideCarIdentifier:SideCarIdentifier.to_string(),
159				MethodName:MethodForLog,
160				TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
161			})
162		},
163	}
164}