351 lines
12 KiB
Rust
351 lines
12 KiB
Rust
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::{Context, anyhow};
|
|
use metrics::counter;
|
|
use quinn::{
|
|
Connection, Endpoint, Incoming, RecvStream, SendStream, ServerConfig, StreamId, TransportConfig,
|
|
};
|
|
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
|
|
use tokio::sync::oneshot;
|
|
|
|
use crate::config::QuicConfig;
|
|
use crate::transport::{QuicMessage, T1Sender, T2Sender, T3Inbound, T3Sender};
|
|
|
|
/// Datagram receive buffer in bytes. Sized to absorb microbursts at the
|
|
/// telemetry rates.
|
|
const DATAGRAM_RECV_BUFFER_BYTES: usize = 256 * 1024;
|
|
|
|
/// Load the cert chain + private key from disk and build a Quinn `ServerConfig`.
|
|
pub fn build_server_config(cfg: &QuicConfig) -> anyhow::Result<ServerConfig> {
|
|
let cert_pem = std::fs::read(&cfg.server_cert)
|
|
.with_context(|| format!("read server_cert at {}", cfg.server_cert))?;
|
|
let key_pem = std::fs::read(&cfg.server_key)
|
|
.with_context(|| format!("read server_key at {}", cfg.server_key))?;
|
|
|
|
let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut cert_pem.as_slice())
|
|
.collect::<Result<_, _>>()
|
|
.with_context(|| format!("parse PEM certs at {}", cfg.server_cert))?;
|
|
if certs.is_empty() {
|
|
return Err(anyhow!("no certificates found in {}", cfg.server_cert));
|
|
}
|
|
|
|
let key: PrivateKeyDer<'static> = rustls_pemfile::private_key(&mut key_pem.as_slice())
|
|
.with_context(|| format!("parse PEM key at {}", cfg.server_key))?
|
|
.ok_or_else(|| anyhow!("no private key found in {}", cfg.server_key))?;
|
|
|
|
let mut server_config =
|
|
ServerConfig::with_single_cert(certs, key).context("build Quinn ServerConfig")?;
|
|
|
|
// Explicit transport config so the values driving evaluation are visible
|
|
// in source and at startup, not buried in Quinn's defaults.
|
|
let mut transport = TransportConfig::default();
|
|
transport.datagram_receive_buffer_size(Some(DATAGRAM_RECV_BUFFER_BYTES));
|
|
server_config.transport = Arc::new(transport);
|
|
|
|
tracing::info!(
|
|
datagram_recv_buffer_bytes = DATAGRAM_RECV_BUFFER_BYTES,
|
|
"Quinn TransportConfig tuned"
|
|
);
|
|
|
|
Ok(server_config)
|
|
}
|
|
|
|
/// Bind the listener. Must be called from inside a tokio runtime context
|
|
/// (Quinn relies on `Handle::current()` internally).
|
|
pub fn bind_endpoint(cfg: &QuicConfig) -> anyhow::Result<Endpoint> {
|
|
let server_config = build_server_config(cfg)?;
|
|
let addr: SocketAddr = format!("{}:{}", cfg.server_interface, cfg.server_port)
|
|
.parse()
|
|
.with_context(|| {
|
|
format!(
|
|
"invalid bind address {}:{}",
|
|
cfg.server_interface, cfg.server_port
|
|
)
|
|
})?;
|
|
Endpoint::server(server_config, addr).context("Endpoint::server bind")
|
|
}
|
|
|
|
/// Accept loop: per-connection senders are cloned from the tier handles and
|
|
/// shipped into `handle_incoming` for orchestration.
|
|
pub async fn accept_loop(endpoint: Endpoint, t1: T1Sender, t2: T2Sender, t3: T3Sender) {
|
|
tracing::info!(local = ?endpoint.local_addr().ok(), "QUIC accept loop running");
|
|
while let Some(incoming) = endpoint.accept().await {
|
|
let t1 = t1.clone();
|
|
let t2 = t2.clone();
|
|
let t3 = t3.clone();
|
|
tokio::spawn(handle_incoming(incoming, t1, t2, t3));
|
|
}
|
|
tracing::info!("QUIC accept loop exited");
|
|
}
|
|
|
|
/// Per-connection orchestrator. Performs the handshake and spawns one reader
|
|
/// per tier, then waits for the connection to close and joins the readers.
|
|
async fn handle_incoming(incoming: Incoming, t1: T1Sender, t2: T2Sender, t3: T3Sender) {
|
|
let conn = match incoming.await {
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "handshake failed");
|
|
return;
|
|
}
|
|
};
|
|
let remote = conn.remote_address();
|
|
tracing::info!(?remote, "connection established");
|
|
|
|
// One task per tier — fully wired across T1/T2/T3.
|
|
let dgram_task = tokio::spawn(read_datagrams(conn.clone(), t1));
|
|
let uni_task = tokio::spawn(read_uni_streams(conn.clone(), t2));
|
|
let bi_task = tokio::spawn(accept_bi_streams(conn.clone(), t3));
|
|
|
|
let _ = conn.closed().await;
|
|
|
|
if let Err(e) = dgram_task.await {
|
|
tracing::warn!(?remote, error = %e, "T1 datagram task ended unexpectedly");
|
|
}
|
|
if let Err(e) = uni_task.await {
|
|
tracing::warn!(?remote, error = %e, "T2 uni stream task ended unexpectedly");
|
|
}
|
|
if let Err(e) = bi_task.await {
|
|
tracing::warn!(?remote, error = %e, "T3 bi stream task ended unexpectedly");
|
|
}
|
|
tracing::info!(?remote, "connection closed");
|
|
}
|
|
|
|
/// T1 — read QUIC datagrams, decode each as a fixed-size `QuicMessage`, push
|
|
/// into the lossy T1 channel.
|
|
async fn read_datagrams(conn: Connection, t1: T1Sender) {
|
|
let remote = conn.remote_address();
|
|
let mut received: u64 = 0;
|
|
let mut dropped: u64 = 0;
|
|
let mut decode_errors: u64 = 0;
|
|
|
|
loop {
|
|
match conn.read_datagram().await {
|
|
Ok(bytes) => match QuicMessage::decode(&bytes[..]) {
|
|
Ok(msg) => {
|
|
received += 1;
|
|
counter!("substrate_received_total", "tier" => "t1").increment(1);
|
|
if !t1.send_lossy(msg) {
|
|
dropped += 1;
|
|
counter!("substrate_dropped_total", "tier" => "t1").increment(1);
|
|
tracing::trace!(?remote, "T1 channel full, datagram dropped");
|
|
}
|
|
}
|
|
Err(e) => {
|
|
decode_errors += 1;
|
|
counter!("substrate_decode_errors_total", "tier" => "t1").increment(1);
|
|
tracing::warn!(
|
|
?remote,
|
|
len = bytes.len(),
|
|
error = %e,
|
|
"T1 datagram decode failed"
|
|
);
|
|
}
|
|
},
|
|
Err(e) => {
|
|
tracing::debug!(
|
|
?remote,
|
|
received,
|
|
dropped,
|
|
decode_errors,
|
|
error = %e,
|
|
"T1 datagram reader ended"
|
|
);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// T2 — accept unidirectional streams. Each accepted stream gets its own task
|
|
/// reading 38-byte chunks until EOF (one stream may carry one event or many).
|
|
/// Cross-stream interleaving is allowed; ordering is only guaranteed *within*
|
|
/// a stream, matching QUIC's stream semantics.
|
|
async fn read_uni_streams(conn: Connection, t2: T2Sender) {
|
|
let remote = conn.remote_address();
|
|
let mut streams_accepted: u64 = 0;
|
|
|
|
loop {
|
|
let recv = match conn.accept_uni().await {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
tracing::debug!(
|
|
?remote,
|
|
streams_accepted,
|
|
error = %e,
|
|
"T2 uni accept loop ended"
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
streams_accepted += 1;
|
|
let t2 = t2.clone();
|
|
tokio::spawn(read_one_uni_stream(remote, recv, t2));
|
|
}
|
|
}
|
|
|
|
/// Per-stream worker for T2. Reads fixed-size `QuicMessage`s back-to-back,
|
|
/// awaits backpressure on the T2 channel, and resets the stream on a decode
|
|
/// failure (one corrupt stream shouldn't take down the whole connection).
|
|
async fn read_one_uni_stream(remote: SocketAddr, mut recv: RecvStream, t2: T2Sender) {
|
|
let stream_id: StreamId = recv.id();
|
|
let mut buf = [0u8; QuicMessage::WIRE_SIZE];
|
|
let mut count: u64 = 0;
|
|
|
|
loop {
|
|
match recv.read_exact(&mut buf).await {
|
|
Ok(()) => match QuicMessage::decode(&buf) {
|
|
Ok(msg) => {
|
|
count += 1;
|
|
counter!("substrate_received_total", "tier" => "t2").increment(1);
|
|
if t2.send(msg).await.is_err() {
|
|
// T2 receiver dropped (substrate shutting down).
|
|
tracing::warn!(
|
|
?remote,
|
|
?stream_id,
|
|
count,
|
|
"T2 channel closed; abandoning stream"
|
|
);
|
|
return;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
counter!("substrate_decode_errors_total", "tier" => "t2").increment(1);
|
|
tracing::warn!(
|
|
?remote,
|
|
?stream_id,
|
|
count,
|
|
error = %e,
|
|
"T2 decode failed; resetting stream"
|
|
);
|
|
let _ = recv.stop(0u32.into());
|
|
return;
|
|
}
|
|
},
|
|
Err(e) => {
|
|
tracing::trace!(
|
|
?remote,
|
|
?stream_id,
|
|
count,
|
|
error = %e,
|
|
"T2 uni stream ended"
|
|
);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// T3 — accept bidirectional streams. Each stream is one command/ack
|
|
/// exchange, modeled per the paper's "per-command oneshot channels": the
|
|
/// reader pushes a `T3Inbound { command, reply }` to the ECS, awaits the
|
|
/// response on `reply_rx`, and writes it back on the same stream.
|
|
async fn accept_bi_streams(conn: Connection, t3: T3Sender) {
|
|
let remote = conn.remote_address();
|
|
let mut streams_accepted: u64 = 0;
|
|
|
|
loop {
|
|
let (send, recv) = match conn.accept_bi().await {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
tracing::debug!(
|
|
?remote,
|
|
streams_accepted,
|
|
error = %e,
|
|
"T3 bi accept loop ended"
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
streams_accepted += 1;
|
|
let t3 = t3.clone();
|
|
tokio::spawn(read_one_bi_stream(remote, send, recv, t3));
|
|
}
|
|
}
|
|
|
|
/// Per-stream worker for T3. Reads exactly one command, ships it with a
|
|
/// `oneshot::Sender` to the ECS, awaits the reply, writes it back. If the
|
|
/// ECS drops the oneshot (no handler installed), the stream is reset so the
|
|
/// client sees an explicit reset instead of a half-open stream.
|
|
async fn read_one_bi_stream(
|
|
remote: SocketAddr,
|
|
mut send: SendStream,
|
|
mut recv: RecvStream,
|
|
t3: T3Sender,
|
|
) {
|
|
let stream_id: StreamId = recv.id();
|
|
|
|
let mut buf = [0u8; QuicMessage::WIRE_SIZE];
|
|
if let Err(e) = recv.read_exact(&mut buf).await {
|
|
tracing::trace!(
|
|
?remote,
|
|
?stream_id,
|
|
error = %e,
|
|
"T3: incomplete command read; closing"
|
|
);
|
|
return;
|
|
}
|
|
let command = match QuicMessage::decode(&buf) {
|
|
Ok(m) => m,
|
|
Err(e) => {
|
|
counter!("substrate_decode_errors_total", "tier" => "t3").increment(1);
|
|
tracing::warn!(
|
|
?remote,
|
|
?stream_id,
|
|
error = %e,
|
|
"T3 command decode failed; resetting stream"
|
|
);
|
|
let _ = recv.stop(0u32.into());
|
|
let _ = send.reset(0u32.into());
|
|
return;
|
|
}
|
|
};
|
|
counter!("substrate_received_total", "tier" => "t3").increment(1);
|
|
|
|
let (reply_tx, reply_rx) = oneshot::channel::<QuicMessage>();
|
|
let inbound = T3Inbound {
|
|
command,
|
|
reply: reply_tx,
|
|
};
|
|
if t3.send(inbound).await.is_err() {
|
|
tracing::warn!(?remote, ?stream_id, "T3 channel closed; abandoning command");
|
|
let _ = send.reset(0u32.into());
|
|
return;
|
|
}
|
|
|
|
let response = match reply_rx.await {
|
|
Ok(msg) => msg,
|
|
Err(_) => {
|
|
// ECS dropped the oneshot. With M4's handler installed this
|
|
// shouldn't happen normally; if it does, the stream is reset so
|
|
// the client sees a clean signal.
|
|
counter!("substrate_t3_no_handler_total").increment(1);
|
|
tracing::debug!(
|
|
?remote,
|
|
?stream_id,
|
|
"T3: no handler for command, resetting stream"
|
|
);
|
|
let _ = send.reset(0u32.into());
|
|
return;
|
|
}
|
|
};
|
|
|
|
if let Err(e) = send.write_all(&response.to_bytes()).await {
|
|
tracing::warn!(
|
|
?remote,
|
|
?stream_id,
|
|
error = %e,
|
|
"T3 ack write failed"
|
|
);
|
|
return;
|
|
}
|
|
if let Err(e) = send.finish() {
|
|
tracing::warn!(
|
|
?remote,
|
|
?stream_id,
|
|
error = %e,
|
|
"T3 ack finish failed"
|
|
);
|
|
}
|
|
}
|