Skip to main content

Mountain/Vine/Server/Notification/
RegisterCommand.rs

1
2//! Cocoon → Mountain `registerCommand` notification.
3//! Stores the command as a `Proxied` handler in Mountain's
4//! `CommandRegistry` so subsequent `commands.executeCommand` calls get
5//! routed back to Cocoon via `$executeContributedCommand` gRPC.
6//!
7//! ## Batching
8//!
9//! Extension boot fires 1000+ `registerCommand` notifications in a tight
10//! burst. Rather than spawning one short-lived tokio task per call (and
11//! always sleeping 16 ms even for the last item), we push into a
12//! `mpsc::unbounded_channel` and a single long-lived flusher task drains
13//! it: it wakes immediately when the first item arrives, collects
14//! everything already queued via `recv_many`, then sleeps 16 ms and
15//! drains a second time to catch stragglers - then emits one batch event.
16//! The net effect is identical to the old coalescer but avoids 1000+
17//! task spawns and reduces the minimum latency to sub-millisecond for
18//! isolated commands registered after boot.
19
20use std::sync::OnceLock;
21
22use serde_json::{Value, json};
23use tauri::{AppHandle, Emitter};
24use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
25
26use crate::{
27	Environment::CommandProvider::CommandHandler,
28	Vine::Server::MountainVinegRPCService::MountainVinegRPCService,
29	dev_log,
30};
31
32struct CommandBatchChannel {
33	Sender:UnboundedSender<(AppHandle, Value)>,
34}
35
36static CMD_CHANNEL:OnceLock<CommandBatchChannel> = OnceLock::new();
37
38fn GetOrInitChannel(Handle:&AppHandle) -> &'static CommandBatchChannel {
39	CMD_CHANNEL.get_or_init(|| {
40		let (Tx, mut Rx) = unbounded_channel::<(AppHandle, Value)>();
41
42		tokio::spawn(async move {
43			let mut Buf:Vec<(AppHandle, Value)> = Vec::with_capacity(128);
44
45			loop {
46				// Block until at least one item arrives.
47				match Rx.recv().await {
48					None => break,
49					Some(First) => Buf.push(First),
50				}
51
52				// Drain everything already queued without blocking.
53				Rx.recv_many(&mut Buf, 4096).await;
54
55				// One frame - let stragglers accumulate.
56				tokio::time::sleep(std::time::Duration::from_millis(16)).await;
57
58				// Drain again after the frame window.
59				Rx.recv_many(&mut Buf, 4096).await;
60
61				if Buf.is_empty() {
62					continue;
63				}
64
65				// Emit single batch; all items share the same AppHandle.
66				let Handle = Buf[0].0.clone();
67
68				let Commands:Vec<Value> = Buf.drain(..).map(|(_, V)| V).collect();
69
70				let Count = Commands.len();
71
72				match Handle.emit("sky://command/register", json!({ "commands": Commands })) {
73					Ok(()) => {
74						dev_log!("sky-emit", "[SkyEmit] ok channel=sky://command/register batch={}", Count);
75
76						// Summary line at the default-visible `commands` tag
77						// so `Trace=short` still surfaces the boot burst as
78						// `RegisterCommand batch=N` per 16ms window instead
79						// of N hidden per-command lines under
80						// `command-register`. One line per batch is the
81						// natural granularity - matches the rate of the
82						// downstream Sky emit.
83						dev_log!("commands", "[RegisterCommand] batch={}", Count);
84					},
85					Err(E) => {
86						dev_log!(
87							"sky-emit",
88							"[SkyEmit] fail channel=sky://command/register batch={} error={}",
89							Count,
90							E
91						);
92					},
93				}
94			}
95		});
96
97		CommandBatchChannel { Sender:Tx }
98	})
99}
100
101pub async fn RegisterCommand(Service:&MountainVinegRPCService, Parameter:&Value) {
102	let CommandId = Parameter.get("commandId").and_then(Value::as_str).unwrap_or("");
103
104	dev_log!(
105		"command-register",
106		"[MountainVinegRPCService] Cocoon registered command: {}",
107		CommandId
108	);
109
110	if CommandId.is_empty() {
111		return;
112	}
113
114	let Kind = Parameter.get("kind").and_then(Value::as_str).unwrap_or("command").to_string();
115
116	if let Ok(mut Registry) = Service
117		.RunTime()
118		.Environment
119		.ApplicationState
120		.Extension
121		.Registry
122		.CommandRegistry
123		.lock()
124	{
125		Registry.insert(
126			CommandId.to_string(),
127			CommandHandler::Proxied {
128				SideCarIdentifier:"cocoon-main".to_string(),
129				CommandIdentifier:CommandId.to_string(),
130			},
131		);
132	}
133
134	let Ch = GetOrInitChannel(Service.ApplicationHandle());
135
136	let _ = Ch.Sender.send((
137		Service.ApplicationHandle().clone(),
138		json!({ "id": CommandId, "commandId": CommandId, "kind": Kind }),
139	));
140}