Skip to main content

Vine/Server/Notification/
OutputChannelCoalesce.rs

1//! Per-channel coalescing buffer for `outputChannel.append` notifications.
2//!
3//! Cocoon's Git extension emits 30+ `append` notifications per `git status`
4//! (one per `[trace] [OperationManager][...]` line, one per executed
5//! sub-command). Each one previously crossed the gRPC boundary, fired its
6//! own renderer event, and wrote its own dev_log entry. For a workspace
7//! with the Git extension actively probing on file changes, this alone
8//! accounted for ~1.9k lines of one 28k-line session log.
9//!
10//! This atom buffers appends per-channel for a short window
11//! (`COALESCE_WINDOW`) and flushes the concatenated payload as a single
12//! renderer emit + a single dev_log line. The downstream Output panel
13//! still sees identical text - just delivered in larger chunks.
14//!
15//! ## Why this is safe
16//!
17//! - Per-channel buffer means ordering is preserved within a channel.
18//! - Append-only semantics mean partial-payload visibility cannot expose torn
19//!   writes - the buffered text is always a prefix of the eventual full
20//!   payload.
21//! - The flusher task running on the tokio runtime keeps the same back-pressure
22//!   shape the per-call path had.
23//!
24//! ## Disable hook
25//!
26//! `OutputCoalesce=0` reverts to per-append emit (debugging
27//! synchronisation issues where a single append must be flushed
28//! immediately to disk).
29
30use std::{
31	collections::HashMap,
32	sync::{Arc, Mutex as StandardMutex, OnceLock},
33	time::Duration,
34};
35
36use serde_json::{Value, json};
37use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
38
39use crate::{Host::RendererEmitter, dev_log};
40
41/// Maximum delay between an append arriving and its flush to the
42/// renderer. Tuned against the FSEvents / Git-extension 16ms tick - one
43/// frame is enough for a `git status` burst to fully accumulate without
44/// introducing a human-perceptible scroll lag.
45const COALESCE_WINDOW:Duration = Duration::from_millis(50);
46
47/// Maximum buffered bytes per channel before a forced flush. Caps
48/// memory for any channel emitting unbounded text (a build extension
49/// piping `cargo build` stdout) before the timer fires.
50const MAX_BUFFERED_BYTES:usize = 64 * 1024;
51
52struct PendingAppend {
53	Channel:String,
54
55	Value:String,
56}
57
58struct CoalesceChannel {
59	Sender:UnboundedSender<(Arc<dyn RendererEmitter>, PendingAppend)>,
60}
61
62static COALESCE_CHANNEL:OnceLock<CoalesceChannel> = OnceLock::new();
63
64fn IsDisabled() -> bool { matches!(std::env::var("OutputCoalesce").as_deref(), Ok("0") | Ok("false")) }
65
66fn GetOrInitChannel() -> &'static CoalesceChannel {
67	COALESCE_CHANNEL.get_or_init(|| {
68		let (Tx, mut Rx) = unbounded_channel::<(Arc<dyn RendererEmitter>, PendingAppend)>();
69
70		tokio::spawn(async move {
71			let Buffers:StandardMutex<HashMap<String, String>> = StandardMutex::new(HashMap::new());
72
73			loop {
74				let Received = Rx.recv().await;
75
76				let (Emitter, First) = match Received {
77					None => break,
78					Some(Pair) => Pair,
79				};
80
81				{
82					let mut Guard = match Buffers.lock() {
83						Ok(G) => G,
84						Err(_) => continue,
85					};
86
87					let Slot = Guard.entry(First.Channel.clone()).or_default();
88
89					Slot.push_str(&First.Value);
90
91					if Slot.len() >= MAX_BUFFERED_BYTES {
92						let Payload = std::mem::take(Slot);
93
94						drop(Guard);
95
96						FlushOne(&Emitter, &First.Channel, &Payload);
97
98						continue;
99					}
100				}
101
102				let mut Drain:Vec<(Arc<dyn RendererEmitter>, PendingAppend)> = Vec::new();
103
104				let _ = Rx.recv_many(&mut Drain, 4096).await;
105
106				for (_, Pending) in Drain.drain(..) {
107					if let Ok(mut Guard) = Buffers.lock() {
108						let Slot = Guard.entry(Pending.Channel).or_default();
109
110						Slot.push_str(&Pending.Value);
111					}
112				}
113
114				tokio::time::sleep(COALESCE_WINDOW).await;
115
116				let mut LateDrain:Vec<(Arc<dyn RendererEmitter>, PendingAppend)> = Vec::new();
117
118				let _ = Rx.recv_many(&mut LateDrain, 4096).await;
119
120				for (_, Pending) in LateDrain.drain(..) {
121					if let Ok(mut Guard) = Buffers.lock() {
122						let Slot = Guard.entry(Pending.Channel).or_default();
123
124						Slot.push_str(&Pending.Value);
125					}
126				}
127
128				let EmitterForFlush = Emitter.clone();
129
130				let Snapshots = {
131					match Buffers.lock() {
132						Ok(mut Guard) => {
133							Guard
134								.iter_mut()
135								.filter(|(_, V)| !V.is_empty())
136								.map(|(K, V)| (K.clone(), std::mem::take(V)))
137								.collect::<Vec<_>>()
138						},
139						Err(_) => continue,
140					}
141				};
142
143				for (Channel, Payload) in Snapshots {
144					FlushOne(&EmitterForFlush, &Channel, &Payload);
145				}
146			}
147		});
148
149		CoalesceChannel { Sender:Tx }
150	})
151}
152
153fn FlushOne(Emitter:&Arc<dyn RendererEmitter>, Channel:&str, Payload:&str) {
154	Emitter.Emit(
155		"sky://output/append",
156		json!({
157			"channel": Channel,
158			"value": Payload,
159		}),
160	);
161
162	let IsGitFamily = Channel.eq_ignore_ascii_case("git")
163		|| Channel.eq_ignore_ascii_case("source control")
164		|| Channel.eq_ignore_ascii_case("scm");
165
166	let LineCount = Payload.matches('\n').count();
167
168	if IsGitFamily {
169		dev_log!(
170			"grpc",
171			"[OutputChannel:{}] flush bytes={} lines~{}",
172			Channel,
173			Payload.len(),
174			LineCount
175		);
176	} else {
177		dev_log!(
178			"output-verbose",
179			"[OutputChannel] flush channel={} bytes={} lines~{}",
180			Channel,
181			Payload.len(),
182			LineCount
183		);
184	}
185}
186
187/// Submit a pending append for coalescing. Returns `true` when the
188/// item was enqueued (the coalescer will flush within
189/// `COALESCE_WINDOW`), `false` when coalescing is disabled and the
190/// caller must flush inline.
191pub fn TryEnqueue(Emitter:Arc<dyn RendererEmitter>, Channel:String, Value:String) -> bool {
192	if IsDisabled() {
193		return false;
194	}
195
196	let Ch = GetOrInitChannel();
197
198	let _ = Ch.Sender.send((Emitter, PendingAppend { Channel, Value }));
199
200	true
201}