Easy cluster parallelization with ZeroMQ

TL;DR: I design short wrapper scripts to send batch jobs to a cluster. The first architecture (ventilator-worker-sink) is suboptimal, therefore I consider another pattern to dispatch the jobs efficiently.

Recently I had to make a big chunk of computations for a publication related to my PhD. I spent quite some time designing a machine learning algorithm and I had to test it under various scenarios. Specifically I have one data set to test the algorithm on, but I need to test this algorithm with a different range on a lot of parameters, just to get some insight on which parameters are the most useful and how they interact with one another.

This approach is called "grid search" because we have a number of parameters that all need to be tested on a different range. For example an SVM typically needs two parameters to be tuned (they're called C and sigma) and you can visualize the grid search as a 2D grid: the first axis is C, the second is sigma, and every point of the grid corresponds to a combination (C, sigma) on which the algorithm yields some number representing how well it did, like an accuracy rate. Then you wish to select the best point on the grid, because it corresponds to the best set of parameters to use your algorithm on. (There are other issues at hand like overfitting, making good use of cross-validating etc., but they're out of the scope of this blog post).

In the lab where I work, we have this nice cluster of 20 machines, containing 12 cores each. Soon enough I started to wonder how I could harvest the cluster to get my results faster; after all, running all these computations sequentially on my laptop felt quite laborious.

Turns out that grid search is an obvious instance of an embarrassingly parallel problem, because each run of the algorithm on some set of parameters is completely independent of all others. You could chose to assign one thread per parameter set and run all computations from there, only to gather the result in the end.

In this article we're not going to focus on how to properly conduct grid search, but rather on how you can leverage a cluster architecture in practice to run batch jobs. We're going to use the popular ZeroMQ library to make our different components communicate. As you'll see, this is really a child's game! And if you need a cluster for your work, you're probably doing more difficult work in the first place, so this will be a piece of cake.

Why ZMQ

The choice of ZMQ was largely motivated by those factors:

  • flexible and powerful building blocks
  • easy to learn
  • easy to setup and get it rolling
  • has an API in your programming language of choice

The ZMQ guide is really an awesome read and I deeply recommend it to anyone even remotely interested in communication systems. The philosophy is quite enlightening and I believe it changes the way you think about distributed systems even if you end up using something else. I'm talking about the same type of enlightenment that you get when learning your first functional language, for example.

In the guide, we learn that the basic building blocks are sockets, but not in the traditional sense of Unix sockets. They're more high-level and allow for special patterns of communications. Quoting the guide:

The built-in core ZeroMQ patterns are:

  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
  • Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with "normal" pairs of sockets.

There are more building blocks and more patterns available, but for our simple workflow, not needing any kind of reliability, distribution or fallback mechanism, this will be more than enough.

Formalize the problem

Succinctly, I would describe my grid search problem as the following:

I have this expensive function \(f(p)\) (it can be a shell command) and I want the result of \(f\) applied to a lot of parameters (\(p_1\), \(p_2\), ..., \(p_n\)). My function \(f\) is free of side-effect and thus I don't care in which order the results arrive, I just want them all.

Here each \(p_i\) is to be understood as a "parameter tuple" that can actually span multiple parameter within it. With the SVM example described earlier, we could have \(p_1=(C=0.1, \sigma=0.1)\) for example.

Find out the architecture

We could design the communication system as a set of several workers per machines. The work would be distributed in a pipeline fashion (see "pipeline" in the ZMQ guide). Each piece of work is received by only one client, which processes it then sends the result to a fixed machine which gathers all results. In this case the "ventilator" sending the work and the "sink" retrieving results will be the same machine.

Spoiler alert: this architecture might seem good but we'll see in a minute that it is actually flawed. However we'll still implement it and try to understand what's wrong later.

Implement it.

The C++ number crunching binary

My core algorithm is a C++ executable to be run on the command line with different options, indicating both the dataset path and the algorithm's specific values of parameters for this run. The results is structured as a JSON document, written to the standard output.

For the sake of the example I'll make a simple skeleton. It does not achieve any meaningful computation, but you get the idea:

/*
 * think.cpp
 */

void hardcore_computation(double p, double q) {
    // Your big number crunching work should go here.
    // Instead we calculate some fake result.
    std::this_thread::sleep_for(std::chrono::seconds(2));
    const double result = std::sin(p + 2*q);

    // Output the result.
    std::cout <<
        "{ \"params\":"                    "\n"
        "  { \"p\": " << p << ","          "\n"
        "    \"q\": " << q << " },"        "\n"
        "  \"result\": " << result << " }" "\n";
}

int main(int argc, const char **argv) {
    // Retrieve parameters for this run.
    assert(argc > 2);
    const double p = std::stod(argv[1]);
    const double q = std::stod(argv[2]);

    // Start the computation.
    hardcore_computation(p, q);

    return 0;
}

(think.cpp)

Just for the intuition, let's launch it and see how it works:

$ g++ -o think think.cpp
$ time ./think 0.1 20
{ "params":
  { "p": 0.1,
    "q": 20 },
  "result": 0.674808 }

real    0m2.002s
user    0m0.000s
sys     0m0.000s

Therefore the computation takes two params p and q. Now we wish to retrieve the results for a (2D) grid of those parameters.

The dispatching ZMQ scripts

At this point I had two choices for the ZMQ socket. A natural way would have been to embed ZMQ into my C++ source, setting up the socket logic on top of my actual computation. However there was a bad smell here; the Unix philosophy "do one thing, do it well" told me that my executable did its job and it would be unwise to bloat it with unrelated communication code.

So I decided instead to write a small Python "worker" script, waiting for a "master" to ask for a computation. Upon receiving the request it will launch the C++ binary with the appropriate parameters, reading the results off stdout and sending them back to the master via the sink socket.

Dispatching jobs with PUSH/PULL.

The first part of the master is the "ventilator". It will send the jobs the the worker. In our case it only consists of giving the parameters, p and q, for the computation.

def ventilator(p_range, q_range):
    # Setup ZMQ socket
    context = zmq.Context()
    sock = context.socket(zmq.PUSH)
    sock.bind("tcp://0.0.0.0:5557")
    time.sleep(0.5) # Give time for all workers to join

    # Iterate over the grid, send each piece of computation to a worker.
    for p in p_range:
        for q in q_range:
            work = { 'p' : p, 'q': q };
            print "sending work p=%d, q=%d..." % (p, q)
            sock.send_json(work)

(ventilatorsink.py)

The second part is the "sink", it will retrieve the results after the workers are done.

def sink(p_range, q_range):
    # Total number of computations.
    n_total = len(p_range) * len(q_range)

    # Setup ZMQ socket.
    context = zmq.Context()
    sock = context.socket(zmq.PULL)
    sock.bind("tcp://0.0.0.0:5558")

    # Accumulate the results until we know all computations are done.
    results = []
    n_processed = 0
    while n_processed < n_total:
        r = sock.recv()
        results.append(r)
        n_processed += 1
    for r in results:
        print r

(ventilatorsink.py)

We'll make those two pieces of code run on a single master machine. I merged them into a single script so that I don't have to repeat p_rangeand q_range across two scripts. Here is the main function which glues them:

if __name__ == "__main__":
    p_range = [pow(10, n) for n in xrange(-6, 6)] # All values for the first parameter
    q_range = [pow(10, n) for n in xrange(-6, 6)] # All values for the second parameter

    if len(sys.argv) < 2:
        print "usage: %s (ventilator | sink)" % sys.argv[0]
    elif sys.argv[1] == 'ventilator':
        ventilator(p_range, q_range)
    elif sys.argv[1] == 'sink':
        sink(p_range, q_range)
    else:
        print "usage: %s (ventilator | sink)" % sys.argv[0]

(ventilatorsink.py)

Finally, the worker script will be deployed on all worker machines:

