April 30, 2015Infra · Data · Backend · Security · Framework · Analytics · Storage · Open Source

How RocksDB is used in osquery

Mike ArpaiaTed Reed

Osquery and RocksDB are two open source projects whose development is supported by teams at Facebook. As members of the team that works on osquery, we love being able to take advantage of great open source software whenever we can. This article is going to walk through how we use RocksDB in osquery. Using RocksDB as osquery's embedded database allows osquery to store and access data in a fast, persistent way, enabling our team to solve some technical problems we'll detail in this blog.

What is osquery?

Osquery is an operating system instrumentation framework for OS X and Linux. The tools that osquery provides make low-level operating system analytics and monitoring both performant and intuitive. Osquery exposes an operating system as a high-performance relational database, making specialized expertise no longer needed to run queries. This allows you to write SQL-based queries to explore operating system data. With osquery, SQL tables represent abstract concepts such as running processes, loaded kernel modules, open network connections, browser plugins, hardware events, and file hashes.

A high-performance, low-footprint host-monitoring daemon, called osqueryd, allows you to schedule queries to be executed across your entire infrastructure. The daemon takes care of aggregating the query results over time and generates logs that indicate state changes in your infrastructure. You can use this to maintain insight into the security, performance, configuration, and state of your entire infrastructure. Osqueryd's logging can integrate right into an organization's internal log aggregation pipeline, regardless of the organization's technology stack, via a robust plugin architecture.

(For more background on osquery, see the introductory talk from the Security @ Scale 2015 conference, as well as the launch blog post.)

What is RocksDB?

RocksDB is an embeddable persistent key-value store for local storage. RocksDB can be used as the foundation for a more traditional, client-server database, but its primary focus is its use as an embeddable storage mechanism. RocksDB is built to be scalable to run on servers with many CPU cores; to efficiently use fast storage; to support IO-bound, in-memory and write-once workloads; and to be flexible to allow for innovation.

(For more background on RocksDB, see the introductory talk from the Data @ Scale 2013 conference.)

How does osquery use RocksDB?

Osquery uses RocksDB extensively, throughout its operations as an embedded datastore. In this article, we'll focus on what we think is the most technically interesting use-case. This is our event-based pub/sub system. For brevity's sake, we'll refer to this whole subsystem of osquery as Events.

Scheduling queries

In order to understand why Events are necessary, let's take a 10,000-foot tour of how you schedule queries in osqueryd. Osqueryd, the osquery daemon, allows you to schedule queries for execution across your infrastructure. The daemon aggregates query results over time and generates logs indicating state change in your infrastructure. You can use these logs to gain insight into changes you may be interested in.

For example, consider the query “select address, hostnames from etc_hosts”. This query returns a set of results indicating the current state of your /etc/hosts file. Consider the following illustration:

 
$ osqueryi "select address, hostnames from etc_hosts"
+-----------------+---------------+
| address         | hostnames     |
+-----------------+---------------+
| 127.0.0.1       | localhost     |
| 255.255.255.255 | broadcasthost |
| ::1             | localhost     |
+-----------------+---------------+

This data set looks interesting. If this dataset changes in any way, it's fundamentally interesting, and we'd like to know about it. Let's “schedule” this query. We just need to add the query to our osqueryd config:

{
  // Define a schedule of queries
  "schedule": {
    "etc_hosts": {
      "query": "select address, hostnames from etc_hosts",
      "interval": 60 // interval in seconds
    }
  }
}

If we run osqueryd with this config, we'll see logs like the following. Keep in mind that, normally, osqueryd logs are each on one line. We've expanded them here for readability.

{
    "name": "etc_hosts",
    "hostIdentifier": "marpaia-mbp.local",
    "calendarTime": "Tue Apr  7 18:13:55 2015",
    "unixTime": "1428455635",
    "columns": {
        "address": "127.0.0.1",
        "hostnames": "localhost"
    },
    "action": "added"
}
{
    "name": "etc_hosts",
    "hostIdentifier": "marpaia-mbp.local",
    "calendarTime": "Tue Apr  7 18:13:55 2015",
    "unixTime": "1428455635",
    "columns": {
        "address": "255.255.255.255",
        "hostnames": "broadcasthost"
    },
    "action": "added"
}
{
    "name": "etc_hosts",
    "hostIdentifier": "marpaia-mbp.local",
    "calendarTime": "Tue Apr  7 18:13:55 2015",
    "unixTime": "1428455635",
    "columns": {
        "address": "::1",
        "hostnames": "localhost"
    },
    "action": "added"
}  

