Mesos, Cassandra, Spark, etc.

A hobo stove. (Vagrant. Hobo. Get it?) Photo Credit: Flickr / Creative Commons

A hobo stove. (Vagrant. Hobo. Get it?) Photo Credit: Flickr / Creative Commons

Over the weekend I developed a Vagrant setup that installed a working one-node Mesos installation, with Zookeeper, with Cassandra and Spark frameworks, based on Ubuntu 14.04 LTS (Trusty Tahr) 64-bit. Docker container support is also enabled. This was not super-easy to get working, and I went down some blind alleys. There are still a few outstanding issues.

  • I couldn't get Mesos to build at all until I raised the VM's memory allocation to 2GB. This seems obvious in hindsight, but it would have saved me some time if the docs had indicated a RAM requirement for the build step.
  • I couldn't get Mesos to work under OpenJDK 7. The build completes successfully, but the tests fail. The docs, which seem a little out of date, ask you to use OpenJDK 6 -- maybe that's for some good reason? I had always heard that OpenJDK was not very performant, but it seems that these days it is mostly the same as Oracle's JDK except for a few proprietary details, mostly having to do with web browser integration.
  • It wasn't super clear to me which version of Spark to download; Spark has to be built for particular Hadoop versions.  I guessed that the version "pre-built for Hadoop 1.x" would be what I needed, because I'd seen some documents indicating that Mesos and Hadoop 2.x do not play well together. It seemed to work.
  • I had some difficulties getting Spark Shell to recognize my environment variables; in the end I got it working by making a symbolic link from where it expected libmesos.so to be, and where it actually was.
  • Mesos and Spark both seem to assume that you're going to use HDFS alongside Mesos, but neither of them install HDFS for you. I didn't get around to that because frankly I don't (yet) understand how all the various components of HDFS work.

I also got most of the same setup working on Ubuntu 12.04 LTS (Precise Pangolin) 64-bit but I would prefer to standardize on 14.04 if possible. Precise is over two years old now. I don't have any religious attachment to Ubuntu, it's just what I am familiar with. I've used LTS releases of Ubuntu in production for over 8 years now.

I realize that a one-node VM with all this stuff is not very realistic, compared to the multi-node, many-core, many-GB-of-RAM, SSD machines you would want to deploy on for production. But certain elements of the Ancho infrastructure will run on this, and it is important to me -- and to hypothetical future project contributors -- to be able to quickly replicate a development environment, on a typical developer's laptop.

Once I get the kinks worked out, I would probably publish this as a ready-to-use Vagrant box, because it takes a non-trivial amount of time for all this stuff to build.

Ancho Architecture Plan

In trying to get Ancho's wheels turning again, I've spent some time in the last week trying to figure out how to translate my concepts into running software. As you may recall, one of Ancho's design goals is to allow fairly large models to run faster by running them in parallel on a cluster.

These are all just my perceptions, so if you're reading this and I got something completely wrong, please let me know.

Resource Manager

Mesos is a project of the Berkeley AMPLab, part of their so-called BDAS (pronounced "bad-ass") stack. Its purpose is to allow multiple compute scheduling frameworks to share the same computing cluster without having to statically partition nodes to specific frameworks. The other alternative here was YARN, which is the resource manager used in most Hadoop distributions. I read some academic papers on Mesos, as well as for Google's Omega scheduler.

The Omega paper helped me understand the architectural differences between Mesos and YARN.  Mesos listens for announcements that computing slots (units CPU & RAM) are available, offers them to running frameworks based on policies you can set, and the frameworks can accept or reject the available resources. This has some drawbacks, as identified in the Omega paper, but it generally works better in multi-framework environments than YARN's philosophy of centralizing control over what runs where. It's easier in practice for each framework to figure out what it wants or doesn't want.

So, I've chosen Mesos as my preferred underlying resource manager.

Hadoop Ecosystem vs. BDAS

The choices here were basically the Hadoop ecosystem (MapReduce, HDFS, Hive, Pig, Mahout, etc) or something else. In all honesty, I have always had trouble understanding how the various parts of Hadoop fit together. There are so many moving parts that I would want to use a curated distribution (Cloudera, Hortonworks or MapR) and Hortonworks is the only distro that's fully open.

The Hadoop distributions are all YARN-based. Mesos makes more sense to me conceptually, and if Ancho will need to be a framework running on top of the resource manager, I need to program against something I understand.  Also, Mesos explicitly supports tasks that run in Docker-based executors. I like Docker, I feel like I understand what it's doing, and I want to use it for sandboxing user-written model code, so this has a lot of appeal.

