Skip to main content

Mountain/Vine/Client/
SendRequest.rs

1
2//! Send a request and await a response. Validates method-name length
3//! and message size, prefers the streaming multiplexer when
4//! `LAND_VINE_STREAMING=1` is on (falls through to unary on any failure
5//! except the authoritative streaming-path timeout), enforces a per-call
6//! timeout via `tokio::time::timeout`, and updates per-connection
7//! activity / failure metadata on completion.
8
9use std::time::Duration;
10
11use serde_json::{Value, from_slice, to_vec};
12use tokio::time::timeout;
13
14use crate::{
15	Vine::{
16		Client::{
17			IsShuttingDown,
18			Shared::{
19				DEFAULT_TIMEOUT_MS,
20				RecordSideCarFailure,
21				SIDECAR_CLIENTS,
22				UpdateSideCarActivity,
23				ValidateMessageSize,
24			},
25		},
26		Error::VineError,
27		Generated::GenericRequest,
28	},
29	dev_log,
30};
31
32pub async fn Fn(
33	SideCarIdentifier:&str,
34
35	Method:String,
36
37	Parameters:Value,
38
39	TimeoutMilliseconds:u64,
40) -> Result<Value, VineError> {
41	if IsShuttingDown::Fn() {
42		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
43	}
44
45	if Method.is_empty() || Method.len() > 128 {
46		return Err(VineError::RPCError(
47			"Method name must be between 1 and 128 characters".to_string(),
48		));
49	}
50
51	let TimeoutDuration =
52		Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
53
54	if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
55		if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
56			if !Mux.IsClosed() {
57				match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
58					Ok(Result_) => {
59						UpdateSideCarActivity(SideCarIdentifier);
60
61						return Ok(Result_);
62					},
63
64					Err(VineError::RequestTimeout { .. }) => {
65						return Err(VineError::RequestTimeout {
66							SideCarIdentifier:SideCarIdentifier.to_string(),
67							MethodName:Method,
68							TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
69						});
70					},
71
72					Err(Error) => {
73						dev_log!(
74							"grpc",
75							"warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling back to \
76							 unary",
77							SideCarIdentifier,
78							Method,
79							Error
80						);
81					},
82				}
83			}
84		}
85	}
86
87	let ParameterBytes =
88		to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
89
90	ValidateMessageSize(&ParameterBytes)?;
91
92	let Client = {
93		let Pool = SIDECAR_CLIENTS.lock();
94
95		Pool.get(SideCarIdentifier).cloned()
96	};
97
98	let Some(mut Client) = Client else {
99		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
100	};
101
102	use std::sync::atomic::{AtomicU64, Ordering as AO};
103
104	static REQ_SEQ:AtomicU64 = AtomicU64::new(1);
105
106	let RequestIdentifier = REQ_SEQ.fetch_add(1, AO::Relaxed);
107
108	let MethodForLog = Method.clone();
109
110	let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
111
112	let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
113
114	match Result_ {
115		Ok(Ok(Response)) => {
116			UpdateSideCarActivity(SideCarIdentifier);
117
118			dev_log!(
119				"grpc",
120				"[VineClient] Request sent successfully to sidecar '{}': method='{}'",
121				SideCarIdentifier,
122				MethodForLog
123			);
124
125			let InnerResponse = Response.into_inner();
126
127			let ResultBytes = InnerResponse.result;
128
129			let ResultValue:Value = from_slice(&ResultBytes)
130				.map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
131
132			if let Some(ErrorData) = InnerResponse.error {
133				return Err(VineError::RPCError(format!(
134					"RPC error from sidecar: code={}, message={}",
135					ErrorData.code, ErrorData.message
136				)));
137			}
138
139			Ok(ResultValue)
140		},
141
142		Ok(Err(Status)) => {
143			RecordSideCarFailure(SideCarIdentifier);
144
145			Err(VineError::RPCError(format!("gRPC error: {}", Status)))
146		},
147
148		Err(_) => {
149			RecordSideCarFailure(SideCarIdentifier);
150
151			Err(VineError::RequestTimeout {
152				SideCarIdentifier:SideCarIdentifier.to_string(),
153				MethodName:MethodForLog,
154				TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
155			})
156		},
157	}
158}