diff --git a/dashboards/sensors.json b/dashboards/sensors.json index 057cc20..0f0d7f8 100644 --- a/dashboards/sensors.json +++ b/dashboards/sensors.json @@ -267,6 +267,48 @@ "legendFormat": "{{type}} {{direction}}" } ] + }, + { + "id": 11, + "title": "Machine State (Relay)", + "type": "stat", + "gridPos": { "h": 8, "w": 6, "x": 18, "y": 8 }, + "datasource": { "type": "prometheus", "uid": "${datasource}" }, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "auto" + }, + "fieldConfig": { + "defaults": { + "mappings": [ + { + "options": { "0": { "color": "green", "index": 0, "text": "RUNNING" } }, + "type": "value" + }, + { + "options": { "1": { "color": "red", "index": 1, "text": "STOPPED" } }, + "type": "value" + } + ] + } + }, + "targets": [ + { + "expr": "sensor_aggregate{type=\"relay\", stat=\"max\"}", + "refId": "A" + } + ] + }, + { + "id": 12, + "title": "Manual Control", + "type": "text", + "gridPos": { "h": 8, "w": 6, "x": 18, "y": 16 }, + "options": { + "mode": "html", + "content": "
\n \n
" + } } ] } diff --git a/monitoring/docker-compose.yml b/monitoring/docker-compose.yml index 48d4850..2dff60b 100644 --- a/monitoring/docker-compose.yml +++ b/monitoring/docker-compose.yml @@ -43,6 +43,7 @@ services: - GF_SECURITY_ADMIN_USER=admin - GF_SECURITY_ADMIN_PASSWORD=admin - GF_USERS_DEFAULT_THEME=dark + - GF_PANELS_DISABLE_SANITIZE_HTML=true volumes: - ./grafana/provisioning:/etc/grafana/provisioning:ro - ../dashboards:/var/lib/grafana/dashboards:ro diff --git a/simulator/src/emitters.rs b/simulator/src/emitters.rs index 0288d34..415be3f 100644 --- a/simulator/src/emitters.rs +++ b/simulator/src/emitters.rs @@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::Context; -use substrate::transport::QuicMessage; +use substrate::transport::{QuicMessage, SensorType}; use tokio::time::MissedTickBehavior; use crate::profile::{SensorSlot, generate_value}; @@ -97,6 +97,7 @@ pub async fn run_t3_emitter( ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut sent: u64 = 0; let mut timeouts: u64 = 0; + let mut last_relay_state = 0.0; loop { ticker.tick().await; @@ -115,9 +116,20 @@ pub async fn run_t3_emitter( slot.seq = slot.seq.wrapping_add(1); match tokio::time::timeout(timeout, t3_one_request(&conn, &cmd)).await { - Ok(Ok(_ack)) => { + Ok(Ok(ack)) => { sent += 1; sent_counter.store(sent, Ordering::Relaxed); + + if ack.sensor_type == SensorType::Relay.as_u8() { + let is_on = ack.raw_value > 0.5; + let was_on = last_relay_state > 0.5; + if is_on && !was_on { + tracing::info!(device = %ack.device_id, "Relay triggered ON (machine stopped)!"); + } else if !is_on && was_on { + tracing::info!(device = %ack.device_id, "Relay turned OFF."); + } + last_relay_state = ack.raw_value; + } } Ok(Err(e)) => { tracing::warn!(error = %e, "T3 request failed"); diff --git a/simulator/src/main.rs b/simulator/src/main.rs index cb54c82..bdd75a2 100644 --- a/simulator/src/main.rs +++ b/simulator/src/main.rs @@ -214,6 +214,50 @@ async fn main() -> anyhow::Result<()> { None }; + let presence_slot_opt = slots.iter().find(|s| s.sensor_type == SensorType::Presence).cloned(); + let conn_clone = client.conn.clone(); + if let Some(presence_slot) = presence_slot_opt { + tokio::spawn(async move { + if let Ok(listener) = tokio::net::TcpListener::bind("0.0.0.0:9002").await { + tracing::info!("Simulator HTTP trigger API listening on 0.0.0.0:9002"); + while let Ok((mut socket, _)) = listener.accept().await { + let conn = conn_clone.clone(); + let slot = presence_slot.clone(); + tokio::spawn(async move { + let mut buf = [0; 1024]; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + if let Ok(n) = socket.read(&mut buf).await { + let req = String::from_utf8_lossy(&buf[..n]); + if req.starts_with("OPTIONS") { + let res = "HTTP/1.1 204 No Content\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Methods: POST, OPTIONS\r\n\r\n"; + let _ = socket.write_all(res.as_bytes()).await; + } else if req.starts_with("POST /trigger") { + if let Ok(mut send) = conn.open_uni().await { + let msg = QuicMessage { + device_id: slot.device_id, + sensor_id: slot.sensor_id, + raw_value: 0.0, + timestamp_us: now_us(), + sequence_number: 0, + sensor_type: slot.sensor_type.as_u8(), + }; + let _ = send.write_all(&msg.to_bytes()).await; + let _ = send.finish(); + tracing::info!("HTTP API triggered: pushed Presence=0.0 over T2"); + } + let res = "HTTP/1.1 200 OK\r\nAccess-Control-Allow-Origin: *\r\n\r\nTriggered"; + let _ = socket.write_all(res.as_bytes()).await; + } else { + let res = "HTTP/1.1 404 Not Found\r\nAccess-Control-Allow-Origin: *\r\n\r\n"; + let _ = socket.write_all(res.as_bytes()).await; + } + } + }); + } + } + }); + } + let started = Instant::now(); let mut t1_sent: u64 = 0; let mut send_errors: u64 = 0; diff --git a/simulator/src/profile.rs b/simulator/src/profile.rs index 5503ea3..217fa48 100644 --- a/simulator/src/profile.rs +++ b/simulator/src/profile.rs @@ -57,6 +57,8 @@ pub fn build_slots( (2, SensorType::Pressure), (3, SensorType::Voltage), (4, SensorType::Current), + (5, SensorType::Presence), + (6, SensorType::Relay), ] { slots.push(SensorSlot { device_id, @@ -83,6 +85,8 @@ pub fn generate_value(t: SensorType, seq: u32) -> f64 { SensorType::Pressure => 1013.0 + 5.0 * (t_phase / 20.0).cos(), SensorType::Voltage => 230.0 + 0.5 * (t_phase / 3.0).sin(), SensorType::Current => 10.0 + 2.0 * (t_phase / 5.0).cos(), + SensorType::Presence => 2.0 + 1.5 * (t_phase / 5.0).sin(), // Drops below 1.0 occasionally + SensorType::Relay => 0.0, // Relay always sends 0.0 as its command (a pure read request) SensorType::Generic => t_phase.sin(), } } diff --git a/substrate/src/transport/mod.rs b/substrate/src/transport/mod.rs index 5536015..c682f9e 100644 --- a/substrate/src/transport/mod.rs +++ b/substrate/src/transport/mod.rs @@ -19,6 +19,8 @@ pub enum SensorType { Pressure = 3, Voltage = 4, Current = 5, + Presence = 6, + Relay = 7, } impl SensorType { @@ -29,6 +31,8 @@ impl SensorType { 3 => Self::Pressure, 4 => Self::Voltage, 5 => Self::Current, + 6 => Self::Presence, + 7 => Self::Relay, _ => Self::Generic, } } @@ -46,6 +50,8 @@ impl SensorType { Self::Pressure => "pressure", Self::Voltage => "voltage", Self::Current => "current", + Self::Presence => "presence", + Self::Relay => "relay", } } @@ -58,6 +64,8 @@ impl SensorType { Self::Pressure => "hPa", Self::Voltage => "V", Self::Current => "A", + Self::Presence => "s", + Self::Relay => "state", } } } diff --git a/substrate/src/world/components.rs b/substrate/src/world/components.rs index 8bd7416..7e6951b 100644 --- a/substrate/src/world/components.rs +++ b/substrate/src/world/components.rs @@ -91,7 +91,9 @@ pub(super) fn threshold_for(t: SensorType) -> f64 { SensorType::Temperature => 22.0, // °C — simulator oscillates 15..25 SensorType::Humidity => 55.0, // % — 30..70 SensorType::Pressure => 1014.0, // hPa — 1008..1018 - SensorType::Voltage => 230.2, // V — 229.5..230.5 + SensorType::Voltage => 230.5, // V — 229..231 SensorType::Current => 10.5, // A — 8..12 + SensorType::Presence => 1.0, // s — Trigger threshold + SensorType::Relay => f64::INFINITY, // Actuator state, no threshold } } diff --git a/substrate/src/world/mod.rs b/substrate/src/world/mod.rs index 00a4531..29a57ba 100644 --- a/substrate/src/world/mod.rs +++ b/substrate/src/world/mod.rs @@ -41,7 +41,10 @@ impl Plugin for WorldPlugin { PreUpdate, systems::ingest_system.run_if(in_state(ServerState::Started)), ) - .add_systems(Update, systems::simulation_system) + .add_systems( + Update, + (systems::simulation_system, systems::automation_system).chain(), + ) .add_systems( PostUpdate, (systems::export_system, systems::diagnostics_system).chain(), diff --git a/substrate/src/world/systems.rs b/substrate/src/world/systems.rs index e4638c2..c373c85 100644 --- a/substrate/src/world/systems.rs +++ b/substrate/src/world/systems.rs @@ -144,6 +144,38 @@ fn upsert_reading( registry.map.insert(key, entity); } +/// Closed-loop automation triggered by T1/T2 sensor data, affecting a T3 actuator. +pub(super) fn automation_system( + mut registry: ResMut, + mut commands: Commands, + mut p: ParamSet<( + Query<(&DeviceId, &SensorTypeTag, &RawSensorData), Changed>, + Query<&mut RawSensorData>, + )>, +) { + let mut triggers = Vec::new(); + for (dev_id, tag, data) in p.p0().iter() { + if tag.0 == SensorType::Presence { + // Trigger threshold: 1.0 seconds + let relay_state = if data.raw_value < 1.0 { 1.0 } else { 0.0 }; + triggers.push((dev_id.0, relay_state)); + } + } + + let mut q = p.p1(); + for (device_id, relay_state) in triggers { + let msg = QuicMessage { + device_id, + sensor_id: 6, // Relay is always 6 in our industrial profile + raw_value: relay_state, + timestamp_us: now_us(), + sequence_number: 0, + sensor_type: SensorType::Relay.as_u8(), + }; + upsert_reading(&mut registry, &mut commands, &mut q, msg); + } +} + /// Per-sensor digital-twin transform. Pulls each entity's latest /// `RawSensorData` into a sliding-window mean (`SmoothedValue`), and emits