Skip to main content

Mountain/Vine/Server/Notification/
OutputChannelCoalesce.rs

1
2//! Per-channel coalescing buffer for `outputChannel.append` notifications.
3//!
4//! Cocoon's Git extension emits 30+ `append` notifications per `git status`
5//! (one per `[trace] [OperationManager][...]` line, one per executed
6//! sub-command). Each one previously:
7//!
8//!   1. Crossed the Cocoon → Mountain gRPC notification boundary.
9//!   2. Fired its own `Tauri::Emitter::emit("sky://output/append")` round-trip.
10//!   3. Wrote its own dev_log entry.
11//!
12//! For a workspace with the Git extension actively probing repo state on
13//! file changes, the volume of `[OperationManager]` traces alone accounted
14//! for ~1.9k lines of one 28k-line session log.
15//!
16//! This atom buffers appends per-channel for a short window
17//! (`COALESCE_WINDOW`) and flushes the concatenated payload as a single
18//! Sky emit + a single dev_log line. The downstream Output panel still
19//! sees identical text - just delivered in larger chunks - which matches
20//! the user-perceived UX of an output channel (it scrolls in chunks, not
21//! character-by-character).
22//!
23//! ## Why this is safe
24//!
25//! - Per-channel buffer means ordering is preserved within a channel.
26//! - Append-only semantics mean partial-payload visibility cannot expose torn
27//!   writes - the buffered text is always a prefix of the eventual full
28//!   payload.
29//! - `Tauri::Emitter` serialises emits per channel; the flush task running on
30//!   the tokio runtime keeps the same back-pressure shape the per-call path
31//!   had.
32//!
33//! ## Disable hook
34//!
35//! `OutputCoalesce=0` reverts to per-append emit (debugging
36//! synchronisation issues where a single append must be flushed
37//! immediately to disk).
38
39use std::{
40	collections::HashMap,
41	sync::{Mutex as StandardMutex, OnceLock},
42	time::Duration,
43};
44
45use serde_json::{Value, json};
46use tauri::{AppHandle, Emitter};
47use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
48
49use crate::dev_log;
50
51/// Maximum delay between an append arriving and its flush to Sky. Tuned
52/// against the FSEvents / Git-extension 16ms tick - one frame is enough
53/// for a `git status` burst to fully accumulate without introducing a
54/// human-perceptible scroll lag.
55const COALESCE_WINDOW:Duration = Duration::from_millis(50);
56
57/// Maximum buffered bytes per channel before a forced flush. Caps memory
58/// for any channel emitting unbounded text (a build extension piping
59/// `cargo build` stdout) before the timer fires.
60const MAX_BUFFERED_BYTES:usize = 64 * 1024;
61
62struct PendingAppend {
63	Channel:String,
64
65	Value:String,
66}
67
68struct CoalesceChannel {
69	Sender:UnboundedSender<(AppHandle, PendingAppend)>,
70}
71
72static COALESCE_CHANNEL:OnceLock<CoalesceChannel> = OnceLock::new();
73
74fn IsDisabled() -> bool { matches!(std::env::var("OutputCoalesce").as_deref(), Ok("0") | Ok("false")) }
75
76fn GetOrInitChannel() -> &'static CoalesceChannel {
77	COALESCE_CHANNEL.get_or_init(|| {
78		let (Tx, mut Rx) = unbounded_channel::<(AppHandle, PendingAppend)>();
79
80		tokio::spawn(async move {
81			// Per-channel pending buffer. Two-level map: channel name →
82			// accumulated text. AppHandle is shared across channels (one
83			// per process) so we stash it alongside.
84			let Buffers:StandardMutex<HashMap<String, String>> = StandardMutex::new(HashMap::new());
85
86			loop {
87				let Received = Rx.recv().await;
88
89				let (Handle, First) = match Received {
90					None => break,
91					Some(Pair) => Pair,
92				};
93
94				// Append to per-channel buffer.
95				{
96					let mut Guard = match Buffers.lock() {
97						Ok(G) => G,
98						Err(_) => continue,
99					};
100
101					let Slot = Guard.entry(First.Channel.clone()).or_default();
102
103					Slot.push_str(&First.Value);
104
105					if Slot.len() >= MAX_BUFFERED_BYTES {
106						let Payload = std::mem::take(Slot);
107
108						drop(Guard);
109
110						FlushOne(&Handle, &First.Channel, &Payload);
111
112						continue;
113					}
114				}
115
116				// Drain everything already queued without blocking.
117				let mut Drain:Vec<(AppHandle, PendingAppend)> = Vec::new();
118
119				let Drained = Rx.recv_many(&mut Drain, 4096).await;
120
121				let _ = Drained;
122
123				for (_, Pending) in Drain.drain(..) {
124					if let Ok(mut Guard) = Buffers.lock() {
125						let Slot = Guard.entry(Pending.Channel).or_default();
126						Slot.push_str(&Pending.Value);
127					}
128				}
129
130				// Window pause - let stragglers accumulate inside the
131				// 50 ms frame. Anything received during the sleep is
132				// drained again before flush.
133				tokio::time::sleep(COALESCE_WINDOW).await;
134
135				let mut LateDrain:Vec<(AppHandle, PendingAppend)> = Vec::new();
136
137				let _ = Rx.recv_many(&mut LateDrain, 4096).await;
138
139				for (_, Pending) in LateDrain.drain(..) {
140					if let Ok(mut Guard) = Buffers.lock() {
141						let Slot = Guard.entry(Pending.Channel).or_default();
142						Slot.push_str(&Pending.Value);
143					}
144				}
145
146				// Flush every non-empty channel.
147				let HandleForFlush = Handle.clone();
148
149				let Snapshots = {
150					match Buffers.lock() {
151						Ok(mut Guard) => {
152							Guard
153								.iter_mut()
154								.filter(|(_, V)| !V.is_empty())
155								.map(|(K, V)| (K.clone(), std::mem::take(V)))
156								.collect::<Vec<_>>()
157						},
158						Err(_) => continue,
159					}
160				};
161
162				for (Channel, Payload) in Snapshots {
163					FlushOne(&HandleForFlush, &Channel, &Payload);
164				}
165			}
166		});
167
168		CoalesceChannel { Sender:Tx }
169	})
170}
171
172fn FlushOne(Handle:&AppHandle, Channel:&str, Payload:&str) {
173	let _ = Handle.emit(
174		"sky://output/append",
175		json!({
176			"channel": Channel,
177			"value": Payload,
178		}),
179	);
180
181	// One log line per flush instead of one per append. The git
182	// channel keeps its `grpc`-tag visibility for SCM activation
183	// diagnosis; other channels stay under `output-verbose`.
184	let IsGitFamily = Channel.eq_ignore_ascii_case("git")
185		|| Channel.eq_ignore_ascii_case("source control")
186		|| Channel.eq_ignore_ascii_case("scm");
187
188	let LineCount = Payload.matches('\n').count();
189
190	if IsGitFamily {
191		dev_log!(
192			"grpc",
193			"[OutputChannel:{}] flush bytes={} lines~{}",
194			Channel,
195			Payload.len(),
196			LineCount
197		);
198	} else {
199		dev_log!(
200			"output-verbose",
201			"[OutputChannel] flush channel={} bytes={} lines~{}",
202			Channel,
203			Payload.len(),
204			LineCount
205		);
206	}
207}
208
209/// Submit a pending append for coalescing. Returns `true` when the
210/// item was enqueued (the coalescer will flush within `COALESCE_WINDOW`),
211/// `false` when coalescing is disabled and the caller must flush
212/// inline.
213pub fn TryEnqueue(Handle:&AppHandle, Channel:String, Value:String) -> bool {
214	if IsDisabled() {
215		return false;
216	}
217
218	let Ch = GetOrInitChannel();
219
220	let _ = Ch.Sender.send((Handle.clone(), PendingAppend { Channel, Value }));
221
222	true
223}