Mountain/Vine/Client/
SendRequest.rs1#![allow(non_snake_case)]
2
3use std::time::Duration;
11
12use serde_json::{Value, from_slice, to_vec};
13use tokio::time::timeout;
14
15use crate::{
16 Vine::{
17 Client::{
18 IsShuttingDown,
19 Shared::{
20 DEFAULT_TIMEOUT_MS,
21 RecordSideCarFailure,
22 SIDECAR_CLIENTS,
23 UpdateSideCarActivity,
24 ValidateMessageSize,
25 },
26 },
27 Error::VineError,
28 Generated::GenericRequest,
29 },
30 dev_log,
31};
32
33pub async fn Fn(
34 SideCarIdentifier:&str,
35 Method:String,
36 Parameters:Value,
37 TimeoutMilliseconds:u64,
38) -> Result<Value, VineError> {
39 if IsShuttingDown::Fn() {
40 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
41 }
42 if Method.is_empty() || Method.len() > 128 {
43 return Err(VineError::RPCError(
44 "Method name must be between 1 and 128 characters".to_string(),
45 ));
46 }
47
48 let TimeoutDuration =
49 Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
50
51 if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
52 if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
53 if !Mux.IsClosed() {
54 match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
55 Ok(Result_) => {
56 UpdateSideCarActivity(SideCarIdentifier);
57 return Ok(Result_);
58 },
59 Err(VineError::RequestTimeout { .. }) => {
60 return Err(VineError::RequestTimeout {
61 SideCarIdentifier:SideCarIdentifier.to_string(),
62 MethodName:Method,
63 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
64 });
65 },
66 Err(Error) => {
67 dev_log!(
68 "grpc",
69 "warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling back to \
70 unary",
71 SideCarIdentifier,
72 Method,
73 Error
74 );
75 },
76 }
77 }
78 }
79 }
80
81 let ParameterBytes =
82 to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
83 ValidateMessageSize(&ParameterBytes)?;
84
85 let Client = {
86 let Pool = SIDECAR_CLIENTS.lock();
87 Pool.get(SideCarIdentifier).cloned()
88 };
89
90 let Some(mut Client) = Client else {
91 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
92 };
93
94 let RequestIdentifier = std::time::SystemTime::now()
95 .duration_since(std::time::UNIX_EPOCH)
96 .unwrap()
97 .as_nanos() as u64;
98 let MethodForLog = Method.clone();
99 let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
100
101 let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
102
103 match Result_ {
104 Ok(Ok(Response)) => {
105 UpdateSideCarActivity(SideCarIdentifier);
106 dev_log!(
107 "grpc",
108 "[VineClient] Request sent successfully to sidecar '{}': method='{}'",
109 SideCarIdentifier,
110 MethodForLog
111 );
112
113 let InnerResponse = Response.into_inner();
114 let ResultBytes = InnerResponse.result;
115 let ResultValue:Value = from_slice(&ResultBytes)
116 .map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
117
118 if let Some(ErrorData) = InnerResponse.error {
119 return Err(VineError::RPCError(format!(
120 "RPC error from sidecar: code={}, message={}",
121 ErrorData.code, ErrorData.message
122 )));
123 }
124
125 Ok(ResultValue)
126 },
127 Ok(Err(Status)) => {
128 RecordSideCarFailure(SideCarIdentifier);
129 Err(VineError::RPCError(format!("gRPC error: {}", Status)))
130 },
131 Err(_) => {
132 RecordSideCarFailure(SideCarIdentifier);
133 Err(VineError::RequestTimeout {
134 SideCarIdentifier:SideCarIdentifier.to_string(),
135 MethodName:MethodForLog,
136 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
137 })
138 },
139 }
140}