156 lines
5.0 KiB
Rust
156 lines
5.0 KiB
Rust
//! End-to-end T3 (bidirectional stream + oneshot ack) tests. Same shape as
|
|
//! the T1/T2 harnesses: spin up substrate's listener with channels owned by
|
|
//! the test, run a "fake ECS" task that drains the T3 receiver and either
|
|
//! replies or drops the oneshot, and assert the client observes the right
|
|
//! behaviour.
|
|
//!
|
|
//! Run with `cargo test -p simulator`.
|
|
|
|
use std::net::SocketAddr;
|
|
use std::path::PathBuf;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::Result;
|
|
use simulator::client::SimulatorClient;
|
|
use substrate::config::QuicConfig;
|
|
use substrate::transport::server::{accept_loop, bind_endpoint};
|
|
use substrate::transport::{QuicMessage, SensorType, T1Sender, T2Sender, T3Sender};
|
|
use tokio::sync::mpsc;
|
|
use uuid::Uuid;
|
|
|
|
fn cert_path(name: &str) -> PathBuf {
|
|
[env!("CARGO_MANIFEST_DIR"), "..", "certs", name].iter().collect()
|
|
}
|
|
|
|
fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig {
|
|
QuicConfig {
|
|
server_port: 0,
|
|
server_interface: "127.0.0.1".to_string(),
|
|
server_cert: cert.to_string_lossy().into_owned(),
|
|
server_key: key.to_string_lossy().into_owned(),
|
|
t1_capacity: 1024,
|
|
t2_capacity: 512,
|
|
t3_capacity: 256,
|
|
}
|
|
}
|
|
|
|
/// Marker `timestamp_us` the fake ECS stamps onto every ack so the test can
|
|
/// distinguish a real reply from any echo of the command's own timestamp.
|
|
const ACK_MARKER_TS: u64 = 999_999_999_999;
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn t3_round_trip_with_fake_handler() -> Result<()> {
|
|
simulator::install_crypto_provider();
|
|
|
|
let cert = cert_path("server.crt");
|
|
let key = cert_path("server.key");
|
|
let cfg = loopback_config(cert.clone(), key);
|
|
|
|
let endpoint = bind_endpoint(&cfg)?;
|
|
let server_addr: SocketAddr = endpoint.local_addr()?;
|
|
|
|
let (t1_tx, _t1_rx) = mpsc::channel(64);
|
|
let (t2_tx, _t2_rx) = mpsc::channel(64);
|
|
let (t3_tx, mut t3_rx) = mpsc::channel(64);
|
|
|
|
let server_task = tokio::spawn(accept_loop(
|
|
endpoint,
|
|
T1Sender::new(t1_tx),
|
|
T2Sender::new(t2_tx),
|
|
T3Sender::new(t3_tx),
|
|
));
|
|
|
|
// Fake ECS handler: drain T3 inbounds, mark the timestamp, send back.
|
|
let handler = tokio::spawn(async move {
|
|
while let Some(inbound) = t3_rx.recv().await {
|
|
let mut ack = inbound.command;
|
|
ack.timestamp_us = ACK_MARKER_TS;
|
|
// Ignore send error (client may have disconnected before listening).
|
|
let _ = inbound.reply.send(ack);
|
|
}
|
|
});
|
|
|
|
let client = SimulatorClient::connect(server_addr, "localhost", &cert).await?;
|
|
|
|
let cmd = QuicMessage {
|
|
device_id: Uuid::from_u128(0xa5a5_a5a5_5a5a_5a5a_a5a5_5a5a_a5a5_5a5a),
|
|
sensor_id: 3,
|
|
raw_value: 1.5,
|
|
timestamp_us: 1_700_000_000_000_000,
|
|
sequence_number: 7,
|
|
sensor_type: SensorType::Voltage.as_u8(),
|
|
};
|
|
|
|
let ack = tokio::time::timeout(Duration::from_secs(2), client.request(&cmd))
|
|
.await
|
|
.expect("T3 ack timed out")?;
|
|
|
|
assert_eq!(ack.device_id, cmd.device_id, "ack should preserve device_id");
|
|
assert_eq!(ack.sensor_id, cmd.sensor_id, "ack should preserve sensor_id");
|
|
assert_eq!(
|
|
ack.sequence_number, cmd.sequence_number,
|
|
"ack should preserve sequence_number for correlation"
|
|
);
|
|
assert_eq!(ack.timestamp_us, ACK_MARKER_TS, "fake ECS should stamp the marker");
|
|
|
|
client.close().await;
|
|
handler.abort();
|
|
server_task.abort();
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn t3_no_handler_resets_stream() -> Result<()> {
|
|
simulator::install_crypto_provider();
|
|
|
|
let cert = cert_path("server.crt");
|
|
let key = cert_path("server.key");
|
|
let cfg = loopback_config(cert.clone(), key);
|
|
|
|
let endpoint = bind_endpoint(&cfg)?;
|
|
let server_addr: SocketAddr = endpoint.local_addr()?;
|
|
|
|
let (t1_tx, _t1_rx) = mpsc::channel(64);
|
|
let (t2_tx, _t2_rx) = mpsc::channel(64);
|
|
let (t3_tx, mut t3_rx) = mpsc::channel(64);
|
|
|
|
let server_task = tokio::spawn(accept_loop(
|
|
endpoint,
|
|
T1Sender::new(t1_tx),
|
|
T2Sender::new(t2_tx),
|
|
T3Sender::new(t3_tx),
|
|
));
|
|
|
|
// Fake ECS that *drops* every oneshot — simulates "no handler installed",
|
|
// which is the placeholder state in `ingest_system` until M4 lands.
|
|
let handler = tokio::spawn(async move {
|
|
while let Some(inbound) = t3_rx.recv().await {
|
|
drop(inbound);
|
|
}
|
|
});
|
|
|
|
let client = SimulatorClient::connect(server_addr, "localhost", &cert).await?;
|
|
|
|
let cmd = QuicMessage {
|
|
device_id: Uuid::new_v4(),
|
|
sensor_id: 0,
|
|
raw_value: 0.0,
|
|
timestamp_us: 0,
|
|
sequence_number: 0,
|
|
sensor_type: SensorType::Generic.as_u8(),
|
|
};
|
|
|
|
let result = tokio::time::timeout(Duration::from_secs(2), client.request(&cmd)).await;
|
|
let inner = result.expect("client.request should not hang when stream is reset");
|
|
assert!(
|
|
inner.is_err(),
|
|
"expected request to fail when substrate resets the stream, got Ok({:?})",
|
|
inner.ok()
|
|
);
|
|
|
|
client.close().await;
|
|
handler.abort();
|
|
server_task.abort();
|
|
Ok(())
|
|
}
|