From 20d59ed0ba4ca24f170145edc6a96d8717d5de6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Val=C3=A8re=20Plantevin?= Date: Tue, 12 May 2026 13:24:03 -0400 Subject: [PATCH] Getting ready for the final test --- CLAUDE.md | 8 +- data/local/cross_tier.csv | 10 +- data/loopback/final_table.csv | 13 +++ scripts/bench-loss.sh | 169 +++++++++++++++++++++++++++++++++ substrate/src/world/mod.rs | 6 +- substrate/src/world/systems.rs | 4 +- 6 files changed, 198 insertions(+), 12 deletions(-) create mode 100644 data/loopback/final_table.csv create mode 100755 scripts/bench-loss.sh diff --git a/CLAUDE.md b/CLAUDE.md index ed4a4af..ac87396 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -58,7 +58,7 @@ quic_ecs_dt/ | TLS / self-signed cert | Done (M1) — `certs/server.{crt,key}` via `make certs`, gitignored. PEM loader in [substrate/src/transport/server.rs:15](substrate/src/transport/server.rs#L15); rustls `aws-lc-rs` default provider installed in [substrate/src/main.rs](substrate/src/main.rs) | | Wire codec for `QuicMessage` (39 B fixed LE, incl. `sensor_type: u8`) | Done — [substrate/src/transport/mod.rs](substrate/src/transport/mod.rs); 5 unit tests passing. `SensorType` enum: `Generic / Temperature / Humidity / Pressure / Voltage / Current` | | `tracing-subscriber` init w/ `RUST_LOG` | Done (M1) — [substrate/src/main.rs:8-12](substrate/src/main.rs#L8-L12) | -| ECS components (`RawSensorData`, `SmoothedValue`) + 5 systems (Ingest/Sim/Export/FaultInjection/Diagnostics) | Done — entities = `(DeviceId, SensorId, SensorTypeTag, RawSensorData, SmoothedValue, Asset)` per (device, sensor); `SensorRegistry` upserts via `HashMap<(Uuid, u16), Entity>` in [substrate/src/world.rs](substrate/src/world.rs). `IngestSystem` drains all three tiers; T3 ack preserves command's `sensor_type` and returns the device's most recent `raw_value`. `SimulationSystem` maintains a 16-sample rolling mean per entity and emits `substrate_threshold_crossings_total{type, direction}` when the smoothed mean crosses a per-type threshold (`Changed` query so cost scales with ingress, not fleet size). `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `FaultInjection` is still a stub awaiting M6. `Diagnostics` logs `tick_hz` once per second | +| ECS components (`RawSensorData`, `SmoothedValue`) + 4 systems (Ingest/Sim/Export/Diagnostics) | Done — entities = `(DeviceId, SensorId, SensorTypeTag, RawSensorData, SmoothedValue, Asset)` per (device, sensor); `SensorRegistry` upserts via `HashMap<(Uuid, u16), Entity>` in [substrate/src/world.rs](substrate/src/world.rs). `IngestSystem` drains all three tiers; T3 ack preserves command's `sensor_type` and returns the device's most recent `raw_value`. `SimulationSystem` maintains a 16-sample rolling mean per entity and emits `substrate_threshold_crossings_total{type, direction}` when the smoothed mean crosses a per-type threshold (`Changed` query so cost scales with ingress, not fleet size). `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `Diagnostics` logs `tick_hz` once per second | | Schedule rate-gating | Done (M4) — `MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(1/tick_rate_hz))` in [substrate/src/main.rs](substrate/src/main.rs); replaces the default busy-loop with the configured period | | Prometheus exporter + Grafana dashboards | Done (M5) — `ObservabilityPlugin` in [substrate/src/observability.rs](substrate/src/observability.rs) installs `metrics-exporter-prometheus` on the existing tokio runtime. **Runtime surface** (paper §Evaluation): counters `substrate_received_total{tier}`, `dropped_total{tier=t1}`, `decode_errors_total{tier}`, `t3_no_handler_total`; latency histograms `substrate_latency_us{tier}`; gauges `substrate_tick_hz`, `substrate_entities`, `substrate_channel_depth{tier}`, `substrate_channel_capacity{tier}`, `substrate_rss_bytes`. **Sensor data surface** (operator dashboard): per-type aggregates `sensor_aggregate{type, stat=count|mean|min|max}` computed once per second over the live world, cardinality bounded by `\|SensorType\| × 4` so it scales to thousands of sensors. Two dashboards: [dashboards/runtime.json](dashboards/runtime.json) and [dashboards/sensors.json](dashboards/sensors.json) (thermometer/gauge/stat panels per type) | | Simulator (Quinn client + sensor generators) | `SimulatorClient` lib in [simulator/src/client.rs](simulator/src/client.rs) — connects, trusts the substrate's PEM cert via custom `ServerCertVerifier` (sidesteps `CaUsedAsEndEntity`); `send_datagram(QuicMessage)` for T1, `send_uni_stream(&[QuicMessage])` for T2, `request(&QuicMessage) -> QuicMessage` for T3. CLI driver in [simulator/src/main.rs](simulator/src/main.rs) with clap flags (`--addr`, `--rate-hz`, `--t2-rate-hz`, `--t3-rate-hz`, `--t3-timeout-ms`, `--count`, `--devices`, `--sensor-id`, `--sensor-type`, `--profile`, `--cert`, `--server-name`); parallel T1+T2+T3 emitters, per-(device,sensor) sequence counters, type-appropriate waveform generators (sin/cos curves centred on realistic sensor ranges), 1-Hz combined progress logs, Ctrl-C drain. `--profile industrial` fans out to 5 sensors per device (Temperature/Humidity/Pressure/Voltage/Current). Bevy-driven sensor generator still pending | @@ -76,13 +76,13 @@ Each milestone has one verification gate. Update Status here as we go. - **M1 — Wire codec & root config.** ✅ Done 2026-05-04. Hand-rolled little-endian codec on `QuicMessage` (38 B fixed: 16 UUID + 2 stream_id + 8 f64 + 8 ts_us + 4 seq) with roundtrip + layout + length-error tests; `config.toml` at repo root; dev TLS via `make certs`; structured `tracing-subscriber` init reads `RUST_LOG` (default `info`). - **M2 — Quinn server + self-signed TLS.** ✅ Done 2026-05-06. Listener up under `ServerState::Starting/Started`; type-system tier semantics + T3 oneshot ack protocol; per-connection `handle_incoming` orchestrator joining T1 datagram, T2 uni-stream, and T3 bi-stream readers. T1 has dropped/decoded counters; T2 resets a stream on decode failure without killing the connection; T3 ships `T3Inbound { command, reply }` to the ECS and resets the stream when no handler answers. End-to-end coverage: 6 integration tests in [simulator/tests/](simulator/tests/) plus 4 codec unit tests, all green. - **M3 — Simulator client.** Replace [simulator/src/main.rs](simulator/src/main.rs) with a Bevy app: Quinn client, N synthetic devices, configurable per-tier rates. *Verify:* end-to-end loopback drains messages on all three tiers. **Status (2026-05-05):** simulator made into a lib + bin; `SimulatorClient::{connect,send_datagram,close}` plus a manual smoke runner in `simulator/src/main.rs`. Two integration tests in `simulator/tests/end_to_end_t1.rs` exercise the full T1 path against an in-process substrate. Bevy-driven generator + T2/T3 helpers + load profiles still pending. -- **M4 — ECS world.** ✅ Done. `Asset` + `DeviceId` + `SensorId` + `SensorTypeTag` + `RawSensorData` + `SmoothedValue` components in [substrate/src/world.rs](substrate/src/world.rs); `SensorRegistry` resource for O(1) `(Uuid, u16) → Entity`. `IngestSystem` drains all three tiers (T1 batched, T2/T3 fully); T3 handler returns the latest sensor value as ack. `SimulationSystem` runs a per-entity 16-sample rolling mean and emits `substrate_threshold_crossings_total{type, direction}` on per-type threshold crossings — gives the ECS observable digital-twin work, not just write-through ingest. `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `FaultInjection` still a stub (M6). `DiagnosticsSystem` logs tick rate once per second. Schedule rate-gated via `ScheduleRunnerPlugin::run_loop(1/tick_rate_hz)`. 8 unit tests passing (entity create, in-place update, T3 ack, SmoothedValue push/window/non-finite/full-roll, threshold-crossing transition). +- **M4 — ECS world.** ✅ Done. `Asset` + `DeviceId` + `SensorId` + `SensorTypeTag` + `RawSensorData` + `SmoothedValue` components in [substrate/src/world.rs](substrate/src/world.rs); `SensorRegistry` resource for O(1) `(Uuid, u16) → Entity`. `IngestSystem` drains all three tiers (T1 batched, T2/T3 fully); T3 handler returns the latest sensor value as ack. `SimulationSystem` runs a per-entity 16-sample rolling mean and emits `substrate_threshold_crossings_total{type, direction}` on per-type threshold crossings — gives the ECS observable digital-twin work, not just write-through ingest. `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `DiagnosticsSystem` logs tick rate once per second. Schedule rate-gated via `ScheduleRunnerPlugin::run_loop(1/tick_rate_hz)`. 8 unit tests passing (entity create, in-place update, T3 ack, SmoothedValue push/window/non-finite/full-roll, threshold-crossing transition). - **M5 — Observability (VictoriaMetrics + Grafana).** ✅ Done. Wire format extended to carry `sensor_type: u8` (38 → 39 B, decoded into `SensorType` enum). Two metric surfaces over `metrics-exporter-prometheus`: - **Runtime** (paper §Evaluation): `substrate_received_total{tier}`, `dropped_total{tier=t1}`, `decode_errors_total{tier}`, `t3_no_handler_total`, `latency_us{tier}` histograms, `tick_hz` / `entities` / `channel_depth{tier}` / `rss_bytes` gauges. - **Sensor data** (operator surface): `sensor_aggregate{type, stat=count|mean|min|max}` aggregated per second across the live ECS world. Cardinality bounded to `\|SensorType\| × 4` series independent of physical sensor count. - Dashboards: [dashboards/runtime.json](dashboards/runtime.json) + [dashboards/sensors.json](dashboards/sensors.json). - Verified: `--profile industrial --devices 2 --count 200` yields 10 entities and all 5 type aggregates with realistic values (T=20.5°C, RH=51%, P=1018 hPa, V=230.2 V, I=12 A). -- **M6 — Benchmark harness.** Sweep `entity_count ∈ {10k, 50k, 100k, 200k}` × `loss_rate ∈ {0%, 1%, 5%}` with 2k warmup + 5k measurement ticks. Loss via `tc netem` or in-app injection. Writes `data/loopback/final_table.csv`. *Verify:* one full sweep on M4 Max produces a CSV the Quarto figures consume. +- **M6 — Benchmark harness.** Sweep `entity_count ∈ {10k, 50k, 100k, 200k}` × `loss_rate ∈ {0%, 1%, 5%}` with 2k warmup + 5k measurement ticks. Loss via `tc netem`. Writes `data/loopback/final_table.csv`. *Verify:* one full sweep on M4 Max produces a CSV the Quarto figures consume. - **M7 — CM5 cross-compile & deploy.** Exercise [Makefile:30](Makefile#L30) (`build-cm5`, `deploy-cm5`); set real `CM5_HOST`. *Verify:* binary runs on CM5 with a feed from M4 Max over 1 Gbps Ethernet. - **M8 — Two-machine run + paper render.** Sweep with simulator on M4 Max → substrate on CM5; populate `data/two_machine/final_table.csv`; `make render` produces a PDF. **Update §Evaluation prose to reflect actual numbers.** Current paper figures (241 Hz, 64 µs / 15.8 ms P99, 2.6 µs jitter, 1.02 MB/1k, R²=0.9999) are **aspirational placeholders** — they may move and the conclusions may shift; that's expected. @@ -105,7 +105,7 @@ Each milestone has one verification gate. Update Status here as we go. - **No graceful shutdown.** The `quic-runtime` thread is parked on `pending()`; spawned tasks (accept loop, per-conn demux) are orphaned at process exit. Fine for research runs; we'll need an `OnExit(Started)` (or a `Stopping` state) when M5 observability needs clean drain or M8 wants finalised CSV writes. - **Bind failure is fatal.** `OnEnter(Starting)` panics if `bind_endpoint` fails. A `ServerState::Failed` variant joins when we wire proper error surfacing. - **T3 ack semantics are minimal.** The current handler echoes the device's most recent `raw_value` with a server timestamp — adequate for "read sensor" commands, not for actuator-write semantics. A future iteration may introduce an `ActuatorState` component and a setpoint-apply path; for now T3 is best framed as "reliable read/query RPC" in the paper. -- **`FaultInjectionSystem` is still empty.** Runs on schedule but does nothing. M6 fills it with rate-controlled in-app drop so loss sweeps don't depend on external `tc netem`. + - **Schedule rate-gating is approximate.** `ScheduleRunnerPlugin::run_loop(period)` honours `period` as a minimum; observed `tick_hz` runs ~85% of target on macOS dev (target 60 → ~50). Should be tighter on the CM5; revisit if M6 sweeps depend on a steady tick. ## Run / verify diff --git a/data/local/cross_tier.csv b/data/local/cross_tier.csv index a731270..1d28f34 100644 --- a/data/local/cross_tier.csv +++ b/data/local/cross_tier.csv @@ -1,2 +1,10 @@ rate_hz,t3_rate_hz,devices,tick_rate_hz,window_s,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t3_received,t3_no_handler,t3_p50_us,t3_p99_us,t3_p999_us,tick_hz,rss_mb,channel_depth_max -100,100,100,0,25,2646,0,118.99720565324648,202.0065277946852,245.99224556720532,2646,0,120.98904580793433,199.99652925270829,238.0069829199846,15833.3,28.0,0 +100,100,100,0,25,2641,0,114.99630654141735,183.99342299596765,233.99506214353187,2641,0,115.98953956135134,181.0005395731182,227.98959982186395,15726.4,27.7,1 +500,100,100,0,25,13172,0,98.99803587754256,164.00550789726537,216.00466678084967,2634,0,112.99007813673754,176.99119972210946,226.98864928535784,15775.8,28.9,1 +1000,100,100,0,25,26259,0,94.00049142147152,146.01363556268566,193.00189597134016,2626,0,102.99701533183928,155.01160914305248,209.99842124823599,15550.5,29.5,1 +5000,100,100,0,25,131395,0,91.99185219896138,143.00791974278306,198.00653053045428,2628,0,99.99298268244951,150.00970795504614,195.99712928020054,15635.5,30.3,5 +10000,100,100,0,25,263310,0,91.99185219896138,155.01160914305248,243.0093726834088,2633,0,104.99366452846704,169.9832685850933,241.99087365988592,15516.1,30.9,0 +25000,100,100,0,25,657000,0,94.00049142147152,166.9843360750187,245.99224556720532,2628,0,107.00761611528327,178.9846436133428,260.00477949916575,15672.3,32.1,25 +50000,100,100,0,25,1316100,0,96.99893608515958,155.01160914305248,197.01896883616524,2632,0,106.00645733856791,164.00550789726537,198.00653053045428,15376.8,33.2,50 +100000,100,100,0,25,2625900,0,98.99803587754256,173.00145626474986,219.0061940968233,2626,0,110.00216095757669,185.99132120176222,231.01901268757703,15085.7,35.5,100 +250000,100,100,0,25,6580250,0,103.99054800886718,200.99901603074525,251.01181592403498,2632,0,115.98953956135134,220.0159432355299,314.977124065739,14190.8,42.0,96 diff --git a/data/loopback/final_table.csv b/data/loopback/final_table.csv new file mode 100644 index 0000000..b2dcdac --- /dev/null +++ b/data/loopback/final_table.csv @@ -0,0 +1,13 @@ +entities,loss_pct,devices,rate_hz,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t2_p99_us,t3_rtt_us,hz,rss_mb +10000,0,2000,100,201,0,152.00296264568095,231.99133298742527,245.01024189833885,0,0,15254.7,26.5 +10000,1,2000,100,202,0,153.00950012244246,251.01181592403498,261.98834382686925,0,0,14916.7,26.8 +10000,5,2000,100,202,0,148.01298577790973,245.01024189833885,262.98579349083377,0,0,15108.5,27.1 +50000,0,10000,100,202,0,146.01363556268566,238.0069829199846,261.98834382686925,0,0,15098.7,27.5 +50000,1,10000,100,202,0,144.01248706798935,236.0160976812146,262.98579349083377,0,0,14938.1,27.6 +50000,5,10000,100,202,0,140.99155733033865,238.0069829199846,266.00098548659696,0,0,14705.4,27.7 +100000,0,20000,100,201,0,138.0063931729486,233.01434382937512,262.98579349083377,0,0,14823.3,27.9 +100000,1,20000,100,202,0,134.00806856721388,231.99133298742527,262.98579349083377,0,0,14802.4,28.6 +100000,5,20000,100,202,0,132.9934676099666,230.00476201617178,262.98579349083377,0,0,15060.9,28.7 +200000,0,40000,100,202,0,136.00613722545975,238.0069829199846,276.027366209557,0,0,14835.4,29.0 +200000,1,40000,100,202,0,138.0063931729486,240.01466203032882,270.02107558160185,0,0,14840.5,29.1 +200000,5,40000,100,202,0,139.00362493341808,240.01466203032882,276.027366209557,0,0,14882.3,29.1 diff --git a/scripts/bench-loss.sh b/scripts/bench-loss.sh new file mode 100755 index 0000000..ea993a1 --- /dev/null +++ b/scripts/bench-loss.sh @@ -0,0 +1,169 @@ +#!/usr/bin/env bash +# scripts/bench-loss.sh — M6 benchmark harness +# Sweeps entity count {10k, 50k, 100k, 200k} x loss_rate {0, 1, 5}% +# Output: data/loopback/final_table.csv + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +cd "$ROOT" + +TICK_RATE_HZ="${TICK_RATE_HZ:-100}" +WARMUP_S="${WARMUP_S:-20}" +WINDOW_S="${WINDOW_S:-50}" +RATE_HZ="${RATE_HZ:-100}" +BUILD="${BUILD:-release}" +IFACE="${IFACE:-eth0}" + +OUT_CSV="${OUT_CSV:-data/loopback/final_table.csv}" + +HAS_TC=1 +# Check for root/sudo since we need to run tc +if ! command -v tc >/dev/null; then + echo "Warning: 'tc' command not found. Loss emulation will be skipped." + HAS_TC=0 +fi + +# --- pretty logging --- +if [[ -t 1 ]]; then + BOLD=$'\033[1m'; DIM=$'\033[2m'; GREEN=$'\033[32m'; RED=$'\033[31m'; RESET=$'\033[0m' +else BOLD=; DIM=; GREEN=; RED=; RESET=; fi +step() { printf '%s» %s%s\n' "$BOLD" "$1" "$RESET"; } +ok() { printf '%s ✓ %s%s\n' "$GREEN" "$1" "$RESET"; } +fail() { printf '%s ✗ %s%s\n' "$RED" "$1" "$RESET"; } + +for cmd in cargo curl lsof awk; do + command -v "$cmd" >/dev/null || { fail "missing: $cmd"; exit 1; } +done +for port in 9000 9100; do + if lsof -nP -iUDP:$port -iTCP:$port -sTCP:LISTEN 2>/dev/null | grep -q LISTEN; then + fail "port $port in use — kill the running substrate first" + exit 1 + fi +done +[[ -f certs/server.crt ]] || make certs >/dev/null + +step "Building ($BUILD)" +if [[ "$BUILD" == "release" ]]; then + cargo build --release -p substrate -p simulator >/dev/null + SUBSTRATE="$ROOT/target/release/substrate" + SIMULATOR="$ROOT/target/release/simulator" +else + cargo build -p substrate -p simulator >/dev/null + SUBSTRATE="$ROOT/target/debug/substrate" + SIMULATOR="$ROOT/target/debug/simulator" +fi + +LOG_DIR="/tmp/quic_ecs_dt_bench" +mkdir -p "$LOG_DIR" +SUB_LOG="$LOG_DIR/substrate.log" +: > "$SUB_LOG" + +step "Starting substrate (tick_rate_hz=$TICK_RATE_HZ, log: $SUB_LOG)" +APP_SIMULATION__TICK_RATE_HZ="$TICK_RATE_HZ" RUST_LOG=warn "$SUBSTRATE" >"$SUB_LOG" 2>&1 & +SUBSTRATE_PID=$! + +for i in $(seq 1 40); do + if curl -sf http://localhost:9100/metrics >/dev/null 2>&1; then + ok "substrate /metrics ready"; break + fi + sleep 0.25 + if [[ $i -eq 40 ]]; then fail "substrate didn't start"; tail -20 "$SUB_LOG"; exit 1; fi +done + +cleanup() { + [[ -n "${SIM_PID:-}" ]] && kill -TERM "$SIM_PID" 2>/dev/null || true + [[ -n "${SUBSTRATE_PID:-}" ]] && kill -TERM "$SUBSTRATE_PID" 2>/dev/null || true + if [[ "$HAS_TC" -eq 1 ]]; then + sudo tc qdisc del dev $IFACE root 2>/dev/null || true + fi + wait 2>/dev/null || true +} +trap cleanup EXIT INT TERM + +snapshot_to() { + curl -s http://localhost:9100/metrics > "$1" +} +get_value() { + awk -v pat="$2" '$0 ~ "^" pat " " { print $NF; exit }' "$1" +} + +mkdir -p "$(dirname "$OUT_CSV")" +echo "entities,loss_pct,devices,rate_hz,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t2_p99_us,t3_rtt_us,hz,rss_mb" > "$OUT_CSV" + +step "Sweeping entity_count x loss_pct (warmup ${WARMUP_S}s, window ${WINDOW_S}s)" +printf '%s%-10s %-8s %-8s %-9s %-9s %-10s %-10s %-10s %-10s %-10s %-8s %-7s%s\n' \ + "$BOLD" "entities" "loss_pct" "devices" "received" "dropped" "t1_p50" "t1_p99" "t1_p999" "t2_p99" "t3_rtt" "hz" "rss_mb" "$RESET" + +BEFORE="$LOG_DIR/before.txt" +AFTER="$LOG_DIR/after.txt" + +ENTITIES_LIST=(10000 50000 100000 200000) +LOSS_LIST=(0 1 5) + +for entities in "${ENTITIES_LIST[@]}"; do + devices=$(( entities / 5 )) + + for loss in "${LOSS_LIST[@]}"; do + # Apply tc netem loss + if [[ "$HAS_TC" -eq 1 ]]; then + sudo tc qdisc del dev $IFACE root 2>/dev/null || true + if [[ "$loss" -gt 0 ]]; then + sudo tc qdisc add dev $IFACE root netem loss ${loss}% 2>/dev/null || { + echo "Warning: failed to apply tc netem loss on interface $IFACE." + } + fi + fi + + sim_args=( + --profile industrial + --rate-hz "$RATE_HZ" + --count 0 + --devices "$devices" + ) + RUST_LOG=warn "$SIMULATOR" "${sim_args[@]}" >"$LOG_DIR/sim_${entities}_${loss}.log" 2>&1 & + SIM_PID=$! + + sleep "$WARMUP_S" + snapshot_to "$BEFORE" + rec_before=$(get_value "$BEFORE" 'substrate_received_total\{tier="t1"\}') + drop_before=$(get_value "$BEFORE" 'substrate_dropped_total\{tier="t1"\}') + + sleep "$WINDOW_S" + + snapshot_to "$AFTER" + kill -TERM "$SIM_PID" 2>/dev/null || true + wait "$SIM_PID" 2>/dev/null || true + SIM_PID="" + + rec_after=$(get_value "$AFTER" 'substrate_received_total\{tier="t1"\}') + drop_after=$(get_value "$AFTER" 'substrate_dropped_total\{tier="t1"\}') + p50=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.5"\}') + p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.99"\}') + p999=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.999"\}') + t2_p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t2",quantile="0.99"\}') + t3_p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t3",quantile="0.99"\}') + + tick_hz=$(get_value "$AFTER" 'substrate_tick_hz') + rss=$(get_value "$AFTER" 'substrate_rss_bytes') + + received=$(awk -v a="$rec_after" -v b="$rec_before" 'BEGIN { printf "%d", a-b }') + dropped=$(awk -v a="$drop_after" -v b="$drop_before" 'BEGIN { printf "%d", a-b }') + rss_mb=$(awk -v r="$rss" 'BEGIN { printf "%.1f", r/1048576 }') + tick_hz_fmt=$(awk -v t="$tick_hz" 'BEGIN { printf "%.1f", t }') + + printf '%-10s %-8s %-8s %-9s %-9s %-10.0f %-10.0f %-10.0f %-10.0f %-10.0f %-8s %-7s\n' \ + "$entities" "$loss" "$devices" "$received" "$dropped" "${p50:-0}" "${p99:-0}" "${p999:-0}" "${t2_p99:-0}" "${t3_p99:-0}" \ + "$tick_hz_fmt" "$rss_mb" + + echo "$entities,$loss,$devices,$RATE_HZ,$received,$dropped,${p50:-0},${p99:-0},${p999:-0},${t2_p99:-0},${t3_p99:-0},$tick_hz_fmt,$rss_mb" >> "$OUT_CSV" + + done +done + +if [[ "$HAS_TC" -eq 1 ]]; then + sudo tc qdisc del dev $IFACE root 2>/dev/null || true +fi + +printf '\n%sCSV written to:%s %s\n' "$DIM" "$RESET" "$OUT_CSV" diff --git a/substrate/src/world/mod.rs b/substrate/src/world/mod.rs index db6c59f..00a4531 100644 --- a/substrate/src/world/mod.rs +++ b/substrate/src/world/mod.rs @@ -4,7 +4,7 @@ //! ```text //! components.rs ── per-sensor components + per-type threshold table //! resources.rs ── SensorRegistry, DiagnosticsState, ExportSampleState -//! systems.rs ── ingest / fault_injection / simulation / export / diagnostics +//! systems.rs ── ingest / simulation / export / diagnostics //! tests.rs ── unit tests (#[cfg(test)] only) //! ``` //! @@ -39,9 +39,7 @@ impl Plugin for WorldPlugin { .init_resource::() .add_systems( PreUpdate, - (systems::fault_injection_system, systems::ingest_system) - .chain() - .run_if(in_state(ServerState::Started)), + systems::ingest_system.run_if(in_state(ServerState::Started)), ) .add_systems(Update, systems::simulation_system) .add_systems( diff --git a/substrate/src/world/systems.rs b/substrate/src/world/systems.rs index d6f00d7..e4638c2 100644 --- a/substrate/src/world/systems.rs +++ b/substrate/src/world/systems.rs @@ -4,7 +4,7 @@ //! //! | Schedule | Systems | //! |-----------|--------------------------------------| -//! | PreUpdate | fault_injection → ingest | +//! | PreUpdate | ingest | //! | Update | simulation | //! | PostUpdate| export → diagnostics | @@ -144,8 +144,6 @@ fn upsert_reading( registry.map.insert(key, entity); } -/// Stub — M6 inserts loss/delay here for benchmark scenarios. -pub(super) fn fault_injection_system() {} /// Per-sensor digital-twin transform. Pulls each entity's latest /// `RawSensorData` into a sliding-window mean (`SmoothedValue`), and emits