First test kinda working

This commit is contained in:
Valère Plantevin
2026-05-12 11:21:40 -04:00
parent cac6c9ac02
commit d3f09ee062
36 changed files with 3903 additions and 102 deletions

View File

@@ -11,7 +11,12 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
quinn = { version = "0.11" }
rustls = { version = "0.23" }
rustls-pemfile = "2"
rustls-pki-types = "1"
tokio = { version = "1", features = ["full"] }
uuid = { version = "1.23", features = ["v4"] }
figment = { version = "0.10", features = ["toml", "env"] }
serde = { version = "1", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
metrics = "0.24"
metrics-exporter-prometheus = "0.17"
memory-stats = "1"

View File

@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
pub struct AppConfig {
pub network: QuicConfig,
pub simulation: SimulationConfig,
pub observability: ObservabilityConfig,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -23,6 +24,15 @@ pub struct QuicConfig {
pub server_key: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ObservabilityConfig {
/// When true, install the Prometheus exporter at startup. Disable for
/// environments where the metrics port collides or scraping is undesired.
pub metrics_enabled: bool,
/// Bind address for the `/metrics` HTTP listener.
pub metrics_listen: String,
}
impl Default for AppConfig {
fn default() -> Self {
Self {
@@ -36,6 +46,10 @@ impl Default for AppConfig {
tick_rate_hz: 60,
max_entities: 10000,
},
observability: ObservabilityConfig {
metrics_enabled: true,
metrics_listen: "0.0.0.0:9100".to_string(),
},
}
}
}

4
substrate/src/lib.rs Normal file
View File

@@ -0,0 +1,4 @@
pub mod config;
pub mod observability;
pub mod transport;
pub mod world;

View File

@@ -1,9 +1,10 @@
mod transport;
mod config;
use bevy::prelude::*;
use tracing_subscriber::EnvFilter;
use crate::config::AppConfig;
use substrate::config::AppConfig;
use substrate::observability::ObservabilityPlugin;
use substrate::transport;
use substrate::world::WorldPlugin;
fn main() {
tracing_subscriber::fmt()
@@ -12,12 +13,22 @@ fn main() {
)
.init();
// rustls 0.23 requires an explicit default crypto provider. Quinn's
// ServerConfig::with_single_cert otherwise panics at first use.
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("install rustls default crypto provider");
let config = AppConfig::load("config.toml").expect("Failed to load config");
tracing::info!(?config, "substrate starting");
// Plugin order matters: EcsQuicTransportPlugin inserts the TokioHandle
// resource ObservabilityPlugin reads in its `build()`.
App::new()
.insert_resource(config)
.add_plugins(MinimalPlugins)
.add_plugins(transport::ecs::EcsQuicTransportPlugin {})
.add_plugins(transport::ecs::EcsQuicTransportPlugin)
.add_plugins(WorldPlugin)
.add_plugins(ObservabilityPlugin)
.run();
}

View File

@@ -0,0 +1,116 @@
//! M5 — Prometheus-format `/metrics` exporter installation and counter
//! pre-registration.
//!
//! Counters and histograms are emitted from the demux path
//! ([`crate::transport::server`]) and the world systems
//! ([`crate::world::ingest_system`], [`crate::world::simulation_system`],
//! [`crate::world::export_system`]). This module's only job is:
//!
//! 1. Install the global metrics recorder + HTTP listener on the existing
//! tokio runtime, once at startup.
//! 2. Pre-register every counter at value 0 so panels render "0" rather than
//! "No data" before the first event of a given kind fires.
//!
//! ## Runtime telemetry
//!
//! - `substrate_received_total{tier=t1|t2|t3}` — counter
//! - `substrate_dropped_total{tier=t1}` — counter (T1 lossy)
//! - `substrate_decode_errors_total{tier=t1|t2|t3}` — counter
//! - `substrate_t3_no_handler_total` — counter
//! - `substrate_latency_us{tier=t1|t2|t3}` — histogram
//! - `substrate_tick_hz` — gauge
//! - `substrate_entities` — gauge
//! - `substrate_channel_depth{tier=t1|t2|t3}` — gauge
//! - `substrate_channel_capacity{tier=t1|t2|t3}` — gauge
//! - `substrate_rss_bytes` — gauge
//!
//! ## Digital-twin surface (operator dashboard)
//!
//! - `sensor_aggregate{type=…, stat=count|mean|min|max}` — gauge
//! - `substrate_threshold_crossings_total{type, direction}` — counter
use std::net::SocketAddr;
use bevy::prelude::*;
use metrics::counter;
use metrics_exporter_prometheus::PrometheusBuilder;
use crate::config::AppConfig;
use crate::transport::SensorType;
use crate::transport::ecs::TokioHandle;
pub struct ObservabilityPlugin;
impl Plugin for ObservabilityPlugin {
fn build(&self, app: &mut App) {
let config = app
.world()
.get_resource::<AppConfig>()
.expect("AppConfig must be inserted before ObservabilityPlugin");
if !config.observability.metrics_enabled {
tracing::info!("metrics exporter disabled by config");
return;
}
let listen: SocketAddr = config
.observability
.metrics_listen
.parse()
.expect("invalid metrics_listen address in config");
let runtime_handle = app
.world()
.get_resource::<TokioHandle>()
.expect("TokioHandle must be inserted before ObservabilityPlugin (load order: transport plugin first)")
.0
.clone();
// PrometheusBuilder::install spawns the HTTP listener via tokio::spawn,
// which requires being inside a runtime context.
let _guard = runtime_handle.enter();
PrometheusBuilder::new()
.with_http_listener(listen)
.install()
.expect("install prometheus exporter");
drop(_guard);
tracing::info!(?listen, "metrics exporter installed");
pre_register_counters();
}
}
/// Pre-register every counter at value 0 so Grafana sees a series to plot
/// even before the first event of that kind. Without this, the Prometheus
/// exporter omits any counter that has never been incremented, and panels
/// render "No data" — confusing when the metric exists, the counter is just
/// genuinely zero (e.g., `substrate_t3_no_handler_total` in normal operation).
fn pre_register_counters() {
for tier in ["t1", "t2", "t3"] {
counter!("substrate_received_total", "tier" => tier).increment(0);
counter!("substrate_decode_errors_total", "tier" => tier).increment(0);
}
counter!("substrate_dropped_total", "tier" => "t1").increment(0);
counter!("substrate_t3_no_handler_total").increment(0);
// Threshold crossings — bounded `|SensorType| × 2` cardinality, all
// pre-registered so dashboard panels show "0" instead of "No data".
for t in [
SensorType::Generic,
SensorType::Temperature,
SensorType::Humidity,
SensorType::Pressure,
SensorType::Voltage,
SensorType::Current,
] {
for direction in ["up", "down"] {
counter!(
"substrate_threshold_crossings_total",
"type" => t.label_str(),
"direction" => direction
)
.increment(0);
}
}
}

View File

@@ -1,68 +1,111 @@
use std::sync::Mutex;
use bevy::prelude::*;
use bevy::state::app::StatesPlugin;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use crate::transport::QuicMessage;
use crate::transport::server::run_substrate_server;
use crate::config::AppConfig;
use crate::transport::{QuicMessage, T1Sender, T2Sender, T3Inbound, T3Sender};
use crate::transport::server::{accept_loop, bind_endpoint};
use crate::transport::state::ServerState;
const T1_CAPACITY: usize = 1024;
const T2_CAPACITY: usize = 512;
const T3_CAPACITY: usize = 256;
pub struct EcsQuicTransportPlugin{}
pub struct EcsQuicTransportPlugin;
/// Receive halves of the three tier channels, wrapped so they can sit in a
/// Bevy `Resource`. The `world` module's ingest system is the sole reader.
#[derive(Resource)]
struct BridgeReceivers {
t1: Mutex<mpsc::Receiver<QuicMessage>>,
t2: Mutex<mpsc::Receiver<QuicMessage>>,
t3: Mutex<mpsc::Receiver<QuicMessage>>,
pub(crate) struct BridgeReceivers {
pub(crate) t1: Mutex<mpsc::Receiver<QuicMessage>>,
pub(crate) t2: Mutex<mpsc::Receiver<QuicMessage>>,
pub(crate) t3: Mutex<mpsc::Receiver<T3Inbound>>,
}
fn ingest_system(bridge: Res<BridgeReceivers>){
let mut t1 = bridge.t1.lock().unwrap();
// Tier 1: drain up to N messages, drop the rest
for _ in 0..T1_CAPACITY {
match t1.try_recv() {
Ok(msg) => { /* write RawSensorData */ }
Err(_) => break,
}
}
// T2/T3: drain completely, these are low volume
let mut t2 = bridge.t2.lock().unwrap();
while let Ok(msg) = t2.try_recv() { /* ... */ }
let mut t3 = bridge.t3.lock().unwrap();
while let Ok(msg) = t3.try_recv() { /* ... */ }
#[derive(Resource, Clone)]
pub(crate) struct BridgeSenders {
pub(crate) t1: T1Sender,
pub(crate) t2: T2Sender,
pub(crate) t3: T3Sender,
}
impl Plugin for EcsQuicTransportPlugin{
#[derive(Resource, Clone)]
pub(crate) struct TokioHandle(pub(crate) Handle);
/// Bring up the QUIC listener using the loaded `AppConfig` and transition to
/// `ServerState::Started`. Runs once via `OnEnter(ServerState::Starting)`.
fn start_quic_server(
config: Res<AppConfig>,
senders: Res<BridgeSenders>,
runtime: Res<TokioHandle>,
mut next: ResMut<NextState<ServerState>>,
) {
tracing::info!("entering ServerState::Starting — bringing up QUIC listener");
// `Endpoint::server` is sync but needs a tokio runtime context for
// `Handle::current()`; entering the runtime is enough — no async block
// required.
let _guard = runtime.0.enter();
let endpoint = bind_endpoint(&config.network).expect("failed to bind QUIC endpoint");
drop(_guard);
tracing::info!(local = ?endpoint.local_addr().ok(), "QUIC listener bound");
let s = senders.clone();
runtime.0.spawn(accept_loop(endpoint, s.t1, s.t2, s.t3));
next.set(ServerState::Started);
tracing::info!("ServerState::Started");
}
impl Plugin for EcsQuicTransportPlugin {
fn build(&self, app: &mut App) {
// Create the channels for multi-thread communication
let (t1_tx, t1_rx) =
mpsc::channel::<QuicMessage>(T1_CAPACITY);
let (t2_tx, t2_rx) =
mpsc::channel::<QuicMessage>(T2_CAPACITY);
let (t3_tx, t3_rx) =
mpsc::channel::<QuicMessage>(T3_CAPACITY);
// Three-tier bridge between the tokio-side QUIC accept loop and the
// ECS PreUpdate ingest system (in the `world` module).
let (t1_tx, t1_rx) = mpsc::channel::<QuicMessage>(T1_CAPACITY);
let (t2_tx, t2_rx) = mpsc::channel::<QuicMessage>(T2_CAPACITY);
let (t3_tx, t3_rx) = mpsc::channel::<T3Inbound>(T3_CAPACITY);
let quic_handle = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap();
// Spawn a tokio runtime on a dedicated OS thread, ship its Handle back
// to the ECS, and keep the runtime alive for the lifetime of the app
// by parking on `pending()`.
let (handle_tx, handle_rx) = std::sync::mpsc::sync_channel::<Handle>(1);
std::thread::Builder::new()
.name("quic-runtime".to_string())
.spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.thread_name("quic-worker")
.build()
.expect("build tokio runtime");
handle_tx
.send(rt.handle().clone())
.expect("send tokio Handle to ECS");
rt.block_on(std::future::pending::<()>());
})
.expect("spawn quic-runtime thread");
rt.block_on(async move {
run_substrate_server(t1_tx, t2_tx, t3_tx).await;
});
});
let handle = handle_rx.recv().expect("receive tokio Handle");
app.insert_resource(BridgeReceivers {
t1: Mutex::new(t1_rx),
t2: Mutex::new(t2_rx),
t3: Mutex::new(t3_rx),
});
app.add_systems(PreUpdate, ingest_system);
// Bevy 0.18 split state machinery into its own plugin; under
// MinimalPlugins it isn't installed by default.
app.add_plugins(StatesPlugin)
.init_state::<ServerState>()
.insert_resource(TokioHandle(handle))
.insert_resource(BridgeSenders {
t1: T1Sender::new(t1_tx),
t2: T2Sender::new(t2_tx),
t3: T3Sender::new(t3_tx),
})
.insert_resource(BridgeReceivers {
t1: Mutex::new(t1_rx),
t2: Mutex::new(t2_rx),
t3: Mutex::new(t3_rx),
})
.add_systems(OnEnter(ServerState::Starting), start_quic_server);
}
}
}

View File

@@ -1,27 +1,100 @@
pub mod ecs;
mod server;
pub mod server;
pub mod state;
/// One sensor sample on the wire.
use tokio::sync::{mpsc, oneshot};
/// Logical type of a sensor reading. Travels in `QuicMessage::sensor_type`
/// so the substrate (and any downstream dashboard) knows which units / range
/// / visualisation applies to the `raw_value`.
///
/// Fixed 38-byte little-endian layout — same on x86_64 and aarch64 (the two
/// Forward compat: unknown discriminants decode as `Generic`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
#[repr(u8)]
pub enum SensorType {
#[default]
Generic = 0,
Temperature = 1,
Humidity = 2,
Pressure = 3,
Voltage = 4,
Current = 5,
}
impl SensorType {
pub fn from_u8(b: u8) -> Self {
match b {
1 => Self::Temperature,
2 => Self::Humidity,
3 => Self::Pressure,
4 => Self::Voltage,
5 => Self::Current,
_ => Self::Generic,
}
}
pub fn as_u8(self) -> u8 {
self as u8
}
/// Lowercase label used as a Prometheus label value.
pub fn label_str(self) -> &'static str {
match self {
Self::Generic => "generic",
Self::Temperature => "temperature",
Self::Humidity => "humidity",
Self::Pressure => "pressure",
Self::Voltage => "voltage",
Self::Current => "current",
}
}
/// SI / engineering unit string for Grafana axis labels.
pub fn unit_str(self) -> &'static str {
match self {
Self::Generic => "",
Self::Temperature => "°C",
Self::Humidity => "%",
Self::Pressure => "hPa",
Self::Voltage => "V",
Self::Current => "A",
}
}
}
/// One sample (T1/T2 sensor reading or T3 actuator command/ack) on the wire.
///
/// Fixed 39-byte little-endian layout — same on x86_64 and aarch64 (the two
/// evaluation hosts), so encode/decode is effectively a memcpy.
///
/// ```text
/// offset size field
/// ------ ---- --------------------------
/// 0 16 device_id (UUID)
/// 16 2 data_stream_id (u16)
/// 16 2 sensor_id (u16)
/// 18 8 raw_value (f64)
/// 26 8 timestamp_us (u64)
/// 34 4 sequence_number (u32)
/// 38 1 sensor_type (u8 — `SensorType` discriminant)
/// ```
///
/// Field semantics:
/// - `device_id` — UUID of the originating device (or target, for T3 commands).
/// - `sensor_id` — logical sensor/actuator on that device (per-device index).
/// - `raw_value` — sensor reading (T1/T2) or actuator setpoint/feedback (T3).
/// - `timestamp_us` — capture time on the device clock for T1/T2; server-side
/// ack time on T3 replies.
/// - `sequence_number` — monotonic counter per `(device_id, sensor_id)` for
/// T1/T2; correlation id linking T3 command and ack.
/// - `sensor_type` — `SensorType` discriminant, decoded via `SensorType::from_u8`.
#[derive(Debug, Clone, Default, Copy, PartialEq)]
pub struct QuicMessage {
pub device_id: uuid::Uuid,
pub data_stream_id: u16,
pub sensor_id: u16,
pub raw_value: f64,
pub timestamp_us: u64,
pub sequence_number: u32,
pub sensor_type: u8,
}
#[derive(Debug, thiserror::Error)]
@@ -32,7 +105,7 @@ pub enum WireError {
impl QuicMessage {
/// Bytes on the wire — fixed-size, no length prefix.
pub const WIRE_SIZE: usize = 38;
pub const WIRE_SIZE: usize = 39;
pub fn encode_to(&self, buf: &mut [u8]) -> Result<(), WireError> {
if buf.len() != Self::WIRE_SIZE {
@@ -42,10 +115,11 @@ impl QuicMessage {
});
}
buf[0..16].copy_from_slice(self.device_id.as_bytes());
buf[16..18].copy_from_slice(&self.data_stream_id.to_le_bytes());
buf[16..18].copy_from_slice(&self.sensor_id.to_le_bytes());
buf[18..26].copy_from_slice(&self.raw_value.to_le_bytes());
buf[26..34].copy_from_slice(&self.timestamp_us.to_le_bytes());
buf[34..38].copy_from_slice(&self.sequence_number.to_le_bytes());
buf[38] = self.sensor_type;
Ok(())
}
@@ -66,12 +140,113 @@ impl QuicMessage {
id_bytes.copy_from_slice(&buf[0..16]);
Ok(Self {
device_id: uuid::Uuid::from_bytes(id_bytes),
data_stream_id: u16::from_le_bytes(buf[16..18].try_into().unwrap()),
sensor_id: u16::from_le_bytes(buf[16..18].try_into().unwrap()),
raw_value: f64::from_le_bytes(buf[18..26].try_into().unwrap()),
timestamp_us: u64::from_le_bytes(buf[26..34].try_into().unwrap()),
sequence_number: u32::from_le_bytes(buf[34..38].try_into().unwrap()),
sensor_type: buf[38],
})
}
/// Convenience accessor — decodes `sensor_type` to the typed enum.
pub fn typ(&self) -> SensorType {
SensorType::from_u8(self.sensor_type)
}
}
// --- Per-tier bridge senders -----------------------------------------------
//
// Three newtypes encode the paper's tier semantics into the type system so
// the demux can't mix them up:
//
// * T1 (datagrams) — lossy; `try_send` drops on full
// * T2 (uni streams) — reliable, ordered; `send().await` backpressures
// * T3 (bi streams) — reliable command + per-command oneshot reply
/// Tier 1 — high-frequency telemetry over QUIC datagrams. Full channel drops.
#[derive(Clone)]
pub struct T1Sender {
inner: mpsc::Sender<QuicMessage>,
}
impl T1Sender {
pub fn new(inner: mpsc::Sender<QuicMessage>) -> Self {
Self { inner }
}
/// Returns `true` if queued, `false` if dropped (channel full or closed).
pub fn send_lossy(&self, msg: QuicMessage) -> bool {
self.inner.try_send(msg).is_ok()
}
/// Currently queued messages — used for channel-depth gauges.
pub fn depth(&self) -> usize {
self.inner.max_capacity().saturating_sub(self.inner.capacity())
}
pub fn capacity(&self) -> usize {
self.inner.max_capacity()
}
}
/// Tier 2 — ordered events over a QUIC unidirectional stream. Awaits on full.
#[derive(Clone)]
pub struct T2Sender {
inner: mpsc::Sender<QuicMessage>,
}
impl T2Sender {
pub fn new(inner: mpsc::Sender<QuicMessage>) -> Self {
Self { inner }
}
pub async fn send(
&self,
msg: QuicMessage,
) -> Result<(), mpsc::error::SendError<QuicMessage>> {
self.inner.send(msg).await
}
pub fn depth(&self) -> usize {
self.inner.max_capacity().saturating_sub(self.inner.capacity())
}
pub fn capacity(&self) -> usize {
self.inner.max_capacity()
}
}
/// Tier 3 — actuator command on a QUIC bidirectional stream, paired with a
/// `oneshot` channel the ECS uses to write the ack back over the same stream.
pub struct T3Inbound {
pub command: QuicMessage,
pub reply: oneshot::Sender<QuicMessage>,
}
#[derive(Clone)]
pub struct T3Sender {
inner: mpsc::Sender<T3Inbound>,
}
impl T3Sender {
pub fn new(inner: mpsc::Sender<T3Inbound>) -> Self {
Self { inner }
}
pub async fn send(
&self,
inbound: T3Inbound,
) -> Result<(), mpsc::error::SendError<T3Inbound>> {
self.inner.send(inbound).await
}
pub fn depth(&self) -> usize {
self.inner.max_capacity().saturating_sub(self.inner.capacity())
}
pub fn capacity(&self) -> usize {
self.inner.max_capacity()
}
}
#[cfg(test)]
@@ -80,33 +255,35 @@ mod tests {
#[test]
fn wire_size_matches_fields() {
assert_eq!(QuicMessage::WIRE_SIZE, 16 + 2 + 8 + 8 + 4);
assert_eq!(QuicMessage::WIRE_SIZE, 16 + 2 + 8 + 8 + 4 + 1);
}
#[test]
fn roundtrip_preserves_all_fields() {
let msg = QuicMessage {
device_id: uuid::Uuid::from_u128(0x0123456789abcdef_fedcba9876543210),
data_stream_id: 0xBEEF,
sensor_id: 0xBEEF,
raw_value: -273.15,
timestamp_us: 1_700_000_000_000_001,
sequence_number: 42,
sensor_type: SensorType::Temperature.as_u8(),
};
let bytes = msg.to_bytes();
assert_eq!(bytes.len(), QuicMessage::WIRE_SIZE);
let decoded = QuicMessage::decode(&bytes).unwrap();
assert_eq!(msg, decoded);
assert_eq!(decoded.typ(), SensorType::Temperature);
}
#[test]
fn decode_rejects_wrong_length() {
assert!(matches!(
QuicMessage::decode(&[0u8; 37]),
Err(WireError::BadLength { expected: 38, got: 37 })
QuicMessage::decode(&[0u8; 38]),
Err(WireError::BadLength { expected: 39, got: 38 })
));
assert!(matches!(
QuicMessage::decode(&[0u8; 39]),
Err(WireError::BadLength { expected: 38, got: 39 })
QuicMessage::decode(&[0u8; 40]),
Err(WireError::BadLength { expected: 39, got: 40 })
));
}
@@ -114,13 +291,22 @@ mod tests {
fn encode_layout_is_little_endian() {
let msg = QuicMessage {
device_id: uuid::Uuid::nil(),
data_stream_id: 0x0102,
sensor_id: 0x0102,
raw_value: 0.0,
timestamp_us: 0,
sequence_number: 0x04030201,
sensor_type: SensorType::Humidity.as_u8(),
};
let bytes = msg.to_bytes();
assert_eq!(&bytes[16..18], &[0x02, 0x01]);
assert_eq!(&bytes[34..38], &[0x01, 0x02, 0x03, 0x04]);
assert_eq!(bytes[38], SensorType::Humidity.as_u8());
}
#[test]
fn unknown_sensor_type_decodes_as_generic() {
assert_eq!(SensorType::from_u8(0), SensorType::Generic);
assert_eq!(SensorType::from_u8(99), SensorType::Generic);
assert_eq!(SensorType::from_u8(255), SensorType::Generic);
}
}

View File

@@ -1,8 +1,350 @@
use tokio::sync::mpsc::Sender;
use crate::transport::QuicMessage;
use std::net::SocketAddr;
use std::sync::Arc;
pub async fn run_substrate_server(t1_tx: Sender<QuicMessage>,
t2_tx: Sender<QuicMessage>,
t3_tx: Sender<QuicMessage>) {
}
use anyhow::{Context, anyhow};
use metrics::counter;
use quinn::{
Connection, Endpoint, Incoming, RecvStream, SendStream, ServerConfig, StreamId, TransportConfig,
};
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use tokio::sync::oneshot;
use crate::config::QuicConfig;
use crate::transport::{QuicMessage, T1Sender, T2Sender, T3Inbound, T3Sender};
/// Datagram receive buffer in bytes. Sized to absorb microbursts at the
/// telemetry rates.
const DATAGRAM_RECV_BUFFER_BYTES: usize = 256 * 1024;
/// Load the cert chain + private key from disk and build a Quinn `ServerConfig`.
pub fn build_server_config(cfg: &QuicConfig) -> anyhow::Result<ServerConfig> {
let cert_pem = std::fs::read(&cfg.server_cert)
.with_context(|| format!("read server_cert at {}", cfg.server_cert))?;
let key_pem = std::fs::read(&cfg.server_key)
.with_context(|| format!("read server_key at {}", cfg.server_key))?;
let certs: Vec<CertificateDer<'static>> = rustls_pemfile::certs(&mut cert_pem.as_slice())
.collect::<Result<_, _>>()
.with_context(|| format!("parse PEM certs at {}", cfg.server_cert))?;
if certs.is_empty() {
return Err(anyhow!("no certificates found in {}", cfg.server_cert));
}
let key: PrivateKeyDer<'static> = rustls_pemfile::private_key(&mut key_pem.as_slice())
.with_context(|| format!("parse PEM key at {}", cfg.server_key))?
.ok_or_else(|| anyhow!("no private key found in {}", cfg.server_key))?;
let mut server_config =
ServerConfig::with_single_cert(certs, key).context("build Quinn ServerConfig")?;
// Explicit transport config so the values driving evaluation are visible
// in source and at startup, not buried in Quinn's defaults.
let mut transport = TransportConfig::default();
transport.datagram_receive_buffer_size(Some(DATAGRAM_RECV_BUFFER_BYTES));
server_config.transport = Arc::new(transport);
tracing::info!(
datagram_recv_buffer_bytes = DATAGRAM_RECV_BUFFER_BYTES,
"Quinn TransportConfig tuned"
);
Ok(server_config)
}
/// Bind the listener. Must be called from inside a tokio runtime context
/// (Quinn relies on `Handle::current()` internally).
pub fn bind_endpoint(cfg: &QuicConfig) -> anyhow::Result<Endpoint> {
let server_config = build_server_config(cfg)?;
let addr: SocketAddr = format!("{}:{}", cfg.server_interface, cfg.server_port)
.parse()
.with_context(|| {
format!(
"invalid bind address {}:{}",
cfg.server_interface, cfg.server_port
)
})?;
Endpoint::server(server_config, addr).context("Endpoint::server bind")
}
/// Accept loop: per-connection senders are cloned from the tier handles and
/// shipped into `handle_incoming` for orchestration.
pub async fn accept_loop(endpoint: Endpoint, t1: T1Sender, t2: T2Sender, t3: T3Sender) {
tracing::info!(local = ?endpoint.local_addr().ok(), "QUIC accept loop running");
while let Some(incoming) = endpoint.accept().await {
let t1 = t1.clone();
let t2 = t2.clone();
let t3 = t3.clone();
tokio::spawn(handle_incoming(incoming, t1, t2, t3));
}
tracing::info!("QUIC accept loop exited");
}
/// Per-connection orchestrator. Performs the handshake and spawns one reader
/// per tier, then waits for the connection to close and joins the readers.
async fn handle_incoming(incoming: Incoming, t1: T1Sender, t2: T2Sender, t3: T3Sender) {
let conn = match incoming.await {
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "handshake failed");
return;
}
};
let remote = conn.remote_address();
tracing::info!(?remote, "connection established");
// One task per tier — fully wired across T1/T2/T3.
let dgram_task = tokio::spawn(read_datagrams(conn.clone(), t1));
let uni_task = tokio::spawn(read_uni_streams(conn.clone(), t2));
let bi_task = tokio::spawn(accept_bi_streams(conn.clone(), t3));
let _ = conn.closed().await;
if let Err(e) = dgram_task.await {
tracing::warn!(?remote, error = %e, "T1 datagram task ended unexpectedly");
}
if let Err(e) = uni_task.await {
tracing::warn!(?remote, error = %e, "T2 uni stream task ended unexpectedly");
}
if let Err(e) = bi_task.await {
tracing::warn!(?remote, error = %e, "T3 bi stream task ended unexpectedly");
}
tracing::info!(?remote, "connection closed");
}
/// T1 — read QUIC datagrams, decode each as a fixed-size `QuicMessage`, push
/// into the lossy T1 channel.
async fn read_datagrams(conn: Connection, t1: T1Sender) {
let remote = conn.remote_address();
let mut received: u64 = 0;
let mut dropped: u64 = 0;
let mut decode_errors: u64 = 0;
loop {
match conn.read_datagram().await {
Ok(bytes) => match QuicMessage::decode(&bytes[..]) {
Ok(msg) => {
received += 1;
counter!("substrate_received_total", "tier" => "t1").increment(1);
if !t1.send_lossy(msg) {
dropped += 1;
counter!("substrate_dropped_total", "tier" => "t1").increment(1);
tracing::trace!(?remote, "T1 channel full, datagram dropped");
}
}
Err(e) => {
decode_errors += 1;
counter!("substrate_decode_errors_total", "tier" => "t1").increment(1);
tracing::warn!(
?remote,
len = bytes.len(),
error = %e,
"T1 datagram decode failed"
);
}
},
Err(e) => {
tracing::debug!(
?remote,
received,
dropped,
decode_errors,
error = %e,
"T1 datagram reader ended"
);
return;
}
}
}
}
/// T2 — accept unidirectional streams. Each accepted stream gets its own task
/// reading 38-byte chunks until EOF (one stream may carry one event or many).
/// Cross-stream interleaving is allowed; ordering is only guaranteed *within*
/// a stream, matching QUIC's stream semantics.
async fn read_uni_streams(conn: Connection, t2: T2Sender) {
let remote = conn.remote_address();
let mut streams_accepted: u64 = 0;
loop {
let recv = match conn.accept_uni().await {
Ok(s) => s,
Err(e) => {
tracing::debug!(
?remote,
streams_accepted,
error = %e,
"T2 uni accept loop ended"
);
return;
}
};
streams_accepted += 1;
let t2 = t2.clone();
tokio::spawn(read_one_uni_stream(remote, recv, t2));
}
}
/// Per-stream worker for T2. Reads fixed-size `QuicMessage`s back-to-back,
/// awaits backpressure on the T2 channel, and resets the stream on a decode
/// failure (one corrupt stream shouldn't take down the whole connection).
async fn read_one_uni_stream(remote: SocketAddr, mut recv: RecvStream, t2: T2Sender) {
let stream_id: StreamId = recv.id();
let mut buf = [0u8; QuicMessage::WIRE_SIZE];
let mut count: u64 = 0;
loop {
match recv.read_exact(&mut buf).await {
Ok(()) => match QuicMessage::decode(&buf) {
Ok(msg) => {
count += 1;
counter!("substrate_received_total", "tier" => "t2").increment(1);
if t2.send(msg).await.is_err() {
// T2 receiver dropped (substrate shutting down).
tracing::warn!(
?remote,
?stream_id,
count,
"T2 channel closed; abandoning stream"
);
return;
}
}
Err(e) => {
counter!("substrate_decode_errors_total", "tier" => "t2").increment(1);
tracing::warn!(
?remote,
?stream_id,
count,
error = %e,
"T2 decode failed; resetting stream"
);
let _ = recv.stop(0u32.into());
return;
}
},
Err(e) => {
tracing::trace!(
?remote,
?stream_id,
count,
error = %e,
"T2 uni stream ended"
);
return;
}
}
}
}
/// T3 — accept bidirectional streams. Each stream is one command/ack
/// exchange, modeled per the paper's "per-command oneshot channels": the
/// reader pushes a `T3Inbound { command, reply }` to the ECS, awaits the
/// response on `reply_rx`, and writes it back on the same stream.
async fn accept_bi_streams(conn: Connection, t3: T3Sender) {
let remote = conn.remote_address();
let mut streams_accepted: u64 = 0;
loop {
let (send, recv) = match conn.accept_bi().await {
Ok(s) => s,
Err(e) => {
tracing::debug!(
?remote,
streams_accepted,
error = %e,
"T3 bi accept loop ended"
);
return;
}
};
streams_accepted += 1;
let t3 = t3.clone();
tokio::spawn(read_one_bi_stream(remote, send, recv, t3));
}
}
/// Per-stream worker for T3. Reads exactly one command, ships it with a
/// `oneshot::Sender` to the ECS, awaits the reply, writes it back. If the
/// ECS drops the oneshot (no handler installed), the stream is reset so the
/// client sees an explicit reset instead of a half-open stream.
async fn read_one_bi_stream(
remote: SocketAddr,
mut send: SendStream,
mut recv: RecvStream,
t3: T3Sender,
) {
let stream_id: StreamId = recv.id();
let mut buf = [0u8; QuicMessage::WIRE_SIZE];
if let Err(e) = recv.read_exact(&mut buf).await {
tracing::trace!(
?remote,
?stream_id,
error = %e,
"T3: incomplete command read; closing"
);
return;
}
let command = match QuicMessage::decode(&buf) {
Ok(m) => m,
Err(e) => {
counter!("substrate_decode_errors_total", "tier" => "t3").increment(1);
tracing::warn!(
?remote,
?stream_id,
error = %e,
"T3 command decode failed; resetting stream"
);
let _ = recv.stop(0u32.into());
let _ = send.reset(0u32.into());
return;
}
};
counter!("substrate_received_total", "tier" => "t3").increment(1);
let (reply_tx, reply_rx) = oneshot::channel::<QuicMessage>();
let inbound = T3Inbound {
command,
reply: reply_tx,
};
if t3.send(inbound).await.is_err() {
tracing::warn!(?remote, ?stream_id, "T3 channel closed; abandoning command");
let _ = send.reset(0u32.into());
return;
}
let response = match reply_rx.await {
Ok(msg) => msg,
Err(_) => {
// ECS dropped the oneshot. With M4's handler installed this
// shouldn't happen normally; if it does, the stream is reset so
// the client sees a clean signal.
counter!("substrate_t3_no_handler_total").increment(1);
tracing::debug!(
?remote,
?stream_id,
"T3: no handler for command, resetting stream"
);
let _ = send.reset(0u32.into());
return;
}
};
if let Err(e) = send.write_all(&response.to_bytes()).await {
tracing::warn!(
?remote,
?stream_id,
error = %e,
"T3 ack write failed"
);
return;
}
if let Err(e) = send.finish() {
tracing::warn!(
?remote,
?stream_id,
error = %e,
"T3 ack finish failed"
);
}
}

