Skip to main content

DevelopmentNodeEnvironment_MicrosoftVSCodeDependency_22NodeVersion_Bundle_Clean_Debug_ElectronProfile_EsbuildCompiler_Mountain/Vine/Server/Notification/
RegisterCommand.rs

1#![allow(non_snake_case)]
2
3//! Cocoon → Mountain `registerCommand` notification.
4//! Stores the command as a `Proxied` handler in Mountain's
5//! `CommandRegistry` so subsequent `commands.executeCommand` calls get
6//! routed back to Cocoon via `$executeContributedCommand` gRPC.
7//!
8//! ## Batching
9//!
10//! Extension boot fires 1000+ `registerCommand` notifications in a tight
11//! burst. Rather than spawning one short-lived tokio task per call (and
12//! always sleeping 16 ms even for the last item), we push into a
13//! `mpsc::unbounded_channel` and a single long-lived flusher task drains
14//! it: it wakes immediately when the first item arrives, collects
15//! everything already queued via `recv_many`, then sleeps 16 ms and
16//! drains a second time to catch stragglers - then emits one batch event.
17//! The net effect is identical to the old coalescer but avoids 1000+
18//! task spawns and reduces the minimum latency to sub-millisecond for
19//! isolated commands registered after boot.
20
21use std::sync::OnceLock;
22
23use serde_json::{Value, json};
24use tauri::{AppHandle, Emitter};
25use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
26
27use crate::{
28	Environment::CommandProvider::CommandHandler,
29	Vine::Server::MountainVinegRPCService::MountainVinegRPCService,
30	dev_log,
31};
32
33struct CommandBatchChannel {
34	Sender:UnboundedSender<(AppHandle, Value)>,
35}
36
37static CMD_CHANNEL:OnceLock<CommandBatchChannel> = OnceLock::new();
38
39fn GetOrInitChannel(Handle:&AppHandle) -> &'static CommandBatchChannel {
40	CMD_CHANNEL.get_or_init(|| {
41		let (Tx, mut Rx) = unbounded_channel::<(AppHandle, Value)>();
42
43		tokio::spawn(async move {
44			let mut Buf:Vec<(AppHandle, Value)> = Vec::with_capacity(128);
45
46			loop {
47				// Block until at least one item arrives.
48				match Rx.recv().await {
49					None => break,
50					Some(First) => Buf.push(First),
51				}
52
53				// Drain everything already queued without blocking.
54				Rx.recv_many(&mut Buf, 4096).await;
55
56				// One frame - let stragglers accumulate.
57				tokio::time::sleep(std::time::Duration::from_millis(16)).await;
58
59				// Drain again after the frame window.
60				Rx.recv_many(&mut Buf, 4096).await;
61
62				if Buf.is_empty() {
63					continue;
64				}
65
66				// Emit single batch; all items share the same AppHandle.
67				let Handle = Buf[0].0.clone();
68
69				let Commands:Vec<Value> = Buf.drain(..).map(|(_, V)| V).collect();
70
71				let Count = Commands.len();
72
73				match Handle.emit("sky://command/register", json!({ "commands": Commands })) {
74					Ok(()) => {
75						dev_log!("sky-emit", "[SkyEmit] ok channel=sky://command/register batch={}", Count);
76
77						// Summary line at the default-visible `commands` tag
78						// so `Trace=short` still surfaces the boot burst as
79						// `RegisterCommand batch=N` per 16ms window instead
80						// of N hidden per-command lines under
81						// `command-register`. One line per batch is the
82						// natural granularity - matches the rate of the
83						// downstream Sky emit.
84						dev_log!("commands", "[RegisterCommand] batch={}", Count);
85					},
86					Err(E) => {
87						dev_log!(
88							"sky-emit",
89							"[SkyEmit] fail channel=sky://command/register batch={} error={}",
90							Count,
91							E
92						);
93					},
94				}
95			}
96		});
97
98		CommandBatchChannel { Sender:Tx }
99	})
100}
101
102pub async fn RegisterCommand(Service:&MountainVinegRPCService, Parameter:&Value) {
103	let CommandId = Parameter.get("commandId").and_then(Value::as_str).unwrap_or("");
104
105	dev_log!(
106		"command-register",
107		"[MountainVinegRPCService] Cocoon registered command: {}",
108		CommandId
109	);
110
111	if CommandId.is_empty() {
112		return;
113	}
114
115	let Kind = Parameter.get("kind").and_then(Value::as_str).unwrap_or("command").to_string();
116
117	if let Ok(mut Registry) = Service
118		.RunTime()
119		.Environment
120		.ApplicationState
121		.Extension
122		.Registry
123		.CommandRegistry
124		.lock()
125	{
126		Registry.insert(
127			CommandId.to_string(),
128			CommandHandler::Proxied {
129				SideCarIdentifier:"cocoon-main".to_string(),
130				CommandIdentifier:CommandId.to_string(),
131			},
132		);
133	}
134
135	let Ch = GetOrInitChannel(Service.ApplicationHandle());
136
137	let _ = Ch.Sender.send((
138		Service.ApplicationHandle().clone(),
139		json!({ "id": CommandId, "commandId": CommandId, "kind": Kind }),
140	));
141}