As you can see, the logs indicate that the three results we saw above were added. Let's say that somebody changed the value of 127.0.0.1 from “localhost” to “foobar”. We would see two log entries: one indicating that localhost was removed and one indicating that foobar was added. Take a look:

{
    "name": "etc_hosts",
    "hostIdentifier": "marpaia-mbp.local",
    "calendarTime": "Tue Apr  7 18:16:50 2015",
    "unixTime": "1428455810",
    "columns": {
        "address": "127.0.0.1",
        "hostnames": "localhost"
    },
    "action": "removed"
}
{
    "name": "etc_hosts",
    "hostIdentifier": "marpaia-mbp.local",
    "calendarTime": "Tue Apr  7 18:16:50 2015",
    "unixTime": "1428455810",
    "columns": {
        "address": "127.0.0.1",
        "hostnames": "foobar"
    },
    "action": "added"
}

Great! Everything is working as planned. Based on our above configuration file, it's clear that osqueryd executes the query every 60 seconds. When osqueryd executes the query, it checks to see whether previous results of that query were already stored in RocksDB. If there is no data, osqueryd will store the results and emit all the rows as having been “added”. If previous results exist in the database, osqueryd will compare the two datasets and emit differential results.

This works really well, but the astute reader will note that if the content of /etc/hosts is changed between queries and changed back before the next query executes, no results will be emitted. The Events system is our solution to this kind of problem.

Obviously, /etc/hosts is a contrived example. Let's explore a more interesting example of a real-life issue that we solved using Events.

Event-based monitoring

We need to be able to react to operating system events for two reasons. First, if data changes between our query schedule and then changes back, we won't be able to monitor that change effectively. Second, repeatedly polling the same resource, which doesn't change often, is inefficient. We can do better.

To best understand what the Events API looks like and how it works, let’s walk through an actual example from the osquery codebase: the udev event publisher and the related “hardware events” event subscriber.

For those who aren't familiar with udev, here's how Wikipedia, defines it: “udev is a device manager for the Linux kernel. As the successor of devfsd and hotplug, udev primarily manages device nodes in the /dev directory. At the same time, udev also handles all user space events raised while hardware devices are added into the system or removed from it, including firmware loading as required by certain devices.”

Actions

When a udev event occurs, we need to define the set of “actions” that the event can represent. We do this in code by declaring an enum of possible actions. When the Event Publisher wants to publish an event to its subscribers, it will denote what kind of event it is. The event may be a device being added, removed, or changed.

enum udev_event_action {
  UDEV_EVENT_ACTION_ADD = 1,
  UDEV_EVENT_ACTION_REMOVE = 2,
  UDEV_EVENT_ACTION_CHANGE = 3,

  UDEV_EVENT_ACTION_ALL = 10,
};

This enum gives us a way to define a simple type to reference these different kinds of events later on.

Subscription contexts

Subscription contexts allow a subscriber to respond to only certain kinds of events. For example, perhaps you're interested in only those device events that may occur on a specific driver. You can specify that driver in the subscription context which a subscriber uses to subscribe to a publisher.

We're going to talk about publishers first, but part of making a publisher means reasoning about how to make your publisher efficient by allowing your subscribers to proactively announce which events they don't care about.

The udev publisher allows you to limit on many values, in addition to driver name. Consider the actual udev subscription context:

 
struct UdevSubscriptionContext : public SubscriptionContext {
  /// The hardware event action, add/remove/change.
  udev_event_action action;

  /// Restrict to a specific subsystem.
  std::string subsystem;
  /// Restrict to a specific devnode.
  std::string devnode;
  /// Restrict to a specific devtype.
  std::string devtype;
  /// Restrict to a specific driver name.
  std::string driver;
}; 

Event contexts

A publisher, quite intuitively, publishes events. Every event carries what's called an “event context” along with it. An event's EventContext object contains the relevant metadata about the event that occurred. The following code is the implementation of the udev event context.

