DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Server/Notification/
OutputChannelCoalesce.rs1#![allow(non_snake_case)]
2
3use std::{
41 collections::HashMap,
42 sync::{Mutex as StandardMutex, OnceLock},
43 time::Duration,
44};
45
46use serde_json::{Value, json};
47use tauri::{AppHandle, Emitter};
48use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
49
50use crate::dev_log;
51
52const COALESCE_WINDOW:Duration = Duration::from_millis(50);
57
58const MAX_BUFFERED_BYTES:usize = 64 * 1024;
62
63struct PendingAppend {
64 Channel:String,
65
66 Value:String,
67}
68
69struct CoalesceChannel {
70 Sender:UnboundedSender<(AppHandle, PendingAppend)>,
71}
72
73static COALESCE_CHANNEL:OnceLock<CoalesceChannel> = OnceLock::new();
74
75fn IsDisabled() -> bool { matches!(std::env::var("OutputCoalesce").as_deref(), Ok("0") | Ok("false")) }
76
77fn GetOrInitChannel() -> &'static CoalesceChannel {
78 COALESCE_CHANNEL.get_or_init(|| {
79 let (Tx, mut Rx) = unbounded_channel::<(AppHandle, PendingAppend)>();
80
81 tokio::spawn(async move {
82 let Buffers:StandardMutex<HashMap<String, String>> = StandardMutex::new(HashMap::new());
86
87 let mut HandleSlot:Option<AppHandle> = None;
88
89 loop {
90 let Received = Rx.recv().await;
91
92 let (Handle, First) = match Received {
93 None => break,
94 Some(Pair) => Pair,
95 };
96
97 HandleSlot = Some(Handle.clone());
98
99 {
101 let mut Guard = match Buffers.lock() {
102 Ok(G) => G,
103 Err(_) => continue,
104 };
105
106 let Slot = Guard.entry(First.Channel.clone()).or_default();
107
108 Slot.push_str(&First.Value);
109
110 if Slot.len() >= MAX_BUFFERED_BYTES {
111 let Payload = std::mem::take(Slot);
112
113 drop(Guard);
114
115 FlushOne(&Handle, &First.Channel, &Payload);
116
117 continue;
118 }
119 }
120
121 let mut Drain:Vec<(AppHandle, PendingAppend)> = Vec::new();
123
124 let Drained = Rx.recv_many(&mut Drain, 4096).await;
125
126 let _ = Drained;
127
128 for (_, Pending) in Drain.drain(..) {
129 if let Ok(mut Guard) = Buffers.lock() {
130 let Slot = Guard.entry(Pending.Channel).or_default();
131 Slot.push_str(&Pending.Value);
132 }
133 }
134
135 tokio::time::sleep(COALESCE_WINDOW).await;
139
140 let mut LateDrain:Vec<(AppHandle, PendingAppend)> = Vec::new();
141
142 let _ = Rx.recv_many(&mut LateDrain, 4096).await;
143
144 for (_, Pending) in LateDrain.drain(..) {
145 if let Ok(mut Guard) = Buffers.lock() {
146 let Slot = Guard.entry(Pending.Channel).or_default();
147 Slot.push_str(&Pending.Value);
148 }
149 }
150
151 let HandleForFlush = HandleSlot.clone().unwrap_or_else(|| Handle.clone());
153
154 let Snapshots = {
155 match Buffers.lock() {
156 Ok(mut Guard) => {
157 Guard
158 .iter_mut()
159 .filter(|(_, V)| !V.is_empty())
160 .map(|(K, V)| (K.clone(), std::mem::take(V)))
161 .collect::<Vec<_>>()
162 },
163 Err(_) => continue,
164 }
165 };
166
167 for (Channel, Payload) in Snapshots {
168 FlushOne(&HandleForFlush, &Channel, &Payload);
169 }
170 }
171 });
172
173 CoalesceChannel { Sender:Tx }
174 })
175}
176
177fn FlushOne(Handle:&AppHandle, Channel:&str, Payload:&str) {
178 let _ = Handle.emit(
179 "sky://output/append",
180 json!({
181 "channel": Channel,
182 "value": Payload,
183 }),
184 );
185
186 let IsGitFamily = Channel.eq_ignore_ascii_case("git")
190 || Channel.eq_ignore_ascii_case("source control")
191 || Channel.eq_ignore_ascii_case("scm");
192
193 let LineCount = Payload.matches('\n').count();
194
195 if IsGitFamily {
196 dev_log!(
197 "grpc",
198 "[OutputChannel:{}] flush bytes={} lines~{}",
199 Channel,
200 Payload.len(),
201 LineCount
202 );
203 } else {
204 dev_log!(
205 "output-verbose",
206 "[OutputChannel] flush channel={} bytes={} lines~{}",
207 Channel,
208 Payload.len(),
209 LineCount
210 );
211 }
212}
213
214pub fn TryEnqueue(Handle:&AppHandle, Channel:String, Value:String) -> bool {
219 if IsDisabled() {
220 return false;
221 }
222
223 let Ch = GetOrInitChannel();
224
225 let _ = Ch.Sender.send((Handle.clone(), PendingAppend { Channel, Value }));
226
227 true
228}