View File

@@ -0,0 +1,13 @@
use bevy::prelude::States;
/// Lifecycle of the QUIC listener inside the ECS schedule.
///
/// `Starting` is the default; `OnEnter(Starting)` performs the bind and, on
/// success, transitions to `Started`. A `Failed` variant will join when we
/// add proper error surfacing — for now a bind failure panics the app.
#[derive(States, Debug, Clone, Copy, Default, Eq, PartialEq, Hash)]
pub enum ServerState {
#[default]
Starting,
Started,
}

View File

@@ -0,0 +1,97 @@
//! Components attached to per-sensor entities, plus the per-type threshold
//! table used by `simulation_system`'s crossing detection.
//!
//! Each (device, sensor) pair becomes one entity tagged with `Asset` and
//! carrying `DeviceId` + `SensorId` + `SensorTypeTag` + `RawSensorData` +
//! `SmoothedValue`.
use bevy::prelude::*;
use crate::transport::SensorType;
/// Marker — every (device, sensor) pair becomes one entity tagged `Asset`.
#[derive(Component, Debug, Default, Clone, Copy)]
pub struct Asset;
#[derive(Component, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct DeviceId(pub uuid::Uuid);
#[derive(Component, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SensorId(pub u16);
/// Sensor type — set on entity creation from the first message that names
/// the (device, sensor) pair, then immutable. We don't track type changes:
/// a given (device_id, sensor_id) is one logical sensor with one type for
/// the lifetime of the run.
#[derive(Component, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SensorTypeTag(pub SensorType);
/// Latest reading from this (device, sensor). Updated in place by
/// `ingest_system`; read by simulation/export/diagnostics.
#[derive(Component, Debug, Default, Clone, Copy, PartialEq)]
pub struct RawSensorData {
pub raw_value: f64,
pub timestamp_us: u64,
pub sequence_number: u32,
}
pub const SMOOTHED_WINDOW: usize = 16;
/// Rolling-window mean of the last `SMOOTHED_WINDOW` raw readings, plus a
/// hysteresis flag for threshold-crossing detection. Maintained by
/// `simulation_system` — this is the bit of the ECS that does honest
/// digital-twin transform work, not just write-through of incoming samples.
#[derive(Component, Debug, Clone, Copy)]
pub struct SmoothedValue {
ring: [f64; SMOOTHED_WINDOW],
head: usize,
filled: u16,
pub mean: f64,
pub above_threshold: bool,
}
impl Default for SmoothedValue {
fn default() -> Self {
Self {
ring: [0.0; SMOOTHED_WINDOW],
head: 0,
filled: 0,
mean: 0.0,
above_threshold: false,
}
}
}
impl SmoothedValue {
/// Push a new sample. Non-finite values (NaN / ±∞) are ignored — the
/// smoothed state stays whatever it was. This matters because T3 acks
/// can carry NaN when the substrate has never seen the target sensor.
pub fn push(&mut self, v: f64) {
if !v.is_finite() {
return;
}
self.ring[self.head] = v;
self.head = (self.head + 1) % SMOOTHED_WINDOW;
if (self.filled as usize) < SMOOTHED_WINDOW {
self.filled += 1;
}
let n = self.filled as usize;
let sum: f64 = self.ring.iter().take(n).sum();
self.mean = sum / n as f64;
}
}
/// Per-type threshold for `simulation_system`'s crossing detection. Chosen
/// mid-band against the simulator's waveforms so crossings actually fire
/// during a demo; in a real deployment these would be alarm thresholds
/// supplied by config.
pub(super) fn threshold_for(t: SensorType) -> f64 {
match t {
SensorType::Generic => 0.0,
SensorType::Temperature => 22.0, // °C — simulator oscillates 15..25
SensorType::Humidity => 55.0, // % — 30..70
SensorType::Pressure => 1014.0, // hPa — 1008..1018
SensorType::Voltage => 230.2, // V — 229.5..230.5
SensorType::Current => 10.5, // A — 8..12
}
}

