//! End-to-end T2 (unidirectional stream) tests. Mirrors the T1 harness: //! spin up substrate's listener with channels owned by the test, drive a //! `SimulatorClient` against it, assert what arrives on the T2 receiver. //! //! Run with `cargo test -p simulator`. use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::time::Duration; use anyhow::Result; use simulator::client::SimulatorClient; use substrate::config::QuicConfig; use substrate::transport::server::{accept_loop, bind_endpoint}; use substrate::transport::{QuicMessage, SensorType, T1Sender, T2Sender, T3Sender}; use tokio::sync::mpsc; use uuid::Uuid; fn cert_path(name: &str) -> PathBuf { [env!("CARGO_MANIFEST_DIR"), "..", "certs", name].iter().collect() } fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig { QuicConfig { server_port: 0, server_interface: "127.0.0.1".to_string(), server_cert: cert.to_string_lossy().into_owned(), server_key: key.to_string_lossy().into_owned(), t1_capacity: 1024, t2_capacity: 512, t3_capacity: 256, } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn t2_single_stream_preserves_order() -> Result<()> { simulator::install_crypto_provider(); let cert = cert_path("server.crt"); let key = cert_path("server.key"); let cfg = loopback_config(cert.clone(), key); let endpoint = bind_endpoint(&cfg)?; let server_addr: SocketAddr = endpoint.local_addr()?; let (t1_tx, _t1_rx) = mpsc::channel(64); let (t2_tx, mut t2_rx) = mpsc::channel(64); let (t3_tx, _t3_rx) = mpsc::channel(64); let server_task = tokio::spawn(accept_loop( endpoint, T1Sender::new(t1_tx), T2Sender::new(t2_tx), T3Sender::new(t3_tx), )); let client = SimulatorClient::connect(server_addr, "localhost", &cert).await?; let device = Uuid::from_u128(0x0011_2233_4455_6677_8899_aabb_ccdd_eeff); const N: u32 = 10; let msgs: Vec = (0..N) .map(|i| QuicMessage { device_id: device, sensor_id: 1, raw_value: f64::from(i), timestamp_us: 1_700_000_000_000_000 + u64::from(i), sequence_number: i, sensor_type: SensorType::Pressure.as_u8(), }) .collect(); client.send_uni_stream(&msgs).await?; for expected in &msgs { let received = tokio::time::timeout(Duration::from_secs(2), t2_rx.recv()) .await .expect("missed T2 message") .expect("T2 channel closed unexpectedly"); assert_eq!(received, *expected); } client.close().await; server_task.abort(); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn t2_concurrent_streams_each_internally_ordered() -> Result<()> { simulator::install_crypto_provider(); let cert = cert_path("server.crt"); let key = cert_path("server.key"); let cfg = loopback_config(cert.clone(), key); let endpoint = bind_endpoint(&cfg)?; let server_addr: SocketAddr = endpoint.local_addr()?; let (t1_tx, _t1_rx) = mpsc::channel(64); let (t2_tx, mut t2_rx) = mpsc::channel(256); let (t3_tx, _t3_rx) = mpsc::channel(64); let server_task = tokio::spawn(accept_loop( endpoint, T1Sender::new(t1_tx), T2Sender::new(t2_tx), T3Sender::new(t3_tx), )); let client = SimulatorClient::connect(server_addr, "localhost", &cert).await?; // 4 devices × 8 messages each on independent uni streams. Cross-stream // ordering may interleave; per-stream ordering must be strict. const DEVICES: usize = 4; const PER_DEVICE: u32 = 8; let device_ids: Vec = (0..DEVICES).map(|_| Uuid::new_v4()).collect(); let mut handles = Vec::with_capacity(DEVICES); for &device in &device_ids { let conn = client.conn.clone(); handles.push(tokio::spawn(async move { let msgs: Vec = (0..PER_DEVICE) .map(|i| QuicMessage { device_id: device, sensor_id: 0, raw_value: f64::from(i), timestamp_us: 1_700_000_000_000_000 + u64::from(i), sequence_number: i, sensor_type: SensorType::Generic.as_u8(), }) .collect(); // Use the connection directly so each task owns its own stream // — same wire pattern as `SimulatorClient::send_uni_stream`. let mut send = conn.open_uni().await.expect("open_uni"); for m in &msgs { send.write_all(&m.to_bytes()).await.expect("write_all"); } send.finish().expect("finish"); })); } for h in handles { h.await?; } // Drain DEVICES × PER_DEVICE messages, group by device, assert per-device // sequence numbers are strictly increasing from 0. let total = DEVICES * PER_DEVICE as usize; let mut by_device: HashMap> = HashMap::new(); for _ in 0..total { let msg = tokio::time::timeout(Duration::from_secs(2), t2_rx.recv()) .await .expect("missed T2 message") .expect("T2 channel closed unexpectedly"); by_device.entry(msg.device_id).or_default().push(msg.sequence_number); } assert_eq!(by_device.len(), DEVICES, "expected one entry per device"); for (dev, seqs) in &by_device { let expected: Vec = (0..PER_DEVICE).collect(); assert_eq!(seqs, &expected, "out-of-order or missing sequence for {dev}"); } client.close().await; server_task.abort(); Ok(()) }