//! Async emitter tasks for T2 (uni streams) and T3 (bi streams + ack). //! //! Each emitter ticks at its own rate, opens a fresh stream per event, and //! shares a `Connection` with the rest of the simulator. T1 (datagrams) is //! driven inline by the main loop so the foreground task owns the progress //! reporting; the reliable tiers run as `tokio::spawn`ed background tasks. use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::Context; use substrate::transport::QuicMessage; use tokio::time::MissedTickBehavior; use crate::profile::{SensorSlot, generate_value}; /// UNIX-epoch microseconds — the wall-clock timestamp the simulator stamps /// into every outgoing `QuicMessage`. Substrate-side latency is computed as /// `substrate_now_us - msg.timestamp_us`, so this needs to be a real wall /// clock both ends share (NTP for two-machine; loopback otherwise). pub fn now_us() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_micros() as u64) .unwrap_or(0) } /// T2 emitter — opens a fresh uni stream per event, writes one /// `QuicMessage`, and `finish`es. Returns the count of events successfully /// delivered when `interrupted` is raised. pub async fn run_t2_emitter( conn: quinn::Connection, mut slot: SensorSlot, rate_hz: f64, interrupted: Arc, counter: Arc, ) -> u64 { let period = Duration::from_nanos((1.0e9 / rate_hz) as u64); let mut ticker = tokio::time::interval(period); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut sent: u64 = 0; let mut send = match conn.open_uni().await { Ok(s) => s, Err(e) => { tracing::warn!(error = %e, "T2 open_uni failed; emitter exiting"); return 0; } }; loop { ticker.tick().await; if interrupted.load(Ordering::SeqCst) { break; } let msg = QuicMessage { device_id: slot.device_id, sensor_id: slot.sensor_id, raw_value: generate_value(slot.sensor_type, slot.seq), timestamp_us: now_us(), sequence_number: slot.seq, sensor_type: slot.sensor_type.as_u8(), }; slot.seq = slot.seq.wrapping_add(1); if let Err(e) = send.write_all(&msg.to_bytes()).await { tracing::warn!(error = %e, "T2 write_all failed; stream closed?"); break; } sent += 1; counter.store(sent, Ordering::Relaxed); } if let Err(e) = send.finish() { tracing::warn!(error = %e, "T2 finish failed"); } sent } /// T3 emitter — opens a fresh bi-stream per command, writes the command, /// awaits the ack with a bounded timeout. Returns `(acks_received, timeouts)`. pub async fn run_t3_emitter( conn: quinn::Connection, mut slot: SensorSlot, rate_hz: f64, timeout: Duration, interrupted: Arc, sent_counter: Arc, timeout_counter: Arc, ) -> (u64, u64) { let period = Duration::from_nanos((1.0e9 / rate_hz) as u64); let mut ticker = tokio::time::interval(period); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut sent: u64 = 0; let mut timeouts: u64 = 0; loop { ticker.tick().await; if interrupted.load(Ordering::SeqCst) { break; } let cmd = QuicMessage { device_id: slot.device_id, sensor_id: slot.sensor_id, raw_value: generate_value(slot.sensor_type, slot.seq), timestamp_us: now_us(), sequence_number: slot.seq, sensor_type: slot.sensor_type.as_u8(), }; slot.seq = slot.seq.wrapping_add(1); match tokio::time::timeout(timeout, t3_one_request(&conn, &cmd)).await { Ok(Ok(_ack)) => { sent += 1; sent_counter.store(sent, Ordering::Relaxed); } Ok(Err(e)) => { tracing::warn!(error = %e, "T3 request failed"); } Err(_) => { timeouts += 1; timeout_counter.store(timeouts, Ordering::Relaxed); tracing::warn!(?timeout, "T3 ack timed out"); } } } (sent, timeouts) } /// Single T3 round-trip: open bi-stream, write 38 B command, `finish` the /// send half, read 38 B ack. Used by `run_t3_emitter`. async fn t3_one_request( conn: &quinn::Connection, cmd: &QuicMessage, ) -> anyhow::Result { let (mut send, mut recv) = conn.open_bi().await.context("T3 open_bi")?; send.write_all(&cmd.to_bytes()) .await .context("T3 write command")?; send.finish().context("T3 finish send half")?; let mut buf = [0u8; QuicMessage::WIRE_SIZE]; recv.read_exact(&mut buf).await.context("T3 read ack")?; QuicMessage::decode(&buf).context("T3 decode ack") }