1use std::{
7 collections::{HashMap, VecDeque},
8 sync::Arc,
9 time::{Duration, Instant, SystemTime},
10};
11
12use log::{debug, error, info, trace, warn};
13use serde::{Deserialize, Serialize};
14use tokio::{
15 sync::{Mutex as AsyncMutex, RwLock},
16 time::interval,
17};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DashboardConfig {
22 pub update_interval_ms:u64,
23 pub metrics_retention_hours:u64,
24 pub alert_threshold_ms:u64,
25 pub trace_sampling_rate:f64,
26 pub max_traces_stored:usize,
27}
28
29impl Default for DashboardConfig {
30 fn default() -> Self {
31 Self {
32 update_interval_ms:5000,
34 metrics_retention_hours:24,
36 alert_threshold_ms:1000,
38 trace_sampling_rate:0.1,
40 max_traces_stored:1000,
42 }
43 }
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub enum MetricType {
49 MessageProcessingTime,
50 ConnectionLatency,
51 MemoryUsage,
52 CpuUsage,
53 NetworkThroughput,
54 ErrorRate,
55 QueueSize,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct PerformanceMetric {
61 pub metric_type:MetricType,
62 pub value:f64,
63 pub timestamp:u64,
64 pub channel:Option<String>,
65 pub tags:HashMap<String, String>,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct TraceSpan {
71 pub trace_id:String,
72 pub span_id:String,
73 pub parent_span_id:Option<String>,
74 pub operation_name:String,
75 pub start_time:u64,
76 pub end_time:Option<u64>,
77 pub duration_ms:Option<u64>,
78 pub tags:HashMap<String, String>,
79 pub logs:Vec<TraceLog>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct TraceLog {
85 pub timestamp:u64,
86 pub message:String,
87 pub level:LogLevel,
88 pub fields:HashMap<String, String>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93pub enum LogLevel {
94 Debug,
95 Info,
96 Warn,
97 Error,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct PerformanceAlert {
103 pub alert_id:String,
104 pub metric_type:MetricType,
105 pub threshold:f64,
106 pub current_value:f64,
107 pub timestamp:u64,
108 pub channel:Option<String>,
109 pub severity:AlertSeverity,
110 pub message:String,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub enum AlertSeverity {
116 Low,
117 Medium,
118 High,
119 Critical,
120}
121
122pub struct PerformanceDashboard {
124 config:DashboardConfig,
125 metrics:Arc<RwLock<VecDeque<PerformanceMetric>>>,
126 traces:Arc<RwLock<HashMap<String, TraceSpan>>>,
127 alerts:Arc<RwLock<VecDeque<PerformanceAlert>>>,
128 statistics:Arc<RwLock<DashboardStatistics>>,
129 is_running:Arc<AsyncMutex<bool>>,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct DashboardStatistics {
135 pub total_metrics_collected:u64,
136 pub total_traces_collected:u64,
137 pub total_alerts_triggered:u64,
138 pub average_processing_time_ms:f64,
139 pub peak_processing_time_ms:u64,
140 pub error_rate_percentage:f64,
141 pub throughput_messages_per_second:f64,
142 pub memory_usage_mb:f64,
143 pub last_update:u64,
144}
145
146impl PerformanceDashboard {
147 pub fn new(config:DashboardConfig) -> Self {
149 let config_clone = config.clone();
150 let dashboard = Self {
151 config,
152 metrics:Arc::new(RwLock::new(VecDeque::new())),
153 traces:Arc::new(RwLock::new(HashMap::new())),
154 alerts:Arc::new(RwLock::new(VecDeque::new())),
155 statistics:Arc::new(RwLock::new(DashboardStatistics {
156 total_metrics_collected:0,
157 total_traces_collected:0,
158 total_alerts_triggered:0,
159 average_processing_time_ms:0.0,
160 peak_processing_time_ms:0,
161 error_rate_percentage:0.0,
162 throughput_messages_per_second:0.0,
163 memory_usage_mb:0.0,
164 last_update:SystemTime::now()
165 .duration_since(SystemTime::UNIX_EPOCH)
166 .unwrap_or_default()
167 .as_secs(),
168 })),
169 is_running:Arc::new(AsyncMutex::new(false)),
170 };
171
172 info!(
173 "[PerformanceDashboard] Created dashboard with {}ms update interval",
174 config_clone.update_interval_ms
175 );
176
177 dashboard
178 }
179
180 pub async fn start(&self) -> Result<(), String> {
182 {
183 let mut running = self.is_running.lock().await;
184 if *running {
185 return Ok(());
187 }
188 *running = true;
189 }
190
191 self.start_metrics_collection().await;
193
194 self.start_alert_monitoring().await;
196
197 self.start_data_cleanup().await;
199
200 info!("[PerformanceDashboard] Performance dashboard started");
201 Ok(())
202 }
203
204 pub async fn stop(&self) -> Result<(), String> {
206 {
207 let mut running = self.is_running.lock().await;
208 if !*running {
209 return Ok(());
211 }
212 *running = false;
213 }
214
215 {
217 let mut metrics = self.metrics.write().await;
218 metrics.clear();
219 }
220
221 {
222 let mut traces = self.traces.write().await;
223 traces.clear();
224 }
225
226 {
227 let mut alerts = self.alerts.write().await;
228 alerts.clear();
229 }
230
231 info!("[PerformanceDashboard] Performance dashboard stopped");
232 Ok(())
233 }
234
235 pub async fn record_metric(&self, metric:PerformanceMetric) {
237 let mut metrics = self.metrics.write().await;
238 metrics.push_back(metric.clone());
239
240 self.update_statistics().await;
242
243 self.check_alerts(&metric).await;
245
246 trace!("[PerformanceDashboard] Recorded metric: {:?}", metric.metric_type);
247 }
248
249 pub async fn start_trace_span(&self, operation_name:String) -> TraceSpan {
251 let trace_id = Self::generate_trace_id();
252 let span_id = Self::generate_span_id();
253
254 let span = TraceSpan {
255 trace_id:trace_id.clone(),
256 span_id:span_id.clone(),
257 parent_span_id:None,
258 operation_name,
259 start_time:SystemTime::now()
260 .duration_since(SystemTime::UNIX_EPOCH)
261 .unwrap_or_default()
262 .as_millis() as u64,
263 end_time:None,
264 duration_ms:None,
265 tags:HashMap::new(),
266 logs:Vec::new(),
267 };
268
269 {
271 let mut traces = self.traces.write().await;
272 traces.insert(span_id.clone(), span.clone());
273 }
274
275 {
277 let mut stats = self.statistics.write().await;
278 stats.total_traces_collected += 1;
279 }
280
281 span
282 }
283
284 pub async fn end_trace_span(&self, span_id:&str) -> Result<(), String> {
286 let mut traces = self.traces.write().await;
287
288 if let Some(mut span) = traces.get_mut(span_id) {
289 let end_time = SystemTime::now()
290 .duration_since(SystemTime::UNIX_EPOCH)
291 .unwrap_or_default()
292 .as_millis() as u64;
293
294 span.end_time = Some(end_time);
295 span.duration_ms = Some(end_time.saturating_sub(span.start_time));
296
297 trace!(
298 "[PerformanceDashboard] Ended trace span: {} (duration: {}ms)",
299 span.operation_name,
300 span.duration_ms.unwrap_or(0)
301 );
302
303 Ok(())
304 } else {
305 Err(format!("Trace span not found: {}", span_id))
306 }
307 }
308
309 pub async fn add_trace_log(&self, span_id:&str, log:TraceLog) -> Result<(), String> {
311 let mut traces = self.traces.write().await;
312
313 if let Some(span) = traces.get_mut(span_id) {
314 span.logs.push(log);
315 Ok(())
316 } else {
317 Err(format!("Trace span not found: {}", span_id))
318 }
319 }
320
321 async fn start_metrics_collection(&self) {
323 let dashboard = Arc::new(self.clone());
324
325 tokio::spawn(async move {
326 let mut interval = interval(Duration::from_millis(dashboard.config.update_interval_ms));
327
328 while *dashboard.is_running.lock().await {
329 interval.tick().await;
330
331 dashboard.collect_system_metrics().await;
333
334 dashboard.update_statistics().await;
336 }
337 });
338 }
339
340 async fn start_alert_monitoring(&self) {
342 let dashboard = Arc::new(self.clone());
343
344 tokio::spawn(async move {
345 let mut interval = interval(Duration::from_secs(10));
346
347 while *dashboard.is_running.lock().await {
348 interval.tick().await;
349
350 dashboard.check_performance_alerts().await;
352 }
353 });
354 }
355
356 async fn start_data_cleanup(&self) {
358 let dashboard = Arc::new(self.clone());
359
360 tokio::spawn(async move {
361 let mut interval = interval(Duration::from_secs(3600));
363
364 while *dashboard.is_running.lock().await {
365 interval.tick().await;
366
367 dashboard.cleanup_old_data().await;
369 }
370 });
371 }
372
373 async fn collect_system_metrics(&self) {
375 if let Ok(memory_usage) = Self::get_memory_usage() {
377 let metric = PerformanceMetric {
378 metric_type:MetricType::MemoryUsage,
379 value:memory_usage,
380 timestamp:SystemTime::now()
381 .duration_since(SystemTime::UNIX_EPOCH)
382 .unwrap_or_default()
383 .as_millis() as u64,
384 channel:None,
385 tags:HashMap::new(),
386 };
387
388 self.record_metric(metric).await;
389 }
390
391 if let Ok(cpu_usage) = Self::get_cpu_usage() {
393 let metric = PerformanceMetric {
394 metric_type:MetricType::CpuUsage,
395 value:cpu_usage,
396 timestamp:SystemTime::now()
397 .duration_since(SystemTime::UNIX_EPOCH)
398 .unwrap_or_default()
399 .as_millis() as u64,
400 channel:None,
401 tags:HashMap::new(),
402 };
403
404 self.record_metric(metric).await;
405 }
406 }
407
408 async fn update_statistics(&self) {
410 let metrics = self.metrics.read().await;
411 let mut stats = self.statistics.write().await;
412
413 let processing_metrics:Vec<&PerformanceMetric> = metrics
415 .iter()
416 .filter(|m| matches!(m.metric_type, MetricType::MessageProcessingTime))
417 .collect();
418
419 if !processing_metrics.is_empty() {
420 let total_time:f64 = processing_metrics.iter().map(|m| m.value).sum();
421 stats.average_processing_time_ms = total_time / processing_metrics.len() as f64;
422
423 stats.peak_processing_time_ms = processing_metrics.iter().map(|m| m.value as u64).max().unwrap_or(0);
424 }
425
426 let error_metrics:Vec<&PerformanceMetric> = metrics
428 .iter()
429 .filter(|m| matches!(m.metric_type, MetricType::ErrorRate))
430 .collect();
431
432 if !error_metrics.is_empty() {
433 let total_errors:f64 = error_metrics.iter().map(|m| m.value).sum();
434 stats.error_rate_percentage = total_errors / error_metrics.len() as f64;
435 }
436
437 let throughput_metrics:Vec<&PerformanceMetric> = metrics
439 .iter()
440 .filter(|m| matches!(m.metric_type, MetricType::NetworkThroughput))
441 .collect();
442
443 if !throughput_metrics.is_empty() {
444 let total_throughput:f64 = throughput_metrics.iter().map(|m| m.value).sum();
445 stats.throughput_messages_per_second = total_throughput / throughput_metrics.len() as f64;
446 }
447
448 let memory_metrics:Vec<&PerformanceMetric> = metrics
450 .iter()
451 .filter(|m| matches!(m.metric_type, MetricType::MemoryUsage))
452 .collect();
453
454 if !memory_metrics.is_empty() {
455 let total_memory:f64 = memory_metrics.iter().map(|m| m.value).sum();
456 stats.memory_usage_mb = total_memory / memory_metrics.len() as f64;
457 }
458
459 stats.last_update = SystemTime::now()
460 .duration_since(SystemTime::UNIX_EPOCH)
461 .unwrap_or_default()
462 .as_secs();
463 }
464
465 async fn check_alerts(&self, metric:&PerformanceMetric) {
467 let threshold = match metric.metric_type {
468 MetricType::MessageProcessingTime => self.config.alert_threshold_ms as f64,
469 MetricType::ErrorRate => 5.0,
471 MetricType::MemoryUsage => 1024.0,
473 MetricType::CpuUsage => 90.0,
475 _ => return,
477 };
478
479 if metric.value > threshold {
480 let severity = match metric.value / threshold {
481 ratio if ratio > 5.0 => AlertSeverity::Critical,
482 ratio if ratio > 3.0 => AlertSeverity::High,
483 ratio if ratio > 2.0 => AlertSeverity::Medium,
484 _ => AlertSeverity::Low,
485 };
486
487 let alert = PerformanceAlert {
488 alert_id:Self::generate_alert_id(),
489 metric_type:metric.metric_type.clone(),
490 threshold,
491 current_value:metric.value,
492 timestamp:metric.timestamp,
493 channel:metric.channel.clone(),
494 severity,
495 message:format!(
496 "{} exceeded threshold: {} > {}",
497 Self::metric_type_name(&metric.metric_type),
498 metric.value,
499 threshold
500 ),
501 };
502
503 {
504 let mut alerts = self.alerts.write().await;
505 alerts.push_back(alert.clone());
506 }
507
508 {
509 let mut stats = self.statistics.write().await;
510 stats.total_alerts_triggered += 1;
511 }
512
513 warn!("[PerformanceDashboard] Alert triggered: {}", alert.message);
514 }
515 }
516
517 async fn check_performance_alerts(&self) {
519 debug!("[PerformanceDashboard] Checking performance alerts");
522 }
523
524 async fn cleanup_old_data(&self) {
526 let retention_threshold = SystemTime::now()
527 .duration_since(SystemTime::UNIX_EPOCH)
528 .unwrap_or_default()
529 .as_secs()
530 - (self.config.metrics_retention_hours * 3600);
531
532 {
534 let mut metrics = self.metrics.write().await;
535 metrics.retain(|m| m.timestamp >= retention_threshold);
536 }
537
538 {
540 let mut traces = self.traces.write().await;
541 traces.retain(|_, span| span.start_time >= retention_threshold);
542
543 if traces.len() > self.config.max_traces_stored {
545 let excess = traces.len() - self.config.max_traces_stored;
546 let keys_to_remove:Vec<String> = traces.keys().take(excess).cloned().collect();
547
548 for key in keys_to_remove {
549 traces.remove(&key);
550 }
551 }
552 }
553
554 {
556 let mut alerts = self.alerts.write().await;
557 alerts.retain(|a| a.timestamp >= retention_threshold);
558 }
559
560 debug!("[PerformanceDashboard] Cleaned up old data");
561 }
562
563 fn get_memory_usage() -> Result<f64, String> {
565 Ok(100.0)
570 }
571
572 fn get_cpu_usage() -> Result<f64, String> {
574 Ok(25.0)
579 }
580
581 fn generate_trace_id() -> String { uuid::Uuid::new_v4().to_string() }
583
584 fn generate_span_id() -> String { uuid::Uuid::new_v4().to_string() }
586
587 fn generate_alert_id() -> String { uuid::Uuid::new_v4().to_string() }
589
590 fn metric_type_name(metric_type:&MetricType) -> &'static str {
592 match metric_type {
593 MetricType::MessageProcessingTime => "Message Processing Time",
594 MetricType::ConnectionLatency => "Connection Latency",
595 MetricType::MemoryUsage => "Memory Usage",
596 MetricType::CpuUsage => "CPU Usage",
597 MetricType::NetworkThroughput => "Network Throughput",
598 MetricType::ErrorRate => "Error Rate",
599 MetricType::QueueSize => "Queue Size",
600 }
601 }
602
603 pub async fn get_statistics(&self) -> DashboardStatistics { self.statistics.read().await.clone() }
605
606 pub async fn get_recent_metrics(&self, limit:usize) -> Vec<PerformanceMetric> {
608 let metrics = self.metrics.read().await;
609 metrics.iter().rev().take(limit).cloned().collect()
610 }
611
612 pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
614 let alerts = self.alerts.read().await;
615 alerts.iter().rev().cloned().collect()
616 }
617
618 pub async fn get_trace(&self, trace_id:&str) -> Option<TraceSpan> {
620 let traces = self.traces.read().await;
621 traces.values().find(|span| span.trace_id == trace_id).cloned()
622 }
623
624 pub fn default_dashboard() -> Self { Self::new(DashboardConfig::default()) }
626
627 pub fn high_frequency_dashboard() -> Self {
629 Self::new(DashboardConfig {
630 update_interval_ms:1000,
632 metrics_retention_hours:1,
634 alert_threshold_ms:500,
636 trace_sampling_rate:1.0,
638 max_traces_stored:5000,
640 })
641 }
642}
643
644impl Clone for PerformanceDashboard {
645 fn clone(&self) -> Self {
646 Self {
647 config:self.config.clone(),
648 metrics:self.metrics.clone(),
649 traces:self.traces.clone(),
650 alerts:self.alerts.clone(),
651 statistics:self.statistics.clone(),
652 is_running:Arc::new(AsyncMutex::new(false)),
653 }
654 }
655}
656
657impl PerformanceDashboard {
659 pub fn create_metric(
661 metric_type:MetricType,
662 value:f64,
663 channel:Option<String>,
664 tags:HashMap<String, String>,
665 ) -> PerformanceMetric {
666 PerformanceMetric {
667 metric_type,
668 value,
669 timestamp:SystemTime::now()
670 .duration_since(SystemTime::UNIX_EPOCH)
671 .unwrap_or_default()
672 .as_millis() as u64,
673 channel,
674 tags,
675 }
676 }
677
678 pub fn create_trace_log(message:String, level:LogLevel, fields:HashMap<String, String>) -> TraceLog {
680 TraceLog {
681 timestamp:SystemTime::now()
682 .duration_since(SystemTime::UNIX_EPOCH)
683 .unwrap_or_default()
684 .as_millis() as u64,
685 message,
686 level,
687 fields,
688 }
689 }
690
691 pub fn calculate_performance_score(average_processing_time:f64, error_rate:f64, throughput:f64) -> f64 {
693 let time_score = 100.0 / (1.0 + average_processing_time / 100.0);
695 let error_score = 100.0 * (1.0 - error_rate / 100.0);
696 let throughput_score = throughput / 1000.0;
699
700 (time_score * 0.4 + error_score * 0.4 + throughput_score * 0.2)
701 .max(0.0)
702 .min(100.0)
703 }
704
705 pub fn format_metric_value(metric_type:&MetricType, value:f64) -> String {
707 match metric_type {
708 MetricType::MessageProcessingTime => format!("{:.2}ms", value),
709 MetricType::ConnectionLatency => format!("{:.2}ms", value),
710 MetricType::MemoryUsage => format!("{:.2}MB", value),
711 MetricType::CpuUsage => format!("{:.2}%", value),
712 MetricType::NetworkThroughput => format!("{:.2} msg/s", value),
713 MetricType::ErrorRate => format!("{:.2}%", value),
714 MetricType::QueueSize => format!("{:.0}", value),
715 }
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[tokio::test]
724 async fn test_performance_dashboard_creation() {
725 let dashboard = PerformanceDashboard::default_dashboard();
726 assert_eq!(dashboard.config.update_interval_ms, 5000);
727 }
728
729 #[tokio::test]
730 async fn test_metric_recording() {
731 let dashboard = PerformanceDashboard::default_dashboard();
732 dashboard.start().await.unwrap();
733
734 let metric = PerformanceDashboard::create_metric(
735 MetricType::MessageProcessingTime,
736 150.0,
737 Some("test_channel".to_string()),
738 HashMap::new(),
739 );
740
741 dashboard.record_metric(metric.clone()).await;
742
743 let recent_metrics = dashboard.get_recent_metrics(10).await;
744 assert!(!recent_metrics.is_empty());
745
746 dashboard.stop().await.unwrap();
747 }
748
749 #[tokio::test]
750 async fn test_trace_span_management() {
751 let dashboard = PerformanceDashboard::default_dashboard();
752 dashboard.start().await.unwrap();
753
754 let span = dashboard.start_trace_span("test_operation".to_string()).await;
755 assert_eq!(span.operation_name, "test_operation");
756
757 dashboard.end_trace_span(&span.span_id).await.unwrap();
758
759 let trace = dashboard.get_trace(&span.trace_id).await;
760 assert!(trace.is_some());
761
762 dashboard.stop().await.unwrap();
763 }
764
765 #[tokio::test]
766 async fn test_alert_generation() {
767 let dashboard = PerformanceDashboard::default_dashboard();
768 dashboard.start().await.unwrap();
769
770 let metric = PerformanceDashboard::create_metric(
772 MetricType::MessageProcessingTime,
773 2000.0,
775 None,
776 HashMap::new(),
777 );
778
779 dashboard.record_metric(metric).await;
780
781 let alerts = dashboard.get_active_alerts().await;
782 assert!(!alerts.is_empty());
783
784 dashboard.stop().await.unwrap();
785 }
786
787 #[test]
788 fn test_performance_score_calculation() {
789 let score = PerformanceDashboard::calculate_performance_score(50.0, 2.0, 500.0);
790 assert!(score >= 0.0 && score <= 100.0);
791 }
792
793 #[test]
794 fn test_metric_value_formatting() {
795 let formatted = PerformanceDashboard::format_metric_value(&MetricType::MessageProcessingTime, 123.456);
796 assert_eq!(formatted, "123.46ms");
797 }
798}