Streaming Columnar Data with Apache Arrow

** Fri 27 January 2017

Over the past couple weeks, Nong Li and I added a streaming binary format to Apache Arrow, accompanying the existing random access / IPC file format. We have implementations in Java and C++, plus Python bindings. In this post, I explain how the format works and show how you can achieve very high data throughput to pandas DataFrames.

Columnar streaming data

A common question I get about using Arrow is the high cost of transposing large tabular datasets from record- or row-oriented format to column-oriented format. For a multi-gigabyte dataset, transposing in memory or on disk may be prohibitive.

For streaming data, whether the source data is row-oriented or column-oriented memory layout, one option is to send small batches of rows, each internally having a columnar memory layout.

In Apache Arrow, an in-memory columnar array collection representing a chunk of a table is called a record batch. Multiple record batches can be collected to represent a single logical table data structure.

In the existing “random access” file format, we write metadata containing the table schema and block locations at the end of the file, enabling you to select any record batch or any column in the dataset very cheaply. In the streaming format, we send a series of messages: the schema followed by one or more record batches.

The different formats look roughly like this diagram:

Streaming data in PyArrow: Usage

To show you how this works, I generate an example dataset representing a single streaming chunk:

Now, suppose we want to write 1 gigabyte of data composed of chunks that are 1 megabyte each, so 1024 chunks. First, let’s create 1MB DataFrame with 16 columns:

Then, I convert this to a pyarrow.RecordBatch:

Now, I create an output stream that writes to RAM and create a StreamWriter:

Then, we write the 1024 chunks composing the 1 GB dataset:

Since we wrote to RAM, we can get the entire stream as a single buffer:

Since this data is in memory, reading back Arrow record batches is a zero-copy operation. I open a StreamReader, read back the data as a pyarrow.Table, and then convert to a pandas DataFrame:

This is all very nice, but you may have some questions. How fast is it? How does the stream chunk size affect the absolute performance to obtain the pandas DataFrame?

Streaming data performance

As the streaming chunksize grows smaller, the cost to reconstruct a contiguous columnar pandas DataFrame increases because of cache-inefficient memory access patterns. There is also some overhead from manipulating the C++ container data structures around the arrays and their memory buffers.

With a 1 MB as above, on my laptop (Quad-core Xeon E3-1505M) I have:

This is an effective throughput of 7.75 GB/s to reconstruct a 1GB DataFrame from 1024 1MB chunks. What happens when we use larger and smaller chunks? Here are the results

The performance degrades significantly from 256K to 64K chunks. I was surprised to see that 1MB chunks were faster than 16MB ones; it would be worth a more thorough investigation to understand whether that is normal variance or something else going on.

In the current iteration of the format, the data is not being compressed at all, so the in-memory and on-the-wire size are about the same. Compression may be added to the format as an option in the future.

Summary

Streaming columnar data can be an efficient way to transmit large datasets to columnar analytics tools like pandas using small chunks. Data services using row-oriented storage can transpose and stream small data chunks that are more friendly to your CPU’s L2 and L3 caches.

Full benchmarking code