View File

@@ -0,0 +1,52 @@
//! ECS world: the five paper-named systems plus the components and resources
//! they operate on.
//!
//! ```text
//! components.rs ── per-sensor components + per-type threshold table
//! resources.rs ── SensorRegistry, DiagnosticsState, ExportSampleState
//! systems.rs ── ingest / fault_injection / simulation / export / diagnostics
//! tests.rs ── unit tests (#[cfg(test)] only)
//! ```
//!
//! Each (device, sensor) pair becomes one entity with `Asset` + `DeviceId` +
//! `SensorId` + `SensorTypeTag` + `RawSensorData` + `SmoothedValue`.
//! `ingest_system` upserts on every incoming `QuicMessage`; the registry maps
//! `(Uuid, u16) → Entity` for O(1) lookup.
mod components;
mod resources;
mod systems;
#[cfg(test)]
mod tests;
use bevy::prelude::*;
use bevy::state::condition::in_state;
use crate::transport::state::ServerState;
pub use components::{
Asset, DeviceId, RawSensorData, SMOOTHED_WINDOW, SensorId, SensorTypeTag, SmoothedValue,
};
pub use resources::SensorRegistry;
pub struct WorldPlugin;
impl Plugin for WorldPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<SensorRegistry>()
.init_resource::<resources::DiagnosticsState>()
.init_resource::<resources::ExportSampleState>()
.add_systems(
PreUpdate,
(systems::fault_injection_system, systems::ingest_system)
.chain()
.run_if(in_state(ServerState::Started)),
)
.add_systems(Update, systems::simulation_system)
.add_systems(
PostUpdate,
(systems::export_system, systems::diagnostics_system).chain(),
);
}
}

