Skip to main content

Mountain/Vine/Client/
Shared.rs

1
2//! Module-private state for the Vine client: connection pool, per-
3//! connection metadata, the broadcast fan-out, the shutdown flag, plus
4//! the constants and message-size validator that every entry-point shares.
5
6use std::{
7	collections::HashMap,
8	sync::{
9		Arc,
10		OnceLock,
11		atomic::{AtomicBool, Ordering},
12	},
13	time::Instant,
14};
15
16use lazy_static::lazy_static;
17use parking_lot::Mutex;
18use tokio::sync::Notify;
19
20use crate::Vine::{Client::NotificationFrame, Error::VineError, Generated::cocoon_service_client::CocoonServiceClient};
21
22/// Cocoon gRPC client over a tonic transport channel.
23pub type CocoonClient = CocoonServiceClient<tonic::transport::Channel>;
24
25/// Default timeout for RPC calls.
26pub const DEFAULT_TIMEOUT_MS:u64 = 5000;
27
28/// Maximum number of retry attempts for failed connections.
29pub const MAX_RETRY_ATTEMPTS:usize = 3;
30
31/// Base delay between retry attempts.
32pub const RETRY_BASE_DELAY_MS:u64 = 100;
33
34/// Maximum message size for validation (4 MB to match the tonic default).
35pub const MAX_MESSAGE_SIZE_BYTES:usize = 4 * 1024 * 1024;
36
37/// Health-check interval.
38pub const HEALTH_CHECK_INTERVAL_MS:u64 = 30000;
39
40/// Connection timeout (currently unused - kept for the streaming variant).
41#[allow(dead_code)]
42pub const CONNECTION_TIMEOUT_MS:u64 = 10000;
43
44/// Notification broadcast capacity (drop-oldest when full). 4096 covers
45/// the worst-case storms (sky://diagnostics/changed at 50-200/s during
46/// rust-analyzer cargo-check) with margin.
47pub const NOTIFICATION_BROADCAST_CAPACITY:usize = 4096;
48
49/// Connection metadata tracking health and last activity.
50pub struct ConnectionMetadata {
51	pub LastActivity:Instant,
52
53	pub FailureCount:usize,
54
55	pub IsHealthy:bool,
56}
57
58lazy_static! {
59	pub static ref SIDECAR_CLIENTS: Arc<Mutex<HashMap<String, CocoonClient>>> = Arc::new(Mutex::new(HashMap::new()));
60	pub static ref CONNECTION_METADATA: Arc<Mutex<HashMap<String, ConnectionMetadata>>> =
61		Arc::new(Mutex::new(HashMap::new()));
62	pub static ref NOTIFICATION_BROADCAST: tokio::sync::broadcast::Sender<NotificationFrame::Struct> = {
63		let (Sender, _) = tokio::sync::broadcast::channel(NOTIFICATION_BROADCAST_CAPACITY);
64
65		Sender
66	};
67}
68
69/// Per-sidecar connection-ready notifiers. Keyed by the sidecar identifier
70/// (e.g. `"cocoon-main"`). Callers that need the connection before issuing an
71/// RPC can `await` the `Notify` instead of polling `IsClientConnected`.
72/// The `Notify` is created lazily on first `GetConnectionNotify` call and
73/// fired (`notify_waiters`) once in `ConnectToSideCar` after a successful
74/// handshake. Subsequent calls see a pre-fired notifier and wake immediately.
75static CONNECTION_NOTIFIERS:OnceLock<Arc<parking_lot::RwLock<HashMap<String, Arc<Notify>>>>> = OnceLock::new();
76
77pub fn GetConnectionNotify(SideCarIdentifier:&str) -> Arc<Notify> {
78	let Map = CONNECTION_NOTIFIERS.get_or_init(|| Arc::new(parking_lot::RwLock::new(HashMap::new())));
79
80	{
81		let Read = Map.read();
82
83		if let Some(Notify) = Read.get(SideCarIdentifier) {
84			return Notify.clone();
85		}
86	}
87
88	let mut Write = Map.write();
89
90	Write
91		.entry(SideCarIdentifier.to_string())
92		.or_insert_with(|| Arc::new(Notify::new()))
93		.clone()
94}
95
96pub fn FireConnectionNotify(SideCarIdentifier:&str) {
97	if let Some(Map) = CONNECTION_NOTIFIERS.get() {
98		if let Some(Notifier) = Map.read().get(SideCarIdentifier) {
99			Notifier.notify_waiters();
100		}
101	}
102}
103
104/// Process-wide shutdown flag. Set to `true` once Mountain has issued
105/// `$shutdown` (or SIGKILL'd) Cocoon. After that point all
106/// `SendNotification` / `SendRequest` calls short-circuit.
107pub static SHUTDOWN_FLAG:AtomicBool = AtomicBool::new(false);
108
109pub fn ShutdownFlagStore(Value:bool) { SHUTDOWN_FLAG.store(Value, Ordering::Relaxed); }
110
111pub fn ShutdownFlagLoad() -> bool { SHUTDOWN_FLAG.load(Ordering::Relaxed) }
112
113/// Increment the failure counter and mark the connection unhealthy.
114pub fn RecordSideCarFailure(SideCarIdentifier:&str) {
115	let mut Metadata = CONNECTION_METADATA.lock();
116
117	if let Some(Connection) = Metadata.get_mut(SideCarIdentifier) {
118		Connection.FailureCount += 1;
119
120		Connection.IsHealthy = false;
121	}
122}
123
124/// Refresh the last-activity timestamp and reset the failure counter.
125pub fn UpdateSideCarActivity(SideCarIdentifier:&str) {
126	let mut Metadata = CONNECTION_METADATA.lock();
127
128	if let Some(Connection) = Metadata.get_mut(SideCarIdentifier) {
129		Connection.LastActivity = Instant::now();
130
131		Connection.FailureCount = 0;
132
133		Connection.IsHealthy = true;
134	}
135}
136
137/// Reject messages above `MAX_MESSAGE_SIZE_BYTES` to bound the worst-case
138/// gRPC frame. Mirrors tonic's own check so we don't pay the codec round-
139/// trip for an oversize payload.
140pub fn ValidateMessageSize(Data:&[u8]) -> Result<(), VineError> {
141	if Data.len() > MAX_MESSAGE_SIZE_BYTES {
142		Err(VineError::MessageTooLarge { ActualSize:Data.len(), MaxSize:MAX_MESSAGE_SIZE_BYTES })
143	} else {
144		Ok(())
145	}
146}