143 lines
4.7 KiB
Rust
143 lines
4.7 KiB
Rust
//! End-to-end T1 datagram test: spin up substrate's listener in-process with
|
|
//! channels the test owns, drive a `SimulatorClient` against it, and assert
|
|
//! the datagram lands in the T1 receiver decoded.
|
|
//!
|
|
//! Run with `cargo test -p simulator`.
|
|
|
|
use std::net::SocketAddr;
|
|
use std::path::PathBuf;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::Result;
|
|
use simulator::client::SimulatorClient;
|
|
use substrate::config::QuicConfig;
|
|
use substrate::transport::server::{accept_loop, bind_endpoint};
|
|
use substrate::transport::{QuicMessage, SensorType, T1Sender, T2Sender, T3Sender};
|
|
use tokio::sync::mpsc;
|
|
use uuid::Uuid;
|
|
|
|
fn cert_path(name: &str) -> PathBuf {
|
|
[env!("CARGO_MANIFEST_DIR"), "..", "certs", name].iter().collect()
|
|
}
|
|
|
|
fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig {
|
|
QuicConfig {
|
|
// Port 0 lets the OS pick a free ephemeral port — tests can run in
|
|
// parallel without colliding on a fixed bind.
|
|
server_port: 0,
|
|
server_interface: "127.0.0.1".to_string(),
|
|
server_cert: cert.to_string_lossy().into_owned(),
|
|
server_key: key.to_string_lossy().into_owned(),
|
|
t1_capacity: 1024,
|
|
t2_capacity: 512,
|
|
t3_capacity: 256,
|
|
}
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn t1_datagram_decoded_into_ecs_channel() -> Result<()> {
|
|
simulator::install_crypto_provider();
|
|
|
|
let cert = cert_path("server.crt");
|
|
let key = cert_path("server.key");
|
|
let cfg = loopback_config(cert.clone(), key);
|
|
|
|
// Bind the substrate's listener on an ephemeral port.
|
|
let endpoint = bind_endpoint(&cfg)?;
|
|
let server_addr: SocketAddr = endpoint.local_addr()?;
|
|
|
|
// Channels the test owns — gives us direct visibility into what the T1
|
|
// demux pushes into the ECS bridge.
|
|
let (t1_tx, mut t1_rx) = mpsc::channel(64);
|
|
let (t2_tx, _t2_rx) = mpsc::channel(64);
|
|
let (t3_tx, _t3_rx) = mpsc::channel(64);
|
|
|
|
let server_task = tokio::spawn(accept_loop(
|
|
endpoint,
|
|
T1Sender::new(t1_tx),
|
|
T2Sender::new(t2_tx),
|
|
T3Sender::new(t3_tx),
|
|
));
|
|
|
|
// Connect a client and send one datagram.
|
|
let client = SimulatorClient::connect(server_addr, "localhost", &cert).await?;
|
|
|
|
let sent = QuicMessage {
|
|
device_id: Uuid::from_u128(0xdead_beef_cafe_f00d_1234_5678_90ab_cdef),
|
|
sensor_id: 7,
|
|
raw_value: 42.0,
|
|
timestamp_us: 1_700_000_000_000_001,
|
|
sequence_number: 1,
|
|
sensor_type: SensorType::Temperature.as_u8(),
|
|
};
|
|
client.send_datagram(&sent)?;
|
|
|
|
// Wait for the substrate's read_datagrams reader to push it into T1.
|
|
let received = tokio::time::timeout(Duration::from_secs(2), t1_rx.recv())
|
|
.await
|
|
.expect("did not observe T1 datagram within 2s")
|
|
.expect("T1 channel closed unexpectedly");
|
|
|
|
assert_eq!(received, sent);
|
|
|
|
client.close().await;
|
|
server_task.abort();
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn t1_burst_preserves_order_and_count() -> Result<()> {
|
|
simulator::install_crypto_provider();
|
|
|
|
let cert = cert_path("server.crt");
|
|
let key = cert_path("server.key");
|
|
let cfg = loopback_config(cert.clone(), key);
|
|
|
|
let endpoint = bind_endpoint(&cfg)?;
|
|
let server_addr: SocketAddr = endpoint.local_addr()?;
|
|
|
|
// T1 capacity 64 ≥ burst size 32 so nothing is dropped under loopback.
|
|
let (t1_tx, mut t1_rx) = mpsc::channel(64);
|
|
let (t2_tx, _t2_rx) = mpsc::channel(8);
|
|
let (t3_tx, _t3_rx) = mpsc::channel(8);
|
|
|
|
let server_task = tokio::spawn(accept_loop(
|
|
endpoint,
|
|
T1Sender::new(t1_tx),
|
|
T2Sender::new(t2_tx),
|
|
T3Sender::new(t3_tx),
|
|
));
|
|
|
|
let client = SimulatorClient::connect(server_addr, "localhost", &cert).await?;
|
|
|
|
let device = Uuid::from_u128(0xa1a2_a3a4_b5b6_b7b8_c9ca_cbcc_cdce_cfd0);
|
|
const BURST: u32 = 32;
|
|
for seq in 0..BURST {
|
|
let msg = QuicMessage {
|
|
device_id: device,
|
|
sensor_id: 0,
|
|
raw_value: f64::from(seq),
|
|
timestamp_us: 1_700_000_000_000_000 + u64::from(seq),
|
|
sequence_number: seq,
|
|
sensor_type: SensorType::Generic.as_u8(),
|
|
};
|
|
client.send_datagram(&msg)?;
|
|
}
|
|
|
|
// Drain BURST messages with a per-message timeout. Loopback shouldn't
|
|
// reorder QUIC datagrams within a single connection.
|
|
for expected_seq in 0..BURST {
|
|
let msg = tokio::time::timeout(Duration::from_secs(2), t1_rx.recv())
|
|
.await
|
|
.unwrap_or_else(|_| panic!("missed datagram seq={expected_seq}"))
|
|
.expect("T1 channel closed");
|
|
assert_eq!(msg.sequence_number, expected_seq);
|
|
assert_eq!(msg.device_id, device);
|
|
assert_eq!(msg.raw_value, f64::from(expected_seq));
|
|
}
|
|
|
|
client.close().await;
|
|
server_task.abort();
|
|
Ok(())
|
|
}
|