diff --git a/config.toml b/config.toml index 1d4a3b9..2a51e49 100644 --- a/config.toml +++ b/config.toml @@ -12,6 +12,9 @@ server_port = 9000 server_interface = "0.0.0.0" server_cert = "certs/server.crt" server_key = "certs/server.key" +t1_capacity = 1024 +t2_capacity = 512 +t3_capacity = 256 [simulation] tick_rate_hz = 60 diff --git a/simulator/src/emitters.rs b/simulator/src/emitters.rs index 861d5ec..0288d34 100644 --- a/simulator/src/emitters.rs +++ b/simulator/src/emitters.rs @@ -38,9 +38,17 @@ pub async fn run_t2_emitter( ) -> u64 { let period = Duration::from_nanos((1.0e9 / rate_hz) as u64); let mut ticker = tokio::time::interval(period); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut sent: u64 = 0; + let mut send = match conn.open_uni().await { + Ok(s) => s, + Err(e) => { + tracing::warn!(error = %e, "T2 open_uni failed; emitter exiting"); + return 0; + } + }; + loop { ticker.tick().await; if interrupted.load(Ordering::SeqCst) { @@ -57,25 +65,19 @@ pub async fn run_t2_emitter( }; slot.seq = slot.seq.wrapping_add(1); - match conn.open_uni().await { - Ok(mut send) => { - if let Err(e) = send.write_all(&msg.to_bytes()).await { - tracing::warn!(error = %e, "T2 write_all failed"); - continue; - } - if let Err(e) = send.finish() { - tracing::warn!(error = %e, "T2 finish failed"); - continue; - } - sent += 1; - counter.store(sent, Ordering::Relaxed); - } - Err(e) => { - tracing::warn!(error = %e, "T2 open_uni failed; emitter exiting"); - break; - } + if let Err(e) = send.write_all(&msg.to_bytes()).await { + tracing::warn!(error = %e, "T2 write_all failed; stream closed?"); + break; } + + sent += 1; + counter.store(sent, Ordering::Relaxed); } + + if let Err(e) = send.finish() { + tracing::warn!(error = %e, "T2 finish failed"); + } + sent } @@ -92,7 +94,7 @@ pub async fn run_t3_emitter( ) -> (u64, u64) { let period = Duration::from_nanos((1.0e9 / rate_hz) as u64); let mut ticker = tokio::time::interval(period); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut sent: u64 = 0; let mut timeouts: u64 = 0; diff --git a/simulator/src/main.rs b/simulator/src/main.rs index 0a232ec..cb54c82 100644 --- a/simulator/src/main.rs +++ b/simulator/src/main.rs @@ -221,7 +221,7 @@ async fn main() -> anyhow::Result<()> { if cli.rate_hz > 0.0 { let period = Duration::from_nanos((1.0e9 / cli.rate_hz) as u64); let mut ticker = tokio::time::interval(period); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); let unlimited = cli.count == 0; let mut last_progress = started; diff --git a/simulator/tests/end_to_end_t1.rs b/simulator/tests/end_to_end_t1.rs index c64209f..0db8432 100644 --- a/simulator/tests/end_to_end_t1.rs +++ b/simulator/tests/end_to_end_t1.rs @@ -28,6 +28,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig { server_interface: "127.0.0.1".to_string(), server_cert: cert.to_string_lossy().into_owned(), server_key: key.to_string_lossy().into_owned(), + t1_capacity: 1024, + t2_capacity: 512, + t3_capacity: 256, } } diff --git a/simulator/tests/end_to_end_t2.rs b/simulator/tests/end_to_end_t2.rs index 881dabe..96a7f69 100644 --- a/simulator/tests/end_to_end_t2.rs +++ b/simulator/tests/end_to_end_t2.rs @@ -27,6 +27,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig { server_interface: "127.0.0.1".to_string(), server_cert: cert.to_string_lossy().into_owned(), server_key: key.to_string_lossy().into_owned(), + t1_capacity: 1024, + t2_capacity: 512, + t3_capacity: 256, } } diff --git a/simulator/tests/end_to_end_t3.rs b/simulator/tests/end_to_end_t3.rs index d86e6e0..a9153b3 100644 --- a/simulator/tests/end_to_end_t3.rs +++ b/simulator/tests/end_to_end_t3.rs @@ -28,6 +28,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig { server_interface: "127.0.0.1".to_string(), server_cert: cert.to_string_lossy().into_owned(), server_key: key.to_string_lossy().into_owned(), + t1_capacity: 1024, + t2_capacity: 512, + t3_capacity: 256, } } diff --git a/substrate/src/config.rs b/substrate/src/config.rs index 9972982..47649ca 100644 --- a/substrate/src/config.rs +++ b/substrate/src/config.rs @@ -22,6 +22,9 @@ pub struct QuicConfig { pub server_interface: String, pub server_cert: String, pub server_key: String, + pub t1_capacity: usize, + pub t2_capacity: usize, + pub t3_capacity: usize, } #[derive(Debug, Serialize, Deserialize)] @@ -41,6 +44,9 @@ impl Default for AppConfig { server_interface: "0.0.0.0".to_string(), server_cert: "certs/server.crt".to_string(), server_key: "certs/server.key".to_string(), + t1_capacity: 1024, + t2_capacity: 512, + t3_capacity: 256, }, simulation: SimulationConfig { tick_rate_hz: 60, diff --git a/substrate/src/transport/ecs.rs b/substrate/src/transport/ecs.rs index 72e295e..542a4a7 100644 --- a/substrate/src/transport/ecs.rs +++ b/substrate/src/transport/ecs.rs @@ -10,10 +10,6 @@ use crate::transport::{QuicMessage, T1Sender, T2Sender, T3Inbound, T3Sender}; use crate::transport::server::{accept_loop, bind_endpoint}; use crate::transport::state::ServerState; -const T1_CAPACITY: usize = 1024; -const T2_CAPACITY: usize = 512; -const T3_CAPACITY: usize = 256; - pub struct EcsQuicTransportPlugin; /// Receive halves of the three tier channels, wrapped so they can sit in a @@ -63,11 +59,12 @@ fn start_quic_server( impl Plugin for EcsQuicTransportPlugin { fn build(&self, app: &mut App) { + let config = app.world_mut().resource::(); // Three-tier bridge between the tokio-side QUIC accept loop and the // ECS PreUpdate ingest system (in the `world` module). - let (t1_tx, t1_rx) = mpsc::channel::(T1_CAPACITY); - let (t2_tx, t2_rx) = mpsc::channel::(T2_CAPACITY); - let (t3_tx, t3_rx) = mpsc::channel::(T3_CAPACITY); + let (t1_tx, t1_rx) = mpsc::channel::(config.network.t1_capacity); + let (t2_tx, t2_rx) = mpsc::channel::(config.network.t2_capacity); + let (t3_tx, t3_rx) = mpsc::channel::(config.network.t3_capacity); // Spawn a tokio runtime on a dedicated OS thread, ship its Handle back // to the ECS, and keep the runtime alive for the lifetime of the app diff --git a/substrate/src/world/systems.rs b/substrate/src/world/systems.rs index 14b800a..d6f00d7 100644 --- a/substrate/src/world/systems.rs +++ b/substrate/src/world/systems.rs @@ -25,6 +25,8 @@ use super::resources::{DiagnosticsState, ExportSampleState, SensorRegistry}; /// T1 batch limit per tick. Anything beyond this stays in the channel and /// either drains next tick or gets dropped on full (T1's contract is lossy). const T1_INGEST_BATCH: usize = 1024; +const T2_INGEST_BATCH: usize = 512; +const T3_INGEST_BATCH: usize = 256; /// Drain the three tier channels into ECS state. /// @@ -56,10 +58,15 @@ pub(super) fn ingest_system( // T2 — uni streams. { let mut t2 = bridge.t2.lock().unwrap(); - while let Ok(msg) = t2.try_recv() { - histogram!("substrate_latency_us", "tier" => "t2") - .record(now.saturating_sub(msg.timestamp_us) as f64); - upsert_reading(&mut registry, &mut commands, &mut q, msg); + for _ in 0..T2_INGEST_BATCH { + match t2.try_recv() { + Ok(msg) => { + histogram!("substrate_latency_us", "tier" => "t2") + .record(now.saturating_sub(msg.timestamp_us) as f64); + upsert_reading(&mut registry, &mut commands, &mut q, msg); + } + Err(_) => break, + } } } @@ -67,27 +74,32 @@ pub(super) fn ingest_system( // sensor value (NaN if we've never seen this (device, sensor) before). { let mut t3 = bridge.t3.lock().unwrap(); - while let Ok(inbound) = t3.try_recv() { - histogram!("substrate_latency_us", "tier" => "t3") - .record(now.saturating_sub(inbound.command.timestamp_us) as f64); - let key = (inbound.command.device_id, inbound.command.sensor_id); - let current_value = registry - .map - .get(&key) - .and_then(|&e| q.get(e).ok()) - .map(|d| d.raw_value) - .unwrap_or(f64::NAN); - let ack = QuicMessage { - device_id: inbound.command.device_id, - sensor_id: inbound.command.sensor_id, - raw_value: current_value, - timestamp_us: now_us(), - sequence_number: inbound.command.sequence_number, - sensor_type: inbound.command.sensor_type, - }; - // Ignore send errors: the demux task may have given up if the - // connection died while we were processing. - let _ = inbound.reply.send(ack); + for _ in 0..T3_INGEST_BATCH { + match t3.try_recv() { + Ok(inbound) => { + histogram!("substrate_latency_us", "tier" => "t3") + .record(now.saturating_sub(inbound.command.timestamp_us) as f64); + let key = (inbound.command.device_id, inbound.command.sensor_id); + let current_value = registry + .map + .get(&key) + .and_then(|&e| q.get(e).ok()) + .map(|d| d.raw_value) + .unwrap_or(f64::NAN); + let ack = QuicMessage { + device_id: inbound.command.device_id, + sensor_id: inbound.command.sensor_id, + raw_value: current_value, + timestamp_us: now_us(), + sequence_number: inbound.command.sequence_number, + sensor_type: inbound.command.sensor_type, + }; + // Ignore send errors: the demux task may have given up if the + // connection died while we were processing. + let _ = inbound.reply.send(ack); + } + Err(_) => break, + } } } }