Skip to main content

Mountain/Vine/Client/
SendNotification.rs

1#![allow(non_snake_case)]
2
3//! Fire-and-forget notification to a sidecar. No response, no per-call
4//! timeout. Prefers the streaming multiplexer under
5//! `LAND_VINE_STREAMING=1`; falls through to unary on any failure.
6//! After a successful wire send, fans out via `PublishNotification::Fn`
7//! so broadcast subscribers (Effect-TS fibers, OTel emitters, future
8//! Mist-WS bridge, dev log) can observe the same flow concurrently.
9
10use serde_json::{Value, to_vec};
11
12use crate::{
13	Vine::{
14		Client::{
15			IsShuttingDown,
16			PublishNotification,
17			Shared::{RecordSideCarFailure, SIDECAR_CLIENTS, UpdateSideCarActivity, ValidateMessageSize},
18		},
19		Error::VineError,
20		Generated::GenericNotification,
21	},
22	dev_log,
23};
24
25pub async fn Fn(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
26	if IsShuttingDown::Fn() {
27		return Ok(());
28	}
29	if Method.is_empty() || Method.len() > 128 {
30		return Err(VineError::RPCError(
31			"Method name must be between 1 and 128 characters".to_string(),
32		));
33	}
34
35	if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
36		if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(&SideCarIdentifier) {
37			if !Mux.IsClosed() {
38				let MethodForPublish = Method.clone();
39				let ParametersForPublish = Parameters.clone();
40				match Mux.Notify(Method.clone(), Parameters.clone()).await {
41					Ok(()) => {
42						UpdateSideCarActivity(&SideCarIdentifier);
43						PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &ParametersForPublish);
44						return Ok(());
45					},
46					Err(Error) => {
47						dev_log!(
48							"grpc",
49							"warn: [VineClient::SendNotification] streaming send failed for '{}' ({}); falling back \
50							 to unary",
51							SideCarIdentifier,
52							Error
53						);
54					},
55				}
56			}
57		}
58	}
59
60	let ParameterBytes = to_vec(&Parameters)?;
61	ValidateMessageSize(&ParameterBytes)?;
62
63	let mut Client = {
64		let Pool = SIDECAR_CLIENTS.lock();
65		Pool.get(&SideCarIdentifier).cloned()
66	};
67
68	if let Some(ref mut Client) = Client {
69		let MethodForPublish = Method.clone();
70		let Request = GenericNotification { method:Method, parameter:ParameterBytes };
71
72		match Client.send_mountain_notification(Request).await {
73			Ok(_) => {
74				UpdateSideCarActivity(&SideCarIdentifier);
75				dev_log!(
76					"grpc",
77					"[VineClient] Notification sent successfully to sidecar '{}'",
78					SideCarIdentifier
79				);
80				PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &Parameters);
81				Ok(())
82			},
83			Err(Status) => {
84				RecordSideCarFailure(&SideCarIdentifier);
85				dev_log!(
86					"grpc",
87					"error: [VineClient] Failed to send notification to sidecar '{}': {}",
88					SideCarIdentifier,
89					Status
90				);
91				Err(VineError::from(Status))
92			},
93		}
94	} else {
95		Err(VineError::ClientNotConnected(SideCarIdentifier))
96	}
97}