def worker():
    # Setup ZMQ sockets.
    context = zmq.Context()
    sock_in = context.socket(zmq.PULL)
    sock_in.connect("tcp://192.168.196.1:5557") # IP of master
    sock_out = context.socket(zmq.PUSH)
    sock_out.connect("tcp://192.168.196.1:5558") # IP of master

    while True:
        work = sock_in.recv_json()
        p = work['p']
        q = work['q']
        print "running computation p=%d, q=%d" % (p, q)
        result = run_computation(p, q)
        sock_out.send(result)

# Delegate the hard work to the C++ binary, and retrieve its results.
def run_computation(p, q):
    return subprocess.check_output(["./think", str(p), str(q)])

(worker.py)

Chop chop, get to work

The order of initialization is quite flexible because ZMQ sockets are not TCP sockets and the "server" in the traditional sense of the term can bind to the endpoint after the client has connected. Thus I propose the following order:

  1. Log onto all worker machines and start N_CORES worker scripts
  2. Start the sink on the master
  3. Start the ventilator
  4. Profit!

For step 1, I use clusterssh to start all my workers in a tmux session. It is also possible to run background commands through ssh in order to start the workers from the master machine. It's okay to start the workers only once; if you need to repeat the scenario, do only steps 2-3 (and 4, for that matter).

Let's have a look:

mdup@master:~/cluster_zmq$ time python ventilatorsink.py sink & python ventilatorsink.py ventilator
[1] 7685
sending work p=0.000001, q=0.000001...
sending work p=0.000001, q=0.000010...
[...]
sending work p=100000.000000, q=100000.000000...
=== Results ===
{ "params":
  { "p": 1e-05,
    "q": 0.01 },
  "result": 0.0200087 }

{ "params":
  { "p": 1e-06,
    "q": 10 },
  "result": 0.912946 }

[...]

{ "params":
  { "p": 0.001,
    "q": 0.0001 },
  "result": 0.0012 }

real    0m2.587s
user    0m0.028s
sys     0m0.056s

The output can be read in two steps. First the work is sent from the ventilator. At this point the sink does nothing. Then, the workers return with the results, until they are all done. The sink knows how much calculations must be done, so when it has this exact number of results, it outputs them all at once. This is the second part of the output. Notice the results are out of order, which is not a problem.

Regarding the time, we ran 288 seconds worth of number crunching (144 computations of 2 seconds), all in 2.587 seconds! That's not bad. The 0.5 sec overhead comes from the sleep() which allows time for workers to join. Every computation was done in parallel, and we even had more workers available.

Change to a correct pattern

I told you before that the pattern was not the right choice in this case. Why? Well so far we had the hypothesis that all computations run in the same amount of time. Let's consider now some calculations which can take very different times, some 1 second and some others 120 seconds to run. You say, that's no problem, because the pipeline will dispatch the jobs equally, right?

Wrong.

The PUSH/PULL sockets will dispatch the jobs equally, but not in the sense that you mean. The dispatching logic is very simple: a PUSH is just a round-robin to its PULL sockets. It doesn't care if the receiving socket is ready or not and will just happily block until it is. The consequence is that your dispatching will be stalled on one worker not done, while a lot of others would be ready and willing to accept the job.

The problem with ventilator-workers-sink

The answer to this problem is to use another pattern: REQ/REP. This one is awesome in simplicity: a REQ socket makes a request and gets a response from the REP socket. That's it. The REQ can't switch roles with REP or make two requests before reading the reply.

We'll use the REQ/REP pattern as follows. A worker will notify the master (REQ) when it's available to make a calculation. The master will answer with a chunk of work to be done (REP). When it's finished, the worker will send back its results (REQ) and the master will answer, well, nothing meaningful but it has to reply something, so it'll just send an empty string as a thank you note (REP).

