Mountain/Vine/Client/
SendNotification.rs1
2use serde_json::{Value, to_vec};
10
11use crate::{
12 Vine::{
13 Client::{
14 IsShuttingDown,
15 PublishNotification,
16 Shared::{RecordSideCarFailure, SIDECAR_CLIENTS, UpdateSideCarActivity, ValidateMessageSize},
17 },
18 Error::VineError,
19 Generated::GenericNotification,
20 },
21 dev_log,
22};
23
24pub async fn Fn(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
25 if IsShuttingDown::Fn() {
26 return Ok(());
27 }
28
29 if Method.is_empty() || Method.len() > 128 {
30 return Err(VineError::RPCError(
31 "Method name must be between 1 and 128 characters".to_string(),
32 ));
33 }
34
35 if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
36 if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(&SideCarIdentifier) {
37 if !Mux.IsClosed() {
38 let MethodForPublish = Method.clone();
39
40 let ParametersForPublish = Parameters.clone();
41
42 match Mux.Notify(Method.clone(), Parameters.clone()).await {
43 Ok(()) => {
44 UpdateSideCarActivity(&SideCarIdentifier);
45
46 PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &ParametersForPublish);
47
48 return Ok(());
49 },
50
51 Err(Error) => {
52 dev_log!(
53 "grpc",
54 "warn: [VineClient::SendNotification] streaming send failed for '{}' ({}); falling back \
55 to unary",
56 SideCarIdentifier,
57 Error
58 );
59 },
60 }
61 }
62 }
63 }
64
65 let ParameterBytes = to_vec(&Parameters)?;
66
67 ValidateMessageSize(&ParameterBytes)?;
68
69 let mut Client = {
70 let Pool = SIDECAR_CLIENTS.lock();
71
72 Pool.get(&SideCarIdentifier).cloned()
73 };
74
75 if let Some(ref mut Client) = Client {
76 let MethodForPublish = Method.clone();
77
78 let Request = GenericNotification { method:Method, parameter:ParameterBytes };
79
80 match Client.send_mountain_notification(Request).await {
81 Ok(_) => {
82 UpdateSideCarActivity(&SideCarIdentifier);
83
84 dev_log!(
85 "grpc",
86 "[VineClient] Notification sent successfully to sidecar '{}'",
87 SideCarIdentifier
88 );
89
90 PublishNotification::Fn(&SideCarIdentifier, &MethodForPublish, &Parameters);
91
92 Ok(())
93 },
94
95 Err(Status) => {
96 RecordSideCarFailure(&SideCarIdentifier);
97
98 dev_log!(
99 "grpc",
100 "error: [VineClient] Failed to send notification to sidecar '{}': {}",
101 SideCarIdentifier,
102 Status
103 );
104
105 Err(VineError::from(Status))
106 },
107 }
108 } else {
109 Err(VineError::ClientNotConnected(SideCarIdentifier))
110 }
111}