1#![allow(non_snake_case)]
2
3use std::{
17 collections::HashMap,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU64, Ordering},
21 },
22 time::Duration,
23};
24
25use dashmap::DashMap;
26use lazy_static::lazy_static;
27use parking_lot::Mutex;
28use serde_json::Value;
29use tokio::sync::{mpsc, oneshot};
30use tokio_stream::wrappers::ReceiverStream;
31use tonic::Streaming;
32
33use super::{
34 Error::VineError,
35 Generated::{
36 CancelOperationRequest,
37 Envelope,
38 GenericNotification,
39 GenericRequest,
40 GenericResponse,
41 RpcError,
42 cocoon_service_client::CocoonServiceClient,
43 envelope::Payload,
44 },
45};
46use crate::dev_log;
47
48const SINK_CAPACITY:usize = 1024;
52
53pub struct Multiplexer {
57 SideCarIdentifier:String,
58 Sink:mpsc::Sender<Envelope>,
59 Pending:Arc<DashMap<u64, oneshot::Sender<GenericResponse>>>,
60 NextRequestIdentifier:AtomicU64,
61 Closed:AtomicBool,
62}
63
64lazy_static! {
65 static ref MULTIPLEXERS:Arc<Mutex<HashMap<String, Arc<Multiplexer>>>> = Arc::new(Mutex::new(HashMap::new()));
69}
70
71impl Multiplexer {
72 pub async fn Open(
77 SideCarIdentifier:String,
78 mut Client:CocoonServiceClient<tonic::transport::Channel>,
79 ) -> Result<Arc<Self>, VineError> {
80 let (Sink, OutboundReceiver) = mpsc::channel::<Envelope>(SINK_CAPACITY);
81 let OutboundStream = ReceiverStream::new(OutboundReceiver);
82
83 let Response = Client
84 .open_channel_from_mountain(OutboundStream)
85 .await
86 .map_err(|S| VineError::RPCError(format!("OpenChannelFromMountain failed: {}", S)))?;
87
88 let InboundStream:Streaming<Envelope> = Response.into_inner();
89
90 let SelfReference = Arc::new(Self {
91 SideCarIdentifier:SideCarIdentifier.clone(),
92 Sink,
93 Pending:Arc::new(DashMap::new()),
94 NextRequestIdentifier:AtomicU64::new(1),
95 Closed:AtomicBool::new(false),
96 });
97
98 let SelfForReadPump = SelfReference.clone();
100 tokio::spawn(async move {
101 ReadPump(InboundStream, SelfForReadPump).await;
102 });
103
104 MULTIPLEXERS.lock().insert(SideCarIdentifier, SelfReference.clone());
106
107 Ok(SelfReference)
108 }
109
110 pub fn Lookup(SideCarIdentifier:&str) -> Option<Arc<Self>> { MULTIPLEXERS.lock().get(SideCarIdentifier).cloned() }
114
115 pub fn Deregister(SideCarIdentifier:&str) { MULTIPLEXERS.lock().remove(SideCarIdentifier); }
118
119 pub async fn Notify(&self, Method:String, Parameters:Value) -> Result<(), VineError> {
122 if self.Closed.load(Ordering::Relaxed) {
123 return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
124 }
125 let Bytes = serde_json::to_vec(&Parameters)?;
126 let Frame = Envelope {
127 payload:Some(Payload::Notification(GenericNotification { method:Method, parameter:Bytes })),
128 };
129 self.Sink
130 .send(Frame)
131 .await
132 .map_err(|_| VineError::RPCError(format!("Sink closed for sidecar {}", self.SideCarIdentifier)))
133 }
134
135 pub async fn Request(&self, Method:String, Parameters:Value, Timeout:Duration) -> Result<Value, VineError> {
140 if self.Closed.load(Ordering::Relaxed) {
141 return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
142 }
143 let Identifier = self.NextRequestIdentifier.fetch_add(1, Ordering::Relaxed);
144 let (Tx, Rx) = oneshot::channel();
145 self.Pending.insert(Identifier, Tx);
146
147 let Bytes = serde_json::to_vec(&Parameters)?;
148 let MethodForError = Method.clone();
149 let Frame = Envelope {
150 payload:Some(Payload::Request(GenericRequest {
151 request_identifier:Identifier,
152 method:Method,
153 parameter:Bytes,
154 })),
155 };
156
157 if self.Sink.send(Frame).await.is_err() {
158 self.Pending.remove(&Identifier);
159 return Err(VineError::RPCError(format!(
160 "Sink closed for sidecar {}",
161 self.SideCarIdentifier
162 )));
163 }
164
165 match tokio::time::timeout(Timeout, Rx).await {
166 Ok(Ok(Response)) => {
167 if let Some(Error) = Response.error {
168 return Err(VineError::RPCError(format!("code={} message={}", Error.code, Error.message)));
169 }
170 if Response.result.is_empty() {
171 return Ok(Value::Null);
172 }
173 serde_json::from_slice::<Value>(&Response.result).map_err(|E| VineError::SerializationError(E))
174 },
175 Ok(Err(_)) => {
176 self.Pending.remove(&Identifier);
177 Err(VineError::RPCError(
178 "response sender closed (peer disconnect mid-request)".into(),
179 ))
180 },
181 Err(_) => {
182 self.Pending.remove(&Identifier);
183 Err(VineError::RequestTimeout {
184 SideCarIdentifier:self.SideCarIdentifier.clone(),
185 MethodName:MethodForError,
186 TimeoutMilliseconds:Timeout.as_millis() as u64,
187 })
188 },
189 }
190 }
191
192 pub async fn Cancel(&self, RequestIdentifier:u64) -> Result<(), VineError> {
196 if self.Closed.load(Ordering::Relaxed) {
197 return Ok(());
198 }
199 let Frame = Envelope {
200 payload:Some(Payload::Cancel(CancelOperationRequest {
201 request_identifier_to_cancel:RequestIdentifier,
202 })),
203 };
204 let _ = self.Sink.send(Frame).await;
205 Ok(())
206 }
207
208 pub fn IsClosed(&self) -> bool { self.Closed.load(Ordering::Relaxed) }
209
210 pub fn SideCarIdentifierBorrow(&self) -> &str { &self.SideCarIdentifier }
211}
212
213async fn ReadPump(mut Stream:Streaming<Envelope>, State:Arc<Multiplexer>) {
218 use futures_util::StreamExt;
219
220 while let Some(FrameResult) = Stream.next().await {
221 let Frame = match FrameResult {
222 Ok(F) => F,
223 Err(Status) => {
224 dev_log!(
225 "grpc",
226 "[Vine::Multiplexer] read err on {}: {}",
227 State.SideCarIdentifier,
228 Status
229 );
230 break;
231 },
232 };
233 let Payload = match Frame.payload {
234 Some(P) => P,
235 None => continue,
236 };
237
238 match Payload {
239 Payload::Notification(N) => {
240 let Parameters:Value = if N.parameter.is_empty() {
241 Value::Null
242 } else {
243 serde_json::from_slice(&N.parameter).unwrap_or(Value::Null)
244 };
245 super::Client::PublishNotificationFromMux::Fn(&State.SideCarIdentifier, &N.method, &Parameters);
246 },
247 Payload::Response(R) => {
248 let Identifier = R.request_identifier;
249 if let Some((_, Sender)) = State.Pending.remove(&Identifier) {
250 let _ = Sender.send(R);
251 }
252 },
255 Payload::Request(_) => {
256 },
263 Payload::Cancel(_) => {
264 },
268 }
269 }
270
271 State.Closed.store(true, Ordering::Relaxed);
272
273 let Keys:Vec<u64> = State.Pending.iter().map(|R| *R.key()).collect();
276 for Key in Keys {
277 if let Some((_, Sender)) = State.Pending.remove(&Key) {
278 let _ = Sender.send(GenericResponse {
279 request_identifier:Key,
280 result:Vec::new(),
281 error:Some(RpcError { code:-32099, message:"stream closed".into(), data:Vec::new() }),
282 });
283 }
284 }
285
286 Multiplexer::Deregister(&State.SideCarIdentifier);
287 dev_log!("grpc", "[Vine::Multiplexer] closed sidecar={}", State.SideCarIdentifier);
288}