Air Quality Monitoring with RocksDB

2023-08-02T18:00:00.000Z

Why measure air quality?

If put into a closed loop control system you can control what you measure for comfort and certain air conditions can be bad for your health and well-being.

Can These All Be Controlled?

Measuring the Air

Hardware

Software

The Database

Why not SQL?

All the following critiques apply to PostgreSQL. I haven't verified that they apply for every flavor of SQL.

What I think of as "type arithmetic" is handled in a way that is unfriendly for object-oriented code. For example, using a LEFT JOIN on two tables can result in a field that is NULL for two distinct reasons if one of the original columns in the right-hand table had a nullable column. You can check which one it is by also including the right-hand table's primary key in the query and checking whether that primary key was also NULL. For example in PostgreSQL:

CREATE TABLE "user" (
    id BIGSERIAL PRIMARY KEY,
    handle TEXT NOT NULL
);

CREATE TABLE "email" (
    id BIGSERIAL PRIMARY KEY,
    user_id BIGSERIAL NOT NULL,
    email TEXT NOT NULL,
    display_name TEXT, -- Note that this is allowed to be NULL.
    CONSTRAINT fk_user_id
        FOREIGN KEY(user_id)
        REFERENCES "user"(id)
);

INSERT INTO "user"(handle) VALUES ('alpha');
INSERT INTO "user"(handle) VALUES ('beta');
INSERT INTO "user"(handle) VALUES ('gamma');

SELECT * FROM "user";

INSERT INTO "email"(user_id, email, display_name) VALUES (1, 'alpha@example.com', 'Dr. Alpha');
INSERT INTO "email"(user_id, email)               VALUES (2, 'beta@example.com');

SELECT
    "user".id,
    "user".handle,
    "email".id,
    "email".email,
    "email".display_name,
    "email".id IS NOT NULL as "has_email"
    FROM "user"
    LEFT JOIN "email"
    ON "user".id = "email".user_id
;

ultimately yields:

 id | handle | id |       email       | display_name | has_email
----+--------+----+-------------------+--------------+-----------
  1 | alpha  |  1 | alpha@example.com | Dr. Alpha    | t
  2 | beta   |  2 | beta@example.com  |              | t
  3 | gamma  |    |                   |              | f
(3 rows)

Want to build a tree structure? It's possible with:

Want to build a graph outside the set of graphs which are trees? You can encode one with adjacencies and query it one node at a time but I'm not aware of a great way to deal with this better than that.

Query return types are ad-hoc:

SQL uses its own basic types which can have an impedance mismatch with language basic types. And it can have more minor impedance mismatches with itself. For example, using PostgreSQL's BIGSERIAL and want to have a foreign key point to it only sometimes? BIGSERIAL NULL doesn't work, BIGINT NULL does.

You always use prepared statements for custom queries, right? You're not passing unchecked input directly to any SQL interpreter, right? You're not going to have a problem with Robert'); DROP TABLE Students;--, right?

Why RocksDB?

I'm not claiming RocksDB is the best database ever or even the best in its niche. I'm presenting it as a design that I think has good usability.

It's a key-value store. Conceptually it's the equivalent of a C++ std::map or a Qt QMap or a Rust std::collections::BTreeMap. But writes are durable and can be wrapped in transactions.

RocksDB doesn't have its own types. All keys and values are simply byte arrays and have no concept of types. This means that you don't have to worry about whether or not your program type converts 1-to-1 with a type in your database engine. All entries are conceptually stored in lexical or dictionary order based on these byte arrays.

However, this does mean that the application is responsible for all of its own serialization and deserialization. Because keys are lexically sorted your key representation should explicitly do things like store integers big-endian to ensure a consistent increasing order during range scans. Thus resolving the holy war, unless lexical order for you is right-to-left 🤔. I'll go over an example of how you would construct a key to specify a specific ordering in Rust so that your keys also act as the SQL equivalent of an index for fast lookup. This also means that if you need a collation order that isn't what a UTF-8 encoding gives you by default you need to do this for RocksDB.

