Skip to main content

Mountain/Vine/
Multiplexer.rs

1#![allow(non_snake_case)]
2
3//! Bidirectional streaming multiplexer for the Vine gRPC bus.
4//!
5//! Owns one bidirectional h2 stream per sidecar. Inbound notifications
6//! fan out to the process-wide broadcast
7//! (`Vine::Client::SubscribeNotifications`); inbound responses route to the
8//! matching pending-request `oneshot` sender. Inbound reverse-RPC requests and
9//! cancellations are TODO for a follow-up phase.
10//!
11//! This is the P14.1 foundation of Patch 14 - it lands the open(),
12//! Notify(), Request(), and ReadPump skeleton so subsequent phases can
13//! wire `SendNotification` / `SendRequest` to consult the multiplexer
14//! when `LAND_VINE_STREAMING=1` is set.
15
16use std::{
17	collections::HashMap,
18	sync::{
19		Arc,
20		atomic::{AtomicBool, AtomicU64, Ordering},
21	},
22	time::Duration,
23};
24
25use dashmap::DashMap;
26use lazy_static::lazy_static;
27use parking_lot::Mutex;
28use serde_json::Value;
29use tokio::sync::{mpsc, oneshot};
30use tokio_stream::wrappers::ReceiverStream;
31use tonic::Streaming;
32
33use super::{
34	Error::VineError,
35	Generated::{
36		CancelOperationRequest,
37		Envelope,
38		GenericNotification,
39		GenericRequest,
40		GenericResponse,
41		RpcError,
42		cocoon_service_client::CocoonServiceClient,
43		envelope::Payload,
44	},
45};
46use crate::dev_log;
47
48/// Outbound queue capacity per multiplexer. Bounded so a stalled
49/// sidecar applies backpressure to the producer side instead of
50/// burning unbounded heap.
51const SINK_CAPACITY:usize = 1024;
52
53/// One multiplexer per sidecar connection. Holds the outbound sink,
54/// the pending-request correlation map, and a shared-state shutdown
55/// flag.
56pub struct Multiplexer {
57	SideCarIdentifier:String,
58	Sink:mpsc::Sender<Envelope>,
59	Pending:Arc<DashMap<u64, oneshot::Sender<GenericResponse>>>,
60	NextRequestIdentifier:AtomicU64,
61	Closed:AtomicBool,
62}
63
64lazy_static! {
65	/// Process-wide registry, one entry per sidecar identifier.
66	/// Lookup site for `SendNotification` / `SendRequest` to consult
67	/// when `LAND_VINE_STREAMING=1`.
68	static ref MULTIPLEXERS:Arc<Mutex<HashMap<String, Arc<Multiplexer>>>> = Arc::new(Mutex::new(HashMap::new()));
69}
70
71impl Multiplexer {
72	/// Open a bidirectional streaming channel against an existing
73	/// `CocoonServiceClient`. Spawns the read pump as a detached
74	/// tokio task and registers the multiplexer in the global
75	/// registry. Returns once the stream is established.
76	pub async fn Open(
77		SideCarIdentifier:String,
78		mut Client:CocoonServiceClient<tonic::transport::Channel>,
79	) -> Result<Arc<Self>, VineError> {
80		let (Sink, OutboundReceiver) = mpsc::channel::<Envelope>(SINK_CAPACITY);
81		let OutboundStream = ReceiverStream::new(OutboundReceiver);
82
83		let Response = Client
84			.open_channel_from_mountain(OutboundStream)
85			.await
86			.map_err(|S| VineError::RPCError(format!("OpenChannelFromMountain failed: {}", S)))?;
87
88		let InboundStream:Streaming<Envelope> = Response.into_inner();
89
90		let SelfReference = Arc::new(Self {
91			SideCarIdentifier:SideCarIdentifier.clone(),
92			Sink,
93			Pending:Arc::new(DashMap::new()),
94			NextRequestIdentifier:AtomicU64::new(1),
95			Closed:AtomicBool::new(false),
96		});
97
98		// Spawn the read pump.
99		let SelfForReadPump = SelfReference.clone();
100		tokio::spawn(async move {
101			ReadPump(InboundStream, SelfForReadPump).await;
102		});
103
104		// Register globally so consumers can look us up.
105		MULTIPLEXERS.lock().insert(SideCarIdentifier, SelfReference.clone());
106
107		Ok(SelfReference)
108	}
109
110	/// Look up the multiplexer for a sidecar. Returns `None` if no
111	/// streaming connection has been opened for that sidecar (the
112	/// caller should fall back to the unary path).
113	pub fn Lookup(SideCarIdentifier:&str) -> Option<Arc<Self>> { MULTIPLEXERS.lock().get(SideCarIdentifier).cloned() }
114
115	/// Drop the registry entry. Called by the read-pump when the
116	/// stream closes.
117	pub fn Deregister(SideCarIdentifier:&str) { MULTIPLEXERS.lock().remove(SideCarIdentifier); }
118
119	/// Send a notification frame (fire-and-forget). Non-blocking
120	/// modulo Sink backpressure (capacity `SINK_CAPACITY`).
121	pub async fn Notify(&self, Method:String, Parameters:Value) -> Result<(), VineError> {
122		if self.Closed.load(Ordering::Relaxed) {
123			return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
124		}
125		let Bytes = serde_json::to_vec(&Parameters)?;
126		let Frame = Envelope {
127			payload:Some(Payload::Notification(GenericNotification { method:Method, parameter:Bytes })),
128		};
129		self.Sink
130			.send(Frame)
131			.await
132			.map_err(|_| VineError::RPCError(format!("Sink closed for sidecar {}", self.SideCarIdentifier)))
133	}
134
135	/// Send a request and await the matching response. Cancels the
136	/// pending entry on timeout. The future is `Send + 'static`-clean
137	/// so callers can drive it inside `tokio::select!` for finer-
138	/// grained cancellation.
139	pub async fn Request(&self, Method:String, Parameters:Value, Timeout:Duration) -> Result<Value, VineError> {
140		if self.Closed.load(Ordering::Relaxed) {
141			return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
142		}
143		let Identifier = self.NextRequestIdentifier.fetch_add(1, Ordering::Relaxed);
144		let (Tx, Rx) = oneshot::channel();
145		self.Pending.insert(Identifier, Tx);
146
147		let Bytes = serde_json::to_vec(&Parameters)?;
148		let MethodForError = Method.clone();
149		let Frame = Envelope {
150			payload:Some(Payload::Request(GenericRequest {
151				request_identifier:Identifier,
152				method:Method,
153				parameter:Bytes,
154			})),
155		};
156
157		if self.Sink.send(Frame).await.is_err() {
158			self.Pending.remove(&Identifier);
159			return Err(VineError::RPCError(format!(
160				"Sink closed for sidecar {}",
161				self.SideCarIdentifier
162			)));
163		}
164
165		match tokio::time::timeout(Timeout, Rx).await {
166			Ok(Ok(Response)) => {
167				if let Some(Error) = Response.error {
168					return Err(VineError::RPCError(format!("code={} message={}", Error.code, Error.message)));
169				}
170				if Response.result.is_empty() {
171					return Ok(Value::Null);
172				}
173				serde_json::from_slice::<Value>(&Response.result).map_err(|E| VineError::SerializationError(E))
174			},
175			Ok(Err(_)) => {
176				self.Pending.remove(&Identifier);
177				Err(VineError::RPCError(
178					"response sender closed (peer disconnect mid-request)".into(),
179				))
180			},
181			Err(_) => {
182				self.Pending.remove(&Identifier);
183				Err(VineError::RequestTimeout {
184					SideCarIdentifier:self.SideCarIdentifier.clone(),
185					MethodName:MethodForError,
186					TimeoutMilliseconds:Timeout.as_millis() as u64,
187				})
188			},
189		}
190	}
191
192	/// Send a Cancel frame asking the peer to abort an in-flight
193	/// request matching `RequestIdentifier`. Best-effort; the peer
194	/// chooses whether to honour it.
195	pub async fn Cancel(&self, RequestIdentifier:u64) -> Result<(), VineError> {
196		if self.Closed.load(Ordering::Relaxed) {
197			return Ok(());
198		}
199		let Frame = Envelope {
200			payload:Some(Payload::Cancel(CancelOperationRequest {
201				request_identifier_to_cancel:RequestIdentifier,
202			})),
203		};
204		let _ = self.Sink.send(Frame).await;
205		Ok(())
206	}
207
208	pub fn IsClosed(&self) -> bool { self.Closed.load(Ordering::Relaxed) }
209
210	pub fn SideCarIdentifierBorrow(&self) -> &str { &self.SideCarIdentifier }
211}
212
213/// Drain the inbound side of the bidirectional stream. Notifications
214/// fan out to the process-wide broadcast; responses wake the parked
215/// `Request` future. Reverse-RPC requests and cancellations are
216/// recorded for a follow-up phase.
217async fn ReadPump(mut Stream:Streaming<Envelope>, State:Arc<Multiplexer>) {
218	use futures_util::StreamExt;
219
220	while let Some(FrameResult) = Stream.next().await {
221		let Frame = match FrameResult {
222			Ok(F) => F,
223			Err(Status) => {
224				dev_log!(
225					"grpc",
226					"[Vine::Multiplexer] read err on {}: {}",
227					State.SideCarIdentifier,
228					Status
229				);
230				break;
231			},
232		};
233		let Payload = match Frame.payload {
234			Some(P) => P,
235			None => continue,
236		};
237
238		match Payload {
239			Payload::Notification(N) => {
240				let Parameters:Value = if N.parameter.is_empty() {
241					Value::Null
242				} else {
243					serde_json::from_slice(&N.parameter).unwrap_or(Value::Null)
244				};
245				super::Client::PublishNotificationFromMux::Fn(&State.SideCarIdentifier, &N.method, &Parameters);
246			},
247			Payload::Response(R) => {
248				let Identifier = R.request_identifier;
249				if let Some((_, Sender)) = State.Pending.remove(&Identifier) {
250					let _ = Sender.send(R);
251				}
252				// A Response with no matching pending entry is a
253				// duplicate or post-cancel arrival; drop silently.
254			},
255			Payload::Request(_) => {
256				// TODO P14.1.1: dispatch the inbound (reverse-RPC)
257				// request to the same handler tree the unary path
258				// uses, then enqueue the GenericResponse onto Sink.
259				// For now we drop, which is safe: the unary path is
260				// still authoritative until phase P14.4 lands the
261				// streaming handler tree on Cocoon side.
262			},
263			Payload::Cancel(_) => {
264				// TODO P14.1.2: signal abort for the in-flight
265				// handler. For now no-op (the unary path doesn't
266				// support cancel either).
267			},
268		}
269	}
270
271	State.Closed.store(true, Ordering::Relaxed);
272
273	// Drain pending senders with disconnect errors so awaiting
274	// fibers don't hang forever.
275	let Keys:Vec<u64> = State.Pending.iter().map(|R| *R.key()).collect();
276	for Key in Keys {
277		if let Some((_, Sender)) = State.Pending.remove(&Key) {
278			let _ = Sender.send(GenericResponse {
279				request_identifier:Key,
280				result:Vec::new(),
281				error:Some(RpcError { code:-32099, message:"stream closed".into(), data:Vec::new() }),
282			});
283		}
284	}
285
286	Multiplexer::Deregister(&State.SideCarIdentifier);
287	dev_log!("grpc", "[Vine::Multiplexer] closed sidecar={}", State.SideCarIdentifier);
288}