Compare commits
2 Commits
d3f09ee062
...
20d59ed0ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
20d59ed0ba | ||
|
|
5d2552efb5 |
@@ -58,7 +58,7 @@ quic_ecs_dt/
|
|||||||
| TLS / self-signed cert | Done (M1) — `certs/server.{crt,key}` via `make certs`, gitignored. PEM loader in [substrate/src/transport/server.rs:15](substrate/src/transport/server.rs#L15); rustls `aws-lc-rs` default provider installed in [substrate/src/main.rs](substrate/src/main.rs) |
|
| TLS / self-signed cert | Done (M1) — `certs/server.{crt,key}` via `make certs`, gitignored. PEM loader in [substrate/src/transport/server.rs:15](substrate/src/transport/server.rs#L15); rustls `aws-lc-rs` default provider installed in [substrate/src/main.rs](substrate/src/main.rs) |
|
||||||
| Wire codec for `QuicMessage` (39 B fixed LE, incl. `sensor_type: u8`) | Done — [substrate/src/transport/mod.rs](substrate/src/transport/mod.rs); 5 unit tests passing. `SensorType` enum: `Generic / Temperature / Humidity / Pressure / Voltage / Current` |
|
| Wire codec for `QuicMessage` (39 B fixed LE, incl. `sensor_type: u8`) | Done — [substrate/src/transport/mod.rs](substrate/src/transport/mod.rs); 5 unit tests passing. `SensorType` enum: `Generic / Temperature / Humidity / Pressure / Voltage / Current` |
|
||||||
| `tracing-subscriber` init w/ `RUST_LOG` | Done (M1) — [substrate/src/main.rs:8-12](substrate/src/main.rs#L8-L12) |
|
| `tracing-subscriber` init w/ `RUST_LOG` | Done (M1) — [substrate/src/main.rs:8-12](substrate/src/main.rs#L8-L12) |
|
||||||
| ECS components (`RawSensorData`, `SmoothedValue`) + 5 systems (Ingest/Sim/Export/FaultInjection/Diagnostics) | Done — entities = `(DeviceId, SensorId, SensorTypeTag, RawSensorData, SmoothedValue, Asset)` per (device, sensor); `SensorRegistry` upserts via `HashMap<(Uuid, u16), Entity>` in [substrate/src/world.rs](substrate/src/world.rs). `IngestSystem` drains all three tiers; T3 ack preserves command's `sensor_type` and returns the device's most recent `raw_value`. `SimulationSystem` maintains a 16-sample rolling mean per entity and emits `substrate_threshold_crossings_total{type, direction}` when the smoothed mean crosses a per-type threshold (`Changed<RawSensorData>` query so cost scales with ingress, not fleet size). `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `FaultInjection` is still a stub awaiting M6. `Diagnostics` logs `tick_hz` once per second |
|
| ECS components (`RawSensorData`, `SmoothedValue`) + 4 systems (Ingest/Sim/Export/Diagnostics) | Done — entities = `(DeviceId, SensorId, SensorTypeTag, RawSensorData, SmoothedValue, Asset)` per (device, sensor); `SensorRegistry` upserts via `HashMap<(Uuid, u16), Entity>` in [substrate/src/world.rs](substrate/src/world.rs). `IngestSystem` drains all three tiers; T3 ack preserves command's `sensor_type` and returns the device's most recent `raw_value`. `SimulationSystem` maintains a 16-sample rolling mean per entity and emits `substrate_threshold_crossings_total{type, direction}` when the smoothed mean crosses a per-type threshold (`Changed<RawSensorData>` query so cost scales with ingress, not fleet size). `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `Diagnostics` logs `tick_hz` once per second |
|
||||||
| Schedule rate-gating | Done (M4) — `MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(1/tick_rate_hz))` in [substrate/src/main.rs](substrate/src/main.rs); replaces the default busy-loop with the configured period |
|
| Schedule rate-gating | Done (M4) — `MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(1/tick_rate_hz))` in [substrate/src/main.rs](substrate/src/main.rs); replaces the default busy-loop with the configured period |
|
||||||
| Prometheus exporter + Grafana dashboards | Done (M5) — `ObservabilityPlugin` in [substrate/src/observability.rs](substrate/src/observability.rs) installs `metrics-exporter-prometheus` on the existing tokio runtime. **Runtime surface** (paper §Evaluation): counters `substrate_received_total{tier}`, `dropped_total{tier=t1}`, `decode_errors_total{tier}`, `t3_no_handler_total`; latency histograms `substrate_latency_us{tier}`; gauges `substrate_tick_hz`, `substrate_entities`, `substrate_channel_depth{tier}`, `substrate_channel_capacity{tier}`, `substrate_rss_bytes`. **Sensor data surface** (operator dashboard): per-type aggregates `sensor_aggregate{type, stat=count|mean|min|max}` computed once per second over the live world, cardinality bounded by `\|SensorType\| × 4` so it scales to thousands of sensors. Two dashboards: [dashboards/runtime.json](dashboards/runtime.json) and [dashboards/sensors.json](dashboards/sensors.json) (thermometer/gauge/stat panels per type) |
|
| Prometheus exporter + Grafana dashboards | Done (M5) — `ObservabilityPlugin` in [substrate/src/observability.rs](substrate/src/observability.rs) installs `metrics-exporter-prometheus` on the existing tokio runtime. **Runtime surface** (paper §Evaluation): counters `substrate_received_total{tier}`, `dropped_total{tier=t1}`, `decode_errors_total{tier}`, `t3_no_handler_total`; latency histograms `substrate_latency_us{tier}`; gauges `substrate_tick_hz`, `substrate_entities`, `substrate_channel_depth{tier}`, `substrate_channel_capacity{tier}`, `substrate_rss_bytes`. **Sensor data surface** (operator dashboard): per-type aggregates `sensor_aggregate{type, stat=count|mean|min|max}` computed once per second over the live world, cardinality bounded by `\|SensorType\| × 4` so it scales to thousands of sensors. Two dashboards: [dashboards/runtime.json](dashboards/runtime.json) and [dashboards/sensors.json](dashboards/sensors.json) (thermometer/gauge/stat panels per type) |
|
||||||
| Simulator (Quinn client + sensor generators) | `SimulatorClient` lib in [simulator/src/client.rs](simulator/src/client.rs) — connects, trusts the substrate's PEM cert via custom `ServerCertVerifier` (sidesteps `CaUsedAsEndEntity`); `send_datagram(QuicMessage)` for T1, `send_uni_stream(&[QuicMessage])` for T2, `request(&QuicMessage) -> QuicMessage` for T3. CLI driver in [simulator/src/main.rs](simulator/src/main.rs) with clap flags (`--addr`, `--rate-hz`, `--t2-rate-hz`, `--t3-rate-hz`, `--t3-timeout-ms`, `--count`, `--devices`, `--sensor-id`, `--sensor-type`, `--profile`, `--cert`, `--server-name`); parallel T1+T2+T3 emitters, per-(device,sensor) sequence counters, type-appropriate waveform generators (sin/cos curves centred on realistic sensor ranges), 1-Hz combined progress logs, Ctrl-C drain. `--profile industrial` fans out to 5 sensors per device (Temperature/Humidity/Pressure/Voltage/Current). Bevy-driven sensor generator still pending |
|
| Simulator (Quinn client + sensor generators) | `SimulatorClient` lib in [simulator/src/client.rs](simulator/src/client.rs) — connects, trusts the substrate's PEM cert via custom `ServerCertVerifier` (sidesteps `CaUsedAsEndEntity`); `send_datagram(QuicMessage)` for T1, `send_uni_stream(&[QuicMessage])` for T2, `request(&QuicMessage) -> QuicMessage` for T3. CLI driver in [simulator/src/main.rs](simulator/src/main.rs) with clap flags (`--addr`, `--rate-hz`, `--t2-rate-hz`, `--t3-rate-hz`, `--t3-timeout-ms`, `--count`, `--devices`, `--sensor-id`, `--sensor-type`, `--profile`, `--cert`, `--server-name`); parallel T1+T2+T3 emitters, per-(device,sensor) sequence counters, type-appropriate waveform generators (sin/cos curves centred on realistic sensor ranges), 1-Hz combined progress logs, Ctrl-C drain. `--profile industrial` fans out to 5 sensors per device (Temperature/Humidity/Pressure/Voltage/Current). Bevy-driven sensor generator still pending |
|
||||||
@@ -76,13 +76,13 @@ Each milestone has one verification gate. Update Status here as we go.
|
|||||||
- **M1 — Wire codec & root config.** ✅ Done 2026-05-04. Hand-rolled little-endian codec on `QuicMessage` (38 B fixed: 16 UUID + 2 stream_id + 8 f64 + 8 ts_us + 4 seq) with roundtrip + layout + length-error tests; `config.toml` at repo root; dev TLS via `make certs`; structured `tracing-subscriber` init reads `RUST_LOG` (default `info`).
|
- **M1 — Wire codec & root config.** ✅ Done 2026-05-04. Hand-rolled little-endian codec on `QuicMessage` (38 B fixed: 16 UUID + 2 stream_id + 8 f64 + 8 ts_us + 4 seq) with roundtrip + layout + length-error tests; `config.toml` at repo root; dev TLS via `make certs`; structured `tracing-subscriber` init reads `RUST_LOG` (default `info`).
|
||||||
- **M2 — Quinn server + self-signed TLS.** ✅ Done 2026-05-06. Listener up under `ServerState::Starting/Started`; type-system tier semantics + T3 oneshot ack protocol; per-connection `handle_incoming` orchestrator joining T1 datagram, T2 uni-stream, and T3 bi-stream readers. T1 has dropped/decoded counters; T2 resets a stream on decode failure without killing the connection; T3 ships `T3Inbound { command, reply }` to the ECS and resets the stream when no handler answers. End-to-end coverage: 6 integration tests in [simulator/tests/](simulator/tests/) plus 4 codec unit tests, all green.
|
- **M2 — Quinn server + self-signed TLS.** ✅ Done 2026-05-06. Listener up under `ServerState::Starting/Started`; type-system tier semantics + T3 oneshot ack protocol; per-connection `handle_incoming` orchestrator joining T1 datagram, T2 uni-stream, and T3 bi-stream readers. T1 has dropped/decoded counters; T2 resets a stream on decode failure without killing the connection; T3 ships `T3Inbound { command, reply }` to the ECS and resets the stream when no handler answers. End-to-end coverage: 6 integration tests in [simulator/tests/](simulator/tests/) plus 4 codec unit tests, all green.
|
||||||
- **M3 — Simulator client.** Replace [simulator/src/main.rs](simulator/src/main.rs) with a Bevy app: Quinn client, N synthetic devices, configurable per-tier rates. *Verify:* end-to-end loopback drains messages on all three tiers. **Status (2026-05-05):** simulator made into a lib + bin; `SimulatorClient::{connect,send_datagram,close}` plus a manual smoke runner in `simulator/src/main.rs`. Two integration tests in `simulator/tests/end_to_end_t1.rs` exercise the full T1 path against an in-process substrate. Bevy-driven generator + T2/T3 helpers + load profiles still pending.
|
- **M3 — Simulator client.** Replace [simulator/src/main.rs](simulator/src/main.rs) with a Bevy app: Quinn client, N synthetic devices, configurable per-tier rates. *Verify:* end-to-end loopback drains messages on all three tiers. **Status (2026-05-05):** simulator made into a lib + bin; `SimulatorClient::{connect,send_datagram,close}` plus a manual smoke runner in `simulator/src/main.rs`. Two integration tests in `simulator/tests/end_to_end_t1.rs` exercise the full T1 path against an in-process substrate. Bevy-driven generator + T2/T3 helpers + load profiles still pending.
|
||||||
- **M4 — ECS world.** ✅ Done. `Asset` + `DeviceId` + `SensorId` + `SensorTypeTag` + `RawSensorData` + `SmoothedValue` components in [substrate/src/world.rs](substrate/src/world.rs); `SensorRegistry` resource for O(1) `(Uuid, u16) → Entity`. `IngestSystem` drains all three tiers (T1 batched, T2/T3 fully); T3 handler returns the latest sensor value as ack. `SimulationSystem` runs a per-entity 16-sample rolling mean and emits `substrate_threshold_crossings_total{type, direction}` on per-type threshold crossings — gives the ECS observable digital-twin work, not just write-through ingest. `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `FaultInjection` still a stub (M6). `DiagnosticsSystem` logs tick rate once per second. Schedule rate-gated via `ScheduleRunnerPlugin::run_loop(1/tick_rate_hz)`. 8 unit tests passing (entity create, in-place update, T3 ack, SmoothedValue push/window/non-finite/full-roll, threshold-crossing transition).
|
- **M4 — ECS world.** ✅ Done. `Asset` + `DeviceId` + `SensorId` + `SensorTypeTag` + `RawSensorData` + `SmoothedValue` components in [substrate/src/world.rs](substrate/src/world.rs); `SensorRegistry` resource for O(1) `(Uuid, u16) → Entity`. `IngestSystem` drains all three tiers (T1 batched, T2/T3 fully); T3 handler returns the latest sensor value as ack. `SimulationSystem` runs a per-entity 16-sample rolling mean and emits `substrate_threshold_crossings_total{type, direction}` on per-type threshold crossings — gives the ECS observable digital-twin work, not just write-through ingest. `ExportSystem` samples `substrate_{entities,channel_depth,channel_capacity,rss_bytes}` + `sensor_aggregate{type, stat}` once per second. `DiagnosticsSystem` logs tick rate once per second. Schedule rate-gated via `ScheduleRunnerPlugin::run_loop(1/tick_rate_hz)`. 8 unit tests passing (entity create, in-place update, T3 ack, SmoothedValue push/window/non-finite/full-roll, threshold-crossing transition).
|
||||||
- **M5 — Observability (VictoriaMetrics + Grafana).** ✅ Done. Wire format extended to carry `sensor_type: u8` (38 → 39 B, decoded into `SensorType` enum). Two metric surfaces over `metrics-exporter-prometheus`:
|
- **M5 — Observability (VictoriaMetrics + Grafana).** ✅ Done. Wire format extended to carry `sensor_type: u8` (38 → 39 B, decoded into `SensorType` enum). Two metric surfaces over `metrics-exporter-prometheus`:
|
||||||
- **Runtime** (paper §Evaluation): `substrate_received_total{tier}`, `dropped_total{tier=t1}`, `decode_errors_total{tier}`, `t3_no_handler_total`, `latency_us{tier}` histograms, `tick_hz` / `entities` / `channel_depth{tier}` / `rss_bytes` gauges.
|
- **Runtime** (paper §Evaluation): `substrate_received_total{tier}`, `dropped_total{tier=t1}`, `decode_errors_total{tier}`, `t3_no_handler_total`, `latency_us{tier}` histograms, `tick_hz` / `entities` / `channel_depth{tier}` / `rss_bytes` gauges.
|
||||||
- **Sensor data** (operator surface): `sensor_aggregate{type, stat=count|mean|min|max}` aggregated per second across the live ECS world. Cardinality bounded to `\|SensorType\| × 4` series independent of physical sensor count.
|
- **Sensor data** (operator surface): `sensor_aggregate{type, stat=count|mean|min|max}` aggregated per second across the live ECS world. Cardinality bounded to `\|SensorType\| × 4` series independent of physical sensor count.
|
||||||
- Dashboards: [dashboards/runtime.json](dashboards/runtime.json) + [dashboards/sensors.json](dashboards/sensors.json).
|
- Dashboards: [dashboards/runtime.json](dashboards/runtime.json) + [dashboards/sensors.json](dashboards/sensors.json).
|
||||||
- Verified: `--profile industrial --devices 2 --count 200` yields 10 entities and all 5 type aggregates with realistic values (T=20.5°C, RH=51%, P=1018 hPa, V=230.2 V, I=12 A).
|
- Verified: `--profile industrial --devices 2 --count 200` yields 10 entities and all 5 type aggregates with realistic values (T=20.5°C, RH=51%, P=1018 hPa, V=230.2 V, I=12 A).
|
||||||
- **M6 — Benchmark harness.** Sweep `entity_count ∈ {10k, 50k, 100k, 200k}` × `loss_rate ∈ {0%, 1%, 5%}` with 2k warmup + 5k measurement ticks. Loss via `tc netem` or in-app injection. Writes `data/loopback/final_table.csv`. *Verify:* one full sweep on M4 Max produces a CSV the Quarto figures consume.
|
- **M6 — Benchmark harness.** Sweep `entity_count ∈ {10k, 50k, 100k, 200k}` × `loss_rate ∈ {0%, 1%, 5%}` with 2k warmup + 5k measurement ticks. Loss via `tc netem`. Writes `data/loopback/final_table.csv`. *Verify:* one full sweep on M4 Max produces a CSV the Quarto figures consume.
|
||||||
- **M7 — CM5 cross-compile & deploy.** Exercise [Makefile:30](Makefile#L30) (`build-cm5`, `deploy-cm5`); set real `CM5_HOST`. *Verify:* binary runs on CM5 with a feed from M4 Max over 1 Gbps Ethernet.
|
- **M7 — CM5 cross-compile & deploy.** Exercise [Makefile:30](Makefile#L30) (`build-cm5`, `deploy-cm5`); set real `CM5_HOST`. *Verify:* binary runs on CM5 with a feed from M4 Max over 1 Gbps Ethernet.
|
||||||
- **M8 — Two-machine run + paper render.** Sweep with simulator on M4 Max → substrate on CM5; populate `data/two_machine/final_table.csv`; `make render` produces a PDF. **Update §Evaluation prose to reflect actual numbers.** Current paper figures (241 Hz, 64 µs / 15.8 ms P99, 2.6 µs jitter, 1.02 MB/1k, R²=0.9999) are **aspirational placeholders** — they may move and the conclusions may shift; that's expected.
|
- **M8 — Two-machine run + paper render.** Sweep with simulator on M4 Max → substrate on CM5; populate `data/two_machine/final_table.csv`; `make render` produces a PDF. **Update §Evaluation prose to reflect actual numbers.** Current paper figures (241 Hz, 64 µs / 15.8 ms P99, 2.6 µs jitter, 1.02 MB/1k, R²=0.9999) are **aspirational placeholders** — they may move and the conclusions may shift; that's expected.
|
||||||
|
|
||||||
@@ -105,7 +105,7 @@ Each milestone has one verification gate. Update Status here as we go.
|
|||||||
- **No graceful shutdown.** The `quic-runtime` thread is parked on `pending()`; spawned tasks (accept loop, per-conn demux) are orphaned at process exit. Fine for research runs; we'll need an `OnExit(Started)` (or a `Stopping` state) when M5 observability needs clean drain or M8 wants finalised CSV writes.
|
- **No graceful shutdown.** The `quic-runtime` thread is parked on `pending()`; spawned tasks (accept loop, per-conn demux) are orphaned at process exit. Fine for research runs; we'll need an `OnExit(Started)` (or a `Stopping` state) when M5 observability needs clean drain or M8 wants finalised CSV writes.
|
||||||
- **Bind failure is fatal.** `OnEnter(Starting)` panics if `bind_endpoint` fails. A `ServerState::Failed` variant joins when we wire proper error surfacing.
|
- **Bind failure is fatal.** `OnEnter(Starting)` panics if `bind_endpoint` fails. A `ServerState::Failed` variant joins when we wire proper error surfacing.
|
||||||
- **T3 ack semantics are minimal.** The current handler echoes the device's most recent `raw_value` with a server timestamp — adequate for "read sensor" commands, not for actuator-write semantics. A future iteration may introduce an `ActuatorState` component and a setpoint-apply path; for now T3 is best framed as "reliable read/query RPC" in the paper.
|
- **T3 ack semantics are minimal.** The current handler echoes the device's most recent `raw_value` with a server timestamp — adequate for "read sensor" commands, not for actuator-write semantics. A future iteration may introduce an `ActuatorState` component and a setpoint-apply path; for now T3 is best framed as "reliable read/query RPC" in the paper.
|
||||||
- **`FaultInjectionSystem` is still empty.** Runs on schedule but does nothing. M6 fills it with rate-controlled in-app drop so loss sweeps don't depend on external `tc netem`.
|
|
||||||
- **Schedule rate-gating is approximate.** `ScheduleRunnerPlugin::run_loop(period)` honours `period` as a minimum; observed `tick_hz` runs ~85% of target on macOS dev (target 60 → ~50). Should be tighter on the CM5; revisit if M6 sweeps depend on a steady tick.
|
- **Schedule rate-gating is approximate.** `ScheduleRunnerPlugin::run_loop(period)` honours `period` as a minimum; observed `tick_hz` runs ~85% of target on macOS dev (target 60 → ~50). Should be tighter on the CM5; revisit if M6 sweeps depend on a steady tick.
|
||||||
|
|
||||||
## Run / verify
|
## Run / verify
|
||||||
|
|||||||
@@ -12,6 +12,9 @@ server_port = 9000
|
|||||||
server_interface = "0.0.0.0"
|
server_interface = "0.0.0.0"
|
||||||
server_cert = "certs/server.crt"
|
server_cert = "certs/server.crt"
|
||||||
server_key = "certs/server.key"
|
server_key = "certs/server.key"
|
||||||
|
t1_capacity = 1024
|
||||||
|
t2_capacity = 512
|
||||||
|
t3_capacity = 256
|
||||||
|
|
||||||
[simulation]
|
[simulation]
|
||||||
tick_rate_hz = 60
|
tick_rate_hz = 60
|
||||||
|
|||||||
@@ -1,2 +1,10 @@
|
|||||||
rate_hz,t3_rate_hz,devices,tick_rate_hz,window_s,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t3_received,t3_no_handler,t3_p50_us,t3_p99_us,t3_p999_us,tick_hz,rss_mb,channel_depth_max
|
rate_hz,t3_rate_hz,devices,tick_rate_hz,window_s,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t3_received,t3_no_handler,t3_p50_us,t3_p99_us,t3_p999_us,tick_hz,rss_mb,channel_depth_max
|
||||||
100,100,100,0,25,2646,0,118.99720565324648,202.0065277946852,245.99224556720532,2646,0,120.98904580793433,199.99652925270829,238.0069829199846,15833.3,28.0,0
|
100,100,100,0,25,2641,0,114.99630654141735,183.99342299596765,233.99506214353187,2641,0,115.98953956135134,181.0005395731182,227.98959982186395,15726.4,27.7,1
|
||||||
|
500,100,100,0,25,13172,0,98.99803587754256,164.00550789726537,216.00466678084967,2634,0,112.99007813673754,176.99119972210946,226.98864928535784,15775.8,28.9,1
|
||||||
|
1000,100,100,0,25,26259,0,94.00049142147152,146.01363556268566,193.00189597134016,2626,0,102.99701533183928,155.01160914305248,209.99842124823599,15550.5,29.5,1
|
||||||
|
5000,100,100,0,25,131395,0,91.99185219896138,143.00791974278306,198.00653053045428,2628,0,99.99298268244951,150.00970795504614,195.99712928020054,15635.5,30.3,5
|
||||||
|
10000,100,100,0,25,263310,0,91.99185219896138,155.01160914305248,243.0093726834088,2633,0,104.99366452846704,169.9832685850933,241.99087365988592,15516.1,30.9,0
|
||||||
|
25000,100,100,0,25,657000,0,94.00049142147152,166.9843360750187,245.99224556720532,2628,0,107.00761611528327,178.9846436133428,260.00477949916575,15672.3,32.1,25
|
||||||
|
50000,100,100,0,25,1316100,0,96.99893608515958,155.01160914305248,197.01896883616524,2632,0,106.00645733856791,164.00550789726537,198.00653053045428,15376.8,33.2,50
|
||||||
|
100000,100,100,0,25,2625900,0,98.99803587754256,173.00145626474986,219.0061940968233,2626,0,110.00216095757669,185.99132120176222,231.01901268757703,15085.7,35.5,100
|
||||||
|
250000,100,100,0,25,6580250,0,103.99054800886718,200.99901603074525,251.01181592403498,2632,0,115.98953956135134,220.0159432355299,314.977124065739,14190.8,42.0,96
|
||||||
|
|||||||
|
13
data/loopback/final_table.csv
Normal file
13
data/loopback/final_table.csv
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
entities,loss_pct,devices,rate_hz,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t2_p99_us,t3_rtt_us,hz,rss_mb
|
||||||
|
10000,0,2000,100,201,0,152.00296264568095,231.99133298742527,245.01024189833885,0,0,15254.7,26.5
|
||||||
|
10000,1,2000,100,202,0,153.00950012244246,251.01181592403498,261.98834382686925,0,0,14916.7,26.8
|
||||||
|
10000,5,2000,100,202,0,148.01298577790973,245.01024189833885,262.98579349083377,0,0,15108.5,27.1
|
||||||
|
50000,0,10000,100,202,0,146.01363556268566,238.0069829199846,261.98834382686925,0,0,15098.7,27.5
|
||||||
|
50000,1,10000,100,202,0,144.01248706798935,236.0160976812146,262.98579349083377,0,0,14938.1,27.6
|
||||||
|
50000,5,10000,100,202,0,140.99155733033865,238.0069829199846,266.00098548659696,0,0,14705.4,27.7
|
||||||
|
100000,0,20000,100,201,0,138.0063931729486,233.01434382937512,262.98579349083377,0,0,14823.3,27.9
|
||||||
|
100000,1,20000,100,202,0,134.00806856721388,231.99133298742527,262.98579349083377,0,0,14802.4,28.6
|
||||||
|
100000,5,20000,100,202,0,132.9934676099666,230.00476201617178,262.98579349083377,0,0,15060.9,28.7
|
||||||
|
200000,0,40000,100,202,0,136.00613722545975,238.0069829199846,276.027366209557,0,0,14835.4,29.0
|
||||||
|
200000,1,40000,100,202,0,138.0063931729486,240.01466203032882,270.02107558160185,0,0,14840.5,29.1
|
||||||
|
200000,5,40000,100,202,0,139.00362493341808,240.01466203032882,276.027366209557,0,0,14882.3,29.1
|
||||||
|
169
scripts/bench-loss.sh
Executable file
169
scripts/bench-loss.sh
Executable file
@@ -0,0 +1,169 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# scripts/bench-loss.sh — M6 benchmark harness
|
||||||
|
# Sweeps entity count {10k, 50k, 100k, 200k} x loss_rate {0, 1, 5}%
|
||||||
|
# Output: data/loopback/final_table.csv
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||||
|
ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||||
|
cd "$ROOT"
|
||||||
|
|
||||||
|
TICK_RATE_HZ="${TICK_RATE_HZ:-100}"
|
||||||
|
WARMUP_S="${WARMUP_S:-20}"
|
||||||
|
WINDOW_S="${WINDOW_S:-50}"
|
||||||
|
RATE_HZ="${RATE_HZ:-100}"
|
||||||
|
BUILD="${BUILD:-release}"
|
||||||
|
IFACE="${IFACE:-eth0}"
|
||||||
|
|
||||||
|
OUT_CSV="${OUT_CSV:-data/loopback/final_table.csv}"
|
||||||
|
|
||||||
|
HAS_TC=1
|
||||||
|
# Check for root/sudo since we need to run tc
|
||||||
|
if ! command -v tc >/dev/null; then
|
||||||
|
echo "Warning: 'tc' command not found. Loss emulation will be skipped."
|
||||||
|
HAS_TC=0
|
||||||
|
fi
|
||||||
|
|
||||||
|
# --- pretty logging ---
|
||||||
|
if [[ -t 1 ]]; then
|
||||||
|
BOLD=$'\033[1m'; DIM=$'\033[2m'; GREEN=$'\033[32m'; RED=$'\033[31m'; RESET=$'\033[0m'
|
||||||
|
else BOLD=; DIM=; GREEN=; RED=; RESET=; fi
|
||||||
|
step() { printf '%s» %s%s\n' "$BOLD" "$1" "$RESET"; }
|
||||||
|
ok() { printf '%s ✓ %s%s\n' "$GREEN" "$1" "$RESET"; }
|
||||||
|
fail() { printf '%s ✗ %s%s\n' "$RED" "$1" "$RESET"; }
|
||||||
|
|
||||||
|
for cmd in cargo curl lsof awk; do
|
||||||
|
command -v "$cmd" >/dev/null || { fail "missing: $cmd"; exit 1; }
|
||||||
|
done
|
||||||
|
for port in 9000 9100; do
|
||||||
|
if lsof -nP -iUDP:$port -iTCP:$port -sTCP:LISTEN 2>/dev/null | grep -q LISTEN; then
|
||||||
|
fail "port $port in use — kill the running substrate first"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
[[ -f certs/server.crt ]] || make certs >/dev/null
|
||||||
|
|
||||||
|
step "Building ($BUILD)"
|
||||||
|
if [[ "$BUILD" == "release" ]]; then
|
||||||
|
cargo build --release -p substrate -p simulator >/dev/null
|
||||||
|
SUBSTRATE="$ROOT/target/release/substrate"
|
||||||
|
SIMULATOR="$ROOT/target/release/simulator"
|
||||||
|
else
|
||||||
|
cargo build -p substrate -p simulator >/dev/null
|
||||||
|
SUBSTRATE="$ROOT/target/debug/substrate"
|
||||||
|
SIMULATOR="$ROOT/target/debug/simulator"
|
||||||
|
fi
|
||||||
|
|
||||||
|
LOG_DIR="/tmp/quic_ecs_dt_bench"
|
||||||
|
mkdir -p "$LOG_DIR"
|
||||||
|
SUB_LOG="$LOG_DIR/substrate.log"
|
||||||
|
: > "$SUB_LOG"
|
||||||
|
|
||||||
|
step "Starting substrate (tick_rate_hz=$TICK_RATE_HZ, log: $SUB_LOG)"
|
||||||
|
APP_SIMULATION__TICK_RATE_HZ="$TICK_RATE_HZ" RUST_LOG=warn "$SUBSTRATE" >"$SUB_LOG" 2>&1 &
|
||||||
|
SUBSTRATE_PID=$!
|
||||||
|
|
||||||
|
for i in $(seq 1 40); do
|
||||||
|
if curl -sf http://localhost:9100/metrics >/dev/null 2>&1; then
|
||||||
|
ok "substrate /metrics ready"; break
|
||||||
|
fi
|
||||||
|
sleep 0.25
|
||||||
|
if [[ $i -eq 40 ]]; then fail "substrate didn't start"; tail -20 "$SUB_LOG"; exit 1; fi
|
||||||
|
done
|
||||||
|
|
||||||
|
cleanup() {
|
||||||
|
[[ -n "${SIM_PID:-}" ]] && kill -TERM "$SIM_PID" 2>/dev/null || true
|
||||||
|
[[ -n "${SUBSTRATE_PID:-}" ]] && kill -TERM "$SUBSTRATE_PID" 2>/dev/null || true
|
||||||
|
if [[ "$HAS_TC" -eq 1 ]]; then
|
||||||
|
sudo tc qdisc del dev $IFACE root 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
wait 2>/dev/null || true
|
||||||
|
}
|
||||||
|
trap cleanup EXIT INT TERM
|
||||||
|
|
||||||
|
snapshot_to() {
|
||||||
|
curl -s http://localhost:9100/metrics > "$1"
|
||||||
|
}
|
||||||
|
get_value() {
|
||||||
|
awk -v pat="$2" '$0 ~ "^" pat " " { print $NF; exit }' "$1"
|
||||||
|
}
|
||||||
|
|
||||||
|
mkdir -p "$(dirname "$OUT_CSV")"
|
||||||
|
echo "entities,loss_pct,devices,rate_hz,t1_received,t1_dropped,t1_p50_us,t1_p99_us,t1_p999_us,t2_p99_us,t3_rtt_us,hz,rss_mb" > "$OUT_CSV"
|
||||||
|
|
||||||
|
step "Sweeping entity_count x loss_pct (warmup ${WARMUP_S}s, window ${WINDOW_S}s)"
|
||||||
|
printf '%s%-10s %-8s %-8s %-9s %-9s %-10s %-10s %-10s %-10s %-10s %-8s %-7s%s\n' \
|
||||||
|
"$BOLD" "entities" "loss_pct" "devices" "received" "dropped" "t1_p50" "t1_p99" "t1_p999" "t2_p99" "t3_rtt" "hz" "rss_mb" "$RESET"
|
||||||
|
|
||||||
|
BEFORE="$LOG_DIR/before.txt"
|
||||||
|
AFTER="$LOG_DIR/after.txt"
|
||||||
|
|
||||||
|
ENTITIES_LIST=(10000 50000 100000 200000)
|
||||||
|
LOSS_LIST=(0 1 5)
|
||||||
|
|
||||||
|
for entities in "${ENTITIES_LIST[@]}"; do
|
||||||
|
devices=$(( entities / 5 ))
|
||||||
|
|
||||||
|
for loss in "${LOSS_LIST[@]}"; do
|
||||||
|
# Apply tc netem loss
|
||||||
|
if [[ "$HAS_TC" -eq 1 ]]; then
|
||||||
|
sudo tc qdisc del dev $IFACE root 2>/dev/null || true
|
||||||
|
if [[ "$loss" -gt 0 ]]; then
|
||||||
|
sudo tc qdisc add dev $IFACE root netem loss ${loss}% 2>/dev/null || {
|
||||||
|
echo "Warning: failed to apply tc netem loss on interface $IFACE."
|
||||||
|
}
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
sim_args=(
|
||||||
|
--profile industrial
|
||||||
|
--rate-hz "$RATE_HZ"
|
||||||
|
--count 0
|
||||||
|
--devices "$devices"
|
||||||
|
)
|
||||||
|
RUST_LOG=warn "$SIMULATOR" "${sim_args[@]}" >"$LOG_DIR/sim_${entities}_${loss}.log" 2>&1 &
|
||||||
|
SIM_PID=$!
|
||||||
|
|
||||||
|
sleep "$WARMUP_S"
|
||||||
|
snapshot_to "$BEFORE"
|
||||||
|
rec_before=$(get_value "$BEFORE" 'substrate_received_total\{tier="t1"\}')
|
||||||
|
drop_before=$(get_value "$BEFORE" 'substrate_dropped_total\{tier="t1"\}')
|
||||||
|
|
||||||
|
sleep "$WINDOW_S"
|
||||||
|
|
||||||
|
snapshot_to "$AFTER"
|
||||||
|
kill -TERM "$SIM_PID" 2>/dev/null || true
|
||||||
|
wait "$SIM_PID" 2>/dev/null || true
|
||||||
|
SIM_PID=""
|
||||||
|
|
||||||
|
rec_after=$(get_value "$AFTER" 'substrate_received_total\{tier="t1"\}')
|
||||||
|
drop_after=$(get_value "$AFTER" 'substrate_dropped_total\{tier="t1"\}')
|
||||||
|
p50=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.5"\}')
|
||||||
|
p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.99"\}')
|
||||||
|
p999=$(get_value "$AFTER" 'substrate_latency_us\{tier="t1",quantile="0.999"\}')
|
||||||
|
t2_p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t2",quantile="0.99"\}')
|
||||||
|
t3_p99=$(get_value "$AFTER" 'substrate_latency_us\{tier="t3",quantile="0.99"\}')
|
||||||
|
|
||||||
|
tick_hz=$(get_value "$AFTER" 'substrate_tick_hz')
|
||||||
|
rss=$(get_value "$AFTER" 'substrate_rss_bytes')
|
||||||
|
|
||||||
|
received=$(awk -v a="$rec_after" -v b="$rec_before" 'BEGIN { printf "%d", a-b }')
|
||||||
|
dropped=$(awk -v a="$drop_after" -v b="$drop_before" 'BEGIN { printf "%d", a-b }')
|
||||||
|
rss_mb=$(awk -v r="$rss" 'BEGIN { printf "%.1f", r/1048576 }')
|
||||||
|
tick_hz_fmt=$(awk -v t="$tick_hz" 'BEGIN { printf "%.1f", t }')
|
||||||
|
|
||||||
|
printf '%-10s %-8s %-8s %-9s %-9s %-10.0f %-10.0f %-10.0f %-10.0f %-10.0f %-8s %-7s\n' \
|
||||||
|
"$entities" "$loss" "$devices" "$received" "$dropped" "${p50:-0}" "${p99:-0}" "${p999:-0}" "${t2_p99:-0}" "${t3_p99:-0}" \
|
||||||
|
"$tick_hz_fmt" "$rss_mb"
|
||||||
|
|
||||||
|
echo "$entities,$loss,$devices,$RATE_HZ,$received,$dropped,${p50:-0},${p99:-0},${p999:-0},${t2_p99:-0},${t3_p99:-0},$tick_hz_fmt,$rss_mb" >> "$OUT_CSV"
|
||||||
|
|
||||||
|
done
|
||||||
|
done
|
||||||
|
|
||||||
|
if [[ "$HAS_TC" -eq 1 ]]; then
|
||||||
|
sudo tc qdisc del dev $IFACE root 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
|
||||||
|
printf '\n%sCSV written to:%s %s\n' "$DIM" "$RESET" "$OUT_CSV"
|
||||||
@@ -38,9 +38,17 @@ pub async fn run_t2_emitter(
|
|||||||
) -> u64 {
|
) -> u64 {
|
||||||
let period = Duration::from_nanos((1.0e9 / rate_hz) as u64);
|
let period = Duration::from_nanos((1.0e9 / rate_hz) as u64);
|
||||||
let mut ticker = tokio::time::interval(period);
|
let mut ticker = tokio::time::interval(period);
|
||||||
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||||
let mut sent: u64 = 0;
|
let mut sent: u64 = 0;
|
||||||
|
|
||||||
|
let mut send = match conn.open_uni().await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "T2 open_uni failed; emitter exiting");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
ticker.tick().await;
|
ticker.tick().await;
|
||||||
if interrupted.load(Ordering::SeqCst) {
|
if interrupted.load(Ordering::SeqCst) {
|
||||||
@@ -57,25 +65,19 @@ pub async fn run_t2_emitter(
|
|||||||
};
|
};
|
||||||
slot.seq = slot.seq.wrapping_add(1);
|
slot.seq = slot.seq.wrapping_add(1);
|
||||||
|
|
||||||
match conn.open_uni().await {
|
|
||||||
Ok(mut send) => {
|
|
||||||
if let Err(e) = send.write_all(&msg.to_bytes()).await {
|
if let Err(e) = send.write_all(&msg.to_bytes()).await {
|
||||||
tracing::warn!(error = %e, "T2 write_all failed");
|
tracing::warn!(error = %e, "T2 write_all failed; stream closed?");
|
||||||
continue;
|
break;
|
||||||
}
|
|
||||||
if let Err(e) = send.finish() {
|
|
||||||
tracing::warn!(error = %e, "T2 finish failed");
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sent += 1;
|
sent += 1;
|
||||||
counter.store(sent, Ordering::Relaxed);
|
counter.store(sent, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(error = %e, "T2 open_uni failed; emitter exiting");
|
if let Err(e) = send.finish() {
|
||||||
break;
|
tracing::warn!(error = %e, "T2 finish failed");
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,7 +94,7 @@ pub async fn run_t3_emitter(
|
|||||||
) -> (u64, u64) {
|
) -> (u64, u64) {
|
||||||
let period = Duration::from_nanos((1.0e9 / rate_hz) as u64);
|
let period = Duration::from_nanos((1.0e9 / rate_hz) as u64);
|
||||||
let mut ticker = tokio::time::interval(period);
|
let mut ticker = tokio::time::interval(period);
|
||||||
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||||
let mut sent: u64 = 0;
|
let mut sent: u64 = 0;
|
||||||
let mut timeouts: u64 = 0;
|
let mut timeouts: u64 = 0;
|
||||||
|
|
||||||
|
|||||||
@@ -221,7 +221,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
if cli.rate_hz > 0.0 {
|
if cli.rate_hz > 0.0 {
|
||||||
let period = Duration::from_nanos((1.0e9 / cli.rate_hz) as u64);
|
let period = Duration::from_nanos((1.0e9 / cli.rate_hz) as u64);
|
||||||
let mut ticker = tokio::time::interval(period);
|
let mut ticker = tokio::time::interval(period);
|
||||||
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||||
|
|
||||||
let unlimited = cli.count == 0;
|
let unlimited = cli.count == 0;
|
||||||
let mut last_progress = started;
|
let mut last_progress = started;
|
||||||
|
|||||||
@@ -28,6 +28,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig {
|
|||||||
server_interface: "127.0.0.1".to_string(),
|
server_interface: "127.0.0.1".to_string(),
|
||||||
server_cert: cert.to_string_lossy().into_owned(),
|
server_cert: cert.to_string_lossy().into_owned(),
|
||||||
server_key: key.to_string_lossy().into_owned(),
|
server_key: key.to_string_lossy().into_owned(),
|
||||||
|
t1_capacity: 1024,
|
||||||
|
t2_capacity: 512,
|
||||||
|
t3_capacity: 256,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,6 +27,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig {
|
|||||||
server_interface: "127.0.0.1".to_string(),
|
server_interface: "127.0.0.1".to_string(),
|
||||||
server_cert: cert.to_string_lossy().into_owned(),
|
server_cert: cert.to_string_lossy().into_owned(),
|
||||||
server_key: key.to_string_lossy().into_owned(),
|
server_key: key.to_string_lossy().into_owned(),
|
||||||
|
t1_capacity: 1024,
|
||||||
|
t2_capacity: 512,
|
||||||
|
t3_capacity: 256,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,9 @@ fn loopback_config(cert: PathBuf, key: PathBuf) -> QuicConfig {
|
|||||||
server_interface: "127.0.0.1".to_string(),
|
server_interface: "127.0.0.1".to_string(),
|
||||||
server_cert: cert.to_string_lossy().into_owned(),
|
server_cert: cert.to_string_lossy().into_owned(),
|
||||||
server_key: key.to_string_lossy().into_owned(),
|
server_key: key.to_string_lossy().into_owned(),
|
||||||
|
t1_capacity: 1024,
|
||||||
|
t2_capacity: 512,
|
||||||
|
t3_capacity: 256,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,9 @@ pub struct QuicConfig {
|
|||||||
pub server_interface: String,
|
pub server_interface: String,
|
||||||
pub server_cert: String,
|
pub server_cert: String,
|
||||||
pub server_key: String,
|
pub server_key: String,
|
||||||
|
pub t1_capacity: usize,
|
||||||
|
pub t2_capacity: usize,
|
||||||
|
pub t3_capacity: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
@@ -41,6 +44,9 @@ impl Default for AppConfig {
|
|||||||
server_interface: "0.0.0.0".to_string(),
|
server_interface: "0.0.0.0".to_string(),
|
||||||
server_cert: "certs/server.crt".to_string(),
|
server_cert: "certs/server.crt".to_string(),
|
||||||
server_key: "certs/server.key".to_string(),
|
server_key: "certs/server.key".to_string(),
|
||||||
|
t1_capacity: 1024,
|
||||||
|
t2_capacity: 512,
|
||||||
|
t3_capacity: 256,
|
||||||
},
|
},
|
||||||
simulation: SimulationConfig {
|
simulation: SimulationConfig {
|
||||||
tick_rate_hz: 60,
|
tick_rate_hz: 60,
|
||||||
|
|||||||
@@ -10,10 +10,6 @@ use crate::transport::{QuicMessage, T1Sender, T2Sender, T3Inbound, T3Sender};
|
|||||||
use crate::transport::server::{accept_loop, bind_endpoint};
|
use crate::transport::server::{accept_loop, bind_endpoint};
|
||||||
use crate::transport::state::ServerState;
|
use crate::transport::state::ServerState;
|
||||||
|
|
||||||
const T1_CAPACITY: usize = 1024;
|
|
||||||
const T2_CAPACITY: usize = 512;
|
|
||||||
const T3_CAPACITY: usize = 256;
|
|
||||||
|
|
||||||
pub struct EcsQuicTransportPlugin;
|
pub struct EcsQuicTransportPlugin;
|
||||||
|
|
||||||
/// Receive halves of the three tier channels, wrapped so they can sit in a
|
/// Receive halves of the three tier channels, wrapped so they can sit in a
|
||||||
@@ -63,11 +59,12 @@ fn start_quic_server(
|
|||||||
|
|
||||||
impl Plugin for EcsQuicTransportPlugin {
|
impl Plugin for EcsQuicTransportPlugin {
|
||||||
fn build(&self, app: &mut App) {
|
fn build(&self, app: &mut App) {
|
||||||
|
let config = app.world_mut().resource::<AppConfig>();
|
||||||
// Three-tier bridge between the tokio-side QUIC accept loop and the
|
// Three-tier bridge between the tokio-side QUIC accept loop and the
|
||||||
// ECS PreUpdate ingest system (in the `world` module).
|
// ECS PreUpdate ingest system (in the `world` module).
|
||||||
let (t1_tx, t1_rx) = mpsc::channel::<QuicMessage>(T1_CAPACITY);
|
let (t1_tx, t1_rx) = mpsc::channel::<QuicMessage>(config.network.t1_capacity);
|
||||||
let (t2_tx, t2_rx) = mpsc::channel::<QuicMessage>(T2_CAPACITY);
|
let (t2_tx, t2_rx) = mpsc::channel::<QuicMessage>(config.network.t2_capacity);
|
||||||
let (t3_tx, t3_rx) = mpsc::channel::<T3Inbound>(T3_CAPACITY);
|
let (t3_tx, t3_rx) = mpsc::channel::<T3Inbound>(config.network.t3_capacity);
|
||||||
|
|
||||||
// Spawn a tokio runtime on a dedicated OS thread, ship its Handle back
|
// Spawn a tokio runtime on a dedicated OS thread, ship its Handle back
|
||||||
// to the ECS, and keep the runtime alive for the lifetime of the app
|
// to the ECS, and keep the runtime alive for the lifetime of the app
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
//! ```text
|
//! ```text
|
||||||
//! components.rs ── per-sensor components + per-type threshold table
|
//! components.rs ── per-sensor components + per-type threshold table
|
||||||
//! resources.rs ── SensorRegistry, DiagnosticsState, ExportSampleState
|
//! resources.rs ── SensorRegistry, DiagnosticsState, ExportSampleState
|
||||||
//! systems.rs ── ingest / fault_injection / simulation / export / diagnostics
|
//! systems.rs ── ingest / simulation / export / diagnostics
|
||||||
//! tests.rs ── unit tests (#[cfg(test)] only)
|
//! tests.rs ── unit tests (#[cfg(test)] only)
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
@@ -39,9 +39,7 @@ impl Plugin for WorldPlugin {
|
|||||||
.init_resource::<resources::ExportSampleState>()
|
.init_resource::<resources::ExportSampleState>()
|
||||||
.add_systems(
|
.add_systems(
|
||||||
PreUpdate,
|
PreUpdate,
|
||||||
(systems::fault_injection_system, systems::ingest_system)
|
systems::ingest_system.run_if(in_state(ServerState::Started)),
|
||||||
.chain()
|
|
||||||
.run_if(in_state(ServerState::Started)),
|
|
||||||
)
|
)
|
||||||
.add_systems(Update, systems::simulation_system)
|
.add_systems(Update, systems::simulation_system)
|
||||||
.add_systems(
|
.add_systems(
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
//!
|
//!
|
||||||
//! | Schedule | Systems |
|
//! | Schedule | Systems |
|
||||||
//! |-----------|--------------------------------------|
|
//! |-----------|--------------------------------------|
|
||||||
//! | PreUpdate | fault_injection → ingest |
|
//! | PreUpdate | ingest |
|
||||||
//! | Update | simulation |
|
//! | Update | simulation |
|
||||||
//! | PostUpdate| export → diagnostics |
|
//! | PostUpdate| export → diagnostics |
|
||||||
|
|
||||||
@@ -25,6 +25,8 @@ use super::resources::{DiagnosticsState, ExportSampleState, SensorRegistry};
|
|||||||
/// T1 batch limit per tick. Anything beyond this stays in the channel and
|
/// T1 batch limit per tick. Anything beyond this stays in the channel and
|
||||||
/// either drains next tick or gets dropped on full (T1's contract is lossy).
|
/// either drains next tick or gets dropped on full (T1's contract is lossy).
|
||||||
const T1_INGEST_BATCH: usize = 1024;
|
const T1_INGEST_BATCH: usize = 1024;
|
||||||
|
const T2_INGEST_BATCH: usize = 512;
|
||||||
|
const T3_INGEST_BATCH: usize = 256;
|
||||||
|
|
||||||
/// Drain the three tier channels into ECS state.
|
/// Drain the three tier channels into ECS state.
|
||||||
///
|
///
|
||||||
@@ -56,18 +58,25 @@ pub(super) fn ingest_system(
|
|||||||
// T2 — uni streams.
|
// T2 — uni streams.
|
||||||
{
|
{
|
||||||
let mut t2 = bridge.t2.lock().unwrap();
|
let mut t2 = bridge.t2.lock().unwrap();
|
||||||
while let Ok(msg) = t2.try_recv() {
|
for _ in 0..T2_INGEST_BATCH {
|
||||||
|
match t2.try_recv() {
|
||||||
|
Ok(msg) => {
|
||||||
histogram!("substrate_latency_us", "tier" => "t2")
|
histogram!("substrate_latency_us", "tier" => "t2")
|
||||||
.record(now.saturating_sub(msg.timestamp_us) as f64);
|
.record(now.saturating_sub(msg.timestamp_us) as f64);
|
||||||
upsert_reading(&mut registry, &mut commands, &mut q, msg);
|
upsert_reading(&mut registry, &mut commands, &mut q, msg);
|
||||||
}
|
}
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// T3 — bidirectional commands. Reply with the device's most recent
|
// T3 — bidirectional commands. Reply with the device's most recent
|
||||||
// sensor value (NaN if we've never seen this (device, sensor) before).
|
// sensor value (NaN if we've never seen this (device, sensor) before).
|
||||||
{
|
{
|
||||||
let mut t3 = bridge.t3.lock().unwrap();
|
let mut t3 = bridge.t3.lock().unwrap();
|
||||||
while let Ok(inbound) = t3.try_recv() {
|
for _ in 0..T3_INGEST_BATCH {
|
||||||
|
match t3.try_recv() {
|
||||||
|
Ok(inbound) => {
|
||||||
histogram!("substrate_latency_us", "tier" => "t3")
|
histogram!("substrate_latency_us", "tier" => "t3")
|
||||||
.record(now.saturating_sub(inbound.command.timestamp_us) as f64);
|
.record(now.saturating_sub(inbound.command.timestamp_us) as f64);
|
||||||
let key = (inbound.command.device_id, inbound.command.sensor_id);
|
let key = (inbound.command.device_id, inbound.command.sensor_id);
|
||||||
@@ -89,6 +98,9 @@ pub(super) fn ingest_system(
|
|||||||
// connection died while we were processing.
|
// connection died while we were processing.
|
||||||
let _ = inbound.reply.send(ack);
|
let _ = inbound.reply.send(ack);
|
||||||
}
|
}
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,8 +144,6 @@ fn upsert_reading(
|
|||||||
registry.map.insert(key, entity);
|
registry.map.insert(key, entity);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stub — M6 inserts loss/delay here for benchmark scenarios.
|
|
||||||
pub(super) fn fault_injection_system() {}
|
|
||||||
|
|
||||||
/// Per-sensor digital-twin transform. Pulls each entity's latest
|
/// Per-sensor digital-twin transform. Pulls each entity's latest
|
||||||
/// `RawSensorData` into a sliding-window mean (`SmoothedValue`), and emits
|
/// `RawSensorData` into a sliding-window mean (`SmoothedValue`), and emits
|
||||||
|
|||||||
Reference in New Issue
Block a user