Moving to Parquet Files as a System-of-Record

 Moving to Parquet Files as a System-of-Record
By

Enigma is home to the world's largest repository of public data. Organizing, updating, maintaining, and indexing all of that data is no small feat. To do so, we were using a combination of technologies to power various parts of the system:

  • CSV files on Amazon's S3 as the primary entry point and format for data
  • Elasticsearch with a heavily customized document structure and dynamic index creation algorithm to allow full text queries over our sparse, heterogeneous data sets (something Elasticsearch is notoriously bad at)
  • Postgres as a resilient backing store for all data

During the evolution of what would become Assembly, Enigma’s platform for searching, storing, and enriching data, the system queried Postgres directly less and less often (as Elasticsearch queries were typically much faster than the associated Postgres queries). By the time our community data platform Enigma Public went live, the system only queried Postgres when a user wanted to export an entire data set as a CSV—and for a few very client-specific use cases. In reality, we were barely using Postgres, but it was a large line-item in our monthly AWS bill. However, to trust Elasticsearch as the sole source of data was, ahem, risky at best. Postgres's reputation for resiliency and reliability let us sleep easier at night (while costing us a fortune, of course).

We knew Postgres could be replaced, but there are a thousand different ways to solve the "I need a single, canonical source for all my data" problem. Without any constraints to guide our decision, we may as well have thrown darts at a very boring dartboard. For more senior engineers, your Spidey-sense should be tingling: When you have too many possible solutions that seem equally good, you need to further constrain the problem. Novice engineers think of constraints as, well, constraining. But in truth, they act as pruning shears for the large branching tree of possible solutions.

Enter the Dragon... er, Product Managers