struct UdevEventContext : public EventContext {
  /// A pointer to the device object, most subscribers will only use device.
  struct udev_device* device;
  
  /// The udev_event_action identifier.
  udev_event_action action;
  
  /// Action as a string (as given by udev).
  std::string action_string;

  // Additional metadata about the event
  std::string subsystem;
  std::string devnode;
  std::string devtype;
  std::string driver;
};

typedef std::shared_ptr<UdevEventContext> UdevEventContextRef;
typedef std::shared_ptr<UdevSubscriptionContext> UdevSubscriptionContextRef;

Event publisher

Now that we've defined the objects that we'll be passing around, let's start passing them around. To create a new event publisher, we must subclass the EventPublisher class. The EventPublisher class takes two template arguments. The first is the subscription context type which the publisher will use to manage subscriptions. The second is the event context type which is used to pass metadata about events to subscribers.

You must implement a few methods when you create an event publisher. First, you must implement what your publisher must do to initialize itself in the setUp() method. The osquery platform will then repeatedly call the run() method in a run-loop, padded with a user-defined sleep interval.

Any resources you create in setUp() can be responsibly deallocated in tearDown(), which is called in the event that your publisher needs to stop running.

We won't go into the specific inner workings of the udev event publisher here, but the reader is encouraged to explore the osquery codebase at his or her leisure. Observe the following shortened header for the UdevEventPublisher instead:

/**
 * @brief A Linux `udev` EventPublisher.
 *
 */
class UdevEventPublisher
    : public EventPublisher<UdevSubscriptionContext, UdevEventContext> {
  DECLARE_PUBLISHER("udev");

 public:
  /// set up the publisher
  Status setUp();

  /// start a run-loop which listens for udev events and publishes them
  /// to subscribers
  Status run();

  /// clean up resources
  void tearDown();

  // ...
  // redacted for brevity
  
  /**
   * @brief Return a string representation of a udev property.
   *
   * @param device the udev device pointer.
   * @param property the udev property identifier string.
   * @return string representation of the property or empty if null.
   */
  static std::string getValue(struct udev_device* device,
                              const std::string& property);

  /**
   * @brief Return a string representation of a udev system attribute.
   *
   * @param device the udev device pointer.
   * @param property the udev system attribute identifier string.
   * @return string representation of the attribute or empty if null.
   */
  static std::string getAttr(struct udev_device* device,
                             const std::string& attr);

  // ...
}; 

Event subscriber

Event subscribers are much more straightforward than publishers. Since many subscribers may subscribe to a single publisher, publishers are designed to be flexible and subscribers are designed to be simple.

An event subscriber may choose to subscribe to an event publisher. You can specify which event publisher a subscriber would like to subscribe to by passing the publisher type as a template argument when you publicly inherit from EventSubscriber. Then you simply initialize your subscriber and define a callback. Consider the following class declaration of the “hardware events” subscriber:

/**
 * @brief Track udev events in Linux
 */
class HardwareEventSubscriber : public EventSubscriber<UdevEventPublisher> {
 public:
  Status init();

  Status Callback(const UdevEventContextRef& ec, const void* user_data);
};

The init() method is simple. We basically create a subscription context and use that subscription context to declare that we want to be notified by all actions. We then subscribe to the publisher which we inherited from and specify our Callback() method as the event callback.

Status HardwareEventSubscriber::init() {
  auto subscription = createSubscriptionContext();
  subscription->action = UDEV_EVENT_ACTION_ALL;

  subscribe(&HardwareEventSubscriber::Callback, subscription, nullptr);
  return Status(0, "OK");
}

The event callback is the method that gets called every time the event publisher you're using publishes an event that matches the conditions you defined in your subscription context. We're concerned with surfacing these details in an osquery SQL table, so we're going to create an SQL row for each event. We will use the osquery::Row type to create our row with the relevant details. In our case, this includes details such as the action, the driver name, the path of the device, etc.

Consider the following code, which implements the hardware event subscriber callback:

Status HardwareEventSubscriber::Callback(const UdevEventContextRef& ec,
                                         const void* user_data) {
  Row r;
  // ...
  // redacted for brevity

  struct udev_device *device = ec->device;
  r["action"] = ec->action_string;
  r["path"] = ec->devnode;
  r["type"] = ec->devtype;
  r["driver"] = ec->driver;

  // UDEV properties.
  r["model"] = UdevEventPublisher::getValue(device, "ID_MODEL_FROM_DATABASE");
  r["model_id"] = INTEGER(UdevEventPublisher::getValue(device, "ID_MODEL_ID"));
  r["vendor"] = UdevEventPublisher::getValue(device, "ID_VENDOR_FROM_DATABASE");
  r["vendor_id"] =
      INTEGER(UdevEventPublisher::getValue(device, "ID_VENDOR_ID"));
  r["serial"] =
      INTEGER(UdevEventPublisher::getValue(device, "ID_SERIAL_SHORT"));
  r["revision"] = INTEGER(UdevEventPublisher::getValue(device, "ID_REVISION"));

  r["time"] = INTEGER(ec->time);

  // add the row to our backing datastore, RocksDB
  // more on this soon!
  add(r, ec->time);
  
  return Status(0, "OK");
}
      

Once we create a relevant row, we need to add it to RocksDB so that we can query it later, via SQL. To do this, we use the inherited add() method. We'll talk more about how this works very soon, but let's wire this up first.

Registration

In order to register our event subscriber, we use the REGISTER macro, which accepts three parameters: the event publisher that you'd like to subscribe to, the string “event_subscriber”, and the name of the table that this subscriber applies to.

REGISTER(HardwareEventSubscriber, "event_subscriber", "hardware_events");
      

Now that our subscriber is adding events to RocksDB as they occur, we need a way to query the data using an easy SQL interface. To do this, we create a table schema in osquery's Python table schema IDL. The actual schema file is intuitive, so I've included it here:

table_name("hardware_events")
description("Hardware (PCI/USB/HID) events from UDEV or IOKit.")
schema([
    Column("action", TEXT, "Remove, insert, change properties, etc"),
    Column("path", TEXT, "Local device path assigned (optional)"),
    Column("type", TEXT, "Type of hardware and hardware event"),
    Column("driver", TEXT, "Driver claiming the device"),
    Column("model", TEXT, "Hardware device model"),
    Column("model_id", INTEGER),
    Column("vendor", TEXT, "Hardware device vendor"),
    Column("vendor_id", INTEGER),
    Column("serial", TEXT, "Device serial (optional)"),
    Column("revision", INTEGER, "Device revision (optional)"),
    Column("time", INTEGER, "Time of hardware event"),
])
attributes(event_subscriber=True)
implementation("events/hardware_events@hardware_events::genTable")

In the schema file, we specify the attribute event_subscriber to be True. This generates the C++ code that ties together osquery's SQL table generator with your event subscriber. By defining your table this way, osquery's build system will automatically generate the code that is required to read the data that you added to RocksDB. Remember, this is the SQL row that you added to RocksDB by using the add() method.

We'll talk more about what happens when you query this table shortly, but first we must understand what the add() method from above does.

Adding data to RocksDB

When we call the add() method, we store the osquery::Row that was just populated into RocksDB. We begin with a column family called “events” stored as the kEvents variable. Since RocksDB is a key value store at its core, we create a unique key to identify this event. By concatenating a namespace and a unique ID, we construct a unique key. We then serialize the SQL row into a string and store that as the value. A simplified version of the implementation of this logic is included here:

/**
 * @brief Store parsed event data from an EventCallback in a backing store.
 *
 * Within an EventCallback the EventSubscriber has an opportunity to create
 * an osquery Row element, add the relevant table data for the EventSubscriber
 * and store that element in the osquery backing store. At query-time
 * the added data will apply selection criteria and return these elements.
 * The backing store data retrieval is optimized by time-based indexes. It
 * is important to add EventTime as it relates to "when the event occurred".
 *
 * @param r An osquery Row element.
 * @param time The time the added event occurred.
 *
 * @return Was the element added to the backing store.
 */
virtual Status EventSubscriberPlugin::add(const osquery::Row& r,
                                          EventTime time) final {
  // ...
  // redacted for brevity

  // Get and increment the EID for this module.
  EventID eid = getEventID();

  std::string event_key = "data." + dbNamespace() + "." + eid;
  
  std::string data;
  status = serializeRowJSON(r, data);
  if (!status.ok()) {
    return status;
  }

  // Store the event data.
  status = db->Put(kEvents, event_key, data);
  // Record the event in the indexing bins.
  recordEvent(eid, time);
  return status;
} 

