Migrating from MongoDB to InfluxDB

One of my first tasks in my job was to migrate from MongoDB to InfluxDB. In this post, I'm going to summarize my experience in such an effort and how I did deal with InfluxDB with no previous experience.

The Scenario

  • We are parsing counters from monitoring systems who come in binary and XML files. These files contain lots of metrics however not all are useful for later analysis. The filter rules are stored in configuration files along with the solution.
  • The parsers are writing single documents in MongoDB using simple insertion operations. Parsing 250 Gb of the sample data takes around a week running in a virtual machine with 16Gb of memory, 8 processors.
  • The parsers are written in Java using multithreading and other APIs like SAX for XML reads, native MongoDB client, logging, etc.

Data modeling

Whereas while the objective of the re-architecting is improving the performance of the data acquisition is absolutely imperative to change the data modeling not just for performance purposes but also for time series analysis.

Consider the documents written in Mongo like:

{
    _id: UUID(),
    name: counter_name,
    date: {"$date":"2017-08-11T17:54:14.692Z"},
    prop1,...,propn
}

The properties prop1,...,propn are numeric values or numeric arrays in some cases. The hardest work was reviewing all those counters and arranged into useful and non required by the algorithms.

In MongoDB, you can model as embedded document such kind of structure easily, and using a bulk operation you can improve the data load. In InfluxDB, however, I would not recommend that design, the data points have a structure like:

<measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]

Where tags are indexed and field are not, considering this you need to avoid writing lots of tags for querying and lots of fields as properties because it has an elevating cost. It is better instead write consistent metrics within the tagging domain and use different measurements rather than a huge structure. There can be many options for downsampling specific measurements using aggregation pipelines for future analysis. You can learn more about InfluxDB's concepts here.

Let's jump into an example to be more clear. Looking into the UCI Machine Learning Repository I found this data set which includes hourly air pollutants data from Beijing. It looks interesting because contains multiple files with well-formed rows.

From row 2 to 5 will represent the timestamp in any case. Let's suppose that we want to plot the wind, for this requirement would be useful to get a measurement structure taking the wind speed and direction, row 17 and 16 respectively. Our measurement's schema will be like: "Wind,station=%s direction=\"%s\",speed=%s %d"

It will allow us queries like:

select * from Wind where station='Aotizhongxin' limit 10

Depends on the amount of data expected and the retention policies would be better to have measurements like:

Aotizhongxin_PM2_SO2 Aotizhongxin_TEMP

Whatever would be the analysis goals you can decompose the data as get the features required. In the given example I do suppose that is required to correlate PM2.5 with SO2 concentrations and other measurements for enrichment, I'm no an expert in air pollution it's clear, but the fact is that you need to develop ideas about what kind of measurements would be useful for labeling or predicting. This may be addressed in collaboration with the Data Science team.

The Code

There is nothing special in this code, we are reading the files using multithreading, and storing using the latest Java's API version, but I'm still using the InfluxDB 1.8. I must say however the API has good improvements in reads and writes.

As I mention previously when we depicted data points a good practice is to handle separated measurements. In the next code, we are formatting the Wind measurement:

public static String lineProtocol(String line) {
    String[] cols = line.replace("\"","").split(",");

    ZonedDateTime timestamp = getZonedDateTime(cols);

    String format;
    try {
        format = String.format("Wind,station=%s direction=\"%s\",speed=%s %d",
                cols[17],
                cols[15],
                cols[16],
                timestamp.toEpochSecond()
        );
    } catch (Exception e) {
        format = String.format("fails,m=%s value=%s %d",cols[17],cols[0],timestamp.toEpochSecond());
        logger.error(e.getMessage());
    }
    return format;
}

Notice that this method is a good candidate for re-engineering, to create a new class using the factory patterns to get a custom measurement format depending on the feature that we are looking for.

We now have the next sampled series:

show series
Key
---
Wind,station=Aotizhongxin
Wind,station=Changping
Wind,station=Dingling
Wind,station=Dongsi
Wind,station=Guanyuan
Wind,station=Gucheng
Wind,station=Huairou
Wind,station=Nongzhanguan
Wind,station=Shunyi
Wind,station=Tiantan
Wind,station=Wanliu
Wind,station=Wanshouxigong

To compile and run the code you need to install Java 8, Maven, and running properly the InfluxDB, I recommend the sandbox for testing purposes.

Post migrated from a previous blog, updated with source code.
Show Comments