Compare commits

...

2 Commits

Author SHA1 Message Date
Valère Plantevin
20d59ed0ba Getting ready for the final test 2026-05-12 13:24:03 -04:00
Valère Plantevin
5d2552efb5 Enhance substrate ingest limits and optimize simulator stream reuse 2026-05-12 11:44:01 -04:00
14 changed files with 279 additions and 64 deletions

View File

@@ -58,7 +58,7 @@ quic_ecs_dt/
| TLS / self-signed cert | Done (M1) — `certs/server.{crt,key}` via `make certs`, gitignored. PEM loader in [substrate/src/transport/server.rs:15](substrate/src/transport/server.rs#L15); rustls `aws-lc-rs` default provider installed in [substrate/src/main.rs](substrate/src/main.rs) |
| Wire codec for `QuicMessage` (39 B fixed LE, incl. `sensor_type: u8`) | Done — [substrate/src/transport/mod.rs](substrate/src/transport/mod.rs); 5 unit tests passing. `SensorType` enum: `Generic / Temperature / Humidity / Pressure / Voltage / Current` |
| `tracing-subscriber` init w/ `RUST_LOG` | Done (M1) — [substrate/src/main.rs:8-12](substrate/src/main.rs#L8-L12) |
| ECS components (`RawSensorData`, `SmoothedValue`) + 5 systems (Ingest/Sim/Export/FaultInjection/Diagnostics) | Done — entities = `(DeviceId, SensorId, SensorTypeTag, RawSensorData, SmoothedValue, Asset)` per (device, sensor); `SensorRegistry` upserts via `HashMap<(Uuid, u16), Entity>` in [substrate/src/world.rs](substrate/src/world.rs). `IngestSystem` drains all three tiers; T3 ack preserves command's `sensor_type` and returns the device's most recent `raw_value`. `SimulationSystem` maintains a 16-sample rolling mean per entity and emits `substrate_threshold_crossings_total{type, direction}` when the smoothed mean crosses a per-type threshold (`Changed<RawSensorData>` query so cost scales with ingress, not fleet size). `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `FaultInjection` is still a stub awaiting M6. `Diagnostics` logs `tick_hz` once per second |
| ECS components (`RawSensorData`, `SmoothedValue`) + 4 systems (Ingest/Sim/Export/Diagnostics) | Done — entities = `(DeviceId, SensorId, SensorTypeTag, RawSensorData, SmoothedValue, Asset)` per (device, sensor); `SensorRegistry` upserts via `HashMap<(Uuid, u16), Entity>` in [substrate/src/world.rs](substrate/src/world.rs). `IngestSystem` drains all three tiers; T3 ack preserves command's `sensor_type` and returns the device's most recent `raw_value`. `SimulationSystem` maintains a 16-sample rolling mean per entity and emits `substrate_threshold_crossings_total{type, direction}` when the smoothed mean crosses a per-type threshold (`Changed<RawSensorData>` query so cost scales with ingress, not fleet size). `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `Diagnostics` logs `tick_hz` once per second |
| Schedule rate-gating | Done (M4) — `MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(1/tick_rate_hz))` in [substrate/src/main.rs](substrate/src/main.rs); replaces the default busy-loop with the configured period |
| Prometheus exporter + Grafana dashboards | Done (M5) — `ObservabilityPlugin` in [substrate/src/observability.rs](substrate/src/observability.rs) installs `metrics-exporter-prometheus` on the existing tokio runtime. **Runtime surface** (paper §Evaluation): counters `substrate_received_total{tier}`, `dropped_total{tier=t1}`, `decode_errors_total{tier}`, `t3_no_handler_total`; latency histograms `substrate_latency_us{tier}`; gauges `substrate_tick_hz`, `substrate_entities`, `substrate_channel_depth{tier}`, `substrate_channel_capacity{tier}`, `substrate_rss_bytes`. **Sensor data surface** (operator dashboard): per-type aggregates `sensor_aggregate{type, stat=count|mean|min|max}` computed once per second over the live world, cardinality bounded by `\|SensorType\| × 4` so it scales to thousands of sensors. Two dashboards: [dashboards/runtime.json](dashboards/runtime.json) and [dashboards/sensors.json](dashboards/sensors.json) (thermometer/gauge/stat panels per type) |
| Simulator (Quinn client + sensor generators) | `SimulatorClient` lib in [simulator/src/client.rs](simulator/src/client.rs) — connects, trusts the substrate's PEM cert via custom `ServerCertVerifier` (sidesteps `CaUsedAsEndEntity`); `send_datagram(QuicMessage)` for T1, `send_uni_stream(&[QuicMessage])` for T2, `request(&QuicMessage) -> QuicMessage` for T3. CLI driver in [simulator/src/main.rs](simulator/src/main.rs) with clap flags (`--addr`, `--rate-hz`, `--t2-rate-hz`, `--t3-rate-hz`, `--t3-timeout-ms`, `--count`, `--devices`, `--sensor-id`, `--sensor-type`, `--profile`, `--cert`, `--server-name`); parallel T1+T2+T3 emitters, per-(device,sensor) sequence counters, type-appropriate waveform generators (sin/cos curves centred on realistic sensor ranges), 1-Hz combined progress logs, Ctrl-C drain. `--profile industrial` fans out to 5 sensors per device (Temperature/Humidity/Pressure/Voltage/Current). Bevy-driven sensor generator still pending |
@@ -76,13 +76,13 @@ Each milestone has one verification gate. Update Status here as we go.
- **M1 — Wire codec & root config.** ✅ Done 2026-05-04. Hand-rolled little-endian codec on `QuicMessage` (38 B fixed: 16 UUID + 2 stream_id + 8 f64 + 8 ts_us + 4 seq) with roundtrip + layout + length-error tests; `config.toml` at repo root; dev TLS via `make certs`; structured `tracing-subscriber` init reads `RUST_LOG` (default `info`).
- **M2 — Quinn server + self-signed TLS.** ✅ Done 2026-05-06. Listener up under `ServerState::Starting/Started`; type-system tier semantics + T3 oneshot ack protocol; per-connection `handle_incoming` orchestrator joining T1 datagram, T2 uni-stream, and T3 bi-stream readers. T1 has dropped/decoded counters; T2 resets a stream on decode failure without killing the connection; T3 ships `T3Inbound { command, reply }` to the ECS and resets the stream when no handler answers. End-to-end coverage: 6 integration tests in [simulator/tests/](simulator/tests/) plus 4 codec unit tests, all green.
- **M3 — Simulator client.** Replace [simulator/src/main.rs](simulator/src/main.rs) with a Bevy app: Quinn client, N synthetic devices, configurable per-tier rates. *Verify:* end-to-end loopback drains messages on all three tiers. **Status (2026-05-05):** simulator made into a lib + bin; `SimulatorClient::{connect,send_datagram,close}` plus a manual smoke runner in `simulator/src/main.rs`. Two integration tests in `simulator/tests/end_to_end_t1.rs` exercise the full T1 path against an in-process substrate. Bevy-driven generator + T2/T3 helpers + load profiles still pending.
- **M4 — ECS world.** ✅ Done. `Asset` + `DeviceId` + `SensorId` + `SensorTypeTag` + `RawSensorData` + `SmoothedValue` components in [substrate/src/world.rs](substrate/src/world.rs); `SensorRegistry` resource for O(1) `(Uuid, u16) → Entity`. `IngestSystem` drains all three tiers (T1 batched, T2/T3 fully); T3 handler returns the latest sensor value as ack. `SimulationSystem` runs a per-entity 16-sample rolling mean and emits `substrate_threshold_crossings_total{type, direction}` on per-type threshold crossings — gives the ECS observable digital-twin work, not just write-through ingest. `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `FaultInjection` still a stub (M6). `DiagnosticsSystem` logs tick rate once per second. Schedule rate-gated via `ScheduleRunnerPlugin::run_loop(1/tick_rate_hz)`. 8 unit tests passing (entity create, in-place update, T3 ack, SmoothedValue push/window/non-finite/full-roll, threshold-crossing transition).
- **M4 — ECS world.** ✅ Done. `Asset` + `DeviceId` + `SensorId` + `SensorTypeTag` + `RawSensorData` + `SmoothedValue` components in [substrate/src/world.rs](substrate/src/world.rs); `SensorRegistry` resource for O(1) `(Uuid, u16) → Entity`. `IngestSystem` drains all three tiers (T1 batched, T2/T3 fully); T3 handler returns the latest sensor value as ack. `SimulationSystem` runs a per-entity 16-sample rolling mean and emits `substrate_threshold_crossings_total{type, direction}` on per-type threshold crossings — gives the ECS observable digital-twin work, not just write-through ingest. `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `DiagnosticsSystem` logs tick rate once per second. Schedule rate-gated via `ScheduleRunnerPlugin::run_loop(1/tick_rate_hz)`. 8 unit tests passing (entity create, in-place update, T3 ack, SmoothedValue push/window/non-finite/full-roll, threshold-crossing transition).
- **M5 — Observability (VictoriaMetrics + Grafana).** ✅ Done. Wire format extended to carry `sensor_type: u8` (38 → 39 B, decoded into `SensorType` enum). Two metric surfaces over `metrics-exporter-prometheus`:
- **Runtime** (paper §Evaluation): `substrate_received_total{tier}`, `dropped_total{tier=t1}`, `decode_errors_total{tier}`, `t3_no_handler_total`, `latency_us{tier}` histograms, `tick_hz` / `entities` / `channel_depth{tier}` / `rss_bytes` gauges.
- **Sensor data** (operator surface): `sensor_aggregate{type, stat=count|mean|min|max}` aggregated per second across the live ECS world. Cardinality bounded to `\|SensorType\| × 4` series independent of physical sensor count.
- Dashboards: [dashboards/runtime.json](dashboards/runtime.json) + [dashboards/sensors.json](dashboards/sensors.json).
- Verified: `--profile industrial --devices 2 --count 200` yields 10 entities and all 5 type aggregates with realistic values (T=20.5°C, RH=51%, P=1018 hPa, V=230.2 V, I=12 A).
- **M6 — Benchmark harness.** Sweep `entity_count ∈ {10k, 50k, 100k, 200k}` × `loss_rate ∈ {0%, 1%, 5%}` with 2k warmup + 5k measurement ticks. Loss via `tc netem` or in-app injection. Writes `data/loopback/final_table.csv`. *Verify:* one full sweep on M4 Max produces a CSV the Quarto figures consume.
- **M6 — Benchmark harness.** Sweep `entity_count ∈ {10k, 50k, 100k, 200k}` × `loss_rate ∈ {0%, 1%, 5%}` with 2k warmup + 5k measurement ticks. Loss via `tc netem`. Writes `data/loopback/final_table.csv`. *Verify:* one full sweep on M4 Max produces a CSV the Quarto figures consume.
- **M7 — CM5 cross-compile & deploy.** Exercise [Makefile:30](Makefile#L30) (`build-cm5`, `deploy-cm5`); set real `CM5_HOST`. *Verify:* binary runs on CM5 with a feed from M4 Max over 1 Gbps Ethernet.
- **M8 — Two-machine run + paper render.** Sweep with simulator on M4 Max → substrate on CM5; populate `data/two_machine/final_table.csv`; `make render` produces a PDF. **Update §Evaluation prose to reflect actual numbers.** Current paper figures (241 Hz, 64 µs / 15.8 ms P99, 2.6 µs jitter, 1.02 MB/1k, R²=0.9999) are **aspirational placeholders** — they may move and the conclusions may shift; that's expected.
@@ -105,7 +105,7 @@ Each milestone has one verification gate. Update Status here as we go.
- **No graceful shutdown.** The `quic-runtime` thread is parked on `pending()`; spawned tasks (accept loop, per-conn demux) are orphaned at process exit. Fine for research runs; we'll need an `OnExit(Started)` (or a `Stopping` state) when M5 observability needs clean drain or M8 wants finalised CSV writes.
- **Bind failure is fatal.** `OnEnter(Starting)` panics if `bind_endpoint` fails. A `ServerState::Failed` variant joins when we wire proper error surfacing.
- **T3 ack semantics are minimal.** The current handler echoes the device's most recent `raw_value` with a server timestamp — adequate for "read sensor" commands, not for actuator-write semantics. A future iteration may introduce an `ActuatorState` component and a setpoint-apply path; for now T3 is best framed as "reliable read/query RPC" in the paper.
- **`FaultInjectionSystem` is still empty.** Runs on schedule but does nothing. M6 fills it with rate-controlled in-app drop so loss sweeps don't depend on external `tc netem`.
- **Schedule rate-gating is approximate.** `ScheduleRunnerPlugin::run_loop(period)` honours `period` as a minimum; observed `tick_hz` runs ~85% of target on macOS dev (target 60 → ~50). Should be tighter on the CM5; revisit if M6 sweeps depend on a steady tick.
## Run / verify

View File

@@ -12,6 +12,9 @@ server_port = 9000
server_interface = "0.0.0.0"
server_cert = "certs/server.crt"
server_key = "certs/server.key"
t1_capacity = 1024
t2_capacity = 512
t3_capacity = 256
[simulation]
tick_rate_hz = 60

View File

@@ -1,2 +1,10 @@
rate_hz,t3_rate_hz,devices,tick_rate_hz,window_s,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t3_received,t3_no_handler,t3_p50_us,t3_p99_us,t3_p999_us,tick_hz,rss_mb,channel_depth_max
100,100,100,0,25,2646,0,118.99720565324648,202.0065277946852,245.99224556720532,2646,0,120.98904580793433,199.99652925270829,238.0069829199846,15833.3,28.0,0
100,100,100,0,25,2641,0,114.99630654141735,183.99342299596765,233.99506214353187,2641,0,115.98953956135134,181.0005395731182,227.98959982186395,15726.4,27.7,1
500,100,100,0,25,13172,0,98.99803587754256,164.00550789726537,216.00466678084967,2634,0,112.99007813673754,176.99119972210946,226.98864928535784,15775.8,28.9,1
1000,100,100,0,25,26259,0,94.00049142147152,146.01363556268566,193.00189597134016,2626,0,102.99701533183928,155.01160914305248,209.99842124823599,15550.5,29.5,1
5000,100,100,0,25,131395,0,91.99185219896138,143.00791974278306,198.00653053045428,2628,0,99.99298268244951,150.00970795504614,195.99712928020054,15635.5,30.3,5
10000,100,100,0,25,263310,0,91.99185219896138,155.01160914305248,243.0093726834088,2633,0,104.99366452846704,169.9832685850933,241.99087365988592,15516.1,30.9,0
25000,100,100,0,25,657000,0,94.00049142147152,166.9843360750187,245.99224556720532,2628,0,107.00761611528327,178.9846436133428,260.00477949916575,15672.3,32.1,25
50000,100,100,0,25,1316100,0,96.99893608515958,155.01160914305248,197.01896883616524,2632,0,106.00645733856791,164.00550789726537,198.00653053045428,15376.8,33.2,50
100000,100,100,0,25,2625900,0,98.99803587754256,173.00145626474986,219.0061940968233,2626,0,110.00216095757669,185.99132120176222,231.01901268757703,15085.7,35.5,100
250000,100,100,0,25,6580250,0,103.99054800886718,200.99901603074525,251.01181592403498,2632,0,115.98953956135134,220.0159432355299,314.977124065739,14190.8,42.0,96
1 rate_hz t3_rate_hz devices tick_rate_hz window_s t1_received t1_dropped t1_p50_us t1_p99_us t1_p999_us t3_received t3_no_handler t3_p50_us t3_p99_us t3_p999_us tick_hz rss_mb channel_depth_max
2 100 100 100 0 25 2646 2641 0 118.99720565324648 114.99630654141735 202.0065277946852 183.99342299596765 245.99224556720532 233.99506214353187 2646 2641 0 120.98904580793433 115.98953956135134 199.99652925270829 181.0005395731182 238.0069829199846 227.98959982186395 15833.3 15726.4 28.0 27.7 0 1
3 500 100 100 0 25 13172 0 98.99803587754256 164.00550789726537 216.00466678084967 2634 0 112.99007813673754 176.99119972210946 226.98864928535784 15775.8 28.9 1
4 1000 100 100 0 25 26259 0 94.00049142147152 146.01363556268566 193.00189597134016 2626 0 102.99701533183928 155.01160914305248 209.99842124823599 15550.5 29.5 1
5 5000 100 100 0 25 131395 0 91.99185219896138 143.00791974278306 198.00653053045428 2628 0 99.99298268244951 150.00970795504614 195.99712928020054 15635.5 30.3 5
6 10000 100 100 0 25 263310 0 91.99185219896138 155.01160914305248 243.0093726834088 2633 0 104.99366452846704 169.9832685850933 241.99087365988592 15516.1 30.9 0
7 25000 100 100 0 25 657000 0 94.00049142147152 166.9843360750187 245.99224556720532 2628 0 107.00761611528327 178.9846436133428 260.00477949916575 15672.3 32.1 25
8 50000 100 100 0 25 1316100 0 96.99893608515958 155.01160914305248 197.01896883616524 2632 0 106.00645733856791 164.00550789726537 198.00653053045428 15376.8 33.2 50
9 100000 100 100 0 25 2625900 0 98.99803587754256 173.00145626474986 219.0061940968233 2626 0 110.00216095757669 185.99132120176222 231.01901268757703 15085.7 35.5 100
10 250000 100 100 0 25 6580250 0 103.99054800886718 200.99901603074525 251.01181592403498 2632 0 115.98953956135134 220.0159432355299 314.977124065739 14190.8 42.0 96

View File

@@ -0,0 +1,13 @@
entities,loss_pct,devices,rate_hz,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t2_p99_us,t3_rtt_us,hz,rss_mb
10000,0,2000,100,201,0,152.00296264568095,231.99133298742527,245.01024189833885,0,0,15254.7,26.5
10000,1,2000,100,202,0,153.00950012244246,251.01181592403498,261.98834382686925,0,0,14916.7,26.8
10000,5,2000,100,202,0,148.01298577790973,245.01024189833885,262.98579349083377,0,0,15108.5,27.1
50000,0,10000,100,202,0,146.01363556268566,238.0069829199846,261.98834382686925,0,0,15098.7,27.5
50000,1,10000,100,202,0,144.01248706798935,236.0160976812146,262.98579349083377,0,0,14938.1,27.6
50000,5,10000,100,202,0,140.99155733033865,238.0069829199846,266.00098548659696,0,0,14705.4,27.7
100000,0,20000,100,201,0,138.0063931729486,233.01434382937512,262.98579349083377,0,0,14823.3,27.9
100000,1,20000,100,202,0,134.00806856721388,231.99133298742527,262.98579349083377,0,0,14802.4,28.6
100000,5,20000,100,202,0,132.9934676099666,230.00476201617178,262.98579349083377,0,0,15060.9,28.7
200000,0,40000,100,202,0,136.00613722545975,238.0069829199846,276.027366209557,0,0,14835.4,29.0
200000,1,40000,100,202,0,138.0063931729486,240.01466203032882,270.02107558160185,0,0,14840.5,29.1
200000,5,40000,100,202,0,139.00362493341808,240.01466203032882,276.027366209557,0,0,14882.3,29.1
1 entities loss_pct devices rate_hz t1_received t1_dropped t1_p50_us t1_p99_us t1_p999_us t2_p99_us t3_rtt_us hz rss_mb
2 10000 0 2000 100 201 0 152.00296264568095 231.99133298742527 245.01024189833885 0 0 15254.7 26.5
3 10000 1 2000 100 202 0 153.00950012244246 251.01181592403498 261.98834382686925 0 0 14916.7 26.8
4 10000 5 2000 100 202 0 148.01298577790973 245.01024189833885 262.98579349083377 0 0 15108.5 27.1
5 50000 0 10000 100 202 0 146.01363556268566 238.0069829199846 261.98834382686925 0 0 15098.7 27.5
6 50000 1 10000 100 202 0 144.01248706798935 236.0160976812146 262.98579349083377 0 0 14938.1 27.6
7 50000 5 10000 100 202 0 140.99155733033865 238.0069829199846 266.00098548659696 0 0 14705.4 27.7
8 100000 0 20000 100 201 0 138.0063931729486 233.01434382937512 262.98579349083377 0 0 14823.3 27.9
9 100000 1 20000 100 202 0 134.00806856721388 231.99133298742527 262.98579349083377 0 0 14802.4 28.6
10 100000 5 20000 100 202 0 132.9934676099666 230.00476201617178 262.98579349083377 0 0 15060.9 28.7
11 200000 0 40000 100 202 0 136.00613722545975 238.0069829199846 276.027366209557 0 0 14835.4 29.0
12 200000 1 40000 100 202 0 138.0063931729486 240.01466203032882 270.02107558160185 0 0 14840.5 29.1
13 200000 5 40000 100 202 0 139.00362493341808 240.01466203032882 276.027366209557 0 0 14882.3 29.1

169
scripts/bench-loss.sh Executable file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env bash
# scripts/bench-loss.sh — M6 benchmark harness
# Sweeps entity count {10k, 50k, 100k, 200k} x loss_rate {0, 1, 5}%
# Output: data/loopback/final_table.csv
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$ROOT"
TICK_RATE_HZ="${TICK_RATE_HZ:-100}"
WARMUP_S="${WARMUP_S:-20}"
WINDOW_S="${WINDOW_S:-50}"
RATE_HZ="${RATE_HZ:-100}"
BUILD="${BUILD:-release}"
IFACE="${IFACE:-eth0}"
OUT_CSV="${OUT_CSV:-data/loopback/final_table.csv}"
HAS_TC=1
# Check for root/sudo since we need to run tc
if ! command -v tc >/dev/null; then
echo "Warning: 'tc' command not found. Loss emulation will be skipped."
HAS_TC=0
fi
# --- pretty logging ---
if [[ -t 1 ]]; then
BOLD=$'\033[1m'; DIM=$'\033[2m'; GREEN=$'\033[32m'; RED=$'\033[31m'; RESET=$'\033[0m'
else BOLD=; DIM=; GREEN=; RED=; RESET=; fi
step() { printf '%s» %s%s\n' "$BOLD" "$1" "$RESET"; }
ok() { printf '%s ✓ %s%s\n' "$GREEN" "$1" "$RESET"; }
fail() { printf '%s ✗ %s%s\n' "$RED" "$1" "$RESET"; }
for cmd in cargo curl lsof awk; do
command -v "$cmd" >/dev/null || { fail "missing: $cmd"; exit 1; }
done
for port in 9000 9100; do
if lsof -nP -iUDP:$port -iTCP:$port -sTCP:LISTEN 2>/dev/null | grep -q LISTEN; then
fail "port $port in use — kill the running substrate first"
exit 1
fi
done
[[ -f certs/server.crt ]] || make certs >/dev/null
step "Building ($BUILD)"
if [[ "$BUILD" == "release" ]]; then
cargo build --release -p substrate -p simulator >/dev/null
SUBSTRATE="$ROOT/target/release/substrate"
SIMULATOR="$ROOT/target/release/simulator"
else
cargo build -p substrate -p simulator >/dev/null
SUBSTRATE="$ROOT/target/debug/substrate"
SIMULATOR="$ROOT/target/debug/simulator"
fi
LOG_DIR="/tmp/quic_ecs_dt_bench"
mkdir -p "$LOG_DIR"
SUB_LOG="$LOG_DIR/substrate.log"
: > "$SUB_LOG"
step "Starting substrate (tick_rate_hz=$TICK_RATE_HZ, log: $SUB_LOG)"
APP_SIMULATION__TICK_RATE_HZ="$TICK_RATE_HZ" RUST_LOG=warn "$SUBSTRATE" >"$SUB_LOG" 2>&1 &
SUBSTRATE_PID=$!
for i in $(seq 1 40); do
if curl -sf http://localhost:9100/metrics >/dev/null 2>&1; then
ok "substrate /metrics ready"; break
fi
sleep 0.25
if [[ $i -eq 40 ]]; then fail "substrate didn't start"; tail -20 "$SUB_LOG"; exit 1; fi
done
cleanup() {
[[ -n "${SIM_PID:-}" ]] && kill -TERM "$SIM_PID" 2>/dev/null || true
[[ -n "${SUBSTRATE_PID:-}" ]] && kill -TERM "$SUBSTRATE_PID" 2>/dev/null || true
if [[ "$HAS_TC" -eq 1 ]]; then
sudo tc qdisc del dev $IFACE root 2>/dev/null || true
fi
wait 2>/dev/null || true
}
trap cleanup EXIT INT TERM
snapshot_to() {
curl -s http://localhost:9100/metrics > "$1"
}
get_value() {
awk -v pat="$2" '$0 ~ "^" pat " " { print $NF; exit }' "$1"
}
mkdir -p "$(dirname "$OUT_CSV")"
echo "entities,loss_pct,devices,rate_hz,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t2_p99_us,t3_rtt_us,hz,rss_mb" > "$OUT_CSV"
step "Sweeping entity_count x loss_pct (warmup ${WARMUP_S}s, window ${WINDOW_S}s)"
printf '%s%-10s %-8s %-8s %-9s %-9s %-10s %-10s %-10s %-10s %-10s %-8s %-7s%s\n' \
"$BOLD" "entities" "loss_pct" "devices" "received" "dropped" "t1_p50" "t1_p99" "t1_p999" "t2_p99" "t3_rtt" "hz" "rss_mb" "$RESET"
BEFORE="$LOG_DIR/before.txt"
AFTER="$LOG_DIR/after.txt"
ENTITIES_LIST=(10000 50000 100000 200000)
LOSS_LIST=(0 1 5)
for entities in "${ENTITIES_LIST[@]}"; do
devices=$(( entities / 5 ))
for loss in "${LOSS_LIST[@]}"; do
# Apply tc netem loss
if [[ "$HAS_TC" -eq 1 ]]; then
sudo tc qdisc del dev $IFACE root 2>/dev/null || true
if [[ "$loss" -gt 0 ]]; then
sudo tc qdisc add dev $IFACE root netem loss ${loss}% 2>/dev/null || {
echo "Warning: failed to apply tc netem loss on interface $IFACE."
}
fi
fi
sim_args=(
--profile industrial
--rate-hz "$RATE_HZ"
--count 0
--devices "$devices"
)
RUST_LOG=warn "$SIMULATOR" "${sim_args[@]}" >"$LOG_DIR/sim_${entities}_${loss}.log" 2>&1 &
SIM_PID=$!
sleep "$WARMUP_S"
snapshot_to "$BEFORE"
rec_before=$(get_value "$BEFORE" 'substrate_received_total\{tier="t1"\}')
drop_before=$(get_value "$BEFORE" 'substrate_dropped_total\{tier="t1"\}')
sleep "$WINDOW_S"
snapshot_to "$AFTER"
kill -TERM "$SIM_PID" 2>/dev/null || true
wait "$SIM_PID" 2>/dev/null || true
SIM_PID=""
rec_after=$(get_value "$AFTER" 'substrate_received_total\{tier="t1"\}')
drop_after=$(get_value "$AFTER" 'substrate_dropped_total\{tier="t1"\}')
p50=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.5"\}')
p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.99"\}')
p999=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.999"\}')
t2_p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t2",quantile="0.99"\}')
t3_p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t3",quantile="0.99"\}')
tick_hz=$(get_value "$AFTER" 'substrate_tick_hz')
rss=$(get_value "$AFTER" 'substrate_rss_bytes')
received=$(awk -v a="$rec_after" -v b="$rec_before" 'BEGIN { printf "%d", a-b }')
dropped=$(awk -v a="$drop_after" -v b="$drop_before" 'BEGIN { printf "%d", a-b }')
rss_mb=$(awk -v r="$rss" 'BEGIN { printf "%.1f", r/1048576 }')
tick_hz_fmt=$(awk -v t="$tick_hz" 'BEGIN { printf "%.1f", t }')
printf '%-10s %-8s %-8s %-9s %-9s %-10.0f %-10.0f %-10.0f %-10.0f %-10.0f %-8s %-7s\n' \
"$entities" "$loss" "$devices" "$received" "$dropped" "${p50:-0}" "${p99:-0}" "${p999:-0}" "${t2_p99:-0}" "${t3_p99:-0}" \
"$tick_hz_fmt" "$rss_mb"
echo "$entities,$loss,$devices,$RATE_HZ,$received,$dropped,${p50:-0},${p99:-0},${p999:-0},${t2_p99:-0},${t3_p99:-0},$tick_hz_fmt,$rss_mb" >> "$OUT_CSV"
done
done
if [[ "$HAS_TC" -eq 1 ]]; then
sudo tc qdisc del dev $IFACE root 2>/dev/null || true
fi
printf '\n%sCSV written to:%s %s\n' "$DIM" "$RESET" "$OUT_CSV"

View File

@@ -38,9 +38,17 @@ pub async fn run_t2_emitter(
) -> u64 {
let period = Duration::from_nanos((1.0e9 / rate_hz) as u64);
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut sent: u64 = 0;
let mut send = match conn.open_uni().await {
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, "T2 open_uni failed; emitter exiting");
return 0;
}
};
loop {
ticker.tick().await;
if interrupted.load(Ordering::SeqCst) {
@@ -57,25 +65,19 @@ pub async fn run_t2_emitter(
};
slot.seq = slot.seq.wrapping_add(1);
match conn.open_uni().await {
Ok(mut send) => {
if let Err(e) = send.write_all(&msg.to_bytes()).await {
tracing::warn!(error = %e, "T2 write_all failed");
continue;
}
if let Err(e) = send.finish() {
tracing::warn!(error = %e, "T2 finish failed");
continue;
}
sent += 1;
counter.store(sent, Ordering::Relaxed);
}
Err(e) => {
tracing::warn!(error = %e, "T2 open_uni failed; emitter exiting");
break;
}
if let Err(e) = send.write_all(&msg.to_bytes()).await {
tracing::warn!(error = %e, "T2 write_all failed; stream closed?");
break;
}
sent += 1;
counter.store(sent, Ordering::Relaxed);
}
if let Err(e) = send.finish() {
tracing::warn!(error = %e, "T2 finish failed");
}
sent
}
@@ -92,7 +94,7 @@ pub async fn run_t3_emitter(
) -> (u64, u64) {
let period = Duration::from_nanos((1.0e9 / rate_hz) as u64);
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut sent: u64 = 0;
let mut timeouts: u64 = 0;

View File

@@ -221,7 +221,7 @@ async fn main() -> anyhow::Result<()> {
if cli.rate_hz > 0.0 {
let period = Duration::from_nanos((1.0e9 / cli.rate_hz) as u64);
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
let unlimited = cli.count == 0;
let mut last_progress = started;

View File

@@ -28,6 +28,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig {
server_interface: "127.0.0.1".to_string(),
server_cert: cert.to_string_lossy().into_owned(),
server_key: key.to_string_lossy().into_owned(),
t1_capacity: 1024,
t2_capacity: 512,
t3_capacity: 256,
}
}

View File

@@ -27,6 +27,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig {
server_interface: "127.0.0.1".to_string(),
server_cert: cert.to_string_lossy().into_owned(),
server_key: key.to_string_lossy().into_owned(),
t1_capacity: 1024,
t2_capacity: 512,
t3_capacity: 256,
}
}

View File

@@ -28,6 +28,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig {
server_interface: "127.0.0.1".to_string(),
server_cert: cert.to_string_lossy().into_owned(),
server_key: key.to_string_lossy().into_owned(),
t1_capacity: 1024,
t2_capacity: 512,
t3_capacity: 256,
}
}

View File

@@ -22,6 +22,9 @@ pub struct QuicConfig {
pub server_interface: String,
pub server_cert: String,
pub server_key: String,
pub t1_capacity: usize,
pub t2_capacity: usize,
pub t3_capacity: usize,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -41,6 +44,9 @@ impl Default for AppConfig {
server_interface: "0.0.0.0".to_string(),
server_cert: "certs/server.crt".to_string(),
server_key: "certs/server.key".to_string(),
t1_capacity: 1024,
t2_capacity: 512,
t3_capacity: 256,
},
simulation: SimulationConfig {
tick_rate_hz: 60,

View File

@@ -10,10 +10,6 @@ use crate::transport::{QuicMessage, T1Sender, T2Sender, T3Inbound, T3Sender};
use crate::transport::server::{accept_loop, bind_endpoint};
use crate::transport::state::ServerState;
const T1_CAPACITY: usize = 1024;
const T2_CAPACITY: usize = 512;
const T3_CAPACITY: usize = 256;
pub struct EcsQuicTransportPlugin;
/// Receive halves of the three tier channels, wrapped so they can sit in a
@@ -63,11 +59,12 @@ fn start_quic_server(
impl Plugin for EcsQuicTransportPlugin {
fn build(&self, app: &mut App) {
let config = app.world_mut().resource::<AppConfig>();
// Three-tier bridge between the tokio-side QUIC accept loop and the
// ECS PreUpdate ingest system (in the `world` module).
let (t1_tx, t1_rx) = mpsc::channel::<QuicMessage>(T1_CAPACITY);
let (t2_tx, t2_rx) = mpsc::channel::<QuicMessage>(T2_CAPACITY);
let (t3_tx, t3_rx) = mpsc::channel::<T3Inbound>(T3_CAPACITY);
let (t1_tx, t1_rx) = mpsc::channel::<QuicMessage>(config.network.t1_capacity);
let (t2_tx, t2_rx) = mpsc::channel::<QuicMessage>(config.network.t2_capacity);
let (t3_tx, t3_rx) = mpsc::channel::<T3Inbound>(config.network.t3_capacity);
// Spawn a tokio runtime on a dedicated OS thread, ship its Handle back
// to the ECS, and keep the runtime alive for the lifetime of the app

View File

@@ -4,7 +4,7 @@
//! ```text
//! components.rs ── per-sensor components + per-type threshold table
//! resources.rs ── SensorRegistry, DiagnosticsState, ExportSampleState
//! systems.rs ── ingest / fault_injection / simulation / export / diagnostics
//! systems.rs ── ingest / simulation / export / diagnostics
//! tests.rs ── unit tests (#[cfg(test)] only)
//! ```
//!
@@ -39,9 +39,7 @@ impl Plugin for WorldPlugin {
.init_resource::<resources::ExportSampleState>()
.add_systems(
PreUpdate,
(systems::fault_injection_system, systems::ingest_system)
.chain()
.run_if(in_state(ServerState::Started)),
systems::ingest_system.run_if(in_state(ServerState::Started)),
)
.add_systems(Update, systems::simulation_system)
.add_systems(

View File

@@ -4,7 +4,7 @@
//!
//! | Schedule | Systems |
//! |-----------|--------------------------------------|
//! | PreUpdate | fault_injection → ingest |
//! | PreUpdate | ingest |
//! | Update | simulation |
//! | PostUpdate| export → diagnostics |
@@ -25,6 +25,8 @@ use super::resources::{DiagnosticsState, ExportSampleState, SensorRegistry};
/// T1 batch limit per tick. Anything beyond this stays in the channel and
/// either drains next tick or gets dropped on full (T1's contract is lossy).
const T1_INGEST_BATCH: usize = 1024;
const T2_INGEST_BATCH: usize = 512;
const T3_INGEST_BATCH: usize = 256;
/// Drain the three tier channels into ECS state.
///
@@ -56,10 +58,15 @@ pub(super) fn ingest_system(
// T2 — uni streams.
{
let mut t2 = bridge.t2.lock().unwrap();
while let Ok(msg) = t2.try_recv() {
histogram!("substrate_latency_us", "tier" => "t2")
.record(now.saturating_sub(msg.timestamp_us) as f64);
upsert_reading(&mut registry, &mut commands, &mut q, msg);
for _ in 0..T2_INGEST_BATCH {
match t2.try_recv() {
Ok(msg) => {
histogram!("substrate_latency_us", "tier" => "t2")
.record(now.saturating_sub(msg.timestamp_us) as f64);
upsert_reading(&mut registry, &mut commands, &mut q, msg);
}
Err(_) => break,
}
}
}
@@ -67,27 +74,32 @@ pub(super) fn ingest_system(
// sensor value (NaN if we've never seen this (device, sensor) before).
{
let mut t3 = bridge.t3.lock().unwrap();
while let Ok(inbound) = t3.try_recv() {
histogram!("substrate_latency_us", "tier" => "t3")
.record(now.saturating_sub(inbound.command.timestamp_us) as f64);
let key = (inbound.command.device_id, inbound.command.sensor_id);
let current_value = registry
.map
.get(&key)
.and_then(|&e| q.get(e).ok())
.map(|d| d.raw_value)
.unwrap_or(f64::NAN);
let ack = QuicMessage {
device_id: inbound.command.device_id,
sensor_id: inbound.command.sensor_id,
raw_value: current_value,
timestamp_us: now_us(),
sequence_number: inbound.command.sequence_number,
sensor_type: inbound.command.sensor_type,
};
// Ignore send errors: the demux task may have given up if the
// connection died while we were processing.
let _ = inbound.reply.send(ack);
for _ in 0..T3_INGEST_BATCH {
match t3.try_recv() {
Ok(inbound) => {
histogram!("substrate_latency_us", "tier" => "t3")
.record(now.saturating_sub(inbound.command.timestamp_us) as f64);
let key = (inbound.command.device_id, inbound.command.sensor_id);
let current_value = registry
.map
.get(&key)
.and_then(|&e| q.get(e).ok())
.map(|d| d.raw_value)
.unwrap_or(f64::NAN);
let ack = QuicMessage {
device_id: inbound.command.device_id,
sensor_id: inbound.command.sensor_id,
raw_value: current_value,
timestamp_us: now_us(),
sequence_number: inbound.command.sequence_number,
sensor_type: inbound.command.sensor_type,
};
// Ignore send errors: the demux task may have given up if the
// connection died while we were processing.
let _ = inbound.reply.send(ack);
}
Err(_) => break,
}
}
}
}
@@ -132,8 +144,6 @@ fn upsert_reading(
registry.map.insert(key, entity);
}
/// Stub — M6 inserts loss/delay here for benchmark scenarios.
pub(super) fn fault_injection_system() {}
/// Per-sensor digital-twin transform. Pulls each entity's latest
/// `RawSensorData` into a sliding-window mean (`SmoothedValue`), and emits