But it does have some traditional database features: as I've already mentioned it has transactions, snapshot isolation and it has column families which mimic the separation of columns into tables but also aren't strictly necessary but they let you set performance options specific to each column family.

There are other things which are good to know about RocksDB in contrast to other database technologies such as it being an embedded database and that it uses log-structured merge-trees and more which are relevant to performance optimization but are out of scope for this discussion.

Example Usage of RocksDB

This simple example is enough to explain all the important details of actually using RocksDB. If you understand this section then you should know enough to start building an application on top of it.

This example uses the following convention:

This convention ensures that when a range scan is performed it can start at the first name(id)\0 key and read every key containing that tag in-order until it reaches the end of the key-value store or until it encounters a key that isn't using that same tag. Or put another way, we're searching over keys that would appear in a dictionary in the range [name(id)\0, name(id)\1).

use rocksdb::{Direction, ErrorKind, IteratorMode, OptimisticTransactionDB, Options, DB};
use std::str::{from_utf8, Utf8Error};

const DB_PATH: &str = "/tmp/rdb";
const TAG_NAME_FROM_ID: &str = "name(id)";

struct User {
    id: u64,
    name: String,
}

impl User {
    fn new(id: u64, name: &str) -> Self {
        Self {
            id,
            name: name.to_string(),
        }
    }

    // `name(id)\0` followed by a big-endian encoded id.
    fn tag_name_of_id(id: u64) -> Vec<u8> {
        [
            TAG_NAME_FROM_ID.as_bytes(),
            [0 as u8].as_slice(),
            id.to_be_bytes().as_slice(),
        ]
        .concat()
    }

    // `name(id)\1`
    fn terminator_name_of_id() -> Vec<u8> {
        [TAG_NAME_FROM_ID.as_bytes(), [1 as u8].as_slice()].concat()
    }

    // Either a string or a UTf-8 encoding error.
    fn from_value_name(bytes: Vec<u8>) -> Result<String, Utf8Error> {
        Ok(from_utf8(bytes.as_slice())?.to_string())
    }

    // `name(id)\0` followed by this object's big-endian encoded id.
    fn key_name_of_id(&self) -> Vec<u8> {
        Self::tag_name_of_id(self.id)
    }

    // A byte array of the name encoded in UTF-8.
    fn value_name(&self) -> Vec<u8> {
        self.name.as_bytes().to_vec()
    }
}

fn main() {
    {
        // Create the database directory and files.
        let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(DB_PATH).unwrap();

        // Write user names.
        'write_txn: loop {
            let txn = db.transaction();

            // Note that names are not in id order.
            let user_name_list = vec![
                User::new(8, "ζ"),
                User::new(5, "ε"),
                User::new(21, "θ"),
                User::new(13, "η"),
                User::new(1, "β"),
                User::new(1, "α"),
                User::new(3, "δ"),
                User::new(2, "γ"),
            ];

            for user_name in user_name_list {
                txn.put(user_name.key_name_of_id(), user_name.value_name())
                    .unwrap();
            }

            // Commit the transaction.
            if let Err(e) = txn.commit() {
                if ErrorKind::Busy != e.kind() {
                    panic!("Can't commit transaction: {:?}", e);
                } else {
                    // TODO: exponential backoff.
                }
            } else {
                break 'write_txn;
            }
        }

        // Read user names.
        'read_txn: loop {
            let txn = db.transaction();

            let iter = txn.iterator(IteratorMode::From(
                &User::tag_name_of_id(0),
                Direction::Forward,
            ));
            let iter_end = User::terminator_name_of_id();

            'range_scan: for i in iter {
                let (key, value) = i.unwrap();
                if !(*key < *iter_end) {
                    break 'range_scan;
                }

                println!(
                    "key={:?} value={}",
                    key,
                    User::from_value_name(value.to_vec()).unwrap()
                );
            }

            // Commit the transaction.
            if let Err(e) = txn.commit() {
                if ErrorKind::Busy != e.kind() {
                    panic!("Can't commit transaction: {:?}", e);
                } else {
                    // TODO: exponential backoff.
                }
            } else {
                break 'read_txn;
            }
        }

        // Drop the database handle.
    }
    DB::destroy(&Options::default(), DB_PATH).unwrap(); // Delete the database directory and files.
}

