Mountain/Vine/Client/
Shared.rs1
2use std::{
7 collections::HashMap,
8 sync::{
9 Arc,
10 OnceLock,
11 atomic::{AtomicBool, Ordering},
12 },
13 time::Instant,
14};
15
16use lazy_static::lazy_static;
17use parking_lot::Mutex;
18use tokio::sync::Notify;
19
20use crate::Vine::{Client::NotificationFrame, Error::VineError, Generated::cocoon_service_client::CocoonServiceClient};
21
22pub type CocoonClient = CocoonServiceClient<tonic::transport::Channel>;
24
25pub const DEFAULT_TIMEOUT_MS:u64 = 5000;
27
28pub const MAX_RETRY_ATTEMPTS:usize = 3;
30
31pub const RETRY_BASE_DELAY_MS:u64 = 100;
33
34pub const MAX_MESSAGE_SIZE_BYTES:usize = 4 * 1024 * 1024;
36
37pub const HEALTH_CHECK_INTERVAL_MS:u64 = 30000;
39
40#[allow(dead_code)]
42pub const CONNECTION_TIMEOUT_MS:u64 = 10000;
43
44pub const NOTIFICATION_BROADCAST_CAPACITY:usize = 4096;
48
49pub struct ConnectionMetadata {
51 pub LastActivity:Instant,
52
53 pub FailureCount:usize,
54
55 pub IsHealthy:bool,
56}
57
58lazy_static! {
59 pub static ref SIDECAR_CLIENTS: Arc<Mutex<HashMap<String, CocoonClient>>> = Arc::new(Mutex::new(HashMap::new()));
60 pub static ref CONNECTION_METADATA: Arc<Mutex<HashMap<String, ConnectionMetadata>>> =
61 Arc::new(Mutex::new(HashMap::new()));
62 pub static ref NOTIFICATION_BROADCAST: tokio::sync::broadcast::Sender<NotificationFrame::Struct> = {
63 let (Sender, _) = tokio::sync::broadcast::channel(NOTIFICATION_BROADCAST_CAPACITY);
64
65 Sender
66 };
67}
68
69static CONNECTION_NOTIFIERS:OnceLock<Arc<parking_lot::RwLock<HashMap<String, Arc<Notify>>>>> = OnceLock::new();
76
77pub fn GetConnectionNotify(SideCarIdentifier:&str) -> Arc<Notify> {
78 let Map = CONNECTION_NOTIFIERS.get_or_init(|| Arc::new(parking_lot::RwLock::new(HashMap::new())));
79
80 {
81 let Read = Map.read();
82
83 if let Some(Notify) = Read.get(SideCarIdentifier) {
84 return Notify.clone();
85 }
86 }
87
88 let mut Write = Map.write();
89
90 Write
91 .entry(SideCarIdentifier.to_string())
92 .or_insert_with(|| Arc::new(Notify::new()))
93 .clone()
94}
95
96pub fn FireConnectionNotify(SideCarIdentifier:&str) {
97 if let Some(Map) = CONNECTION_NOTIFIERS.get() {
98 if let Some(Notifier) = Map.read().get(SideCarIdentifier) {
99 Notifier.notify_waiters();
100 }
101 }
102}
103
104pub static SHUTDOWN_FLAG:AtomicBool = AtomicBool::new(false);
108
109pub fn ShutdownFlagStore(Value:bool) { SHUTDOWN_FLAG.store(Value, Ordering::Relaxed); }
110
111pub fn ShutdownFlagLoad() -> bool { SHUTDOWN_FLAG.load(Ordering::Relaxed) }
112
113pub fn RecordSideCarFailure(SideCarIdentifier:&str) {
115 let mut Metadata = CONNECTION_METADATA.lock();
116
117 if let Some(Connection) = Metadata.get_mut(SideCarIdentifier) {
118 Connection.FailureCount += 1;
119
120 Connection.IsHealthy = false;
121 }
122}
123
124pub fn UpdateSideCarActivity(SideCarIdentifier:&str) {
126 let mut Metadata = CONNECTION_METADATA.lock();
127
128 if let Some(Connection) = Metadata.get_mut(SideCarIdentifier) {
129 Connection.LastActivity = Instant::now();
130
131 Connection.FailureCount = 0;
132
133 Connection.IsHealthy = true;
134 }
135}
136
137pub fn ValidateMessageSize(Data:&[u8]) -> Result<(), VineError> {
141 if Data.len() > MAX_MESSAGE_SIZE_BYTES {
142 Err(VineError::MessageTooLarge { ActualSize:Data.len(), MaxSize:MAX_MESSAGE_SIZE_BYTES })
143 } else {
144 Ok(())
145 }
146}