When ABC Arbitrage first started, software engineering was not a big part of the company’s culture and everything was designed with a monolithic approach.
Nowadays, the whole industry shifted towards a service-oriented and microservices architecture, and we, at ABC Arbitrage, decided to follow the path.
Our trading platform and infrastructure is architectured around a peer-to-peer service bus that does not depend on a third party broker to dispatch messages between services.
While switching from a monolithic approach to a services-oriented architecture, we also needed a monitoring tool to gather and report application metrics.
While looking at existing solutions, we quickly came to the conclusion that none of them would suit our particular needs, nonetheless to say that in 2014, when project first started, all the monitoring solutions we now have did not exist back then or were in their early stage.
We thus decided to roll our own solution, Observer. This project was also a great use-case for a database we were already using, Cassandra. Cassandra is great to store a high volume of data and was especially suited to store time-series data like metrics data.
For many years, Observer has been our monitoring & alerting solution, as it also provided alerting through a dedicated service.
It proved to work fairly well and was scaling nicely with our increasing number of services being deployed. Using Cassandra as a storage backend allowed us to ingest a high volume of data (20k writes/s) while having replicas across multiple data-centers.
However, care must be taken while using Cassandra with a time-series oriented workload. At its core, Cassandra uses a Log-Structured Merge Tree engine which allows high write-throughput by buffering data in-memory and periodically flushing immutable chunk of data to disk in the form of SSTables. To limit data fragmentation and read latency, SSTables are periodically combined together in a specific process called compaction. By default, Cassandra uses a size-tiered compaction strategy that combines multiple SSTables that belong to a particular data size range together. While very simple to reason about, this strategy is not particularly well suited for a time-series workload with a fixed TTL.
We did not realize that for a very long time, until issues came all along, starting with our disk usage that kept growing, even though we configured short TTLs. Cassandra expires TTL-ed data through special markers called tombstones, and evicts tombstones during compaction and after certain conditions are met (after gc_grace_seconds, non-overlapping ranges, …). Using a compaction strategy based on size was thus making it very hard for Cassandra to evict tombstones as old and expired data was located in big files that took very long to get compacted together.
As a quick workaround and to avoid a disaster, we started deleting SSTable files manually, based on their maximum timestamp data, until we discovered a brand new compaction strategy: Time Window Compaction Strategy (TWCS)
Installing and configuring the TWCS definitely solved our issues and we decided to switch every time-serie workload we had in Cassandra to this strategy. Cassandra helped us grow and scaled accordingly to our needs. Folks at Criteo followed the same way and since announced BigGraphite.
Unlike BigGraphite, Observer provided its own frontend for viewing data and dashboarding, based on Angularjs.
It worked fairly well but had some limitations that were difficult to overcome:
- Web frontend was entirely handcrafted. Only a couple of people on the team had web skills and were able to keep improving it
- Dashboarding UI only provided basic widgets to display and visualize data (graph, single stat)
- It did not provide an API to query data. Viewing data was restricted to the web frontend which was making difficult if not impossible to develop external analytic tools
- No data insertion from the past
- Transformation on data was not supported. Aggregations and data manipulation would have to be done at client-side.
ABC Arbitrage recently introduced the Lab, which gives people an opportunity to spend some time working on what they think would benefit the company. A quota of 2 to 3 months is given to employees to gather a team and work on a project, as long as it is relevant to the company’s business. As our first project with this new policy, two engineers chose to work on a brand new solution to entirely replace Observer, which was getting old and not maintained.
As such, amongst all the now existing solutions, we decided to try the couple InfluxDB / Grafana. Multiple reasons motivated this choice:
- InfluxDB is a time-series database and is optimized as such, unlike Cassandra which is a generic column store NoSQL database.
- It is open-source
- InfluxDB comes with a collection of products, called the TICK stack for collecting data and alerting
- Although only available in the paid version, InfluxDB provides a clustering feature
- It provides a rich and powerful query language to retrieve and manipulate data with a set of predefined operations.
- It is supported out of the box by Grafana (time series analytics software)
- Grafana supports rich visualisations for many time serie databases
- Grafana has built-in alerting
Folks at InfluxData provided us with a trial licence so that we could try and evaluate their product, especially the clustering part that we needed to maintain high-availability and we started experimenting from that.
We coupled InfluxDB with Grafana for data visualization and dashboarding.
To start playing with InfluxDB, we needed to ingest some data. In our current infrastructure, data points are processed by a central component before being persisted to Cassandra. We hooked some custom code in that component to perform dual writes in Cassandra and InfluxDB so that we had real data to play with. It also gave us a first indicator of how much points InfluxDB was able to handle and we were able to confirm that InfluxDB was able to handle our current load (machines were sized accordingly to InfluxData hardware sizing guidelines).
Compared with our in-house solution based on Cassandra, InfluxDB gave us a set of rich and powerful features, out of the box:
- Retention Policy and Downsampling though Continuous Queries
- Data aggregation
- Kapacitor: InfluxDB ETL
Let’s examine each one of them in detail.
Retention Policy and Downsampling though Continuous Queries
In Cassandra, data retention (the amount of time Cassandra will keep data) is managed with TTL, on a per-cell basis. InfluxDB uses retention policies, on a per-database basis. With many instances of many services pushing thousands of points per second, keeping a high resolution of data for a long period of time can create storage issues. To avoid such concerns, a natural solution is to only keep the high resolution data for a short period of time and keep a lowest precision longer. The process of transforming high resolution to lowest precision data is called downsampling. Cassandra does not come with any built-in downsampling nor rollup logic. Remember, Cassandra is a generic NoSQL column store, not a specialized time-series store. Downsampling must then be implemented in a external component, which was what Observer was doing.
Downsampling in InfluxDB is directly supported through Continuous Queries. A continuous query is a query that runs periodically and executes a regular InfluxQL query. Thus, no custom logic is needed, data will automatically be downsampled and retention policies will be automatically populated with downsampled data by InfluxDB.
On disk, data is organized in shards, similar to Cassandra’s time windows. Each shard belongs to a shard group with a specific duration. When a shard group exceeds the retention policy configured expiration time, the shard group is thrown out from disk so that the same amount of data is kept over time.
Let’s say that you have multiple instances of the same service, spread in multiple datacenters. A metric representing the number of received messages per second is published for each instance. While having a data point per instance, it might also be useful to know the total number of messages received at a given point in time for a particular datacenter.
With our old system, it was fairly difficult to do. The same point had to be published twice. Once for the node and second time for the datacenter. Every point published by every node for the datacenter would share the metric id (UUID) and a central component that received the data would sum the data before persisting it to Cassandra. The level of aggregations were very limited and it forced every client to publish their points multiple times if aggregation was needed.
While InfluxDB being “schema-less”, data is still structured a certain way with a name and a set of key-value pairs called tags. Tags are especially useful and allow for easy and generic data aggregation. For example, the datacenter aggregation could be calculated with the given InfluxQL query:
select sum(“value”) from “received_messages” where time >= now() – 1h group by time(10s), “datacenter”
Provided that data is correctly tagged, it becomes easy to select, manipulate and transform data with InfluxQL.
Kapacitor: InfluxDB ETL
Kapacitor is part of the TICK stack and provides a rich stream-processing engine to manipulate and store data back to InfluxDB. It is also useful for alerting and anomaly detection, which was also implemented in Observer and handled by a specific alerting service.
We initially tried alerting through Grafana. However, many of our dashboards had template variables which does not work with Grafana alerting system. To handle that kind of use-case, it is recommended to create dashboards dedicated to alerting, and then use a group-by clause in the InfluxDB query. A Grafana alert can then be configured on a graph displaying multiple series at the same time but won’t trigger multiple times. This is a limitation of the Grafana’s alerting system. Alert status is handled on a per-panel basis, and not on a per-serie basis. Per-serie alert state tracking is part of Grafana’s roadmap.
We then decided to use Kapacitor for our alerts, waiting for Grafana to improve alerting.
We spent two months trying InfluxDB and Grafana as a potential candidate for our current monitoring solution replacement. These two months allowed us to become familiar with InfluxDB, from data ingestion to authentication and authorization as well as the other components of the TICK stack. We reinforced our beliefs that InfluxDB, coupled to Grafana was a great replacement for our current monitoring solution and adds some real value.
In the following months, work will be done to deploy an InfluxDB cluster to production and start migrating application metrics to this new cluster.