View File

@@ -0,0 +1,48 @@
//! Bevy `Resource`s consumed by the world's systems.
use std::collections::HashMap;
use std::time::Instant;
use bevy::prelude::{Entity, Resource};
/// O(1) lookup `(device_id, sensor_id) → Entity`. Populated lazily by the
/// ingest system; queried by export/diagnostics.
#[derive(Resource, Default)]
pub struct SensorRegistry {
pub(crate) map: HashMap<(uuid::Uuid, u16), Entity>,
}
impl SensorRegistry {
pub fn entity_count(&self) -> usize {
self.map.len()
}
}
/// Rolling counter of ticks since the last `diagnostics` log line was emitted.
#[derive(Resource)]
pub(super) struct DiagnosticsState {
pub(super) last_log: Instant,
pub(super) ticks_since_log: u64,
}
impl Default for DiagnosticsState {
fn default() -> Self {
Self {
last_log: Instant::now(),
ticks_since_log: 0,
}
}
}
/// Rate-limiter for `export_system` — runs at the ECS tick rate but only
/// emits gauges once per second.
#[derive(Resource)]
pub(super) struct ExportSampleState {
pub(super) last_sample: Instant,
}
impl Default for ExportSampleState {
fn default() -> Self {
Self { last_sample: Instant::now() }
}
}