At the end of the add() method, after storing the row in RocksDB, we call the recordEvent() method. When we record the event, we create an index that allows us to specify when a certain event was added. Based on the event's time, we store its key in a presorted bin. This allows us to query the data in RocksDB more efficiently:

  
/**
 * @brief Add an EventID, EventTime pair to all matching list types.
 *
 * The list types are defined by time size. Based on the EventTime this pair
 * is added to the list bin for each list type. If there are two list types:
 * 60 seconds and 3600 seconds and `time` is 92, this pair will be added to
 * list type 1 bin 4 and list type 2 bin 1.
 *
 * @param eid A unique EventID.
 * @param time The time when this EventID%'s event occurred.
 *
 * @return Were the indexes recorded.
 */
Status EventSubscriberPlugin::recordEvent(EventID& eid, EventTime time) {
  // ...
  // redacted for brevity

  for (auto time_list : kEventTimeLists) {
    // ...
    {
      boost::lock_guard&boost::mutex> lock(event_record_lock_);
      // Append the record (eid, unix_time) to the list bin.
      std::string record_value;
      status = db->Get(
          kEvents, record_key + "." + list_key + "." + list_id, record_value);

      if (record_value.length() == 0) {
        // This is a new list_id for list_key, append the ID to the indirect
        // lookup for this list_key.
        std::string index_value;
        status = db->Get(kEvents, index_key + "." + list_key, index_value);
        if (index_value.length() == 0) {
          // A new index.
          index_value = list_id;
        } else {
          index_value += "," + list_id;
        }
        status = db->Put(kEvents, index_key + "." + list_key, index_value);
        record_value = eid + ":" + time_value;
      } else {
        // Tokenize a record using ',' and the EID/time using ':'.
        record_value += "," + eid + ":" + time_value;
      }
      status = db->Put(
          kEvents, record_key + "." + list_key + "." + list_id, record_value);
      if (!status.ok()) {
        LOG(ERROR) & "Could not put Event Record key: " && record_key && "."
                   && list_key && "." && list_id;
      }
    }
  }

  return Status(0, "OK");
}

Expiring data out of RocksDB

We've seen how we add data to RocksDB, but you may be concerned that RocksDB will grow in size infinitely as more events occur, destined to eventually fill up your disk. This doesn't happen, though, because we expire data out of RocksDB. Given a vector of indexes to expire, the following code will delete the records from RocksDB:

/**
 * @brief Expire indexes and eventually records.
 *
 * @param list_type the string representation of list binning type.
 * @param indexes complete set of 'index.step' indexes for the list_type.
 * @param expirations of the indexes, the set to expire.
 *
 * @return status if the indexes and records were removed.
 */
Status EventSubscriberPlugin::expireIndexes(
    const std::string& list_type,
    const std::vector<std::string>& indexes,
    const std::vector<std::string>& expirations) {
  // ...
  // redacted for brevity

  // Remove the records using the list of expired indexes.
  std::vector<std::string> persisting_indexes = indexes;
  for (const auto& bin : expirations) {
    db->Delete(kEvents, record_key + "." + list_type + "." + bin);
    persisting_indexes.erase(
        std::remove(persisting_indexes.begin(), persisting_indexes.end(), bin),
        persisting_indexes.end());
  }

  // Update the list of indexes with the non-expired indexes.
  auto new_indexes = boost::algorithm::join(persisting_indexes, ",");
  db->Put(kEvents, index_key + "." + list_type, new_indexes);

  // Delete record events.
  for (const auto& record : expired_records) {
    db->Delete(kEvents, data_key + "." + record.first);
  }

  return Status(0, "OK");
}

When does the data actually get expired?

You may be wondering how osquery knows what data to expire and when it does it. Well, let's talk about the table specification file we looked at earlier. That file was used during osquery's build process to generate some C++ code that ties the SQL engine that osquery uses with the hardware events subscriber.

