A Map-reduce Example in Ray

Introduction

Last fall, we had several blog posts on Ray, a new distributed infrastructure, focused on machine learning and data engineering. In this blog post, I explore a map-reduce example to use Ray with large scale applications. Although Ray’s documentation already has a simple map-reduce example, I want to look at a more complex problem to better understand what Ray’s capabilities are.

The map-reduce algorithm is one of the most famous tools for processing large data sets. It is in the heart of Hadoop, Spark, and many big data databases. The basic problem that a distributed map-reduce solves is aggregation over a collection of records where the keys are arbitrarily distributed across a collection of files. In the map phase, the files are read in parallel and partial sums/counts are kept for each key encountered by a mapper. The mappers then use hashing to redistribute the partial sums/counts to reducers in such a way that a given reducer will get all the partial aggregations for that key. The reducers simply perform aggregations of the keys they are sent. The all-to-all “shuffle” between the mapper and reducer stages is what makes map-reduce challenging to implement in a performant manner.

The code for this article is available on GitHub here: https://github.com/BenedatLLC/ray-tutorial/tree/main/map-reduce.

Problem Statement

You can download the English edition of Wikipedia as a single large XML file that is about 80 gigabytes uncompressed. I want to scan this file, extract all the outgoing inter-article links, and build a list of articles with incoming reference counts, sorted in descending order by references.

Implementation

The implementations do not parse the full XML syntax of the files. Instead, they search for article headings (inside of <title> tags) and references to other articles (inside of double brackets, like [[this]]). The “map” part of map-reduce reads from the dump file and yields a stream of articles and the references they contain. It can drop the article and consider only the references, which each represent one incoming reference for each article targeted.

The “reduce” part of the algorithm maintains a Counter of articles and their reference counts. Each result yielded from the mapper is used to increment the counts of the referenced articles. When the entire dump file has been mapped and reduced, the implementation can sort the (article, count) pairs by count decreasing (highest count first) and article increasing (alphabetical order within a count value) and write the result to a csv file.

Sequential Implementation

For comparison purposes, I include a sequential non-Ray implementation. The sequential implementation is straightforward: the mapper reads one article at a time and the reducer updates the counts for the references from that article. The sort is done at the end.

Parallel Ray-based Implementation

I use Ray actor classes for the map and reduce stages. A Ray actor is a Python class that is defined with a special @ray.remote() decorator. Instances of this class can be started on the Ray cluster. Then, other parts of a Ray application can make remote invocations of the actor’s methods. The internal state of the class is preserved across calls. Ray also has tasks, which are stateless remotely-callable functions. See my previous blog post for a little more discussion about actors and tasks.

The parallel map-reduce implementation instantiates multiple copies of the map and reduce actors, allowing the program to take advantage of all the cores available across a Ray cluster. I use a “pull” design, where the sort stage requests values from the reducers, which in turn request values from the mappers. This is depicted in the above diagram, where the arrows represent the direction of requests. The data then flows in the opposite direction.

A copy of the Wikipedia dump is available on each of the worker nodes. I divide the input file into “blocks”, so that the mappers can parallelize the reading and processing of the file. Since a given block might start at an arbitrary point in the file, the mapper reads until it encounters the first <title> tag. Then, it processes lines until it reaches the end of the block. This will likely include multiple articles in the dataset. At the end of the block, it keeps reading until it encounters the first <title> of the next block. This ensures that it reads the full content of the last article in its block.

In this pull design, a mapper begins reading its block when it receives its first call from a reducer. The mapper maintains a Counter object for each reducer, mapping article titles to counts. The reducer for a given article title is determined via a simple hash function on the article name. After completing its reading of the input block, the mapper returns the counter object for the requesting reducer. Subsequent calls to the mapper return the associated counters, which are cached in the mapper actor’s state.

Each reducer returns to the sorter a pre-sorted Pandas data frame with its associated articles and reference counts. The sorter combines these data frames, sorts the combined data and returns it to the driver program. The driver then writes the dataframe to the local filesystem as a CSV file.

Push Implementation

The source repository also contains a second implementation of the parallel map-reduce that uses a “push” design — mappers initiate the process and send batches of data to the reducers. When the reducers are complete, they send data to sorters. This design has a lower maximum memory footprint than the pull design and is slightly faster. However, it is more complex.

Worker Allocation and Placement Groups

I ran a map actor and a reduce actor for each worker CPU in the cluster. Ideally, these are evenly distributed across the nodes in the cluster, to better parallelize the storage access and compute, and to minimize the remote messages that must be sent by Ray. I found the default distribution of actors to Ray workers was imbalanced, causing significant differences in the total run time. To address this, I assigned a half a CPU to each actor in its @ray.remote() decorator and used placement groups. These allowed me to perform a round-robin distribution of my actors across the available CPUs. This is done automatically by the driver program, so the user does not have to worry about sizing and actor placement.

Evaluation

I ran the map reduce implementations on a small Ray cluster. It has a Ray head node with 8 CPU cores and two worker nodes with 16 CPU cores each. To keep things simple, I do not run any Ray workers on the head node, which has slower CPUs and storage. For a fair comparison, the sequential (non-Ray) implementation was run on one of the faster worker nodes. The parallel implementations used 32 mapper actors and 32 reducer actors.

The Wikipedia dump contained 21,704,402 articles with a total of 229,395,013 inter-article references. The most popular article is “United States”, with 334,886 inbound references.

To evaluate performance, I ran each scenario three times and took the mean and standard deviation of the elapsed run time. Here are the results:

ImplementationRuntime (s)StdDev (s)
Sequential1065.74.9
Pull design with placement groups179.70.7
Pull design without placement groups208.726.8
Push design with placement groups168.70.6

As you can see, the Ray pull implementation is significantly faster, at about one sixth the execution time of the sequential version. Perfect scaling of the parallel implementation is not to be expected, due to the coordination overhead, the communication overhead, and the storage I/O bandwidth limitations. However, in cases like this, it makes sense to just throw all our cluster resources at the problem, and see what we get — the details are handled by Ray!

Leave a Reply

Your email address will not be published.