Data and Persistence¶
OpenCrate uses SQLite for all persistence. Each domain store owns its own database file, runs SQL on a dedicated thread, and communicates with the rest of the system through channels. This document covers the store pattern, every store and its schema, the event bus, and the node model.
Store Pattern¶
Every store in OpenCrate follows the same architecture:
Caller --mpsc::Sender<Command>--> Dedicated Thread --SQLite--> data/*.db
^ |
| |
+------oneshot::Sender<Response>-------+
How it works:
- The store spawns a dedicated OS thread at startup.
- That thread opens a SQLite connection (WAL mode) and runs a receive loop.
- Callers send commands through an
mpsc::Sender<Command>. Each command includes aoneshot::Senderfor the response. - The store thread executes SQL, sends the result back through the oneshot channel.
- The caller awaits the oneshot receiver (async-compatible).
Why this pattern:
- SQLite performs best with a single writer. One thread per database eliminates contention.
- WAL mode allows concurrent readers without blocking the writer.
- The mpsc channel serializes writes naturally, no mutexes needed.
- The oneshot response channel integrates cleanly with async code.
Watch Channels¶
Many stores expose a tokio::sync::watch channel that broadcasts the latest configuration version. Subscribers (like the MqttPublisher or WebhookDispatcher) use this for hot reload -- when config changes, they pick up the new state without restart.
Subscribe Channels¶
Some stores expose a tokio::sync::broadcast channel for fine-grained change notifications. The ProgramStore uses this so the ExecutionEngine knows when a program was updated and needs recompilation.
All Stores¶
PointStore¶
- Storage: In-memory (
HashMapbehindArc<RwLock>) - No database file -- points are transient runtime values
- EventBus integration: Publishes
ValueChangedandStatusChangedevents - COV filtering:
set_if_changed()skips events when the value is unchanged (used by Modbus poll loop)
NodeStore¶
- Database:
data/nodes.db - Tables:
node,node_tag,node_ref,node_property,node_capability,protocol_binding - Hot cache:
Arc<RwLock<HashMap<NodeId, NodeSnapshot>>>for fast reads - Hierarchy queries: Recursive CTEs for subtree traversal
- EventBus: Publishes
EntityCreated,EntityUpdated,EntityDeleted
HistoryStore¶
- Database:
data/history.db - Tables: Trend data indexed by point ID and timestamp
- Rollup support: Pre-aggregated intervals (min, max, avg, sum)
AlarmStore¶
- Database:
data/alarms.db - Tables: Active alarms, alarm history
- EventBus: Publishes
AlarmRaised,AlarmCleared,AlarmAcknowledged - Flag sync: EventBus-driven (replaced old 3-second poll loop); stale check remains periodic (30s)
ScheduleStore¶
- Database:
data/schedules.db - Tables: Schedule definitions, schedule entries
- EventBus: Publishes
ScheduleWritten
EntityStore¶
- Database:
data/entities.db - Tables: Legacy entity records
- EventBus: Publishes
EntityCreated,EntityUpdated,EntityDeleted
DiscoveryStore¶
- Database:
data/discovery.db - Tables:
discovered_device,discovered_point - Migrations: v3 added
network_idcolumn, v4 addedobject_list_stalecolumn - Device states: Discovered, Accepted, Ignored
ProgramStore¶
- Database:
data/programs.db - Tables: Program definitions, execution log (auto-pruned to last 100 per program)
- Subscribe channel: Notifies ExecutionEngine of program changes
NotificationStore¶
- Database:
data/notifications.db - Tables:
recipient,routing_rule,alarm_shelving,notification_log - Watch channel: AlarmRouter hot-reloads routing rules on change
MqttStore¶
- Database:
data/mqtt.db - Tables:
mqtt_broker,mqtt_topic_pattern - Watch channel: MqttPublisher hot-reloads broker configs on change
CommissioningStore¶
- Database:
data/commissioning.db - Tables:
commission_session,commission_item - Session lifecycle: not_started -> in_progress -> completed -> signed_off
- Item lifecycle: not_started -> verified / failed / deferred
ReportStore¶
- Database:
data/reports.db - Tables:
report_definition,report_schedule,report_execution - Cascade deletes: Deleting a definition removes its schedules and execution history
EnergyStore¶
- Database:
data/energy.db - Tables:
utility_rate,energy_meter,energy_baseline,energy_rollup - Upsert on conflict: Partial rollups are updated in place when final data arrives
WebhookStore¶
- Database:
data/webhooks.db - Tables:
webhook_endpoint,webhook_delivery,webhook_config - Watch channel: WebhookDispatcher hot-reloads endpoint configs on change
FddStore¶
- Database:
data/fdd.db - Tables: Fault detection rules, fault instances
- EventBus: FddEngine publishes
FddFaultRaised,FddFaultCleared
ExportStore¶
- Database:
data/export.db - Tables: Export target configurations, export state tracking
- Watch channel: ExportPublisher hot-reloads target configs on change
UserStore¶
- Database:
data/users.db - Tables: Users, roles, password hashes
- Password storage: Argon2 hashing
AuditStore¶
- Database:
data/audit.db - Tables: Audit log entries (user, action, target, timestamp, detail)
- Write-only from application perspective: No updates or deletes, append-only log
OverrideStore¶
- Database:
data/overrides.db - Tables: Active overrides with optional expiry timestamps
Event Bus¶
The EventBus is a tokio::sync::broadcast channel with capacity 4096. Events are wrapped in Arc<Event> for zero-copy distribution to all subscribers.
All 18 Event Variants¶
| Event | Published By | Payload |
|---|---|---|
| ValueChanged | PointStore | node_id, value, timestamp |
| StatusChanged | PointStore | node_id, old_status, new_status |
| AlarmRaised | AlarmStore | alarm_id, node_id, severity, message |
| AlarmCleared | AlarmStore | alarm_id, node_id |
| AlarmAcknowledged | AlarmStore | alarm_id, user |
| ScheduleWritten | ScheduleStore | schedule_id, node_id, value |
| EntityCreated | EntityStore / NodeStore | entity_id, entity_type |
| EntityUpdated | EntityStore / NodeStore | entity_id, changed_fields |
| EntityDeleted | EntityStore / NodeStore | entity_id |
| DeviceDiscovered | DiscoveryService / Bridges | device_key, protocol, network_id |
| DeviceDown | Bridges (after backoff exhaustion) | device_key, protocol, error |
| DeviceRecovered | Bridges (on reconnect) | device_key, protocol |
| DeviceAccepted | DiscoveryService | device_key, created_node_ids |
| DiscoveryScanComplete | DiscoveryService | protocol, device_count, point_count |
| DeviceMonitorCycle | BACnet monitor loop | checked_count, online_count, offline_count |
| ObjectListChanged | BACnet monitor loop | device_key, old_count, new_count |
| FddFaultRaised | FddEngine | fault_id, rule_id, node_id, severity, message |
| FddFaultCleared | FddEngine | fault_id, rule_id, node_id |
Subscriber Wiring¶
Each subscriber calls event_bus.subscribe() to get a broadcast::Receiver<Arc<Event>>, then runs an async loop that matches on event variants it cares about:
EventBus (broadcast, capacity 4096)
|
+---> AlarmRouter [AlarmRaised, AlarmCleared, AlarmAcknowledged]
+---> MqttPublisher [ValueChanged, AlarmRaised, AlarmCleared, DeviceDown, DeviceDiscovered]
+---> WebhookDispatcher [AlarmRaised, AlarmCleared, AlarmAcknowledged, DeviceDown, DeviceRecovered, FddFaultRaised, FddFaultCleared]
+---> ExportPublisher [ValueChanged, AlarmRaised, AlarmCleared]
+---> FddEngine [ValueChanged]
+---> ExecutionEngine [ValueChanged (for OnChange-triggered programs)]
Backpressure¶
If a subscriber falls 4096 messages behind, the broadcast channel returns a Lagged error. Subscribers handle this by logging the lag count and continuing from the latest message. No events are queued indefinitely -- this is a deliberate design choice to prevent memory growth from slow consumers.
Node Model¶
The node model is a unified hierarchy that represents every physical and logical object in a building.
Node Structure¶
pub struct Node {
pub id: NodeId, // e.g., "1001/analog-input-1" or "1001"
pub node_type: NodeType, // Site, Space, Equip, Point, VirtualPoint
pub dis: String, // Display name
pub parent_id: Option<NodeId>,
pub value: Option<PointValue>,
pub status: PointStatus,
pub tags: HashSet<String>,
pub refs: HashMap<String, NodeId>,
pub properties: HashMap<String, serde_json::Value>,
pub capabilities: NodeCapabilities,
pub protocol_binding: Option<ProtocolBinding>,
}
Node Types¶
| Type | Description | Example |
|---|---|---|
| Site | A building or campus | "Main Office" |
| Space | A zone, floor, or room | "Floor 2", "Room 201" |
| Equip | A piece of equipment | "AHU-1", "Boiler-3" |
| Point | A physical I/O point bound to a protocol | "Supply Air Temp" |
| VirtualPoint | A calculated or software-only point | "Avg Zone Temp" |
NodeCapabilities¶
Boolean flags indicating what a node supports:
pub struct NodeCapabilities {
pub readable: bool,
pub writable: bool,
pub historizable: bool,
pub alarmable: bool,
pub schedulable: bool,
}
ProtocolBinding¶
How a node maps to a field device:
pub struct ProtocolBinding {
pub protocol: String, // "bacnet", "modbus", etc.
pub config: serde_json::Value, // Protocol-specific config
}
The config field is intentionally untyped JSON. This keeps the node model protocol-agnostic while allowing protocol-specific details:
BACnet binding config:
{
"device_instance": 1001,
"object_type": "analog-input",
"object_instance": 1,
"network_id": "ip-main"
}
Modbus binding config:
{
"device_id": "meter-1",
"register_type": "holding",
"address": 100,
"data_type": "float32",
"byte_order": "big-endian"
}
Convenience constructors ProtocolBinding::bacnet(), ::modbus(), and ::virtual_binding() build these for common cases. A custom Deserialize implementation handles the legacy tagged-enum format for backward compatibility.
Node ID Convention¶
- Points:
"{device_instance_id}/{point_id}"(e.g.,"1001/analog-input-1") - Equipment:
"{device_instance_id}"(e.g.,"1001") - Sites and Spaces: user-defined string IDs
Hierarchy¶
Nodes form a tree through parent_id. The NodeStore supports recursive CTE queries to fetch entire subtrees efficiently. A typical hierarchy:
Site: "Main Office"
Space: "Floor 1"
Equip: "AHU-1" (device 1001)
Point: "1001/analog-input-1" (Supply Air Temp)
Point: "1001/analog-input-2" (Return Air Temp)
Point: "1001/analog-output-1" (Damper Position)
Equip: "VAV-101" (device 1002)
Point: "1002/analog-input-1" (Zone Temp)
Point: "1002/analog-output-1" (Airflow Setpoint)
Space: "Floor 2"
...
Auto-Tagging¶
When nodes are created from scenario configs or accepted from discovery, suggest_equip_tags() and suggest_point_tags() apply heuristic tags based on names and object types (e.g., an object named "Supply Air Temp" gets tags air, temp, sensor, supply). These tags drive alarm routing rules, report filters, and energy meter associations.