use std::net::SocketAddr; use std::sync::Arc; use anyhow::{Context, anyhow}; use metrics::counter; use quinn::{ Connection, Endpoint, Incoming, RecvStream, SendStream, ServerConfig, StreamId, TransportConfig, }; use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use tokio::sync::oneshot; use crate::config::QuicConfig; use crate::transport::{QuicMessage, T1Sender, T2Sender, T3Inbound, T3Sender}; /// Datagram receive buffer in bytes. Sized to absorb microbursts at the /// telemetry rates. const DATAGRAM_RECV_BUFFER_BYTES: usize = 256 * 1024; /// Load the cert chain + private key from disk and build a Quinn `ServerConfig`. pub fn build_server_config(cfg: &QuicConfig) -> anyhow::Result { let cert_pem = std::fs::read(&cfg.server_cert) .with_context(|| format!("read server_cert at {}", cfg.server_cert))?; let key_pem = std::fs::read(&cfg.server_key) .with_context(|| format!("read server_key at {}", cfg.server_key))?; let certs: Vec> = rustls_pemfile::certs(&mut cert_pem.as_slice()) .collect::>() .with_context(|| format!("parse PEM certs at {}", cfg.server_cert))?; if certs.is_empty() { return Err(anyhow!("no certificates found in {}", cfg.server_cert)); } let key: PrivateKeyDer<'static> = rustls_pemfile::private_key(&mut key_pem.as_slice()) .with_context(|| format!("parse PEM key at {}", cfg.server_key))? .ok_or_else(|| anyhow!("no private key found in {}", cfg.server_key))?; let mut server_config = ServerConfig::with_single_cert(certs, key).context("build Quinn ServerConfig")?; // Explicit transport config so the values driving evaluation are visible // in source and at startup, not buried in Quinn's defaults. let mut transport = TransportConfig::default(); transport.datagram_receive_buffer_size(Some(DATAGRAM_RECV_BUFFER_BYTES)); server_config.transport = Arc::new(transport); tracing::info!( datagram_recv_buffer_bytes = DATAGRAM_RECV_BUFFER_BYTES, "Quinn TransportConfig tuned" ); Ok(server_config) } /// Bind the listener. Must be called from inside a tokio runtime context /// (Quinn relies on `Handle::current()` internally). pub fn bind_endpoint(cfg: &QuicConfig) -> anyhow::Result { let server_config = build_server_config(cfg)?; let addr: SocketAddr = format!("{}:{}", cfg.server_interface, cfg.server_port) .parse() .with_context(|| { format!( "invalid bind address {}:{}", cfg.server_interface, cfg.server_port ) })?; Endpoint::server(server_config, addr).context("Endpoint::server bind") } /// Accept loop: per-connection senders are cloned from the tier handles and /// shipped into `handle_incoming` for orchestration. pub async fn accept_loop(endpoint: Endpoint, t1: T1Sender, t2: T2Sender, t3: T3Sender) { tracing::info!(local = ?endpoint.local_addr().ok(), "QUIC accept loop running"); while let Some(incoming) = endpoint.accept().await { let t1 = t1.clone(); let t2 = t2.clone(); let t3 = t3.clone(); tokio::spawn(handle_incoming(incoming, t1, t2, t3)); } tracing::info!("QUIC accept loop exited"); } /// Per-connection orchestrator. Performs the handshake and spawns one reader /// per tier, then waits for the connection to close and joins the readers. async fn handle_incoming(incoming: Incoming, t1: T1Sender, t2: T2Sender, t3: T3Sender) { let conn = match incoming.await { Ok(c) => c, Err(e) => { tracing::warn!(error = %e, "handshake failed"); return; } }; let remote = conn.remote_address(); tracing::info!(?remote, "connection established"); // One task per tier — fully wired across T1/T2/T3. let dgram_task = tokio::spawn(read_datagrams(conn.clone(), t1)); let uni_task = tokio::spawn(read_uni_streams(conn.clone(), t2)); let bi_task = tokio::spawn(accept_bi_streams(conn.clone(), t3)); let _ = conn.closed().await; if let Err(e) = dgram_task.await { tracing::warn!(?remote, error = %e, "T1 datagram task ended unexpectedly"); } if let Err(e) = uni_task.await { tracing::warn!(?remote, error = %e, "T2 uni stream task ended unexpectedly"); } if let Err(e) = bi_task.await { tracing::warn!(?remote, error = %e, "T3 bi stream task ended unexpectedly"); } tracing::info!(?remote, "connection closed"); } /// T1 — read QUIC datagrams, decode each as a fixed-size `QuicMessage`, push /// into the lossy T1 channel. async fn read_datagrams(conn: Connection, t1: T1Sender) { let remote = conn.remote_address(); let mut received: u64 = 0; let mut dropped: u64 = 0; let mut decode_errors: u64 = 0; loop { match conn.read_datagram().await { Ok(bytes) => match QuicMessage::decode(&bytes[..]) { Ok(msg) => { received += 1; counter!("substrate_received_total", "tier" => "t1").increment(1); if !t1.send_lossy(msg) { dropped += 1; counter!("substrate_dropped_total", "tier" => "t1").increment(1); tracing::trace!(?remote, "T1 channel full, datagram dropped"); } } Err(e) => { decode_errors += 1; counter!("substrate_decode_errors_total", "tier" => "t1").increment(1); tracing::warn!( ?remote, len = bytes.len(), error = %e, "T1 datagram decode failed" ); } }, Err(e) => { tracing::debug!( ?remote, received, dropped, decode_errors, error = %e, "T1 datagram reader ended" ); return; } } } } /// T2 — accept unidirectional streams. Each accepted stream gets its own task /// reading 38-byte chunks until EOF (one stream may carry one event or many). /// Cross-stream interleaving is allowed; ordering is only guaranteed *within* /// a stream, matching QUIC's stream semantics. async fn read_uni_streams(conn: Connection, t2: T2Sender) { let remote = conn.remote_address(); let mut streams_accepted: u64 = 0; loop { let recv = match conn.accept_uni().await { Ok(s) => s, Err(e) => { tracing::debug!( ?remote, streams_accepted, error = %e, "T2 uni accept loop ended" ); return; } }; streams_accepted += 1; let t2 = t2.clone(); tokio::spawn(read_one_uni_stream(remote, recv, t2)); } } /// Per-stream worker for T2. Reads fixed-size `QuicMessage`s back-to-back, /// awaits backpressure on the T2 channel, and resets the stream on a decode /// failure (one corrupt stream shouldn't take down the whole connection). async fn read_one_uni_stream(remote: SocketAddr, mut recv: RecvStream, t2: T2Sender) { let stream_id: StreamId = recv.id(); let mut buf = [0u8; QuicMessage::WIRE_SIZE]; let mut count: u64 = 0; loop { match recv.read_exact(&mut buf).await { Ok(()) => match QuicMessage::decode(&buf) { Ok(msg) => { count += 1; counter!("substrate_received_total", "tier" => "t2").increment(1); if t2.send(msg).await.is_err() { // T2 receiver dropped (substrate shutting down). tracing::warn!( ?remote, ?stream_id, count, "T2 channel closed; abandoning stream" ); return; } } Err(e) => { counter!("substrate_decode_errors_total", "tier" => "t2").increment(1); tracing::warn!( ?remote, ?stream_id, count, error = %e, "T2 decode failed; resetting stream" ); let _ = recv.stop(0u32.into()); return; } }, Err(e) => { tracing::trace!( ?remote, ?stream_id, count, error = %e, "T2 uni stream ended" ); return; } } } } /// T3 — accept bidirectional streams. Each stream is one command/ack /// exchange, modeled per the paper's "per-command oneshot channels": the /// reader pushes a `T3Inbound { command, reply }` to the ECS, awaits the /// response on `reply_rx`, and writes it back on the same stream. async fn accept_bi_streams(conn: Connection, t3: T3Sender) { let remote = conn.remote_address(); let mut streams_accepted: u64 = 0; loop { let (send, recv) = match conn.accept_bi().await { Ok(s) => s, Err(e) => { tracing::debug!( ?remote, streams_accepted, error = %e, "T3 bi accept loop ended" ); return; } }; streams_accepted += 1; let t3 = t3.clone(); tokio::spawn(read_one_bi_stream(remote, send, recv, t3)); } } /// Per-stream worker for T3. Reads exactly one command, ships it with a /// `oneshot::Sender` to the ECS, awaits the reply, writes it back. If the /// ECS drops the oneshot (no handler installed), the stream is reset so the /// client sees an explicit reset instead of a half-open stream. async fn read_one_bi_stream( remote: SocketAddr, mut send: SendStream, mut recv: RecvStream, t3: T3Sender, ) { let stream_id: StreamId = recv.id(); let mut buf = [0u8; QuicMessage::WIRE_SIZE]; if let Err(e) = recv.read_exact(&mut buf).await { tracing::trace!( ?remote, ?stream_id, error = %e, "T3: incomplete command read; closing" ); return; } let command = match QuicMessage::decode(&buf) { Ok(m) => m, Err(e) => { counter!("substrate_decode_errors_total", "tier" => "t3").increment(1); tracing::warn!( ?remote, ?stream_id, error = %e, "T3 command decode failed; resetting stream" ); let _ = recv.stop(0u32.into()); let _ = send.reset(0u32.into()); return; } }; counter!("substrate_received_total", "tier" => "t3").increment(1); let (reply_tx, reply_rx) = oneshot::channel::(); let inbound = T3Inbound { command, reply: reply_tx, }; if t3.send(inbound).await.is_err() { tracing::warn!(?remote, ?stream_id, "T3 channel closed; abandoning command"); let _ = send.reset(0u32.into()); return; } let response = match reply_rx.await { Ok(msg) => msg, Err(_) => { // ECS dropped the oneshot. With M4's handler installed this // shouldn't happen normally; if it does, the stream is reset so // the client sees a clean signal. counter!("substrate_t3_no_handler_total").increment(1); tracing::debug!( ?remote, ?stream_id, "T3: no handler for command, resetting stream" ); let _ = send.reset(0u32.into()); return; } }; if let Err(e) = send.write_all(&response.to_bytes()).await { tracing::warn!( ?remote, ?stream_id, error = %e, "T3 ack write failed" ); return; } if let Err(e) = send.finish() { tracing::warn!( ?remote, ?stream_id, error = %e, "T3 ack finish failed" ); } }