Skip to main content

Mountain/Vine/Server/Notification/
ProgressReport.rs

1
2//! Cocoon → Mountain `progress.report` notification.
3//!
4//! The git extension alone fires 6000+ of these per session. We push into
5//! an `mpsc::unbounded_channel`; a single long-lived flusher task wakes on
6//! the first item, drains everything queued, sleeps 16 ms (one frame), drains
7//! again, then emits one batched Tauri event per progress handle with the
8//! accumulated `increment` and latest non-empty `message`. Zero spawns per
9//! call; sub-millisecond first-wake; single event per handle per frame.
10
11use std::sync::OnceLock;
12
13use serde_json::{Value, json};
14use tauri::{AppHandle, Emitter};
15use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
16
17use crate::{Vine::Server::MountainVinegRPCService::MountainVinegRPCService, dev_log};
18
19struct ProgressItem {
20	Handle:AppHandle,
21
22	ProgressHandle:String,
23
24	Message:String,
25
26	Increment:f64,
27}
28
29struct ProgressChannel {
30	Sender:UnboundedSender<ProgressItem>,
31}
32
33static PROGRESS_CH:OnceLock<ProgressChannel> = OnceLock::new();
34
35fn GetOrInitChannel(Handle:&AppHandle) -> &'static ProgressChannel {
36	PROGRESS_CH.get_or_init(|| {
37		let (Tx, mut Rx) = unbounded_channel::<ProgressItem>();
38
39		tokio::spawn(async move {
40			let mut Buf:Vec<ProgressItem> = Vec::with_capacity(64);
41
42			loop {
43				match Rx.recv().await {
44					None => break,
45					Some(Item) => Buf.push(Item),
46				}
47
48				Rx.recv_many(&mut Buf, 4096).await;
49
50				tokio::time::sleep(std::time::Duration::from_millis(16)).await;
51
52				Rx.recv_many(&mut Buf, 4096).await;
53
54				if Buf.is_empty() {
55					continue;
56				}
57
58				// Merge per-handle: latest non-empty message, summed increments.
59				let mut ByHandle:std::collections::HashMap<String, (AppHandle, String, f64)> =
60					std::collections::HashMap::new();
61
62				for Item in Buf.drain(..) {
63					let Entry = ByHandle
64						.entry(Item.ProgressHandle.clone())
65						.or_insert_with(|| (Item.Handle.clone(), String::new(), 0.0));
66
67					if !Item.Message.is_empty() {
68						Entry.1 = Item.Message;
69					}
70
71					Entry.2 += Item.Increment;
72				}
73
74				for (ProgressHandleId, (AppHandle, Message, Increment)) in ByHandle {
75					if let Err(E) = AppHandle.emit(
76						"sky://notification/progress-update",
77						json!({
78							"id": ProgressHandleId,
79							"message": Message,
80							"increment": Increment,
81						}),
82					) {
83						dev_log!(
84							"grpc",
85							"warn: [ProgressReport] emit failed handle={} error={}",
86							ProgressHandleId,
87							E
88						);
89					}
90				}
91			}
92		});
93
94		ProgressChannel { Sender:Tx }
95	})
96}
97
98pub async fn ProgressReport(Service:&MountainVinegRPCService, Parameter:&Value) {
99	let ProgressHandle = Parameter.get("handle").and_then(Value::as_str).unwrap_or("").to_string();
100
101	let Message = Parameter.get("message").and_then(Value::as_str).unwrap_or("").to_string();
102
103	let Increment = Parameter.get("increment").and_then(Value::as_f64).unwrap_or(0.0);
104
105	let Ch = GetOrInitChannel(Service.ApplicationHandle());
106
107	let _ =
108		Ch.Sender
109			.send(ProgressItem { Handle:Service.ApplicationHandle().clone(), ProgressHandle, Message, Increment });
110}