View File

@@ -0,0 +1,278 @@
//! The five paper-named ECS systems and their private helpers.
//!
//! Scheduler placement (configured in [`super::WorldPlugin`]):
//!
//! | Schedule | Systems |
//! |-----------|--------------------------------------|
//! | PreUpdate | fault_injection → ingest |
//! | Update | simulation |
//! | PostUpdate| export → diagnostics |
use std::collections::HashMap;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use bevy::prelude::*;
use metrics::{counter, gauge, histogram};
use crate::transport::ecs::{BridgeReceivers, BridgeSenders};
use crate::transport::{QuicMessage, SensorType};
use super::components::{
Asset, DeviceId, RawSensorData, SensorId, SensorTypeTag, SmoothedValue, threshold_for,
};
use super::resources::{DiagnosticsState, ExportSampleState, SensorRegistry};
/// T1 batch limit per tick. Anything beyond this stays in the channel and
/// either drains next tick or gets dropped on full (T1's contract is lossy).
const T1_INGEST_BATCH: usize = 1024;
/// Drain the three tier channels into ECS state.
///
/// T1: bounded batch (lossy); T2: full drain (reliable); T3: full drain, with
/// each command answered by an ack carrying the device's current sensor value.
pub(super) fn ingest_system(
bridge: Res<BridgeReceivers>,
mut registry: ResMut<SensorRegistry>,
mut commands: Commands,
mut q: Query<&mut RawSensorData>,
) {
let now = now_us();
// T1 — datagrams.
{
let mut t1 = bridge.t1.lock().unwrap();
for _ in 0..T1_INGEST_BATCH {
match t1.try_recv() {
Ok(msg) => {
histogram!("substrate_latency_us", "tier" => "t1")
.record(now.saturating_sub(msg.timestamp_us) as f64);
upsert_reading(&mut registry, &mut commands, &mut q, msg);
}
Err(_) => break,
}
}
}
// T2 — uni streams.
{
let mut t2 = bridge.t2.lock().unwrap();
while let Ok(msg) = t2.try_recv() {
histogram!("substrate_latency_us", "tier" => "t2")
.record(now.saturating_sub(msg.timestamp_us) as f64);
upsert_reading(&mut registry, &mut commands, &mut q, msg);
}
}
// T3 — bidirectional commands. Reply with the device's most recent
// sensor value (NaN if we've never seen this (device, sensor) before).
{
let mut t3 = bridge.t3.lock().unwrap();
while let Ok(inbound) = t3.try_recv() {
histogram!("substrate_latency_us", "tier" => "t3")
.record(now.saturating_sub(inbound.command.timestamp_us) as f64);
let key = (inbound.command.device_id, inbound.command.sensor_id);
let current_value = registry
.map
.get(&key)
.and_then(|&e| q.get(e).ok())
.map(|d| d.raw_value)
.unwrap_or(f64::NAN);
let ack = QuicMessage {
device_id: inbound.command.device_id,
sensor_id: inbound.command.sensor_id,
raw_value: current_value,
timestamp_us: now_us(),
sequence_number: inbound.command.sequence_number,
sensor_type: inbound.command.sensor_type,
};
// Ignore send errors: the demux task may have given up if the
// connection died while we were processing.
let _ = inbound.reply.send(ack);
}
}
}
fn upsert_reading(
registry: &mut SensorRegistry,
commands: &mut Commands,
q: &mut Query<&mut RawSensorData>,
msg: QuicMessage,
) {
let key = (msg.device_id, msg.sensor_id);
let data = RawSensorData {
raw_value: msg.raw_value,
timestamp_us: msg.timestamp_us,
sequence_number: msg.sequence_number,
};
if let Some(&entity) = registry.map.get(&key) {
// Common case: existing entity, mutate in place.
if let Ok(mut existing) = q.get_mut(entity) {
*existing = data;
} else {
// Edge case: entity was registered earlier in *this* tick via
// `commands.spawn`, so the components aren't in the archetype
// yet (`Commands` is deferred). Queue another insert; last write
// wins when Commands flushes.
commands.entity(entity).insert(data);
}
return;
}
let entity = commands
.spawn((
Asset,
DeviceId(msg.device_id),
SensorId(msg.sensor_id),
SensorTypeTag(SensorType::from_u8(msg.sensor_type)),
SmoothedValue::default(),
data,
))
.id();
registry.map.insert(key, entity);
}
/// Stub — M6 inserts loss/delay here for benchmark scenarios.
pub(super) fn fault_injection_system() {}
/// Per-sensor digital-twin transform. Pulls each entity's latest
/// `RawSensorData` into a sliding-window mean (`SmoothedValue`), and emits
/// `substrate_threshold_crossings_total{type, direction}` when that mean
/// transitions across the per-type threshold. The `Changed<RawSensorData>`
/// filter restricts the scan to entities updated *this tick*, so the cost
/// scales with ingress rate, not fleet size.
pub(super) fn simulation_system(
mut q: Query<(&SensorTypeTag, &RawSensorData, &mut SmoothedValue), Changed<RawSensorData>>,
) {
for (st, raw, mut smoothed) in q.iter_mut() {
smoothed.push(raw.raw_value);
let now_above = smoothed.mean > threshold_for(st.0);
if now_above != smoothed.above_threshold {
smoothed.above_threshold = now_above;
let dir = if now_above { "up" } else { "down" };
counter!(
"substrate_threshold_crossings_total",
"type" => st.0.label_str(),
"direction" => dir
)
.increment(1);
}
}
}
/// Sample ECS-side gauges into the Prometheus exporter. Runs every tick but
/// only emits once per second to keep cost negligible. This is the system
/// the paper's §Architecture diagram calls `ExportSystem`.
pub(super) fn export_system(
senders: Res<BridgeSenders>,
registry: Res<SensorRegistry>,
sensors_q: Query<(&SensorTypeTag, &RawSensorData)>,
mut state: ResMut<ExportSampleState>,
) {
let now = Instant::now();
if now.duration_since(state.last_sample) < Duration::from_secs(1) {
return;
}
state.last_sample = now;
// ---- runtime telemetry ----
gauge!("substrate_entities").set(registry.entity_count() as f64);
gauge!("substrate_channel_depth", "tier" => "t1").set(senders.t1.depth() as f64);
gauge!("substrate_channel_depth", "tier" => "t2").set(senders.t2.depth() as f64);
gauge!("substrate_channel_depth", "tier" => "t3").set(senders.t3.depth() as f64);
gauge!("substrate_channel_capacity", "tier" => "t1").set(senders.t1.capacity() as f64);
gauge!("substrate_channel_capacity", "tier" => "t2").set(senders.t2.capacity() as f64);
gauge!("substrate_channel_capacity", "tier" => "t3").set(senders.t3.capacity() as f64);
if let Some(stats) = memory_stats::memory_stats() {
gauge!("substrate_rss_bytes").set(stats.physical_mem as f64);
}
// ---- sensor data aggregates (per type) ----
let mut by_type: HashMap<&'static str, Aggregate> = HashMap::new();
for (st, data) in &sensors_q {
by_type
.entry(st.0.label_str())
.or_insert_with(Aggregate::new)
.push(data.raw_value);
}
for (label, agg) in &by_type {
gauge!("sensor_aggregate", "type" => *label, "stat" => "count").set(agg.count as f64);
if agg.count > 0 {
gauge!("sensor_aggregate", "type" => *label, "stat" => "mean").set(agg.mean());
gauge!("sensor_aggregate", "type" => *label, "stat" => "min").set(agg.min);
gauge!("sensor_aggregate", "type" => *label, "stat" => "max").set(agg.max);
}
}
}
pub(super) fn diagnostics_system(
mut state: ResMut<DiagnosticsState>,
registry: Res<SensorRegistry>,
) {
state.ticks_since_log += 1;
let now = Instant::now();
let elapsed = now.duration_since(state.last_log);
if elapsed >= Duration::from_secs(1) {
let tick_hz = state.ticks_since_log as f64 / elapsed.as_secs_f64();
gauge!("substrate_tick_hz").set(tick_hz);
tracing::info!(
tick_hz = format_args!("{:.1}", tick_hz),
entities = registry.entity_count(),
"diagnostics"
);
state.last_log = now;
state.ticks_since_log = 0;
}
}
fn now_us() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0)
}
/// Per-type accumulator for `export_system`'s sensor aggregates. NaN-safe.
#[derive(Debug, Clone, Copy)]
struct Aggregate {
count: u64,
sum: f64,
min: f64,
max: f64,
}
impl Aggregate {
fn new() -> Self {
Self {
count: 0,
sum: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
}
}
fn push(&mut self, v: f64) {
if !v.is_finite() {
return;
}
self.count += 1;
self.sum += v;
if v < self.min {
self.min = v;
}
if v > self.max {
self.max = v;
}
}
fn mean(&self) -> f64 {
if self.count == 0 {
f64::NAN
} else {
self.sum / self.count as f64
}
}
}

