Mountain/Vine/Client/
SendRequest.rs1
2use std::time::Duration;
10
11use serde_json::{Value, from_slice, to_vec};
12use tokio::time::timeout;
13
14use crate::{
15 Vine::{
16 Client::{
17 IsShuttingDown,
18 Shared::{
19 DEFAULT_TIMEOUT_MS,
20 RecordSideCarFailure,
21 SIDECAR_CLIENTS,
22 UpdateSideCarActivity,
23 ValidateMessageSize,
24 },
25 },
26 Error::VineError,
27 Generated::GenericRequest,
28 },
29 dev_log,
30};
31
32pub async fn Fn(
33 SideCarIdentifier:&str,
34
35 Method:String,
36
37 Parameters:Value,
38
39 TimeoutMilliseconds:u64,
40) -> Result<Value, VineError> {
41 if IsShuttingDown::Fn() {
42 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
43 }
44
45 if Method.is_empty() || Method.len() > 128 {
46 return Err(VineError::RPCError(
47 "Method name must be between 1 and 128 characters".to_string(),
48 ));
49 }
50
51 let TimeoutDuration =
52 Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
53
54 if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
55 if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
56 if !Mux.IsClosed() {
57 match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
58 Ok(Result_) => {
59 UpdateSideCarActivity(SideCarIdentifier);
60
61 return Ok(Result_);
62 },
63
64 Err(VineError::RequestTimeout { .. }) => {
65 return Err(VineError::RequestTimeout {
66 SideCarIdentifier:SideCarIdentifier.to_string(),
67 MethodName:Method,
68 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
69 });
70 },
71
72 Err(Error) => {
73 dev_log!(
74 "grpc",
75 "warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling back to \
76 unary",
77 SideCarIdentifier,
78 Method,
79 Error
80 );
81 },
82 }
83 }
84 }
85 }
86
87 let ParameterBytes =
88 to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
89
90 ValidateMessageSize(&ParameterBytes)?;
91
92 let Client = {
93 let Pool = SIDECAR_CLIENTS.lock();
94
95 Pool.get(SideCarIdentifier).cloned()
96 };
97
98 let Some(mut Client) = Client else {
99 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
100 };
101
102 use std::sync::atomic::{AtomicU64, Ordering as AO};
103
104 static REQ_SEQ:AtomicU64 = AtomicU64::new(1);
105
106 let RequestIdentifier = REQ_SEQ.fetch_add(1, AO::Relaxed);
107
108 let MethodForLog = Method.clone();
109
110 let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
111
112 let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
113
114 match Result_ {
115 Ok(Ok(Response)) => {
116 UpdateSideCarActivity(SideCarIdentifier);
117
118 dev_log!(
119 "grpc",
120 "[VineClient] Request sent successfully to sidecar '{}': method='{}'",
121 SideCarIdentifier,
122 MethodForLog
123 );
124
125 let InnerResponse = Response.into_inner();
126
127 let ResultBytes = InnerResponse.result;
128
129 let ResultValue:Value = from_slice(&ResultBytes)
130 .map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
131
132 if let Some(ErrorData) = InnerResponse.error {
133 return Err(VineError::RPCError(format!(
134 "RPC error from sidecar: code={}, message={}",
135 ErrorData.code, ErrorData.message
136 )));
137 }
138
139 Ok(ResultValue)
140 },
141
142 Ok(Err(Status)) => {
143 RecordSideCarFailure(SideCarIdentifier);
144
145 Err(VineError::RPCError(format!("gRPC error: {}", Status)))
146 },
147
148 Err(_) => {
149 RecordSideCarFailure(SideCarIdentifier);
150
151 Err(VineError::RequestTimeout {
152 SideCarIdentifier:SideCarIdentifier.to_string(),
153 MethodName:MethodForLog,
154 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
155 })
156 },
157 }
158}