//! Reliable TCP routing and connection management. //! //! This module implements the reliable control lane for client communication. //! It handles the initial authentication handshake and routes incoming TCP events //! from the length-delimited codec to the broader application state. use anyhow::{Context, Result}; use core_protocol::tcp_events::TcpEvent; use futures::{SinkExt, StreamExt}; use std::sync::Arc; use tokio::net::TcpStream; use tokio_serde::formats::Bincode; use tokio_serde::SymmetricallyFramed; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::{error, info, instrument, warn}; use crate::state::AppState; use crate::state::UserState; /// A type alias for the heavily-nested framed stream type, combining `LengthDelimitedCodec` and `Bincode`. type FramedStream = SymmetricallyFramed< Framed, TcpEvent, Bincode, >; /// Handles the lifecycle of a newly connected client's TCP stream. /// /// This spans an instrumented task for the connection, setting up the necessary /// framers and codecs before entering the event loop. #[instrument(skip(stream, state))] pub async fn handle_connection(stream: TcpStream, state: Arc) { let peer_addr = match stream.peer_addr() { Ok(addr) => addr, Err(e) => { error!("Failed to get peer address: {:?}", e); return; } }; info!("New connection from {}", peer_addr); // We pad the TCP stream with a length-delimited codec to guarantee frame boundaries, // avoiding fragmentation issues common in raw TCP sockets. let length_delimited = Framed::new(stream, LengthDelimitedCodec::new()); let mut framed: FramedStream = SymmetricallyFramed::new( length_delimited, Bincode::::default(), ); if let Err(e) = process_connection(&mut framed, state).await { warn!("Connection closed with error: {:?}", e); } else { info!("Connection closed cleanly"); } } /// The inner event loop that processes deserialized `TcpEvent`s from the client. /// /// # Errors /// Returns an `anyhow::Result` if deserialization fails, the connection drops unexpectedly, /// or a serialization error occurs when transmitting a response. async fn process_connection(framed: &mut FramedStream, state: Arc) -> Result<()> { while let Some(event) = framed.next().await { let event = event.context("Failed to deserialize event")?; match event { TcpEvent::AuthRequest { username } => { // REDACTED standard: we might log the username, but this is a reminder // for future sensitive items to use [REDACTED]. info!("AuthRequest received for user: {}", username); let session_token = state.generate_token(); state.active_users.insert( session_token, UserState { username: username.clone(), }, ); framed .send(TcpEvent::AuthResponse { session_token }) .await .context("Failed to send AuthResponse")?; info!("AuthResponse sent to {}", username); } _ => { warn!("Received unhandled event before auth or unsupported event"); } } } Ok(()) }