Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Client/
SendRequest.rs

1#![allow(non_snake_case)]
2
3//! Send a request and await a response. Validates method-name length
4//! and message size, prefers the streaming multiplexer when
5//! `LAND_VINE_STREAMING=1` is on (falls through to unary on any failure
6//! except the authoritative streaming-path timeout), enforces a per-call
7//! timeout via `tokio::time::timeout`, and updates per-connection
8//! activity / failure metadata on completion.
9
10use std::time::Duration;
11
12use serde_json::{Value, from_slice, to_vec};
13use tokio::time::timeout;
14
15use crate::{
16	Vine::{
17		Client::{
18			IsShuttingDown,
19			Shared::{
20				DEFAULT_TIMEOUT_MS,
21				RecordSideCarFailure,
22				SIDECAR_CLIENTS,
23				UpdateSideCarActivity,
24				ValidateMessageSize,
25			},
26		},
27		Error::VineError,
28		Generated::GenericRequest,
29	},
30	dev_log,
31};
32
33pub async fn Fn(
34	SideCarIdentifier:&str,
35
36	Method:String,
37
38	Parameters:Value,
39
40	TimeoutMilliseconds:u64,
41) -> Result<Value, VineError> {
42	if IsShuttingDown::Fn() {
43		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
44	}
45
46	if Method.is_empty() || Method.len() > 128 {
47		return Err(VineError::RPCError(
48			"Method name must be between 1 and 128 characters".to_string(),
49		));
50	}
51
52	let TimeoutDuration =
53		Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
54
55	if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
56		if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
57			if !Mux.IsClosed() {
58				match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
59					Ok(Result_) => {
60						UpdateSideCarActivity(SideCarIdentifier);
61
62						return Ok(Result_);
63					},
64
65					Err(VineError::RequestTimeout { .. }) => {
66						return Err(VineError::RequestTimeout {
67							SideCarIdentifier:SideCarIdentifier.to_string(),
68							MethodName:Method,
69							TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
70						});
71					},
72
73					Err(Error) => {
74						dev_log!(
75							"grpc",
76							"warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling back to \
77							 unary",
78							SideCarIdentifier,
79							Method,
80							Error
81						);
82					},
83				}
84			}
85		}
86	}
87
88	let ParameterBytes =
89		to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
90
91	ValidateMessageSize(&ParameterBytes)?;
92
93	let Client = {
94		let Pool = SIDECAR_CLIENTS.lock();
95
96		Pool.get(SideCarIdentifier).cloned()
97	};
98
99	let Some(mut Client) = Client else {
100		return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
101	};
102
103	use std::sync::atomic::{AtomicU64, Ordering as AO};
104
105	static REQ_SEQ:AtomicU64 = AtomicU64::new(1);
106
107	let RequestIdentifier = REQ_SEQ.fetch_add(1, AO::Relaxed);
108
109	let MethodForLog = Method.clone();
110
111	let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
112
113	let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
114
115	match Result_ {
116		Ok(Ok(Response)) => {
117			UpdateSideCarActivity(SideCarIdentifier);
118
119			dev_log!(
120				"grpc",
121				"[VineClient] Request sent successfully to sidecar '{}': method='{}'",
122				SideCarIdentifier,
123				MethodForLog
124			);
125
126			let InnerResponse = Response.into_inner();
127
128			let ResultBytes = InnerResponse.result;
129
130			let ResultValue:Value = from_slice(&ResultBytes)
131				.map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
132
133			if let Some(ErrorData) = InnerResponse.error {
134				return Err(VineError::RPCError(format!(
135					"RPC error from sidecar: code={}, message={}",
136					ErrorData.code, ErrorData.message
137				)));
138			}
139
140			Ok(ResultValue)
141		},
142
143		Ok(Err(Status)) => {
144			RecordSideCarFailure(SideCarIdentifier);
145
146			Err(VineError::RPCError(format!("gRPC error: {}", Status)))
147		},
148
149		Err(_) => {
150			RecordSideCarFailure(SideCarIdentifier);
151
152			Err(VineError::RequestTimeout {
153				SideCarIdentifier:SideCarIdentifier.to_string(),
154				MethodName:MethodForLog,
155				TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
156			})
157		},
158	}
159}