Skip to main content

Vine/Client/
SendNotification.rs

1//! Fire-and-forget notification to a sidecar. No response, no per-call
2//! timeout. Prefers the streaming multiplexer under
3//! `LAND_VINE_STREAMING=1` when the `multiplexer` cargo feature is
4//! enabled; falls through to unary on any failure. After a successful
5//! wire send, fans out via `PublishNotification::Fn` so broadcast
6//! subscribers (Effect-TS fibers, OTel emitters, future Mist-WS bridge,
7//! dev log) can observe the same flow concurrently.
8
9use serde_json::{Value, to_vec};
10
11use crate::{
12	Client::{
13		IsShuttingDown,
14		PublishNotification,
15		Shared::{RecordSideCarFailure, SIDECAR_CLIENTS, UpdateSideCarActivity, ValidateMessageSize},
16	},
17	Error::VineError,
18	Generated::GenericNotification,
19	dev_log,
20};
21
22pub async fn Fn(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
23	if IsShuttingDown::Fn() {
24		return Ok(());
25	}
26
27	if Method.is_empty() || Method.len() > 128 {
28		return Err(VineError::RPCError(
29			"Method name must be between 1 and 128 characters".to_string(),
30		));
31	}
32
33	#[cfg(feature = "multiplexer")]
34	{
35		if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
36			if let Some(Mux) = crate::Multiplexer::Multiplexer::Lookup(&SideCarIdentifier) {
37				if !Mux.IsClosed() {
38					let MethodForPublish = Method.clone();
39
40					let ParametersForPublish = Parameters.clone();
41
42					match Mux.Notify(Method.clone(), Parameters.clone()).await {
43						Ok(()) => {
44							UpdateSideCarActivity(&SideCarIdentifier);
45
46							PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &ParametersForPublish);
47
48							return Ok(());
49						},
50
51						Err(Error) => {
52							dev_log!(
53								"grpc",
54								"warn: [VineClient::SendNotification] streaming send failed for '{}' ({}); falling \
55								 back to unary",
56								SideCarIdentifier,
57								Error
58							);
59						},
60					}
61				}
62			}
63		}
64	}
65
66	let ParameterBytes = to_vec(&Parameters)?;
67
68	ValidateMessageSize(&ParameterBytes)?;
69
70	let mut Client = {
71		let Pool = SIDECAR_CLIENTS.lock();
72
73		Pool.get(&SideCarIdentifier).cloned()
74	};
75
76	if let Some(ref mut Client) = Client {
77		let MethodForPublish = Method.clone();
78
79		let Request = GenericNotification { method:Method, parameter:ParameterBytes };
80
81		match Client.send_mountain_notification(Request).await {
82			Ok(_) => {
83				UpdateSideCarActivity(&SideCarIdentifier);
84
85				dev_log!(
86					"grpc",
87					"[VineClient] Notification sent successfully to sidecar '{}'",
88					SideCarIdentifier
89				);
90
91				PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &Parameters);
92
93				Ok(())
94			},
95
96			Err(Status) => {
97				RecordSideCarFailure(&SideCarIdentifier);
98
99				dev_log!(
100					"grpc",
101					"error: [VineClient] Failed to send notification to sidecar '{}': {}",
102					SideCarIdentifier,
103					Status
104				);
105
106				Err(VineError::from(Status))
107			},
108		}
109	} else {
110		Err(VineError::ClientNotConnected(SideCarIdentifier))
111	}
112}