Vine/Client/
SendRequest.rs1use std::time::Duration;
15
16use serde_json::{Value, from_slice, to_vec};
17use tokio::time::timeout;
18
19use crate::{
20 Client::{
21 IsShuttingDown,
22 Shared::{
23 DEFAULT_TIMEOUT_MS,
24 RecordSideCarFailure,
25 SIDECAR_CLIENTS,
26 UpdateSideCarActivity,
27 ValidateMessageSize,
28 },
29 },
30 Error::VineError,
31 Generated::GenericRequest,
32 dev_log,
33};
34
35pub async fn Fn(
36 SideCarIdentifier:&str,
37
38 Method:String,
39
40 Parameters:Value,
41
42 TimeoutMilliseconds:u64,
43) -> Result<Value, VineError> {
44 if IsShuttingDown::Fn() {
45 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
46 }
47
48 if Method.is_empty() || Method.len() > 128 {
49 return Err(VineError::RPCError(
50 "Method name must be between 1 and 128 characters".to_string(),
51 ));
52 }
53
54 let TimeoutDuration =
55 Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
56
57 #[cfg(feature = "multiplexer")]
58 {
59 if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
60 if let Some(Mux) = crate::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
61 if !Mux.IsClosed() {
62 match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
63 Ok(Result_) => {
64 UpdateSideCarActivity(SideCarIdentifier);
65
66 return Ok(Result_);
67 },
68
69 Err(VineError::RequestTimeout { .. }) => {
70 return Err(VineError::RequestTimeout {
71 SideCarIdentifier:SideCarIdentifier.to_string(),
72 MethodName:Method,
73 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
74 });
75 },
76
77 Err(Error) => {
78 dev_log!(
79 "grpc",
80 "warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling \
81 back to unary",
82 SideCarIdentifier,
83 Method,
84 Error
85 );
86 },
87 }
88 }
89 }
90 }
91 }
92
93 let ParameterBytes =
94 to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
95
96 ValidateMessageSize(&ParameterBytes)?;
97
98 let Client = {
99 let Pool = SIDECAR_CLIENTS.lock();
100
101 Pool.get(SideCarIdentifier).cloned()
102 };
103
104 let Some(mut Client) = Client else {
105 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
106 };
107
108 use std::sync::atomic::{AtomicU64, Ordering as AO};
109
110 static REQ_SEQ:AtomicU64 = AtomicU64::new(1);
111
112 let RequestIdentifier = REQ_SEQ.fetch_add(1, AO::Relaxed);
113
114 let MethodForLog = Method.clone();
115
116 let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
117
118 let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
119
120 match Result_ {
121 Ok(Ok(Response)) => {
122 UpdateSideCarActivity(SideCarIdentifier);
123
124 dev_log!(
125 "grpc",
126 "[VineClient] Request sent successfully to sidecar '{}': method='{}'",
127 SideCarIdentifier,
128 MethodForLog
129 );
130
131 let InnerResponse = Response.into_inner();
132
133 let ResultBytes = InnerResponse.result;
134
135 let ResultValue:Value = from_slice(&ResultBytes)
136 .map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
137
138 if let Some(ErrorData) = InnerResponse.error {
139 return Err(VineError::RPCError(format!(
140 "RPC error from sidecar: code={}, message={}",
141 ErrorData.code, ErrorData.message
142 )));
143 }
144
145 Ok(ResultValue)
146 },
147
148 Ok(Err(Status)) => {
149 RecordSideCarFailure(SideCarIdentifier);
150
151 Err(VineError::RPCError(format!("gRPC error: {}", Status)))
152 },
153
154 Err(_) => {
155 RecordSideCarFailure(SideCarIdentifier);
156
157 Err(VineError::RequestTimeout {
158 SideCarIdentifier:SideCarIdentifier.to_string(),
159 MethodName:MethodForLog,
160 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
161 })
162 },
163 }
164}