And this is the output. Note that name(id)\0 encodes to bytes as [ 98, 111, 100, 121, 40, 105, 100, 41, 0 ] in either UTF-8 or ASCII. Also note that there are 8 bytes following it encoding the id.

key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 1] value=α
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 2] value=γ
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 3] value=δ
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 5] value=ε
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 8] value=ζ
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 13] value=η
key=[98, 111, 100, 121, 40, 105, 100, 41, 0, 0, 0, 0, 0, 0, 0, 0, 21] value=θ

Also note that the values are reported in order even though they were put into the database out of order which is expected given that the database is conceptually a lexically sorted map. If you paid attention to the ids or know your Greek alphabet you might have noticed that the β user name is missing. And that's because the α user name was put into the database after it with the same id effectively replacing the value at that key. If you want to make sure the key didn't exist already you could check it with a get first.

You can get the complete code for this example here and run it on your machine with cargo run --release.

Logging and Reporting the Data

Given that we're running 3 different sensors there's a fair bit of information to log in even one sample. And since RocksDB doesn't hold our hand when creating a database with a schema or even require that we have a schema at all I'll impose one for organizational reasons. In the image below you have a collection of tags which when paired with an id become a key that maps to a measured value from either the CO2 sensor, the air quality sensor, the humidity sensor or the system timestamp from immediately after the acquisitions.

In the previous example there wasn't a need to fetch the id field, it simply ordered the keys. But for this application since we want to make every series individually retrievable through an XMLHttpRequest in order to initialize a web page with the last full hour or day of samples it's convenient to figure out the last hour or day of ids and return that. So instead of range scanning via a key ordered by ids the application constructs the id(timestamp) key that would have been generated exactly 24 hours ago in the case of getting data over the last day.

With that id(timestamp) constructed the application scans through the database until it sees the last key matching that tag. Once it's done it can report those ids to the client as a JSON response body. The client can then use that list of ids to request all of the time series' values it's interested in. actix_web will handle each of these web requests and return the series value for that id also as a JSON response body.

If you're curious the timestamp is a Unix timestamp with nanosecond precision converted stored in a u128.

