//! DSP and Voice Activity Detection (VAD) thread. //! //! Pulls audio from the lock-free ringbuffer, applies WebRTC noise suppression //! and echo cancellation, then checks for voice activity before signalling //! the UI via a `tokio::sync::watch` channel. //! //! This thread is a dedicated `std::thread` (not a Tokio task) because //! real-time audio processing must never be at the mercy of a cooperative //! async scheduler. use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::Duration; use ringbuf::HeapCons; use ringbuf::traits::{Consumer, Observer}; use tokio::sync::watch; use tracing::info; use webrtc_audio_processing::Processor; use webrtc_audio_processing_config::{ Config, EchoCanceller, NoiseSuppression, NoiseSuppressionLevel, }; use super::{FRAME_SIZE, SAMPLE_RATE}; /// RMS threshold below which a frame is considered silence. /// This provides a simple amplitude-based VAD since the WebRTC v2 API /// removed the standalone voice detection configuration. const VAD_RMS_THRESHOLD: f32 = 0.01; /// Spawns the dedicated background DSP thread. /// /// Reads 960-sample frames from the ringbuffer, applies WebRTC /// noise suppression + echo cancellation, and updates the active /// speaker state via the provided watch channel. pub fn spawn_dsp_thread( mut consumer: HeapCons, ptt_flag: Arc, active_speaker_tx: watch::Sender, ) { thread::spawn(move || { info!("DSP thread started."); let ap = match Processor::new(SAMPLE_RATE) { Ok(ap) => ap, Err(e) => { tracing::error!("Failed to initialize WebRTC APM: {:?}", e); return; } }; let config = Config { echo_canceller: Some(EchoCanceller::default()), noise_suppression: Some(NoiseSuppression { level: NoiseSuppressionLevel::High, analyze_linear_aec_output: false, }), ..Default::default() }; ap.set_config(config); // Mono capture: one channel with FRAME_SIZE samples. let mut frame_buf = vec![vec![0.0f32; FRAME_SIZE]]; loop { // Wait until we have a full 20ms frame (960 samples at 48kHz). if consumer.occupied_len() >= FRAME_SIZE { let _ = consumer.pop_slice(&mut frame_buf[0]); let is_transmitting = ptt_flag.load(Ordering::Relaxed); // Run the WebRTC DSP pipeline on the capture frame. if let Err(e) = ap.process_capture_frame(&mut frame_buf) { tracing::warn!("APM processing failed: {:?}", e); } // Simple RMS-based VAD since webrtc-audio-processing v2 // removed the dedicated VoiceDetection config field. let rms = compute_rms(&frame_buf[0]); let has_voice = rms > VAD_RMS_THRESHOLD; let should_transmit = is_transmitting && has_voice; // Only update the watch channel when the state actually changes // to avoid unnecessary UI repaints. if *active_speaker_tx.borrow() != should_transmit { let _ = active_speaker_tx.send(should_transmit); } } else { thread::sleep(Duration::from_millis(2)); } } }); } /// Computes the Root Mean Square (RMS) of a sample buffer. /// /// Used as a lightweight VAD: if the RMS is below a threshold, /// the frame is considered silence. fn compute_rms(samples: &[f32]) -> f32 { if samples.is_empty() { return 0.0; } let sum_sq: f32 = samples.iter().map(|s| s * s).sum(); #[allow(clippy::cast_precision_loss)] // FRAME_SIZE (960) is well within f32's 23-bit mantissa. let divisor = samples.len() as f32; (sum_sq / divisor).sqrt() }