Every engineer's worst nightmare is to hear "so Product wants us to ship feature X by Y", where X may or may not have anything to do with your product, and Y is negative. Of course, as an engineer I think I know exactly what the customer wants; as a senior engineer, I know that's BS. Product Managers are the stewards of the product and meant to represent the voice of the actual customers (i.e. not the imaginary ones for whom you've already decided what they want).

On the flip-side, once every thousand years or so the engineering gods (on whose deaf ears every engineer’s prayer falls) take a break from writing the next volume of The Art of Computer Programming and perform a single miracle: Product's ideal direction for the product and Engineering's ideal direction for the product just happen to align perfectly. When you see it happen in the wild (and I've been privy to an actual honest-to-god bike-shedding argument, so my Software Safari creds are solid) everyone just kind of looks at each other a bit frightened. Surely this didn't just happen by chance?

Such was the case for Enigma Public and the foundation on which it runs, Enigma Assembly. Product wanted to take the next logical step in surfacing and organizing useful data sets: create derivative data sets by joining, filtering, and/or enriching existing data sets. The implementation, of course, is left to engineering, but this step represents a non-trivial shift in the way we think about and work with data at Enigma.

Meanwhile, Engineering (and SysOps) were looking for a less expensive and, more importantly, horizontally scalable solution to the System-of-Record issue. CSV files on S3, while overly simple, was actually much closer to the kind of data storage solution we wanted to use than Postgres. One big drawback of CSVs (among many, many other drawbacks) is their lack of schema information—indeed metadata in general less column names. Column type-inference libraries for CSVs are actually pretty good. But as we already knew the schema for new data sets before creating the CSVs and wanted to be sure the resulting schema always matched our definition.

Mean-meanwhile, the Enigma Data team was ramping up their use of Spark for various super-secret (and super-cool) machine learning projects. Hitting the Assembly API to download the CSVs of thousands of datasets, load them onto HDFS, and deserialize them with Spark proved to be an enormous bottleneck. They wanted the data in a format supported by Spark that took less time to deserialize than CSVs. They also wanted a simple SQL layer to be able to query the raw data from.

What's the simplest thing that could possibly work?

Before you go and call every vendor of systems even tangentially related to the problem you're working on just for the free dinners and helicopter rides (as I am wont to do), a useful exercise is to take your list of requirements and, to shamelessly steal a phrase from the Test Driven Development folks, ask "What's the simplest thing that could possibly work?" Put another way, what might an ideal solution look like if one removes all non-Essential Complexity?

To recap, here are the goals of our storage system: 

  • Distributed, and accessible simultaneously from other distributed systems
  • Compact wire-format, as these data sets will likely be transferred quite often
  • Support for, at a minimum, Spark and Python
  • Efficient serialization and deserialization of data sets across supported systems (serialization and deserialization is a common bottleneck in many "Big Data" applications) 
  • Capacity to make common, SQL-like operations (join, filter, add new columns) on existing data sets without requiring heroic data manipulation/transformations
  • Ability to power a SQL interface, either directly (if it's a DBMS) or indirectly (e.g. Amazon Athena on S3 files)

In which I make you do work

Let's actually work through this thought exercise. When it comes to data storage, the simplest and most fundamental building blocks are files. Our goal is to use a simple, file-based design for our system. And because we're going to be accessing these files using multiple distributed systems, we'll need some kind of distributed storage service to hold the data. Note, it needn't be a fully distributed file system like HDFS; all we require is a system that can map a file name to its contents—essentially an Object store.

For the files themselves, choosing the right format will be the key. The CSV format is a decent start (it is certainly simple), but we know that it is not able to encode schema information in the file itself (nor is there any standard way to encode it elsewhere). The wire-format is also about as un-compact as it gets. While we could compress CSVs before sending, that's true of any file, and thus not a "real" solution to the "compact wire-format" requirement. While we're at it, the CSV format is about the worst format one could create for efficient serialization and deserialization.

Ideally, our file format would be self-describing, giving us the freedom to use a "schema-on-read" approach where we simply dump the files somewhere (without first specifying their schema, as would be required in a system like Postgres) and decode the schema only when accessing them. That would allow us to tick the last requirement, powering an SQL interface, as many systems support creating SQL interfaces over file formats of this type. It's also much simpler than requiring a separate metadata store (a la Hive) with "table" definitions.

Perhaps the most restrictive (and thus most useful) requirement is the ability to make joining data sets and adding new columns to existing data sets "easy". Since most file formats store data row-by-row, this seems like a non-starter. After all, how would we add a new column to an existing data set? Short of essentially reading the data, jamming the new column's value in row-by-row, and then writing it out to a new file, there is no obvious simple solution.

So we know the kind of system we'd want, but are a bit stuck on the file format. Luckily, through the use of two new-ish Apache projects and Amazon S3, we can build our "simplest possible system" rather easily.

The perfect storm (but not "Storm" the streaming processing system from Twitter)

In describing the needs of the various teams earlier, I left out one small detail. The Enigma Data team didn't just ask for any old Spark-compatible data format that could be efficiently deserialized, they asked for a specific format. What they actually said was "and could you store the files in Parquet format on S3?"

If you're like me, you probably would have responded to that request in a manner similar to "you want me to put what, where?" After the Data team showed me “Google” and how to use it to search the entire Internet (what a time to be alive!), I came across Apache Parquet. Parquet is, wait for it... a file format. But not just any file format! It's a columnar format. In a columnar storage format, rather than storing data essentially as a list of independent rows, each file contains the values in one or more columns of data (the previous link has a nice, straightforward example). Parquet, in particular, also includes the schema of the data alongside the data itself at the end of the file (why the end rather than the beginning is left as an exercise for the reader). Columnar formats and systems based on them are rather new, so don’t worry if this is your first exposure to them.

By now, you’re probably sick of saying the word "columnar" in your head and are wary of the benefits over "row-ular" ™️ data. Let's discuss a few of those benefits:

  • Data for a single column is stored contiguously and all values share the same datatype, allowing you to compress the bejeezus out of the data using simple and well-known compression tricks. In addition to these tricks, Parquet supports using actual compression algorithms on the data—and even different algorithms for different columns of the same table.
  • When doing analysis on large data sets, it turns out "apply the following function to every value in this row" is not the most common data access pattern. Rather, it is much more likely that some subset of columns are needed at a given time (Pandas users, back me up). Arranging data by column means that columns unused in a given query never need to be read from disk—a huge performance boost for common operations on large-ish datasets.
  • In the brave new world ushered in by Big Data, a full data set rarely fits in memory. Therefore, diskaccess patterns have become an extremely important differentiator of storage systems. Serialization and deserialization of data written in a columnar format is usually much faster due to the fact that a given column's data is stored contiguously. That has locality (e.g. referential, temporal) wins written all over it.

So "Parquet files on S3" actually seems to satisfy most of our requirements:

  • Its columnar format makes adding new columns to existing data not excruciatingly painful
  • Files are compressed by the encoding scheme resulting in hilariously small Parquet files compared to the same data as a CSV file
  • All major systems provide "a SQL interface over HDFS files" support Parquet as a file format (and in some it is the default)
  • Spark natively supports Parquet
  • S3 handles all the distributed system-y requirements

"This should be simple!" Or, "Why I've been writing C and become a contributor to three open source projects"

In fact, there is only one hard requirement missing from "Parquet on S3" (I tried to shorten that, but could only come up with "PoS" and "PS3"): Python compatibility. At the time, Parquet existed as both a spec and a reference implementation in Java. Only Java. This is to be expected, though, as Parquet is based on the Google paper describing Dremel and, as we all know, every technology described in a Google paper is quickly followed by an Apache project implementing the technology in Java. Alas, even my witty observations could not help us. With an entire backend written in Python, adding Java to the mix for such a small task was unpalatable.

I put my new-found Google skills to work and came across two tightly coupled projects: parquet-cpp and Arrow. The former is a C++ implementation of the Parquet format and the latter is interesting enough to deserve its own sentence. Arrow is a close analogue to Parquet, only the storage medium is memory (RAM) rather than disk. That is, Arrow is a columnar in-memory data format and series of libraries. At the risk of oversimplification, "Arrow : Memory :: Parquet : Disk". It also provides libraries for a growing number of programming languages.

One might ask why we are even discussing Arrow. After all, we should be able to generate Python bindings using parquet-cpp, right? Well, Arrow takes care of that, as well as the part we haven't given much thought to yet: if we want to use Parquet as the output format, what intermediate formats does it support? For Python, the answer is "Arrow", in the form of the pyarrow package.

pyarrow is a first class citizen in the Arrow project: a good deal of time and effort has been spent implementing the features on the Arrow roadmap. And since Arrow is so closely related to parquet-cpp, support for Parquet output (again, from Python) is baked-in. Of course, this is starting to sound like turtles-all-the-way-down. We've now shifted the question "what intermediate formats does Parquet support" to "what intermediate formats does Arrow support?" or, "How does one construct an Arrow Table?". The answer, interestingly enough (you’ll see why I say that in a bit), is to use Pandas.

Now, given that we already know we have, or can create, CSV representations of data sets, the sequence of steps to get to "Parquet on S3" should be clear:

  1. Download and read a CSV file into a Pandas DataFrame
  2. Convert the DataFrame into an pyarrow.Table via Table.from_pandas()
  3. Output the Table as a Parquet file using pyarrow.parquet.write_table(our_table, some_filename)

This should be a piece of cake!

Spoiler alert: there is no cake

While both Arrow and parquet-cpp were still pre-1.0, there were/are a number of companies using both successfully in production. Few, however, it seemed, were working with CSV files of the magnitude we were used to (up to tens of GB). In addition, some Parquet implementations (cough Spark cough) had made some rather odd implementation choices.

The one that affected PoS (I've given up, let's get the giggles out now) directly was Spark's use of the int96 type to represent DATETIMEs. Now, in their defense, when they were implementing Parquet support there was only one other system that could actually output Parquet, and that was Impala. And Impala used int96 because <insert plausible explanation here>, so no one is actually to blame. Of course, once other systems started supporting Parquet output, Spark faced pressure to adopt the more "conventional" int64 type to represent DATETIMEs. Cue lots of Jira tickets, GitHub issues, Slack discussions, and email threads.

I wouldn't become aware of this fact until a bit later, as when I started work on PoS parquet-cpp didn't support DATETIMEs full-stop. Once support was added, I was happily generating Parquet versions of every data set in Enigma's public data repository—and the Data team was happily loading some percentage of those successfully into Spark. The rest were flat-out rejected due to a type mismatch, which is how I became aware of the int96 issue.

No matter! I would simply coerce DATE and DATETIME fields into Python/Pandas/numpy strings. This was fine (for a while, anyway) with our Data team as they didn't need to do any analysis on date data at the moment (though of course they needed to be able to load datasets with date data). And so I happily re-generated Parquet versions of every data set in Enigma's public data repository.

May you live in interesting times… and debug interesting bugs

During said regeneration, I noticed something curious. About 90% of the CSV to Parquet transformations worked just fine. For the remaining 10%, Pandas complained that the CSV had columns of mixed type. Knowing that this data already existed in Postgres with a set schema, that error message was a bit surprising.

A little digging revealed that the default behavior of the Pandas CSV parser is to operate over large files in chunks rather than reading the entire file into memory all at once. This can, in some cases (see the low_memory parameter), cause the column type inference code to be unable to determine a column's type. If all of the data is read at once, there is no such issue. This makes intuitive sense; if you can see all the data at once, you can definitively say if it's all one type or not. When you're operating over chunks of data, however, if any of the types inferred for each chunk doesn't seem to match the others, you can't make the same assertion.

No problem! As I said, I already had the schema of each of the CSVs and Pandas supports explicitly specifying the dtype of each column. And if for some reason that doesn't work, I could always read the entire CSV into memory (the file-generation process was running on a machine with 64 GB of RAM) and all the column types should be inferred properly. Both are parameters of pandas.read_csv(): dtypes=<dictionary mapping column name to numpy type>for the former solution and low_memory=False for the latter.

Surely at least one of those methods worked...

Spoiler alert: neither method worked. In fact, both methods uncovered bugs, though the bugs were distributed across three open source projects. When specifying dtypes, the interpreter core dumped within Arrow with the following stack trace:


#0  __memmove_avx_unaligned () at ../sysdeps/x86_64/multiarch/memcpy-avx-unaligned.S:181
#1  0x00007fbaa5c779f1 in parquet::InMemoryOutputStream::Write(unsigned char const*, long) () from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/libparquet.so.1
#2  0x00007fbaa5c0ce97 in parquet::PlainEncoder >::Put(parquet::ByteArray const*, int) ()
   from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/libparquet.so.1
#3  0x00007fbaa5c18855 in parquet::TypedColumnWriter >::WriteMiniBatch(long, short const*, short const*, parquet::ByteArray const*) ()
   from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/libparquet.so.1
#4  0x00007fbaa5c189d5 in parquet::TypedColumnWriter >::WriteBatch(long, short const*, short const*, parquet::ByteArray const*) ()
   from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/libparquet.so.1
#5  0x00007fbaa5be0900 in arrow::Status parquet::arrow::FileWriter::Impl::TypedWriteBatch, arrow::BinaryType>(parquet::ColumnWriter*, std::shared_ptr const&, long, short const*, short const*) () from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/libparquet.so.1
#6  0x00007fbaa5be171d in parquet::arrow::FileWriter::Impl::WriteColumnChunk(arrow::Array const&) () from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/libparquet.so.1
#7  0x00007fbaa5be1dad in parquet::arrow::FileWriter::WriteColumnChunk(arrow::Array const&) () from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/libparquet.so.1
#8  0x00007fbaa5be2047 in parquet::arrow::FileWriter::WriteTable(arrow::Table const&, long) () from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/libparquet.so.1
#9  0x00007fbaa51e1f53 in __pyx_pw_7pyarrow_8_parquet_13ParquetWriter_5write_table(_object*, _object*, _object*) ()
   from /home/ubuntu/.local/lib/python3.5/site-packages/pyarrow/_parquet.cpython-35m-x86_64-linux-gnu.so
#10 0x00000000004e9bc7 in PyCFunction_Call () at ../Objects/methodobject.c:98
...
#34 0x000000000063e7d6 in run_file (p_cf=0x7ffe6510afb0, filename=0x2161260 L"scripts/parquet_export.py", fp=0x226fde0) at ../Modules/main.c:318
#35 Py_Main () at ../Modules/main.c:768
#36 0x00000000004cfe41 in main () at ../Programs/python.c:65
#37 0x00007fbadf0db830 in __libc_start_main (main=0x4cfd60 , argc=2, argv=0x7ffe6510b1c8, init=, fini=, rtld_fini=, stack_end=0x7ffe6510b1b8)
    at ../csu/libc-start.c:291
#38 0x00000000005d5f29 in _start ()


This actually turned out to be the manifestation of three issues. The first was that the pandas_type in the pyarrow.Table's schema was mixed rather than string in some cases, which isn't a valid type for pyarrow. The second issue, and cause of the crash, was an integer overflow in one of the various offsets stored in the BinaryArray type, the type used for strings. The last issue was the absence of bounds checks for overflow that would have otherwise prevented this.

Door number two, please

OK, so that's unfortunate. But remember, we still have another option! We can let Pandas read the entire CSV into memory in one go and infer the column types. Since none of the individual data sets are larger than the 64 GB of RAM the machine has, this shouldn't be a problem.

It was. And it looked eerily similar to the Arrow issue. Pandas crashed while trying to allocate memory due to an integer overflow. The overflow occurred in an offset value tracking the current capacity of the buffer the CSV is being read into. But before I could even get to that bug, another bug was causing the Python interpreter to core dump while trying to raise an Exception to tell me, incorrectly, it was “out of memory” .

All told, the situation was... not ideal.

Patches on patches on patches

Clearly, I had to get at least one approach working (because: job). I submitted issues to Arrow and Pandas and created a reproducible example for each. For those who don't know, Wes McKinney just happens to be both the PMC of Arrow and Parquet as well as the creator of Pandas (this is why Arrow is so tightly integrated with Pandas). I discussed the issues with him on Slack (I had already contributed to Arrow before and was already on their Slack). Once it became clear the Arrow issue would require multiple changes from multiple people, I set to work on the Pandas issue.

The Pandas issue was like an onion (with, uh, two layers): one had to peel back and fix the first issue before the second would reveal itself. Also, debugging was a lot more time consuming due to the majority of code being written in Cython. Cython code looks like Python got bit by a radioactive K&R book and mutated into some weird hybrid. It's eventually compiled into highly optimized C code, so you have to have a pretty good handle on C to do anything non-trivial in it. That said, Python can make use of the resulting compiled library as if it were any other C library (i.e. seamlessly) and it can provide massive performance gains for some types of workloads.

Anyway, the first issue was that Pandas was raising an Exception with the message "out of memory" and then immediately core dumping. This was due to a pointer (meant to point to the address of the actual error message in memory) being dereferenced before memory for the error message was allocated. That's a complicated way of saying it was something like the following (in psuedo-C):


struct buffer {
    ...
    char* error_msg; 
};
void parse_buffer(...) {
    ...
    if (make_stream_space(self, ex_fields - fields) < 0) {
        self->error_msg = "out of memory";
        return -1;
    }
}


Every other time error_msg was set, it was preceded by self->error_msg = (char *)malloc(bufsize);, so this is just a case of someone forgetting to allocate memory for the error message before using it. Of course, it would be easy for someone to forget (or not know it was required at all) to allocate the memory for the error message before setting it. I have another GitHub issue open to pre-allocate the error_msg buffer, but for the moment just added in the missing allocation so I could continue.

/ragequit

Now I could successfully get Pandas to raise the "out of memory" exception without crashing. Of course, it should never have been raised in the first place, so it was time to fix the "real" integer overflow issue. The CSV tokenizer for parsing CSVs had an in-memory buffer to hold the data being parsed and the implementation was pretty straightforward. In C, you typically create a dynamically-sized array-like container using a structure that stores a pointer to the start of the memory buffer allocated for the array contents.

Since the length of the container is dynamic, it is initialized with a default size and grows the underlying buffer as necessary. To know when to resize (and what new size to request), you keep track of the current size (how much data has been added) and current total capacity. When data is appended and would cause size > capacity, it’s time to grow (resize) the buffer!

The buffer is resized using realloc(2), which takes two arguments: a void* to point to the currently allocated buffer and a size_t value to represent the desired capacity. Like malloc(2), it returns a void* to the newly allocated region or a null pointer in the case of failure. All of this is rather straightforward. For a buffer that doesn't ever grow past a certain size, everything works fine. However, the offsets mentioned earlier were stored as plain old ints. This proved to be problematic.

Two’s complement, not “two complements”

To understand why, recall that a 32-bit (signed) integer has a maximum value of 2^31, or about 2.1 billion. When talking about a byte array, that equates to 2GB. When the CSV tokenizer’s buffer needed to grow, the current int capacity would be doubled and passed as the desired buffer size to realloc(2). But as we saw, realloc(2) expects the second argument (the desired size) to be of type size_t, which is guaranteed to be unsigned (and, on most modern platforms, at least 64-bits).

Most modern systems represent signed integers using a method called “Two’s complement”. Adding 1 to a signed integer whose current value is 2^31 causes the new value to be negative 2^31 (not 2^32, as one might expect) and is said to have "overflowed". Unsigned integers, like size_t, have a maximum value of 2^32 rather than the 2^31 maximum for signed integers. While signed integers designate the first bit as the "sign bit" to indicate if the following 31-bits should be interpreted as positive or negative, unsigned integers are free to make use of all 32 glorious bits. When the signed integer overflows, the leading bit changes from 0 (positive) to 1 (negative). When that binary value is interpreted as an unsigned type (which is stored using 64 bits rather than 32), the value is very, very large.

So now we know why the error only occurred on large CSV files. Allocations that would grow the buffer > 2 GB would effectively be asking for an enormous amount of memory (which triggered the "out of memory" message). I fixed both issues and a few other minor things, submitted the PR, and was good to go. At the same time, Wes and company were finishing up the fixes on the Arrow side. The fixes were released as part of Arrow 0.5.0. The Pandas fixes would be part of the next release. And they lived happily ever after...

No, they didn't.

Arrow 0.5.0 was a "curious" release, especially from a Python perspective. After Wes published pyarrow 0.5.0 to PyPI, I immediately downloaded it and tested the new Arrow implementation with a patched version of Pandas that included my fixes. There were no longer any error messages about running out of memory. In fact, there were no messages at all! Nothing seemed to be happening with the process despite it reporting 100% CPU utilization.

I used gdb on the running Python interpreter and discovered it was stuck in the jemalloc library. In Arrow 0.5.0, jemalloc became the default memory allocator due to much better performance than the ol' libc allocator. But it looked like this change was causing issues.

Specifically, the code was stuck in a spinlock. Spinlocks are a low-level concept programs use when "I have to acquire a mutex but I expect it to be almost always available or held for a very short amount of time". Rather than trying to acquire the mutex and sleep()ing as is normally done, a spinlock just "spins" through attempts to acquire the mutex without pausing, hence the 100% CPU utilization.

It seems that using jemalloc for pyarrow while Python used the regular libc allocator caused issues between the two. Specifically, it looks like some of jemalloc's internal data structures were being corrupted. pyarrow 0.5.0 was immediately removed from PyPI and we worked on a short and longer-term fix. For the next release, jemalloc would not be used as the default allocator (it could still be requested during compilation, but for pyarrow the library would be compiled without it). In the longer term, I'm working on a fix that makes use of jemalloc prefixes so that the two allocators running in the single Python interpreter process will play nicely (which apparently one is supposed to do when using jemalloc with another allocator, though a lot of other projects have run into this as the documentation is a bit lacking. While investigating I discovered a huge Redis GitHub issue thread that described behavior identical to what we were seeing).

What’s the simplest thing that could possibly take two months of work?

And so, when pyarrow 0.6.0 was officially released a week ago, Enigma finally had the simple, straightforward System-of-Record comprised entirely of Parquet files stored on S3. In a final ironic twist, version 0.6.0 is also the first to support writing dates in the deprecated int96 format, so that issue is solved as well (and Spark changed to use 64-bit integers as of their latest release). Anyway, we'll be decommissioning our Postgres instances soon and are well positioned to support the direction the business is headed. If only every major architectural change were so simple...

____________________________________________________________________

Interested in solving some uniquely complex challenges like this? We're hiring.