BDAS isn't quite as neatly packaged, but it also seems like it's easier to put together just the components that I need without needing a Master's degree in compiler theory.

Distributed Storage

A long time ago I decided that of all the NoSQL databases, I prefer Cassandra. As above, I feel like I understand what it's doing, and its data model fits well with what Ancho models will generate and then need to analyze. Cassandra is essentially a very large multi-level hashtable. It makes more sense to me than trying to use something with file semantics like HDFS, even if database semantics are added on top of it with a layer like Hive. Also, it seems like it should have better performance for my use cases.

Distributed Computing

Apache Spark seems like the way to go on this, for several reasons. It's fast, it's directly integrated with Mesos, it can read and write directly to Cassandra, and its Python API is a first-class Spark citizen. I feel like I understand the "Resilient Distributed Dataset" (RDD) abstraction that forms the core of Spark.

On top of Spark, their machine learning library MLlib seems to offer a lot of the functions I will need for computing summary statistics after an Ancho model has run. (It also says that it interoperates with NumPy, although I haven't seen any details about this.) It would be nice to have access to Pandas for this, but Pandas is not designed to run across distributed data as far as I know, and SparklingPandas (Pandas on top of Spark) doesn't seem to be very actively developed.

Of the things I can imagine needing a distributed computing framework for, Spark can do it all.

Architecture

So, my architecture diagram ends up looking something like this:

Diagram generated with Draw.io

Diagram generated with Draw.io

User Story

The usage scenario would run something like this:

  • Users author their models in Python against an Ancho API. Development, testing and small scale runs of the model can be done locally without any cluster infrastructure.
  • To run the model in a cluster, the user would submit the model to an Ancho Cluster (running atop Mesos as a framework) by one of two means:
    • Submitting the needed parameters by POSTing a JSON file (or something similar) to a REST-like interface. There would probably be a command line tool for this, but the API would allow jobs to be submitted by an automated process.
    • Logging into a web site that allows users to submit jobs, monitor their execution, and get the results.
  • Parameters submitted to the Ancho Cluster for a model run would include:
    • Location of your model code -- Ancho can accept a tarball, or check it out of a source control repository, or pull it from S3, or whatever.
    • Any parameters you need to submit to your Run: starting values, number of sequences to run, target values, etc.
  • Once the Run is submitted, the Ancho Cluster framework would start one or more docker executors across the Mesos cluster to run your model. Those executors would download the model, install any Python packages required (as specified in pip-requirements.txt) and begin running Sequences. After each Sequence completes, it reports its results back to the Ancho framework (and stored in Cassandra), which eventually declares the Run complete and decommissions most of the executors.
  • A few remaining executors will be told to generate the summary statistics about the Run using Spark/MLlib. I originally considered having the framework do the summary work, but this would make it impossible for users to define their own summary functions. I think that Cassandra's security model will allow me to restrict user code to accessing just the user's own data (only certain keyspaces/tables), but I may need to revisit this.
  • The final results will be exposed as a data structure to users of the RESTy interface. Web users could be presented something more interactive, such as an IPython session with the result data preloaded, or something prettier and easier to digest, such as a PDF generated via ReportLab.

MapReduce and NumPy?

This is a question, but it's a long question and I'm hoping someone out there might know the answer. I'm looking for something equivalent to NumPy/SciPy that is implemented in a Map/Reduce fashion -- or a better way to conceive this problem.

Say, for instance, that you have a very large array of floating point numbers, and you want to compute summary statistics like standard deviation, skewness and kurtosis. It might fit in memory on modern machines, and it would definitely fit on disk and you could work with it using numpy.memmap, which is is basically an array-like object that is backed by a file on disk. However, the process that generated this very large array was distributed across several nodes, so the data isn't all in one place. So basically we've mapped our function, and we need a reduce step that produces the needed result. Preferably, one that doesn't require copying all the data around between nodes.

Sure, you could copy all the data into one place and do the reduction, but it seems like it would be drastically more efficient to use the distributed infrastructure to solve this problem. Do some of the work at each node, and then do a final step to aggregate their work. The problem is that I don't want to re-implement all these functions myself if someone else has already done it, because I will probably do it badly compared to someone that really understands the math.

So, does a fork or subproject of numpy/scipy exist that can do functions on parts of arrays and then combine them? Does a map/reduce-aware implementation of numpy exist?

UPDATE

Brian Hicks suggested using mrjob or Sparkling Pandas. The Pandas-on-Spark project might do it, but I think I'm going to sink some time into learning Spark's MLlib, which seems like it would do what I need.