View File

@@ -0,0 +1,294 @@
//! Unit tests for the world's components and systems.
//!
//! Lives as a child module so it can poke at `pub(super)` items (the
//! internal resources, `threshold_for`, etc.) without enlarging the
//! public API.
use std::sync::Mutex;
use bevy::prelude::*;
use bevy::state::app::StatesPlugin;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use crate::transport::ecs::{BridgeReceivers, BridgeSenders};
use crate::transport::state::ServerState;
use crate::transport::{QuicMessage, SensorType, T1Sender, T2Sender, T3Inbound, T3Sender};
use super::WorldPlugin;
use super::components::{RawSensorData, SMOOTHED_WINDOW, SmoothedValue, threshold_for};
use super::resources::SensorRegistry;
/// Build a Bevy app with just enough plugins/resources to run the world
/// systems against test-owned channels. No QUIC, no tokio runtime.
fn make_test_app() -> (
App,
mpsc::Sender<QuicMessage>,
mpsc::Sender<QuicMessage>,
mpsc::Sender<T3Inbound>,
) {
let (t1_tx, t1_rx) = mpsc::channel::<QuicMessage>(64);
let (t2_tx, t2_rx) = mpsc::channel::<QuicMessage>(64);
let (t3_tx, t3_rx) = mpsc::channel::<T3Inbound>(64);
let bridge = BridgeReceivers {
t1: Mutex::new(t1_rx),
t2: Mutex::new(t2_rx),
t3: Mutex::new(t3_rx),
};
// export_system samples channel depth/capacity from the senders; it
// requires the resource even when the test pushes via the raw senders
// directly (which is what the rest of the test does).
let senders = BridgeSenders {
t1: T1Sender::new(t1_tx.clone()),
t2: T2Sender::new(t2_tx.clone()),
t3: T3Sender::new(t3_tx.clone()),
};
let mut app = App::new();
app.add_plugins(MinimalPlugins)
.add_plugins(StatesPlugin)
.init_state::<ServerState>()
.insert_resource(bridge)
.insert_resource(senders)
.add_plugins(WorldPlugin);
// Force the state machine into Started so the run_if guard passes.
app.world_mut()
.resource_mut::<NextState<ServerState>>()
.set(ServerState::Started);
// Process the state transition before tests push messages.
app.update();
(app, t1_tx, t2_tx, t3_tx)
}
// ---- ingest_system: entity lifecycle and T3 ack semantics ----
#[test]
fn ingest_t1_creates_entity_and_writes_raw_data() {
let (mut app, t1_tx, _t2_tx, _t3_tx) = make_test_app();
let device = Uuid::from_u128(0xa1a2_a3a4_a5a6_a7a8_a9aa_abac_adae_afb0);
let msg = QuicMessage {
device_id: device,
sensor_id: 5,
raw_value: 3.14,
timestamp_us: 1_700_000_000_000_001,
sequence_number: 1,
sensor_type: SensorType::Temperature.as_u8(),
};
t1_tx.try_send(msg).expect("channel cap");
// Tick 1: ingest drains the channel and spawns via Commands.
app.update();
// Tick 2: Commands have flushed into the archetype.
app.update();
let registry = app.world().resource::<SensorRegistry>();
assert_eq!(registry.map.len(), 1);
let entity = *registry
.map
.get(&(device, 5))
.expect("entity not registered");
let data = app
.world()
.get::<RawSensorData>(entity)
.expect("RawSensorData missing");
assert_eq!(data.raw_value, 3.14);
assert_eq!(data.sequence_number, 1);
assert_eq!(data.timestamp_us, 1_700_000_000_000_001);
}
#[test]
fn ingest_t1_repeated_messages_update_in_place() {
let (mut app, t1_tx, _t2_tx, _t3_tx) = make_test_app();
let device = Uuid::new_v4();
// First reading.
t1_tx
.try_send(QuicMessage {
device_id: device,
sensor_id: 0,
raw_value: 1.0,
timestamp_us: 1,
sequence_number: 1,
sensor_type: SensorType::Generic.as_u8(),
})
.unwrap();
app.update();
app.update();
// Second reading on the same (device, sensor).
t1_tx
.try_send(QuicMessage {
device_id: device,
sensor_id: 0,
raw_value: 2.0,
timestamp_us: 2,
sequence_number: 2,
sensor_type: SensorType::Generic.as_u8(),
})
.unwrap();
app.update();
let registry = app.world().resource::<SensorRegistry>();
assert_eq!(registry.map.len(), 1, "should reuse the same entity");
let entity = *registry.map.get(&(device, 0)).unwrap();
let data = app.world().get::<RawSensorData>(entity).unwrap();
assert_eq!(data.raw_value, 2.0);
assert_eq!(data.sequence_number, 2);
}
#[test]
fn ingest_t3_replies_with_current_sensor_value() {
let (mut app, t1_tx, _t2_tx, t3_tx) = make_test_app();
let device = Uuid::new_v4();
// Seed a T1 reading so the (device, sensor) entity exists.
t1_tx
.try_send(QuicMessage {
device_id: device,
sensor_id: 9,
raw_value: 42.0,
timestamp_us: 1,
sequence_number: 1,
sensor_type: SensorType::Temperature.as_u8(),
})
.unwrap();
app.update();
app.update();
// Send a T3 command and capture the ack via the oneshot.
let (reply_tx, reply_rx) = oneshot::channel();
t3_tx
.try_send(T3Inbound {
command: QuicMessage {
device_id: device,
sensor_id: 9,
raw_value: 0.0,
timestamp_us: 0,
sequence_number: 7,
sensor_type: SensorType::Temperature.as_u8(),
},
reply: reply_tx,
})
.unwrap();
app.update();
let ack = reply_rx
.blocking_recv()
.expect("ECS handler should have replied");
assert_eq!(ack.device_id, device);
assert_eq!(ack.sensor_id, 9);
assert_eq!(ack.sequence_number, 7, "ack preserves correlation id");
assert_eq!(ack.raw_value, 42.0, "ack carries the latest sensor reading");
assert_eq!(
ack.typ(),
SensorType::Temperature,
"ack preserves sensor type"
);
assert!(ack.timestamp_us > 0, "ack stamped with server time");
}
// ---- SmoothedValue unit tests ----
#[test]
fn smoothed_value_first_push_sets_mean() {
let mut s = SmoothedValue::default();
s.push(10.0);
assert_eq!(s.mean, 10.0);
assert!(!s.above_threshold);
}
#[test]
fn smoothed_value_averages_filled_window() {
let mut s = SmoothedValue::default();
for v in [1.0, 2.0, 3.0, 4.0] {
s.push(v);
}
assert!((s.mean - 2.5).abs() < 1e-9);
}
#[test]
fn smoothed_value_rolls_after_window_fills() {
let mut s = SmoothedValue::default();
for _ in 0..SMOOTHED_WINDOW {
s.push(0.0);
}
assert!((s.mean - 0.0).abs() < 1e-9);
for _ in 0..SMOOTHED_WINDOW {
s.push(10.0);
}
assert!((s.mean - 10.0).abs() < 1e-9, "ring should fully roll over");
}
#[test]
fn smoothed_value_ignores_nonfinite() {
let mut s = SmoothedValue::default();
s.push(5.0);
let before = s.mean;
s.push(f64::NAN);
s.push(f64::INFINITY);
s.push(f64::NEG_INFINITY);
assert_eq!(s.mean, before, "non-finite values should not perturb the mean");
}
// ---- simulation_system: end-to-end threshold-crossing transition ----
#[test]
fn simulation_smoothes_and_detects_threshold_crossing() {
let (mut app, t1_tx, _t2_tx, _t3_tx) = make_test_app();
let device = Uuid::new_v4();
let threshold = threshold_for(SensorType::Temperature); // 22.0 °C
// Below-threshold readings: smoothed mean stays under, no crossing.
for seq in 0..SMOOTHED_WINDOW as u32 {
t1_tx
.try_send(QuicMessage {
device_id: device,
sensor_id: 0,
raw_value: 18.0,
timestamp_us: u64::from(seq),
sequence_number: seq,
sensor_type: SensorType::Temperature.as_u8(),
})
.unwrap();
app.update();
app.update();
}
let registry = app.world().resource::<SensorRegistry>();
let entity = *registry.map.get(&(device, 0)).unwrap();
let smoothed = app
.world()
.get::<SmoothedValue>(entity)
.expect("SmoothedValue should be on every sensor entity");
assert!(smoothed.mean < threshold);
assert!(!smoothed.above_threshold, "should not have crossed up yet");
// Above-threshold readings: enough samples to drag the mean above
// the threshold (window = 16; pushing 30°C for 16 ticks lands mean ≈ 30).
for seq in (SMOOTHED_WINDOW as u32)..(SMOOTHED_WINDOW as u32 * 2) {
t1_tx
.try_send(QuicMessage {
device_id: device,
sensor_id: 0,
raw_value: 30.0,
timestamp_us: u64::from(seq),
sequence_number: seq,
sensor_type: SensorType::Temperature.as_u8(),
})
.unwrap();
app.update();
}
let smoothed = app.world().get::<SmoothedValue>(entity).unwrap();
assert!(smoothed.mean > threshold);
assert!(
smoothed.above_threshold,
"smoothed mean should have crossed up through {threshold}"
);
}