Vine/Server/Notification/
OutputChannelCoalesce.rs1use std::{
31 collections::HashMap,
32 sync::{Arc, Mutex as StandardMutex, OnceLock},
33 time::Duration,
34};
35
36use serde_json::{Value, json};
37use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
38
39use crate::{Host::RendererEmitter, dev_log};
40
41const COALESCE_WINDOW:Duration = Duration::from_millis(50);
46
47const MAX_BUFFERED_BYTES:usize = 64 * 1024;
51
52struct PendingAppend {
53 Channel:String,
54
55 Value:String,
56}
57
58struct CoalesceChannel {
59 Sender:UnboundedSender<(Arc<dyn RendererEmitter>, PendingAppend)>,
60}
61
62static COALESCE_CHANNEL:OnceLock<CoalesceChannel> = OnceLock::new();
63
64fn IsDisabled() -> bool { matches!(std::env::var("OutputCoalesce").as_deref(), Ok("0") | Ok("false")) }
65
66fn GetOrInitChannel() -> &'static CoalesceChannel {
67 COALESCE_CHANNEL.get_or_init(|| {
68 let (Tx, mut Rx) = unbounded_channel::<(Arc<dyn RendererEmitter>, PendingAppend)>();
69
70 tokio::spawn(async move {
71 let Buffers:StandardMutex<HashMap<String, String>> = StandardMutex::new(HashMap::new());
72
73 loop {
74 let Received = Rx.recv().await;
75
76 let (Emitter, First) = match Received {
77 None => break,
78 Some(Pair) => Pair,
79 };
80
81 {
82 let mut Guard = match Buffers.lock() {
83 Ok(G) => G,
84 Err(_) => continue,
85 };
86
87 let Slot = Guard.entry(First.Channel.clone()).or_default();
88
89 Slot.push_str(&First.Value);
90
91 if Slot.len() >= MAX_BUFFERED_BYTES {
92 let Payload = std::mem::take(Slot);
93
94 drop(Guard);
95
96 FlushOne(&Emitter, &First.Channel, &Payload);
97
98 continue;
99 }
100 }
101
102 let mut Drain:Vec<(Arc<dyn RendererEmitter>, PendingAppend)> = Vec::new();
103
104 let _ = Rx.recv_many(&mut Drain, 4096).await;
105
106 for (_, Pending) in Drain.drain(..) {
107 if let Ok(mut Guard) = Buffers.lock() {
108 let Slot = Guard.entry(Pending.Channel).or_default();
109
110 Slot.push_str(&Pending.Value);
111 }
112 }
113
114 tokio::time::sleep(COALESCE_WINDOW).await;
115
116 let mut LateDrain:Vec<(Arc<dyn RendererEmitter>, PendingAppend)> = Vec::new();
117
118 let _ = Rx.recv_many(&mut LateDrain, 4096).await;
119
120 for (_, Pending) in LateDrain.drain(..) {
121 if let Ok(mut Guard) = Buffers.lock() {
122 let Slot = Guard.entry(Pending.Channel).or_default();
123
124 Slot.push_str(&Pending.Value);
125 }
126 }
127
128 let EmitterForFlush = Emitter.clone();
129
130 let Snapshots = {
131 match Buffers.lock() {
132 Ok(mut Guard) => {
133 Guard
134 .iter_mut()
135 .filter(|(_, V)| !V.is_empty())
136 .map(|(K, V)| (K.clone(), std::mem::take(V)))
137 .collect::<Vec<_>>()
138 },
139 Err(_) => continue,
140 }
141 };
142
143 for (Channel, Payload) in Snapshots {
144 FlushOne(&EmitterForFlush, &Channel, &Payload);
145 }
146 }
147 });
148
149 CoalesceChannel { Sender:Tx }
150 })
151}
152
153fn FlushOne(Emitter:&Arc<dyn RendererEmitter>, Channel:&str, Payload:&str) {
154 Emitter.Emit(
155 "sky://output/append",
156 json!({
157 "channel": Channel,
158 "value": Payload,
159 }),
160 );
161
162 let IsGitFamily = Channel.eq_ignore_ascii_case("git")
163 || Channel.eq_ignore_ascii_case("source control")
164 || Channel.eq_ignore_ascii_case("scm");
165
166 let LineCount = Payload.matches('\n').count();
167
168 if IsGitFamily {
169 dev_log!(
170 "grpc",
171 "[OutputChannel:{}] flush bytes={} lines~{}",
172 Channel,
173 Payload.len(),
174 LineCount
175 );
176 } else {
177 dev_log!(
178 "output-verbose",
179 "[OutputChannel] flush channel={} bytes={} lines~{}",
180 Channel,
181 Payload.len(),
182 LineCount
183 );
184 }
185}
186
187pub fn TryEnqueue(Emitter:Arc<dyn RendererEmitter>, Channel:String, Value:String) -> bool {
192 if IsDisabled() {
193 return false;
194 }
195
196 let Ch = GetOrInitChannel();
197
198 let _ = Ch.Sender.send((Emitter, PendingAppend { Channel, Value }));
199
200 true
201}