//! Substrate → simulator T3 receiver. //! //! The substrate is the brain: when its `automation_system` decides to //! actuate, it opens a QUIC bidirectional stream to one of its connected //! devices. The simulator side accepts those streams here, decodes the //! 39-byte command, applies it to local actuator state, and writes a 39-byte //! ack back. This closes the loop the paper's three-tier model describes. use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use substrate::transport::{QuicMessage, SensorType}; /// Convenience constructor used by `main.rs` and integration tests. /// `true` means the simulated engine is running normally. pub fn new_engine_state() -> Arc { Arc::new(AtomicBool::new(true)) } /// Loop accepting substrate-initiated bidirectional streams until the /// connection drops. Each stream is one (command, ack) round-trip: /// the simulator reads a 39-byte `QuicMessage`, mutates `engine_running` if /// the command targets the Relay actuator, then writes a 39-byte ack back /// (echoes the command with the simulator's local timestamp). pub async fn run_command_receiver(conn: quinn::Connection, engine_running: Arc) { let remote = conn.remote_address(); let mut streams_seen: u64 = 0; loop { let (send, recv) = match conn.accept_bi().await { Ok(s) => s, Err(e) => { tracing::debug!( ?remote, streams_seen, error = %e, "command receiver: accept_bi loop ended" ); return; } }; streams_seen += 1; let engine_running = engine_running.clone(); tokio::spawn(handle_one_command(remote, send, recv, engine_running)); } } async fn handle_one_command( remote: std::net::SocketAddr, mut send: quinn::SendStream, mut recv: quinn::RecvStream, engine_running: Arc, ) { let mut buf = [0u8; QuicMessage::WIRE_SIZE]; if let Err(e) = recv.read_exact(&mut buf).await { tracing::trace!(?remote, error = %e, "command receiver: short read; closing stream"); return; } let cmd = match QuicMessage::decode(&buf) { Ok(m) => m, Err(e) => { tracing::warn!(?remote, error = %e, "command receiver: decode failed"); let _ = send.reset(0u32.into()); return; } }; if cmd.typ() == SensorType::Relay { // raw_value == 1.0 ⇒ stop the engine; 0.0 ⇒ resume. let now_running = cmd.raw_value < 0.5; let was_running = engine_running.swap(now_running, Ordering::SeqCst); if now_running != was_running { if now_running { tracing::info!(device = %cmd.device_id, "Relay=0 received — engine resuming"); } else { tracing::info!(device = %cmd.device_id, "Relay=1 received — engine stopping"); } } } else { tracing::debug!( ?remote, sensor_type = cmd.sensor_type, "command receiver: ignoring non-Relay command" ); } // Ack by echoing the command — the substrate's outbound drain measures // latency from open_bi() to ack receipt. if let Err(e) = send.write_all(&cmd.to_bytes()).await { tracing::warn!(?remote, error = %e, "command receiver: ack write failed"); return; } if let Err(e) = send.finish() { tracing::warn!(?remote, error = %e, "command receiver: ack finish failed"); } }