todo id_last id_last id id id_last->id id_timestamp id(timestamp) id_timestamp->id carbon_dioxide_co2_concentration_ppm_id carbon_dioxide.co2_concentration_ppm(id) carbon_dioxide_co2_concentration_ppm carbon_dioxide.co2_concentration_ppm carbon_dioxide_co2_concentration_ppm_id->carbon_dioxide_co2_concentration_ppm carbon_dioxide_temperature_celsius_id carbon_dioxide.temperature_celsius(id) carbon_dioxide_temperature_celsius carbon_dioxide.temperature_celsius carbon_dioxide_temperature_celsius_id->carbon_dioxide_temperature_celsius carbon_dioxide_relative_humidity_id carbon_dioxide.relative_humidity(id) carbon_dioxide_relative_humidity carbon_dioxide.relative_humidity carbon_dioxide_relative_humidity_id->carbon_dioxide_relative_humidity particulate_standard_particle_pm1p0_id particulate.standard_particle.pm1p0(id) particulate_standard_particle_pm1p0 particulate.standard_particle.pm1p0 particulate_standard_particle_pm1p0_id->particulate_standard_particle_pm1p0 particulate_standard_particle_pm2p5_id particulate.standard_particle.pm2p5(id) particulate_standard_particle_pm2p5 particulate.standard_particle.pm2p5 particulate_standard_particle_pm2p5_id->particulate_standard_particle_pm2p5 particulate_standard_particle_pm10p_id particulate.standard_particle.pm10p(id) particulate_standard_particle_pm10p particulate.standard_particle.pm10p particulate_standard_particle_pm10p_id->particulate_standard_particle_pm10p particulate_atmospheric_environment_pm1p0_id particulate.atmospheric_environment.pm1p0(id) particulate_atmospheric_environment_pm1p0 particulate.atmospheric_environment.pm1p0 particulate_atmospheric_environment_pm1p0_id->particulate_atmospheric_environment_pm1p0 particulate_atmospheric_environment_pm2p5_id particulate.atmospheric_environment.pm2p5(id) particulate_atmospheric_environment_pm2p5 particulate.atmospheric_environment.pm2p5 particulate_atmospheric_environment_pm2p5_id->particulate_atmospheric_environment_pm2p5 particulate_atmospheric_environment_pm10p_id particulate.atmospheric_environment.pm10p(id) particulate_atmospheric_environment_pm10p particulate.atmospheric_environment.pm10p particulate_atmospheric_environment_pm10p_id->particulate_atmospheric_environment_pm10p particulate_particle_count_pm0p3_id particulate.particle_count.pm0p3(id) particulate_particle_count_pm0p3 particulate.particle_count.pm0p3 particulate_particle_count_pm0p3_id->particulate_particle_count_pm0p3 particulate_particle_count_pm0p5_id particulate.particle_count.pm0p5(id) particulate_particle_count_pm0p5 particulate.particle_count.pm0p5 particulate_particle_count_pm0p5_id->particulate_particle_count_pm0p5 particulate_particle_count_pm1p0_id particulate.particle_count.pm1p0(id) particulate_particle_count_pm1p0 particulate.particle_count.pm1p0 particulate_particle_count_pm1p0_id->particulate_particle_count_pm1p0 particulate_particle_count_pm2p5_id particulate.particle_count.pm2p5(id) particulate_particle_count_pm2p5 particulate.particle_count.pm2p5 particulate_particle_count_pm2p5_id->particulate_particle_count_pm2p5 particulate_particle_count_pm5p0_id particulate.particle_count.pm5p0(id) particulate_particle_count_pm5p0 particulate.particle_count.pm5p0 particulate_particle_count_pm5p0_id->particulate_particle_count_pm5p0 particulate_particle_count_pm10p_id particulate.particle_count.pm10p(id) particulate_particle_count_pm10p particulate.particle_count.pm10p particulate_particle_count_pm10p_id->particulate_particle_count_pm10p hygrometer_temperature_fahrenheit_id hygrometer.temperature_fahrenheit(id) hygrometer_temperature_fahrenheit hygrometer.temperature_fahrenheit hygrometer_temperature_fahrenheit_id->hygrometer_temperature_fahrenheit hygrometer_relative_humidity_percent_id hygrometer.relative_humidity_percent(id) hygrometer_relative_humidity_percent hygrometer.relative_humidity_percent hygrometer_relative_humidity_percent_id->hygrometer_relative_humidity_percent timestamp_id timestamp(id) timestamp timestamp timestamp_id->timestamp id->carbon_dioxide_co2_concentration_ppm_id id->carbon_dioxide_temperature_celsius_id id->carbon_dioxide_relative_humidity_id id->particulate_standard_particle_pm1p0_id id->particulate_standard_particle_pm2p5_id id->particulate_standard_particle_pm10p_id id->particulate_atmospheric_environment_pm1p0_id id->particulate_atmospheric_environment_pm2p5_id id->particulate_atmospheric_environment_pm10p_id id->particulate_particle_count_pm0p3_id id->particulate_particle_count_pm0p5_id id->particulate_particle_count_pm1p0_id id->particulate_particle_count_pm2p5_id id->particulate_particle_count_pm5p0_id id->particulate_particle_count_pm10p_id id->hygrometer_temperature_fahrenheit_id id->hygrometer_relative_humidity_percent_id id->timestamp_id timestamp->id_timestamp

Updating the Client on New Data

But there's one aspect I left out. What if I left the page open over the whole period the page reported data for? So if I opened the page that reported the last hour but left it for an additional hour that data will be stale.

To address this (and so that I don't have to constantly reload the page to see that the CO2 concentration has risen a little bit over the last 20 minutes) every time an acquisition is performed each sample is sent out to the web client via WebSocket. How Actix Web and base Actix with its actor model help us do that is a bit out-of-scope for this discussion and how the web client does so is well out-of-scope.