/*
 *  Copyright (c) 2014, Facebook, Inc.
 *  All rights reserved.
 *
 *  This source code is licensed under the BSD-style license found in the
 *  LICENSE file in the root directory of this source tree. An additional grant
 *  of patent rights can be found in the PATENTS file in the same directory.
 *
 */

/*
** This file is generated. Do not modify it manually!
*/

#include <osquery/events.h>
#include <osquery/tables.h>

namespace osquery { namespace tables {

/// BEGIN[GENTABLE]

class hardware_events {
 public:
  osquery::QueryData genTable(QueryContext& request);
};

class hardwareEventsTablePlugin : public TablePlugin {
 private:
  TableColumns columns() const {
    return {
      {"action", "TEXT"},
      {"path", "TEXT"},
      {"type", "TEXT"},
      {"driver", "TEXT"},
      {"model", "TEXT"},
      {"model_id", "INTEGER"},
      {"vendor", "TEXT"},
      {"vendor_id", "INTEGER"},
      {"serial", "TEXT"},
      {"revision", "INTEGER"},
      {"time", "INTEGER"}
    };
  }

  QueryData generate(QueryContext& request) {
    auto subscriber = EventFactory::getEventSubscriber("hardware_events");
    return subscriber->genTable(request);
  }
};


REGISTER(hardwareEventsTablePlugin, "table", "hardware_events");

/// END[GENTABLE]

}}
       
       

As you can see, the generated code is actually quite clear and readable. We create a simple “table plugin”, populate some metadata, and register it. When we query the “hardware_events” table, the generate() method is called on this class. Generate uses the EventFactory class to get a reference to the hardware events subscriber and calls the gentable() function on the subscriber. The genTable() function is an inherited function that retrieves the indexes from RocksDB. The call chain here is quite long, so we won't dive into it too deeply, but we will explore the highlights.

The genTable() function calls its own get() method.

/**
 * @brief Suggested entrypoint for table generation.
 *
 * The EventSubscriber is a convention that removes a lot of boilerplate event
 * subscriptioning and acting. The `genTable` static entrypoint is the
 * suggested method for table specs.
 *
 * @return The query-time table data, retrieved from a backing store.
 */
virtual QueryData EventSubscriberPlugin::genTable(tables::QueryContext& context)
    __attribute__((used)) {
  return get(0, 0);
}
       
       

The get() method eventually calls the getIndexes() function to retrieve the active keys from RocksDB. During the process of getting the keys from RocksDB, the getIndexes() method calls expireIndexes() which expires stale events from RocksDB. Whenever you query the “hardware_events” table, it cleans itself up! This maintains the database size and makes sure that you're getting only the information that you want and need.

class EventSubscriberPlugin : public Plugin {
// ...
// redacted for brevity
 private:
  /**
   * @brief Plan the best set of indexes for event record access.
   *
   * @param start an inclusive time to begin searching.
   * @param stop an inclusive time to end searching.
   * @param list_key optional key to bind to a specific index binning.
   *
   * @return List of 'index.step' index strings.
   */
  std::vector<std::string> getIndexes(EventTime start,
                                      EventTime stop,
                                      int list_key = 0);

  /**
   * @brief Expire indexes and eventually records.
   *
   * @param list_type the string representation of list binning type.
   * @param indexes complete set of 'index.step' indexes for the list_type.
   * @param expirations of the indexes, the set to expire.
   *
   * @return status if the indexes and records were removed.
   */
  Status expireIndexes(const std::string& list_type,
                       const std::vector<std::string>& indexes,
                       const std::vector<std::string>& expirations);       

       

Scheduling the query

Similarly to how we scheduled the query on the “etc_hosts” table earlier, now we just need to use osqueryd to schedule a query on the “hardware_events” table. Once we do that, we can enjoy accurate logs indicating each time that hardware is added, removed, or changed on your host!

That's all, folks

All the code examples in this article are current as of the most recent release of osquery, which is 1.4.4. This code is subject to changes and improvements over time. If this kind of work is something that interests you, we're hiring!

Thanks for following along. We love using RocksDB in osquery and we hope you love using RocksDB too.

Keep Updated

Stay up-to-date via RSS with the latest open source project releases from Facebook, news from our Engineering teams, and upcoming events.

Subscribe
Facebook © 2017