Skip to main content

Vine/Server/Notification/
ProgressReport.rs

1//! Cocoon `progress.report` notification.
2//!
3//! The git extension alone fires 6000+ of these per session. Items are
4//! pushed into an `mpsc::unbounded_channel`; a single long-lived flusher
5//! task wakes on the first item, drains everything queued, sleeps one
6//! frame (16 ms), drains again, then emits one merged
7//! `sky://notification/progress-update` per progress handle. Per-handle
8//! merge: latest non-empty `message`, summed `increment`. Zero spawns
9//! per call; one renderer event per handle per frame.
10
11use std::sync::{Arc, OnceLock};
12
13use serde_json::{Value, json};
14use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
15
16use crate::{
17	Host::{RendererEmitter, VineHost},
18	dev_log,
19};
20
21struct ProgressItem {
22	Emitter:Arc<dyn RendererEmitter>,
23
24	ProgressHandle:String,
25
26	Message:String,
27
28	Increment:f64,
29}
30
31struct ProgressChannel {
32	Sender:UnboundedSender<ProgressItem>,
33}
34
35static PROGRESS_CH:OnceLock<ProgressChannel> = OnceLock::new();
36
37fn GetOrInitChannel() -> &'static ProgressChannel {
38	PROGRESS_CH.get_or_init(|| {
39		let (Tx, mut Rx) = unbounded_channel::<ProgressItem>();
40
41		tokio::spawn(async move {
42			let mut Buf:Vec<ProgressItem> = Vec::with_capacity(64);
43
44			loop {
45				match Rx.recv().await {
46					None => break,
47					Some(Item) => Buf.push(Item),
48				}
49
50				Rx.recv_many(&mut Buf, 4096).await;
51
52				tokio::time::sleep(std::time::Duration::from_millis(16)).await;
53
54				Rx.recv_many(&mut Buf, 4096).await;
55
56				if Buf.is_empty() {
57					continue;
58				}
59
60				let mut ByHandle:std::collections::HashMap<String, (Arc<dyn RendererEmitter>, String, f64)> =
61					std::collections::HashMap::new();
62
63				for Item in Buf.drain(..) {
64					let Entry = ByHandle
65						.entry(Item.ProgressHandle.clone())
66						.or_insert_with(|| (Item.Emitter.clone(), String::new(), 0.0));
67
68					if !Item.Message.is_empty() {
69						Entry.1 = Item.Message;
70					}
71
72					Entry.2 += Item.Increment;
73				}
74
75				for (ProgressHandleId, (Emitter, Message, Increment)) in ByHandle {
76					Emitter.Emit(
77						"sky://notification/progress-update",
78						json!({
79							"id": ProgressHandleId,
80							"message": Message,
81							"increment": Increment,
82						}),
83					);
84
85					dev_log!(
86						"sky-emit",
87						"[ProgressReport] emit handle={} increment={}",
88						ProgressHandleId,
89						Increment
90					);
91				}
92			}
93		});
94
95		ProgressChannel { Sender:Tx }
96	})
97}
98
99pub async fn ProgressReport(Host:&dyn VineHost, Parameter:&Value) {
100	let ProgressHandle = Parameter.get("handle").and_then(Value::as_str).unwrap_or("").to_string();
101
102	let Message = Parameter.get("message").and_then(Value::as_str).unwrap_or("").to_string();
103
104	let Increment = Parameter.get("increment").and_then(Value::as_f64).unwrap_or(0.0);
105
106	let Ch = GetOrInitChannel();
107
108	let _ = Ch
109		.Sender
110		.send(ProgressItem { Emitter:Host.RendererEmitter(), ProgressHandle, Message, Increment });
111}