A better architecture where workers explicitly ask for work with REQ/REP. They also send results back with REQ/REP.
def master(p_range, q_range):
    # Setup ZMQ.
    context = zmq.Context()
    sock = context.socket(zmq.REP)
    sock.bind("tcp://0.0.0.0:5557")

    # Generate the json messages for all computations.
    works = generate_works(p_range, q_range)

    # How many calculations are expected?
    n_total = len(p_range) * len(q_range)

    # Loop until all results arrived.
    results = []
    while len(results) < n_total:
        # Receive;
        j = sock.recv_json()

        # First case: worker says "I'm available". Send him some work.
        if j['msg'] == "available":
            send_next_work(sock, works)

        # Second case: worker says "Here's your result". Store it, say thanks.
        elif j['msg'] == "result":
            r = j['result']
            results.append(r)
            send_thanks(sock)

    # Results are all in.
    print "=== Results ==="
    for r in results:
        print r

def generate_works(p_range, q_range):
    # We want to span all (p, q) combinations.
    for p in p_range:
        for q in q_range:
            work = { 'p' : p, 'q': q };
            print "sending work p=%f, q=%f..." % (p, q)
            yield work

def send_next_work(sock, works):
    try:
        work = works.next()
        sock.send_json(work)
    except StopIteration:
        # If no more work is available, we still have to reply something.
        sock.send_json({})

def send_thanks(sock):
    sock.send("") # Nothing more to say actually

if __name__ == "__main__":
    p_range = [pow(10, n) for n in xrange(-6, 6)] # All values for the first parameter
    q_range = [pow(10, n) for n in xrange(-6, 6)] # All values for the second parameter

    master(p_range, q_range)

(master.py)

You'll notice that we ditched the sleep() call, because the REQ/REP pattern is more robust for this use case.

def slave():
    # Setup ZMQ.
    context = zmq.Context()
    sock = context.socket(zmq.REQ)
    sock.connect("tcp://192.168.196.1:5557") # IP of master

    while True:
        # Say we're available.
        sock.send_json({ "msg": "available" })

        # Retrieve work and run the computation.
        work = sock.recv_json()
        if work == {}:
            continue
        p = work['p']
        q = work['q']
        print "running computation p=%d, q=%d" % (p, q)
        result = run_computation(p, q)

        # We have a result, let's inform the master about that, and receive the
        # "thanks".
        sock.send_json({ "msg": "result", "result": result})
        sock.recv()

# Delegate the hard work to the C++ binary and retrieve its results from the
# standard output.
def run_computation(p, q):
    return subprocess.check_output(["./think", str(p), str(q)])

if __name__ == "__main__":
    slave()

(slave.py)

Then again, we can fire up the slaves on the cluster, one per core. Then we start the master:

mdup@master:~/cluster_zmq$ time python master.py
sending work p=0.000001, q=0.000001...
sending work p=0.000001, q=0.000010...
[...]
sending work p=100000.000000, q=100000.000000...
=== Results ===
{ "params":
  { "p": 1e-06,
    "q": 1e-06 },
  "result": 3e-06 }

{ "params":
  { "p": 1e-06,
    "q": 1e-05 },
  "result": 2.1e-05 }

[...]

{ "params":
  { "p": 100000,
    "q": 100000 },
  "result": 0.107064 }


real    0m2.146s
user    0m2.164s
sys     0m0.960s

Simple and beautiful.

The task is a bit easy here because we don't have enough jobs for all workers. Remember, in my cluster I have 20 nodes with 12 cores each, so I need at least 240 jobs to keep them all busy. I'll fire 10,000 just to see how things go:

mdup@master:~/cluster_zmq$ time python master.py
[sending jobs...]
=== Results ===
[printing results...]

real    1m24.446s
user    0m3.444s
sys     0m1.960s

Indeed we had 20,000 seconds worth of computations and we were able to divide that by our available 240 cores, leading to a theoretical elapsed time of 20,000/240=83.3 seconds. We achieved 84.4 seconds so I'd say this is close enough. Well done!

Conclusion

I hope you enjoyed this post and that it gives you some ideas if you ever need to speed up your number crunching applications!

To be clear, the goal here is to make something simple enough that you don't need to care too much about the communications. In a lab setting I don't really need a high flexibility scheme, I just need to get the work done, and ZMQ does that very well. We didn't even scratch the surface of what ZMQ is able to do, though!

If you want to copy the skeleton, head to the GitHub repo.

Further reading: