use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::time::Instant; use anyhow::{Context, anyhow}; use metrics::{counter, histogram}; use quinn::{ Connection, Endpoint, Incoming, RecvStream, ServerConfig, StreamId, TransportConfig, }; use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use tokio::sync::mpsc; use uuid::Uuid; use crate::config::QuicConfig; use crate::transport::{OutboundT3, QuicMessage, SensorType, T1Sender, T2Sender}; /// Maps each known device UUID to the QUIC `Connection` that hosts it. /// Several UUIDs typically point at the same `Connection` (one simulator /// process commonly represents multiple virtual devices). `quinn::Connection` /// is internally `Arc`-backed so cloning is cheap. /// /// Held inside an `Arc>` so the tokio readers can register on first /// message and `drain_outbound_t3` can look up routes at automation cadence. /// Critical sections are tiny sync map ops — no `.await` while the lock is /// held — so `std::sync::RwLock` is the right choice over `tokio::sync::*`. pub type ConnectionRegistry = Arc>>; pub fn new_connection_registry() -> ConnectionRegistry { Arc::new(RwLock::new(HashMap::new())) } /// Insert (device → connection) if absent. Idempotent so it can be called /// per-message without measurable cost on the hot ingest path. fn ensure_registered(registry: &ConnectionRegistry, device_id: Uuid, conn: &Connection) { let need_insert = { let guard = registry.read().unwrap(); !guard.contains_key(&device_id) }; if need_insert { registry .write() .unwrap() .entry(device_id) .or_insert_with(|| conn.clone()); } } /// Datagram receive buffer in bytes. Sized to absorb microbursts at the /// telemetry rates. const DATAGRAM_RECV_BUFFER_BYTES: usize = 256 * 1024; /// Load the cert chain + private key from disk and build a Quinn `ServerConfig`. pub fn build_server_config(cfg: &QuicConfig) -> anyhow::Result { let cert_pem = std::fs::read(&cfg.server_cert) .with_context(|| format!("read server_cert at {}", cfg.server_cert))?; let key_pem = std::fs::read(&cfg.server_key) .with_context(|| format!("read server_key at {}", cfg.server_key))?; let certs: Vec> = rustls_pemfile::certs(&mut cert_pem.as_slice()) .collect::>() .with_context(|| format!("parse PEM certs at {}", cfg.server_cert))?; if certs.is_empty() { return Err(anyhow!("no certificates found in {}", cfg.server_cert)); } let key: PrivateKeyDer<'static> = rustls_pemfile::private_key(&mut key_pem.as_slice()) .with_context(|| format!("parse PEM key at {}", cfg.server_key))? .ok_or_else(|| anyhow!("no private key found in {}", cfg.server_key))?; let mut server_config = ServerConfig::with_single_cert(certs, key).context("build Quinn ServerConfig")?; // Explicit transport config so the values driving evaluation are visible // in source and at startup, not buried in Quinn's defaults. let mut transport = TransportConfig::default(); transport.datagram_receive_buffer_size(Some(DATAGRAM_RECV_BUFFER_BYTES)); server_config.transport = Arc::new(transport); tracing::info!( datagram_recv_buffer_bytes = DATAGRAM_RECV_BUFFER_BYTES, "Quinn TransportConfig tuned" ); Ok(server_config) } /// Bind the listener. Must be called from inside a tokio runtime context /// (Quinn relies on `Handle::current()` internally). pub fn bind_endpoint(cfg: &QuicConfig) -> anyhow::Result { let server_config = build_server_config(cfg)?; let addr: SocketAddr = format!("{}:{}", cfg.server_interface, cfg.server_port) .parse() .with_context(|| { format!( "invalid bind address {}:{}", cfg.server_interface, cfg.server_port ) })?; Endpoint::server(server_config, addr).context("Endpoint::server bind") } /// Accept loop. Owns the outbound-T3 drain task and the connection registry, /// then clones per-connection state into `handle_incoming` for orchestration. /// /// The drain task is spawned exactly once for the lifetime of the listener; /// it routes ECS-issued `OutboundT3` commands to the right connection by /// looking up `target_device` in the registry that `handle_incoming` populates. /// /// Tier semantics: T1 datagrams + T2 uni streams come *in* from devices; /// T3 bi streams are server-initiated for actuator commands and go *out* /// via `drain_outbound_t3`. Devices never open bi streams to the substrate. /// /// If `synthetic_t3_rate_hz > 0`, a bench-only task drives toggling Relay /// commands at that rate through the same outbound channel — used by the /// cross-tier isolation benchmark. pub async fn accept_loop( endpoint: Endpoint, t1: T1Sender, t2: T2Sender, registry: ConnectionRegistry, outbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender, synthetic_t3_rate_hz: f64, ) { tracing::info!(local = ?endpoint.local_addr().ok(), "QUIC accept loop running"); tokio::spawn(drain_outbound_t3(registry.clone(), outbound_rx)); if synthetic_t3_rate_hz > 0.0 { tracing::info!(rate_hz = synthetic_t3_rate_hz, "synthetic T3 driver enabled"); tokio::spawn(synthetic_t3_driver( registry.clone(), outbound_tx.clone(), synthetic_t3_rate_hz, )); } drop(outbound_tx); while let Some(incoming) = endpoint.accept().await { let t1 = t1.clone(); let t2 = t2.clone(); let registry = registry.clone(); tokio::spawn(handle_incoming(incoming, t1, t2, registry)); } tracing::info!("QUIC accept loop exited"); } /// Bench-only synthetic T3 driver. Round-robins over every registered device, /// pushing a toggling Relay setpoint through the outbound channel at the /// configured rate. Exercises the same code path as `automation_system`, so /// the cross-tier-isolation bench measures the real path. async fn synthetic_t3_driver( registry: ConnectionRegistry, tx: mpsc::Sender, rate_hz: f64, ) { let period = std::time::Duration::from_nanos((1.0e9 / rate_hz) as u64); let mut ticker = tokio::time::interval(period); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut next_value = 1.0; loop { ticker.tick().await; // Snapshot device list under read lock; release before doing async work. let devices: Vec = registry.read().unwrap().keys().copied().collect(); if devices.is_empty() { continue; } for device in devices { let cmd = OutboundT3 { target_device: device, sensor_id: 6, raw_value: next_value, sensor_type: SensorType::Relay.as_u8(), }; if tx.try_send(cmd).is_err() { counter!("substrate_t3_outbound_dropped_total").increment(1); } } // Toggle for the next round so we exercise both setpoints. next_value = if next_value > 0.5 { 0.0 } else { 1.0 }; } } /// Per-connection orchestrator. Performs the handshake and spawns the T1 /// datagram + T2 uni-stream readers; T3 outbound is handled connection-wide /// by `drain_outbound_t3`. Waits for the connection to close, then purges /// the registry and joins the inbound readers. async fn handle_incoming( incoming: Incoming, t1: T1Sender, t2: T2Sender, registry: ConnectionRegistry, ) { let conn = match incoming.await { Ok(c) => c, Err(e) => { tracing::warn!(error = %e, "handshake failed"); return; } }; let remote = conn.remote_address(); let stable_id = conn.stable_id(); tracing::info!(?remote, stable_id, "connection established"); let dgram_task = tokio::spawn(read_datagrams(conn.clone(), t1, registry.clone())); let uni_task = tokio::spawn(read_uni_streams(conn.clone(), t2, registry.clone())); let _ = conn.closed().await; // Purge every device UUID that pointed at this connection. Cheap: 7 entries // for an industrial-profile simulator, occasional disconnect. registry .write() .unwrap() .retain(|_, c| c.stable_id() != stable_id); if let Err(e) = dgram_task.await { tracing::warn!(?remote, error = %e, "T1 datagram task ended unexpectedly"); } if let Err(e) = uni_task.await { tracing::warn!(?remote, error = %e, "T2 uni stream task ended unexpectedly"); } tracing::info!(?remote, "connection closed"); } /// T1 — read QUIC datagrams, decode each as a fixed-size `QuicMessage`, push /// into the lossy T1 channel. Registers the sending device in the connection /// registry on first sight so outbound T3 commands can find this connection. async fn read_datagrams(conn: Connection, t1: T1Sender, registry: ConnectionRegistry) { let remote = conn.remote_address(); let mut received: u64 = 0; let mut dropped: u64 = 0; let mut decode_errors: u64 = 0; loop { match conn.read_datagram().await { Ok(bytes) => match QuicMessage::decode(&bytes[..]) { Ok(msg) => { received += 1; counter!("substrate_received_total", "tier" => "t1").increment(1); ensure_registered(®istry, msg.device_id, &conn); if !t1.send_lossy(msg) { dropped += 1; counter!("substrate_dropped_total", "tier" => "t1").increment(1); tracing::trace!(?remote, "T1 channel full, datagram dropped"); } } Err(e) => { decode_errors += 1; counter!("substrate_decode_errors_total", "tier" => "t1").increment(1); tracing::warn!( ?remote, len = bytes.len(), error = %e, "T1 datagram decode failed" ); } }, Err(e) => { tracing::debug!( ?remote, received, dropped, decode_errors, error = %e, "T1 datagram reader ended" ); return; } } } } /// T2 — accept unidirectional streams. Each accepted stream gets its own task /// reading 38-byte chunks until EOF (one stream may carry one event or many). /// Cross-stream interleaving is allowed; ordering is only guaranteed *within* /// a stream, matching QUIC's stream semantics. async fn read_uni_streams(conn: Connection, t2: T2Sender, registry: ConnectionRegistry) { let remote = conn.remote_address(); let mut streams_accepted: u64 = 0; loop { let recv = match conn.accept_uni().await { Ok(s) => s, Err(e) => { tracing::debug!( ?remote, streams_accepted, error = %e, "T2 uni accept loop ended" ); return; } }; streams_accepted += 1; let t2 = t2.clone(); let conn = conn.clone(); let registry = registry.clone(); tokio::spawn(read_one_uni_stream(remote, recv, t2, conn, registry)); } } /// Per-stream worker for T2. Reads fixed-size `QuicMessage`s back-to-back, /// awaits backpressure on the T2 channel, and resets the stream on a decode /// failure (one corrupt stream shouldn't take down the whole connection). async fn read_one_uni_stream( remote: SocketAddr, mut recv: RecvStream, t2: T2Sender, conn: Connection, registry: ConnectionRegistry, ) { let stream_id: StreamId = recv.id(); let mut buf = [0u8; QuicMessage::WIRE_SIZE]; let mut count: u64 = 0; loop { match recv.read_exact(&mut buf).await { Ok(()) => match QuicMessage::decode(&buf) { Ok(msg) => { count += 1; counter!("substrate_received_total", "tier" => "t2").increment(1); ensure_registered(®istry, msg.device_id, &conn); if t2.send(msg).await.is_err() { // T2 receiver dropped (substrate shutting down). tracing::warn!( ?remote, ?stream_id, count, "T2 channel closed; abandoning stream" ); return; } } Err(e) => { counter!("substrate_decode_errors_total", "tier" => "t2").increment(1); tracing::warn!( ?remote, ?stream_id, count, error = %e, "T2 decode failed; resetting stream" ); let _ = recv.stop(0u32.into()); return; } }, Err(e) => { tracing::trace!( ?remote, ?stream_id, count, error = %e, "T2 uni stream ended" ); return; } } } } /// T3 outbound drain — the substrate side of the actuator-command path. /// /// Pops `OutboundT3` items the ECS produced, looks up the target device's /// connection in the registry, and **spawns one tokio task per command** to /// do the actual `open_bi() → write → finish → read_ack` round-trip. The /// drain task itself never blocks on a per-command await, so a single stuck /// `read_exact` (e.g. peer dropping mid-stream while Quinn's idle timeout /// counts down) cannot stall the pipeline. /// /// Per-stream task records `substrate_latency_us{tier="t3"}` from /// `open_bi()` start to ack-receipt and increments /// `substrate_received_total{tier="t3"}` on success. /// /// Per-`(device, sensor)` sequence numbers are owned here so the wire-level /// concerns stay out of the ECS. async fn drain_outbound_t3(registry: ConnectionRegistry, mut rx: mpsc::Receiver) { let mut seq_by_target: HashMap<(Uuid, u16), u32> = HashMap::new(); while let Some(cmd) = rx.recv().await { let conn = match registry.read().unwrap().get(&cmd.target_device).cloned() { Some(c) => c, None => { counter!("substrate_t3_outbound_no_route_total").increment(1); tracing::debug!( device = %cmd.target_device, "outbound T3: no route, dropping" ); continue; } }; let key = (cmd.target_device, cmd.sensor_id); let seq = { let s = seq_by_target.entry(key).or_insert(0); let v = *s; *s = s.wrapping_add(1); v }; let msg = QuicMessage { device_id: cmd.target_device, sensor_id: cmd.sensor_id, raw_value: cmd.raw_value, timestamp_us: now_us(), sequence_number: seq, sensor_type: cmd.sensor_type, }; // One task per command. Concurrent in-flight bi-streams are // first-class in QUIC, and this keeps the channel-drain loop hot. tokio::spawn(async move { let started = Instant::now(); match send_outbound_t3(&conn, &msg).await { Ok(ack) => { let elapsed_us = started.elapsed().as_micros() as f64; histogram!("substrate_latency_us", "tier" => "t3").record(elapsed_us); counter!("substrate_received_total", "tier" => "t3").increment(1); tracing::trace!( device = %msg.device_id, sensor_id = msg.sensor_id, raw = msg.raw_value, ack_raw = ack.raw_value, elapsed_us, "outbound T3 completed" ); } Err(e) => { counter!("substrate_t3_outbound_errors_total").increment(1); tracing::warn!( device = %msg.device_id, sensor_id = msg.sensor_id, error = %e, "outbound T3 failed" ); } } }); } tracing::info!("outbound T3 drain task exited"); } /// Single substrate-initiated T3 round-trip: open bi-stream, write command, /// finish send half, read 39-byte ack, decode. async fn send_outbound_t3(conn: &Connection, cmd: &QuicMessage) -> anyhow::Result { let (mut send, mut recv) = conn.open_bi().await.context("open_bi for outbound T3")?; send.write_all(&cmd.to_bytes()) .await .context("write outbound T3 command")?; send.finish().context("finish outbound T3 send half")?; let mut buf = [0u8; QuicMessage::WIRE_SIZE]; recv.read_exact(&mut buf) .await .context("read outbound T3 ack")?; QuicMessage::decode(&buf).context("decode outbound T3 ack") } fn now_us() -> u64 { use std::time::{SystemTime, UNIX_EPOCH}; SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_micros() as u64) .unwrap_or(0) }