Skip to main content

Mountain/Vine/Client/
SendNotification.rs

1
2//! Fire-and-forget notification to a sidecar. No response, no per-call
3//! timeout. Prefers the streaming multiplexer under
4//! `LAND_VINE_STREAMING=1`; falls through to unary on any failure.
5//! After a successful wire send, fans out via `PublishNotification::Fn`
6//! so broadcast subscribers (Effect-TS fibers, OTel emitters, future
7//! Mist-WS bridge, dev log) can observe the same flow concurrently.
8
9use serde_json::{Value, to_vec};
10
11use crate::{
12	Vine::{
13		Client::{
14			IsShuttingDown,
15			PublishNotification,
16			Shared::{RecordSideCarFailure, SIDECAR_CLIENTS, UpdateSideCarActivity, ValidateMessageSize},
17		},
18		Error::VineError,
19		Generated::GenericNotification,
20	},
21	dev_log,
22};
23
24pub async fn Fn(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
25	if IsShuttingDown::Fn() {
26		return Ok(());
27	}
28
29	if Method.is_empty() || Method.len() > 128 {
30		return Err(VineError::RPCError(
31			"Method name must be between 1 and 128 characters".to_string(),
32		));
33	}
34
35	if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
36		if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(&SideCarIdentifier) {
37			if !Mux.IsClosed() {
38				let MethodForPublish = Method.clone();
39
40				let ParametersForPublish = Parameters.clone();
41
42				match Mux.Notify(Method.clone(), Parameters.clone()).await {
43					Ok(()) => {
44						UpdateSideCarActivity(&SideCarIdentifier);
45
46						PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &ParametersForPublish);
47
48						return Ok(());
49					},
50
51					Err(Error) => {
52						dev_log!(
53							"grpc",
54							"warn: [VineClient::SendNotification] streaming send failed for '{}' ({}); falling back \
55							 to unary",
56							SideCarIdentifier,
57							Error
58						);
59					},
60				}
61			}
62		}
63	}
64
65	let ParameterBytes = to_vec(&Parameters)?;
66
67	ValidateMessageSize(&ParameterBytes)?;
68
69	let mut Client = {
70		let Pool = SIDECAR_CLIENTS.lock();
71
72		Pool.get(&SideCarIdentifier).cloned()
73	};
74
75	if let Some(ref mut Client) = Client {
76		let MethodForPublish = Method.clone();
77
78		let Request = GenericNotification { method:Method, parameter:ParameterBytes };
79
80		match Client.send_mountain_notification(Request).await {
81			Ok(_) => {
82				UpdateSideCarActivity(&SideCarIdentifier);
83
84				dev_log!(
85					"grpc",
86					"[VineClient] Notification sent successfully to sidecar '{}'",
87					SideCarIdentifier
88				);
89
90				PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &Parameters);
91
92				Ok(())
93			},
94
95			Err(Status) => {
96				RecordSideCarFailure(&SideCarIdentifier);
97
98				dev_log!(
99					"grpc",
100					"error: [VineClient] Failed to send notification to sidecar '{}': {}",
101					SideCarIdentifier,
102					Status
103				);
104
105				Err(VineError::from(Status))
106			},
107		}
108	} else {
109		Err(VineError::ClientNotConnected(SideCarIdentifier))
110	}
111}