DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Client/
SendRequest.rs1#![allow(non_snake_case)]
2
3use std::time::Duration;
11
12use serde_json::{Value, from_slice, to_vec};
13use tokio::time::timeout;
14
15use crate::{
16 Vine::{
17 Client::{
18 IsShuttingDown,
19 Shared::{
20 DEFAULT_TIMEOUT_MS,
21 RecordSideCarFailure,
22 SIDECAR_CLIENTS,
23 UpdateSideCarActivity,
24 ValidateMessageSize,
25 },
26 },
27 Error::VineError,
28 Generated::GenericRequest,
29 },
30 dev_log,
31};
32
33pub async fn Fn(
34 SideCarIdentifier:&str,
35
36 Method:String,
37
38 Parameters:Value,
39
40 TimeoutMilliseconds:u64,
41) -> Result<Value, VineError> {
42 if IsShuttingDown::Fn() {
43 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
44 }
45
46 if Method.is_empty() || Method.len() > 128 {
47 return Err(VineError::RPCError(
48 "Method name must be between 1 and 128 characters".to_string(),
49 ));
50 }
51
52 let TimeoutDuration =
53 Duration::from_millis(if TimeoutMilliseconds > 0 { TimeoutMilliseconds } else { DEFAULT_TIMEOUT_MS });
54
55 if std::env::var("LAND_VINE_STREAMING").as_deref() == Ok("1") {
56 if let Some(Mux) = crate::Vine::Multiplexer::Multiplexer::Lookup(SideCarIdentifier) {
57 if !Mux.IsClosed() {
58 match Mux.Request(Method.clone(), Parameters.clone(), TimeoutDuration).await {
59 Ok(Result_) => {
60 UpdateSideCarActivity(SideCarIdentifier);
61
62 return Ok(Result_);
63 },
64
65 Err(VineError::RequestTimeout { .. }) => {
66 return Err(VineError::RequestTimeout {
67 SideCarIdentifier:SideCarIdentifier.to_string(),
68 MethodName:Method,
69 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
70 });
71 },
72
73 Err(Error) => {
74 dev_log!(
75 "grpc",
76 "warn: [VineClient::SendRequest] streaming send failed for '{}::{}' ({}); falling back to \
77 unary",
78 SideCarIdentifier,
79 Method,
80 Error
81 );
82 },
83 }
84 }
85 }
86 }
87
88 let ParameterBytes =
89 to_vec(&Parameters).map_err(|E| VineError::RPCError(format!("Failed to serialize parameters: {}", E)))?;
90
91 ValidateMessageSize(&ParameterBytes)?;
92
93 let Client = {
94 let Pool = SIDECAR_CLIENTS.lock();
95
96 Pool.get(SideCarIdentifier).cloned()
97 };
98
99 let Some(mut Client) = Client else {
100 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
101 };
102
103 use std::sync::atomic::{AtomicU64, Ordering as AO};
104
105 static REQ_SEQ:AtomicU64 = AtomicU64::new(1);
106
107 let RequestIdentifier = REQ_SEQ.fetch_add(1, AO::Relaxed);
108
109 let MethodForLog = Method.clone();
110
111 let Request = GenericRequest { request_identifier:RequestIdentifier, method:Method, parameter:ParameterBytes };
112
113 let Result_ = timeout(TimeoutDuration, Client.process_mountain_request(Request)).await;
114
115 match Result_ {
116 Ok(Ok(Response)) => {
117 UpdateSideCarActivity(SideCarIdentifier);
118
119 dev_log!(
120 "grpc",
121 "[VineClient] Request sent successfully to sidecar '{}': method='{}'",
122 SideCarIdentifier,
123 MethodForLog
124 );
125
126 let InnerResponse = Response.into_inner();
127
128 let ResultBytes = InnerResponse.result;
129
130 let ResultValue:Value = from_slice(&ResultBytes)
131 .map_err(|E| VineError::RPCError(format!("Failed to deserialize response: {}", E)))?;
132
133 if let Some(ErrorData) = InnerResponse.error {
134 return Err(VineError::RPCError(format!(
135 "RPC error from sidecar: code={}, message={}",
136 ErrorData.code, ErrorData.message
137 )));
138 }
139
140 Ok(ResultValue)
141 },
142
143 Ok(Err(Status)) => {
144 RecordSideCarFailure(SideCarIdentifier);
145
146 Err(VineError::RPCError(format!("gRPC error: {}", Status)))
147 },
148
149 Err(_) => {
150 RecordSideCarFailure(SideCarIdentifier);
151
152 Err(VineError::RequestTimeout {
153 SideCarIdentifier:SideCarIdentifier.to_string(),
154 MethodName:MethodForLog,
155 TimeoutMilliseconds:TimeoutDuration.as_millis() as u64,
156 })
157 },
158 }
159}