Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Server/Notification/
OutputChannelCoalesce.rs

1#![allow(non_snake_case)]
2
3//! Per-channel coalescing buffer for `outputChannel.append` notifications.
4//!
5//! Cocoon's Git extension emits 30+ `append` notifications per `git status`
6//! (one per `[trace] [OperationManager][...]` line, one per executed
7//! sub-command). Each one previously:
8//!
9//!   1. Crossed the Cocoon → Mountain gRPC notification boundary.
10//!   2. Fired its own `Tauri::Emitter::emit("sky://output/append")` round-trip.
11//!   3. Wrote its own dev_log entry.
12//!
13//! For a workspace with the Git extension actively probing repo state on
14//! file changes, the volume of `[OperationManager]` traces alone accounted
15//! for ~1.9k lines of one 28k-line session log.
16//!
17//! This atom buffers appends per-channel for a short window
18//! (`COALESCE_WINDOW`) and flushes the concatenated payload as a single
19//! Sky emit + a single dev_log line. The downstream Output panel still
20//! sees identical text - just delivered in larger chunks - which matches
21//! the user-perceived UX of an output channel (it scrolls in chunks, not
22//! character-by-character).
23//!
24//! ## Why this is safe
25//!
26//! - Per-channel buffer means ordering is preserved within a channel.
27//! - Append-only semantics mean partial-payload visibility cannot expose torn
28//!   writes - the buffered text is always a prefix of the eventual full
29//!   payload.
30//! - `Tauri::Emitter` serialises emits per channel; the flush task running on
31//!   the tokio runtime keeps the same back-pressure shape the per-call path
32//!   had.
33//!
34//! ## Disable hook
35//!
36//! `OutputCoalesce=0` reverts to per-append emit (debugging
37//! synchronisation issues where a single append must be flushed
38//! immediately to disk).
39
40use std::{
41	collections::HashMap,
42	sync::{Mutex as StandardMutex, OnceLock},
43	time::Duration,
44};
45
46use serde_json::{Value, json};
47use tauri::{AppHandle, Emitter};
48use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
49
50use crate::dev_log;
51
52/// Maximum delay between an append arriving and its flush to Sky. Tuned
53/// against the FSEvents / Git-extension 16ms tick - one frame is enough
54/// for a `git status` burst to fully accumulate without introducing a
55/// human-perceptible scroll lag.
56const COALESCE_WINDOW:Duration = Duration::from_millis(50);
57
58/// Maximum buffered bytes per channel before a forced flush. Caps memory
59/// for any channel emitting unbounded text (a build extension piping
60/// `cargo build` stdout) before the timer fires.
61const MAX_BUFFERED_BYTES:usize = 64 * 1024;
62
63struct PendingAppend {
64	Channel:String,
65
66	Value:String,
67}
68
69struct CoalesceChannel {
70	Sender:UnboundedSender<(AppHandle, PendingAppend)>,
71}
72
73static COALESCE_CHANNEL:OnceLock<CoalesceChannel> = OnceLock::new();
74
75fn IsDisabled() -> bool { matches!(std::env::var("OutputCoalesce").as_deref(), Ok("0") | Ok("false")) }
76
77fn GetOrInitChannel() -> &'static CoalesceChannel {
78	COALESCE_CHANNEL.get_or_init(|| {
79		let (Tx, mut Rx) = unbounded_channel::<(AppHandle, PendingAppend)>();
80
81		tokio::spawn(async move {
82			// Per-channel pending buffer. Two-level map: channel name →
83			// accumulated text. AppHandle is shared across channels (one
84			// per process) so we stash it alongside.
85			let Buffers:StandardMutex<HashMap<String, String>> = StandardMutex::new(HashMap::new());
86
87			let mut HandleSlot:Option<AppHandle> = None;
88
89			loop {
90				let Received = Rx.recv().await;
91
92				let (Handle, First) = match Received {
93					None => break,
94					Some(Pair) => Pair,
95				};
96
97				HandleSlot = Some(Handle.clone());
98
99				// Append to per-channel buffer.
100				{
101					let mut Guard = match Buffers.lock() {
102						Ok(G) => G,
103						Err(_) => continue,
104					};
105
106					let Slot = Guard.entry(First.Channel.clone()).or_default();
107
108					Slot.push_str(&First.Value);
109
110					if Slot.len() >= MAX_BUFFERED_BYTES {
111						let Payload = std::mem::take(Slot);
112
113						drop(Guard);
114
115						FlushOne(&Handle, &First.Channel, &Payload);
116
117						continue;
118					}
119				}
120
121				// Drain everything already queued without blocking.
122				let mut Drain:Vec<(AppHandle, PendingAppend)> = Vec::new();
123
124				let Drained = Rx.recv_many(&mut Drain, 4096).await;
125
126				let _ = Drained;
127
128				for (_, Pending) in Drain.drain(..) {
129					if let Ok(mut Guard) = Buffers.lock() {
130						let Slot = Guard.entry(Pending.Channel).or_default();
131						Slot.push_str(&Pending.Value);
132					}
133				}
134
135				// Window pause - let stragglers accumulate inside the
136				// 50 ms frame. Anything received during the sleep is
137				// drained again before flush.
138				tokio::time::sleep(COALESCE_WINDOW).await;
139
140				let mut LateDrain:Vec<(AppHandle, PendingAppend)> = Vec::new();
141
142				let _ = Rx.recv_many(&mut LateDrain, 4096).await;
143
144				for (_, Pending) in LateDrain.drain(..) {
145					if let Ok(mut Guard) = Buffers.lock() {
146						let Slot = Guard.entry(Pending.Channel).or_default();
147						Slot.push_str(&Pending.Value);
148					}
149				}
150
151				// Flush every non-empty channel.
152				let HandleForFlush = HandleSlot.clone().unwrap_or_else(|| Handle.clone());
153
154				let Snapshots = {
155					match Buffers.lock() {
156						Ok(mut Guard) => {
157							Guard
158								.iter_mut()
159								.filter(|(_, V)| !V.is_empty())
160								.map(|(K, V)| (K.clone(), std::mem::take(V)))
161								.collect::<Vec<_>>()
162						},
163						Err(_) => continue,
164					}
165				};
166
167				for (Channel, Payload) in Snapshots {
168					FlushOne(&HandleForFlush, &Channel, &Payload);
169				}
170			}
171		});
172
173		CoalesceChannel { Sender:Tx }
174	})
175}
176
177fn FlushOne(Handle:&AppHandle, Channel:&str, Payload:&str) {
178	let _ = Handle.emit(
179		"sky://output/append",
180		json!({
181			"channel": Channel,
182			"value": Payload,
183		}),
184	);
185
186	// One log line per flush instead of one per append. The git
187	// channel keeps its `grpc`-tag visibility for SCM activation
188	// diagnosis; other channels stay under `output-verbose`.
189	let IsGitFamily = Channel.eq_ignore_ascii_case("git")
190		|| Channel.eq_ignore_ascii_case("source control")
191		|| Channel.eq_ignore_ascii_case("scm");
192
193	let LineCount = Payload.matches('\n').count();
194
195	if IsGitFamily {
196		dev_log!(
197			"grpc",
198			"[OutputChannel:{}] flush bytes={} lines~{}",
199			Channel,
200			Payload.len(),
201			LineCount
202		);
203	} else {
204		dev_log!(
205			"output-verbose",
206			"[OutputChannel] flush channel={} bytes={} lines~{}",
207			Channel,
208			Payload.len(),
209			LineCount
210		);
211	}
212}
213
214/// Submit a pending append for coalescing. Returns `true` when the
215/// item was enqueued (the coalescer will flush within `COALESCE_WINDOW`),
216/// `false` when coalescing is disabled and the caller must flush
217/// inline.
218pub fn TryEnqueue(Handle:&AppHandle, Channel:String, Value:String) -> bool {
219	if IsDisabled() {
220		return false;
221	}
222
223	let Ch = GetOrInitChannel();
224
225	let _ = Ch.Sender.send((Handle.clone(), PendingAppend { Channel, Value }));
226
227	true
228}