Files
quic_ecs_dt/substrate/src/transport/ecs.rs
2026-05-13 15:03:23 -04:00

154 lines
5.9 KiB
Rust

use std::sync::Mutex;
use bevy::prelude::*;
use bevy::state::app::StatesPlugin;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use crate::config::AppConfig;
use crate::transport::{OutboundT3, QuicMessage, T1Sender, T2Sender, T3OutboundSender};
use crate::transport::server::{ConnectionRegistry, accept_loop, bind_endpoint, new_connection_registry};
use crate::transport::state::ServerState;
pub struct EcsQuicTransportPlugin;
/// Receive halves of the inbound tier channels (T1 datagrams, T2 uni
/// streams). The `world` module's ingest system is the sole reader.
/// T3 is substrate-initiated and lives on the tokio side via the outbound
/// drain task — no inbound T3 receiver exists here.
#[derive(Resource)]
pub(crate) struct BridgeReceivers {
pub(crate) t1: Mutex<mpsc::Receiver<QuicMessage>>,
pub(crate) t2: Mutex<mpsc::Receiver<QuicMessage>>,
}
#[derive(Resource, Clone)]
pub(crate) struct BridgeSenders {
pub(crate) t1: T1Sender,
pub(crate) t2: T2Sender,
/// Outbound actuator-command sender — `automation_system` enqueues
/// `OutboundT3` items here; the tokio drain task routes them to the
/// originating device's connection.
pub(crate) t3_out: T3OutboundSender,
}
/// Holds the receiver half of the outbound-T3 channel until the listener
/// starts, plus the connection registry and a sender clone for the optional
/// synthetic T3 driver. All pass into `accept_loop` once at the
/// `Starting → Started` transition.
#[derive(Resource)]
pub(crate) struct OutboundT3Plumbing {
pub(crate) rx: Mutex<Option<mpsc::Receiver<OutboundT3>>>,
pub(crate) tx: mpsc::Sender<OutboundT3>,
pub(crate) registry: ConnectionRegistry,
}
#[derive(Resource, Clone)]
pub(crate) struct TokioHandle(pub(crate) Handle);
/// Bring up the QUIC listener using the loaded `AppConfig` and transition to
/// `ServerState::Started`. Runs once via `OnEnter(ServerState::Starting)`.
fn start_quic_server(
config: Res<AppConfig>,
senders: Res<BridgeSenders>,
runtime: Res<TokioHandle>,
outbound: Res<OutboundT3Plumbing>,
mut next: ResMut<NextState<ServerState>>,
) {
tracing::info!("entering ServerState::Starting — bringing up QUIC listener");
// `Endpoint::server` is sync but needs a tokio runtime context for
// `Handle::current()`; entering the runtime is enough — no async block
// required.
let _guard = runtime.0.enter();
let endpoint = bind_endpoint(&config.network).expect("failed to bind QUIC endpoint");
drop(_guard);
tracing::info!(local = ?endpoint.local_addr().ok(), "QUIC listener bound");
// Move the outbound receiver into the tokio side; accept_loop owns it for
// the rest of the listener's life. The registry is cloned (it's already an
// `Arc`) so the ECS-side resource can still observe the routes if needed.
let outbound_rx = outbound
.rx
.lock()
.unwrap()
.take()
.expect("OutboundT3 receiver consumed twice");
let outbound_tx = outbound.tx.clone();
let registry = outbound.registry.clone();
let synthetic_rate = config.network.synthetic_t3_rate_hz;
let s = senders.clone();
runtime.0.spawn(accept_loop(
endpoint,
s.t1,
s.t2,
registry,
outbound_rx,
outbound_tx,
synthetic_rate,
));
next.set(ServerState::Started);
tracing::info!("ServerState::Started");
}
impl Plugin for EcsQuicTransportPlugin {
fn build(&self, app: &mut App) {
let config = app.world_mut().resource::<AppConfig>();
// Inbound bridge: T1 datagrams + T2 uni streams from devices into the
// ECS PreUpdate ingest system (in the `world` module).
let (t1_tx, t1_rx) = mpsc::channel::<QuicMessage>(config.network.t1_capacity);
let (t2_tx, t2_rx) = mpsc::channel::<QuicMessage>(config.network.t2_capacity);
// Outbound-T3: substrate → device actuator-command path. Capacity
// budget tracks automation cadence, not per-sample throughput.
let (t3_out_tx, t3_out_rx) = mpsc::channel::<OutboundT3>(config.network.t3_capacity);
let registry = new_connection_registry();
// Spawn a tokio runtime on a dedicated OS thread, ship its Handle back
// to the ECS, and keep the runtime alive for the lifetime of the app
// by parking on `pending()`.
let (handle_tx, handle_rx) = std::sync::mpsc::sync_channel::<Handle>(1);
std::thread::Builder::new()
.name("quic-runtime".to_string())
.spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.thread_name("quic-worker")
.build()
.expect("build tokio runtime");
handle_tx
.send(rt.handle().clone())
.expect("send tokio Handle to ECS");
rt.block_on(std::future::pending::<()>());
})
.expect("spawn quic-runtime thread");
let handle = handle_rx.recv().expect("receive tokio Handle");
// Bevy 0.18 split state machinery into its own plugin; under
// MinimalPlugins it isn't installed by default.
app.add_plugins(StatesPlugin)
.init_state::<ServerState>()
.insert_resource(TokioHandle(handle))
.insert_resource(BridgeSenders {
t1: T1Sender::new(t1_tx),
t2: T2Sender::new(t2_tx),
t3_out: T3OutboundSender::new(t3_out_tx.clone()),
})
.insert_resource(BridgeReceivers {
t1: Mutex::new(t1_rx),
t2: Mutex::new(t2_rx),
})
.insert_resource(OutboundT3Plumbing {
rx: Mutex::new(Some(t3_out_rx)),
tx: t3_out_tx,
registry,
})
.add_systems(OnEnter(ServerState::Starting), start_quic_server);
}
}