Air Quality Monitoring with RocksDB
Why measure air quality?
If put into a closed loop control system you can control what you measure for comfort and certain air conditions can be bad for your health and well-being.
-
Low and high temperatures are uncomfortable for hopefully obvious reasons.
-
Low humidity can speed up dehydration, even in Winter.
-
High humidity slows evaporative cooling, effectively raising the wet-bulb temperature or the temperature humans feel.
-
High humidity can make surfaces feel wet with moisture condensing on slightly cold surfaces.
-
If someone is smoking either nearby outside or nearby in an apartment building smoke can move between rooms which is unpleasant to smell.
-
Exposure to fine particles in the air like 2.5μm diameter particles which can be found in smoke can increase the risk of heart attack.
Exposure can also affect heart and lung function, worsening medical conditions like heart disease and asthma, and increase the risk for heart attacks. Scientific studies have linked increases in daily PM2.5 exposure with higher cardiovascular and respiratory hospital admissions, emergency department visits, and deaths. Studies also suggest that long-term exposure to fine particles causes increased mortality from heart disease and may be associated with increased rates of chronic bronchitis, reduced lung function, and lung cancer. People with heart and breathing problems, pregnant women, children, and older adults may be particularly sensitive to particle pollution.
-
For some reason no one can figure out </ s> extreme weather conditions are becoming more common and more extreme. Take for example the 2023 Canadian wildfires which have burned more acreage than any one of the last 40 years of records. These fires sent up so much smoke you can see in the Air Quality Index values for Wake County North Carolina which days the wind blew it down the East Coast.
-
There are also a number of studies into the effects of CO2 concentration on cognitive impairment. Some studies have used it as a proxy for volatile organic compounds (VOCs) while others have directly varied CO2 concentration. The results of these studies have been mixed but there's suggestion that concentrations of both can affect some aspects of cognition.
Can These All Be Controlled?
- Low and high temperatures: heating and cooling with A/C keeps this bounded.
- Low humidity:
- Humidifiers can raise the humidity during Winter when atmospheric air is not sufficient to maintain humidity in heated indoor air.
- Continuing to blow air over A/C cooling fins that have condensed water on them can re-release that as water vapor.
- High humidity: keeping the fan in an A/C unit on as little as possible during cooling lets condensed water drain from the system.
- Fine particles: these particles settle out of the air on their own indoors over a long period of time but an air filter can be used to pull these out of the air even faster.
- High CO2 concentration: as long as this concentration is lower than outdoor air an open window with a fan pushing air through it will quickly drop the concentration. However this also means that fine particles from outdoors will be suspended in the indoor air.
Measuring the Air
Hardware
- The sensors all of which are I2C devices.
- Adafruit SCD-30 - NDIR CO2 Temperature and Humidity Sensor
- Since this is our slowest sensor and can only sample once every 2 seconds at its fastest this is our limiting factor on sampling rate. This application will use a sampling rate of once every 10 seconds.
- Adafruit PMSA003I Air Quality Breakout
- Adafruit Sensirion SHT40 Temperature & Humidity Sensor
- Why did I use this when the CO2 sensor already records temperature and humidity? Because I ordered it a while ago for another project.
- Adafruit SCD-30 - NDIR CO2 Temperature and Humidity Sensor
- Raspberry Pi 3B+
- Note that this data is being recorded on the same microSD card as the one the OS is booting off of. microSD cards have a reputation for not being reliable for write-heavy workloads over time.
- All the I2C devices have their headers soldered on, are plugged into a solderless breadboard and are daisy-chained together with a ribbon cable up to the I2C pins of the Raspbery Pi's I/O header.
Software
- Base image is built with Raspberry Pi Bake
- This builds a Raspberry Pi system image in an emulator so that a disk image is created automatically with all the necessary packages and configuration settings so that nothing has to be done on the physical Raspberry Pi apart from plugging in a microSD card with the image burned to it.
- The most important steps taken here for this discussion is uninstalling any pre-installed Docker tools, setting up the correct Docker tool set (which is not trivial) including Docker Compose and enabling I2C with
sudo raspi-config nonint do_i2c 0
.
- Program
- Written in Rust, adapted from an earlier Python program that sampled humidity and controller a humidifier
- Main dependencies
actix_web
a component of Actix: to set it up as a web serverhandlebars
a Rust port of Handlebars.js: to make HTML templatesi2c-linux
andudev
: to read from I2C devicesrocksdb
a Rust wrapper for RocksDB: to store and retrieve data
- Uses Docker Compose
- Cross-compiled on an x86-64 computer and
debian:bullseye
in Docker - Image contents are carefully copied to a
arm32v7/debian:bullseye
image- Since I'm running an arm32v7 image on an x86-64 machine at this point no executables will work.
- All moves from the x86-64 image to the arm32v7 image can only use the
COPY --from=build
syntax and directories can't and don't need to be made withmkdir -p
. - Any dependencies like
libatomic.so
have to be copied over as well.
- The image is uploaded to Docker Hub
docker-compose
with theDOCKER_HOST
variable set to the target Raspberry Pi is then used to configure and bring up the Docker image as a container
- Cross-compiled on an x86-64 computer and
The Database
Why not SQL?
All the following critiques apply to PostgreSQL. I haven't verified that they apply for every flavor of SQL.
What I think of as "type arithmetic" is handled in a way that is unfriendly for object-oriented code. For example, using a LEFT JOIN
on two tables can result in a field that is NULL
for two distinct reasons if one of the original columns in the right-hand table had a nullable column. You can check which one it is by also including the right-hand table's primary key in the query and checking whether that primary key was also NULL
. For example in PostgreSQL:
CREATE TABLE "user" (
id BIGSERIAL PRIMARY KEY,
handle TEXT NOT NULL
);
CREATE TABLE "email" (
id BIGSERIAL PRIMARY KEY,
user_id BIGSERIAL NOT NULL,
email TEXT NOT NULL,
display_name TEXT, -- Note that this is allowed to be NULL.
CONSTRAINT fk_user_id
FOREIGN KEY(user_id)
REFERENCES "user"(id)
);
INSERT INTO "user"(handle) VALUES ('alpha');
INSERT INTO "user"(handle) VALUES ('beta');
INSERT INTO "user"(handle) VALUES ('gamma');
SELECT * FROM "user";
INSERT INTO "email"(user_id, email, display_name) VALUES (1, 'alpha@example.com', 'Dr. Alpha');
INSERT INTO "email"(user_id, email) VALUES (2, 'beta@example.com');
SELECT
"user".id,
"user".handle,
"email".id,
"email".email,
"email".display_name,
"email".id IS NOT NULL as "has_email"
FROM "user"
LEFT JOIN "email"
ON "user".id = "email".user_id
;
ultimately yields:
id | handle | id | email | display_name | has_email
----+--------+----+-------------------+--------------+-----------
1 | alpha | 1 | alpha@example.com | Dr. Alpha | t
2 | beta | 2 | beta@example.com | | t
3 | gamma | | | | f
(3 rows)
Want to build a tree structure? It's possible with:
- Nested Sets: But these touch on average half the fields in a given tree on an insertion or deletion.
- Adjacency List: Using a foreign key reference to the id of the parent relation. But the obvious approach here has the client manually traversing the tree one query at a time.
WITH RECURSIVE
: But have fun writing, maintaining and generally understanding that.- PostgreSQL extension
ltree
: This is the closest to a good approach if unusual as it has every relation store what is essentially an absolute path from the root of a tree to that relation's place in the tree. So moving a subtree (if you ever wanted to do that) would require touching every relation in the subtree.- Note that in Rust
diesel
has an extensiondiesel_ltree
which allows handling ofLtree
-typed columns. And while table self-joins aren't trivial (which you want to do to avoid an additional query round trip if you're trying to for example find the children of a relation based on the parent's id) you can use aliases to do so.
- Note that in Rust
Want to build a graph outside the set of graphs which are trees? You can encode one with adjacencies and query it one node at a time but I'm not aware of a great way to deal with this better than that.
Query return types are ad-hoc:
-
Rust's
diesel
helps prevent type skew but migrations must be managed through its tools. -
JOIN
operations generate new type signatures (ever had to disambiguate theid
field in aJOIN
?) -
As an example for how it's integrated into other software Qt examples use a
QVariant
to fetch fields positionally. This can silently fail to convert to a type C++ can use and pick some default value.QVariant::typeId
can be used to optionally check the type beforehand.QSqlQuery query("SELECT country FROM artist"); while (query.next()) { QString country = query.value(0).toString(); doSomething(country); }
- Rust's
diesel
doesn't have this issue at the source-code level as it knows the resulting type of it's equivalent of aSELECT
statement.
- Rust's
SQL uses its own basic types which can have an impedance mismatch with language basic types. And it can have more minor impedance mismatches with itself. For example, using PostgreSQL's BIGSERIAL
and want to have a foreign key point to it only sometimes? BIGSERIAL NULL
doesn't work, BIGINT NULL
does.
You always use prepared statements for custom queries, right? You're not passing unchecked input directly to any SQL interpreter, right? You're not going to have a problem with Robert'); DROP TABLE Students;--
, right?
Why RocksDB?
I'm not claiming RocksDB is the best database ever or even the best in its niche. I'm presenting it as a design that I think has good usability.
It's a key-value store. Conceptually it's the equivalent of a C++ std::map
or a Qt QMap
or a Rust std::collections::BTreeMap
. But writes are durable and can be wrapped in transactions.
RocksDB doesn't have its own types. All keys and values are simply byte arrays and have no concept of types. This means that you don't have to worry about whether or not your program type converts 1-to-1 with a type in your database engine. All entries are conceptually stored in lexical or dictionary order based on these byte arrays.
However, this does mean that the application is responsible for all of its own serialization and deserialization. Because keys are lexically sorted your key representation should explicitly do things like store integers big-endian to ensure a consistent increasing order during range scans. Thus resolving the holy war, unless lexical order for you is right-to-left 🤔. I'll go over an example of how you would construct a key to specify a specific ordering in Rust so that your keys also act as the SQL equivalent of an index for fast lookup. This also means that if you need a collation order that isn't what a UTF-8 encoding gives you by default you need to do this for RocksDB.
But it does have some traditional database features: as I've already mentioned it has transactions, snapshot isolation and it has column families which mimic the separation of columns into tables but also aren't strictly necessary but they let you set performance options specific to each column family.
There are other things which are good to know about RocksDB in contrast to other database technologies such as it being an embedded database and that it uses log-structured merge-trees and more which are relevant to performance optimization but are out of scope for this discussion.
Example Usage of RocksDB
This simple example is enough to explain all the important details of actually using RocksDB. If you understand this section then you should know enough to start building an application on top of it.
This example uses the following convention:
- There is one string tag
name(id)
which we can refer to as "name of id"
- Keys which can be constructed from this string tag
name(id)\0
followed by some number of bytes. This is the tag followed by a byte with a value of 0 followed by some number of bytes that encodes theid
as an argument. For this example it will always be 8 bytes to encodeid
as au64
.- Note that strings in Rust don't generally end with
NULL
terminators as they do in C and C++, including the strings used here. The\0
used here should really be thought of as a byte that will never show up in the tag string and can be reliably used to mark the end of the tag part and the beginning of the byte-encoded arguments if used as a consistent convention for all keys.
- Note that strings in Rust don't generally end with
name(id)\1
. This is the tag followed by a byte with a value of 1 followed by no bytes.- Note that
\1
isn't a valid escape character in Rust and is written this way as shorthand. name(id)\1
will always lexically sort after every byte string starting withname(id)\0
so it can be used as an exclusive end of the range.
- Note that
This convention ensures that when a range scan is performed it can start at the first name(id)\0
key and read every key containing that tag in-order until it reaches the end of the key-value store or until it encounters a key that isn't using that same tag. Or put another way, we're searching over keys that would appear in a dictionary in the range [name(id)\0
, name(id)\1
).
use rocksdb::{Direction, ErrorKind, IteratorMode, OptimisticTransactionDB, Options, DB};
use std::str::{from_utf8, Utf8Error};
const DB_PATH: &str = "/tmp/rdb";
const TAG_NAME_FROM_ID: &str = "name(id)";
struct User {
id: u64,
name: String,
}
impl User {
fn new(id: u64, name: &str) -> Self {
Self {
id,
name: name.to_string(),
}
}
// `name(id)\0` followed by a big-endian encoded id.
fn tag_name_of_id(id: u64) -> Vec<u8> {
[
TAG_NAME_FROM_ID.as_bytes(),
[0 as u8].as_slice(),
id.to_be_bytes().as_slice(),
]
.concat()
}
// `name(id)\1`
fn terminator_name_of_id() -> Vec<u8> {
[TAG_NAME_FROM_ID.as_bytes(), [1 as u8].as_slice()].concat()
}
// Either a string or a UTf-8 encoding error.
fn from_value_name(bytes: Vec<u8>) -> Result<String, Utf8Error> {
Ok(from_utf8(bytes.as_slice())?.to_string())
}
// `name(id)\0` followed by this object's big-endian encoded id.
fn key_name_of_id(&self) -> Vec<u8> {
Self::tag_name_of_id(self.id)
}
// A byte array of the name encoded in UTF-8.
fn value_name(&self) -> Vec<u8> {
self.name.as_bytes().to_vec()
}
}
fn main() {
{
// Create the database directory and files.
let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(DB_PATH).unwrap();
// Write user names.
'write_txn: loop {
let txn = db.transaction();
// Note that names are not in id order.
let user_name_list = vec![
User::new(8, "ζ"),
User::new(5, "ε"),
User::new(21, "θ"),
User::new(13, "η"),
User::new(1, "β"),
User::new(1, "α"),
User::new(3, "δ"),
User::new(2, "γ"),
];
for user_name in user_name_list {
txn.put(user_name.key_name_of_id(), user_name.value_name())
.unwrap();
}
// Commit the transaction.
if let Err(e) = txn.commit() {
if ErrorKind::Busy != e.kind() {
panic!("Can't commit transaction: {:?}", e);
} else {
// TODO: exponential backoff.
}
} else {
break 'write_txn;
}
}
// Read user names.
'read_txn: loop {
let txn = db.transaction();
let iter = txn.iterator(IteratorMode::From(
&User::tag_name_of_id(0),
Direction::Forward,
));
let iter_end = User::terminator_name_of_id();
'range_scan: for i in iter {
let (key, value) = i.unwrap();
if !(*key < *iter_end) {
break 'range_scan;
}
println!(
"key={:?} value={}",
key,
User::from_value_name(value.to_vec()).unwrap()
);
}
// Commit the transaction.
if let Err(e) = txn.commit() {
if ErrorKind::Busy != e.kind() {
panic!("Can't commit transaction: {:?}", e);
} else {
// TODO: exponential backoff.
}
} else {
break 'read_txn;
}
}
// Drop the database handle.
}
DB::destroy(&Options::default(), DB_PATH).unwrap(); // Delete the database directory and files.
}
And this is the output. Note that name(id)\0
encodes to bytes as [ 98, 111, 100, 121, 40, 105, 100, 41, 0 ]
in either UTF-8 or ASCII. Also note that there are 8 bytes following it encoding the id.
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 1] value=α
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 2] value=γ
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 3] value=δ
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 5] value=ε
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 8] value=ζ
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 13] value=η
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 21] value=θ
Also note that the values are reported in order even though they were put
into the database out of order which is expected given that the database is conceptually a lexically sorted map. If you paid attention to the ids or know your Greek alphabet you might have noticed that the β
user name is missing. And that's because the α
user name was put
into the database after it with the same id effectively replacing the value at that key. If you want to make sure the key didn't exist already you could check it with a get
first.
You can get the complete code for this example here and run it on your machine with cargo run --release
.
Logging and Reporting the Data
Given that we're running 3 different sensors there's a fair bit of information to log in even one sample. And since RocksDB doesn't hold our hand when creating a database with a schema or even require that we have a schema at all I'll impose one for organizational reasons. In the image below you have a collection of tags which when paired with an id
become a key that maps to a measured value from either the CO2 sensor, the air quality sensor, the humidity sensor or the system timestamp from immediately after the acquisitions.
In the previous example there wasn't a need to fetch the id
field, it simply ordered the keys. But for this application since we want to make every series individually retrievable through an XMLHttpRequest
in order to initialize a web page with the last full hour or day of samples it's convenient to figure out the last hour or day of id
s and return that. So instead of range scanning via a key ordered by id
s the application constructs the id(timestamp)
key that would have been generated exactly 24 hours ago in the case of getting data over the last day.
With that id(timestamp)
constructed the application scans through the database until it sees the last key matching that tag. Once it's done it can report those id
s to the client as a JSON response body. The client can then use that list of id
s to request all of the time series' values it's interested in. actix_web
will handle each of these web requests and return the series value for that id
also as a JSON response body.
If you're curious the timestamp is a Unix timestamp with nanosecond precision converted stored in a u128
.
Updating the Client on New Data
But there's one aspect I left out. What if I left the page open over the whole period the page reported data for? So if I opened the page that reported the last hour but left it for an additional hour that data will be stale.
To address this (and so that I don't have to constantly reload the page to see that the CO2 concentration has risen a little bit over the last 20 minutes) every time an acquisition is performed each sample is sent out to the web client via WebSocket. How Actix Web and base Actix with its actor model help us do that is a bit out-of-scope for this discussion and how the web client does so is well out-of-scope.