Concurrency in Java

Hey everyone. These past few days, I've been doing a lot of work making some of our processors concurrently work on things. Concurrency is the idea of having your program do many things at once, rather than linearly. The major positive of doing this is a sharp performance gain. For example, if you have 10 tasks that take 1 minute each to accomplish. If you have one person do all these tasks it'll take 10 minutes. If you have 10 equal people do one each, it'll take 1 minute to do all the work. Pretty basic concept.

What's not so basic is how to go about implementing this in your code. Let's say you have a function that does your work to complete this task:

public void doWork()
{
...
}

and you have some Manager Class that is continuously running, looking for something to do.
public void process()
{
while(true)
{
Object work = acquireWork();
if(work != null)
{
doWork();
}
}
}

where acquireWork() will return null if there's no work to do. Pretty standard operations. You give this thing 10 1 minute tasks, it'll take 10 minutes to do them.

Let's use Threading!

Let's split some of this stuff up and use Threading. Threads will spawn a new process that works aside from your current program. When you start up a Thread, your main process will go on its merry way and let the thread do all the work.

To make a process run on a thread, you need your class to implement Runnable, and put all your work in the run() method you must implement.. So, let's do that with our doWork method.

public class WorkerThread implement Runnable
{
private Object work;

public WorkerThread(Object work)
{
this.work = work;
}

public void run()
{
doWork();
}

public void doWork()
{
...
}
}


now, back in our while(true) loop, we can spin off threads to do all the work, rather than getting holding up processor time doing all the work ourselves.
public void process()
{
while(true)
{
Object work = acquireWork();
if(work != null)
{
new Thread(new WorkerThread(work)).start();
}
}
}

the start() method will spin off your thread, while your main processor goes back to find more work for threads to do. You give this thing 10 1 minutes tasks, it'll take around a minute for all those tasks to complete, but it'll take virtually no time for the main processor.

Note! Do not use threads when your task operates on an overall chunk of data. You want to give threads task that can be done completely independent of each other.

Not bad eh? You've got the thing running smokin' fast.. Almost too fast though.. what happens when you get 100 tasks, or 1000 tasks.. or 1,000,000 tasks?! You're going to run out of resources very fast, and eventually your program will probably throw some Out Of Memory Exceptions.

Let's not use Threading

You need to manage this stuff so it doesn't spin out of control. That's where Executors come into play. Executors manage all of your threads in a thread pool. The main class you want to use is the ThreadPoolExecutor which has severals parameters in its constructor. You specify how big your pool is, which type of waiting queue to use, how long to hold onto created threads, and, optionally, can specify a saturation policy.

It's quite customizable, so it can be confusing at first. The Executors class is comprized of static methods with commonly used, pre-built ThreadPoolExecutors for ya.

There's the fixed pool executor, which will hold onto a specific amount of threads and when all those threads are busy it'll queue up the extras. This is the perfect way to throttle the amount of work going on at once. Say you have 10 1 minute tasks, but want only 5 people to work on them (budget reasons, probably). When you get these 10 tasks at once, all 5 will work on the first 5 and when they finish them, they'll grab the next one in the queue. If you get 1000 tasks, 995 of 'em will go straight to the queue.

There's also a cached pool executor, which will grow will the work. It'll create new threads when there's a ton of work, but once things die down, it'll start giving back the resources to keep these threads alive. meh.. There's not much difference in using a cached pool than starting up all the threads yourself.

Even though, the fixed pool bottles necks a lot of the work, it'll save system resources and prevent the program from running out of memory.

Implementing this is pretty simple.

public void process()
{
ExecutorService exec = Executors.newFixedThreadPool(5);
while(true)
{
Object work = acquireWork();
if(work != null)
{
exec.execute(new WorkerThread(work));
}
}
}

It looks pretty indentical to the previous code. Now it'll take about 2 minutes to go through those 10 1 minute tasks, but the program won't die when it gets 1,000,000 tasks.

Customizing the Executor

We can do better than this.

The workers are happy with their tasks, but the main process is going crazy. Every little task that comes along will be taken right away and throw in the queue for the workers. That's not too bad, unless you're doing some sort of transaction service. If you open up a Transaction everytime you acquireWork, then you'll have 1,000 open transaction waiting to be processed sitting in the queue.

You could do away with the waiting pool completely, and never have an open task sitting in the queue. This isn't such a bad idea, but what do you when all your threads are busy, and you give a new task to the executor? You need a Saturation policy that will tell the executor what to do in this case.

Saturation Policies

By default, the Executor uses the Abort Policy, which just throws an exception. Well, we probably don't want to do that. There are two other ones that are available that aren't so great either: DiscardPolicy, which will discard the new task, and DiscardOldestPolicy, which will discard the oldest piece of work that a thread is working on. You defintely don't want to lose work, so discarding is out. (and) Discarding the oldest piece of work? You'd get into a cycle of constantly throwing away half worked on tasks. Nothing would get done except the last remaining pieces of work.

If we're going to use a saturation policy, then we should use the Caller Runs policy, which uses the main process to execute the newest task when all threads are busy. Then by the time the main processor finishes a task, the other threads should be ready to receive more work.. hell if not, the main processor can do the next one too.

This works out pretty well, since when times are busy everyone is jumping in to do the work. The manager actually lend a helping hand ^_^.

There's one big problem with this though. What happens when you have variable task lengths, and the manager ends up getting one that takes far longer to complete than normal? Everything is held up by that one task, and your performance dips very quickly. You should really only use this when you have tasks of predictable uniform completion time.

All of our tasks take 1 minute to complete, so using a Caller Runs policy should be fine.

I should note that the static Executors method return an ExecutorService, not a ThreadPoolExecutor, which actually have the setRejectionHandler that you use to set the saturation policy. So you'll need to work around a little bit.

public void process()
{
ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
while(true)
{
Object work = acquireWork();
if(work != null)
{
exec.execute(new WorkerThread(work));
}
}
}

This is pretty good except for one problem.. When you get six pieces of work, the 5 worker threads will get their work and the manager will work on the 6th. Before the manager finishes task #6, all 5 workers will have finished their tasks, and now must wait for the manager to finish before they can get anything new. Now you're wasting time again.

You can solve this by going back and using the waiting queue. If you have your waiting queue the same size as your thread pool, then in that time when those workers finish their first piece of work, they'll have something new to work on right away. When the manager finishes up his piece of work, the queue should be empty and ready to receive more work. Very streamlined.

Now you'll need to actually make your own ThreadPoolExecutor, rather than using the pre-built one. I start off by just copying the code from the Executors class and tweaking it a little bit.

public void process()
{
ThreadPoolExecutor exec = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(5));
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
while(true)
{
Object work = acquireWork();
if(work != null)
{
exec.execute(new WorkerThread(work));
}
}
}

The LinkedBlockQueue is your waiting pool, and you must set the size in its constructor.I suppose one problem with this setup is.. What if you get 6 tasks, and then another 5 tasks a few seconds later? Then, by the time the manager finishes up with task #6, the 5 workers will already been done and waiting for work. Maybe this isn't the best implementation afterall..

but if you're working with a steady workflow and uniformally timed tasks.. you'll be fine.

Semaphores

Let's do something else.. Let's not use the Caller Runs policy, and let the manager just manage the workflow. Instead of having the manager grabbing an extra task that can't be performed on because everyone is busy, we can halt the process until something is available by using a Semaphore.

A semaphore will have a certain amount of tickets that is acquired by using semaphore.acquire(). If you only give the Semaphore 10 tickets to hand out, you can only run semaphore.acquire() 10 times. On the 11th time, semaphore.acquire() will wait for a ticket to be returned. You return tickets be running semaphore.release().

Implementing this is pretty simple. We want our ticket taker before any processing, and then the threads must return the tickets when they're done with their task.

private Semaphore semaphore = new Semaphore(10);

public void process()
{
ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
while(true)
{
semaphore.acquire();
Object work = acquireWork();
if(work != null)
{
exec.execute(new WorkerThread(work, this));
}
}
}

public void releaseSemaphore()
{
semaphore.release();
}

public class WorkerThread implement Runnable
{
private Object work;
private Manager manager;

public WorkerThread(Object work, Manager manager)
{
this.work = work;
this.manager = manager;
}

public void run()
{
doWork();
manager.releaseSemaphor();
}

public void doWork()
{
...
}
}


Now the manager isn't constantly grabbing tasks when everyone is busy. The semaphore will let him know when there's space free for a new task. We don't need to setup a saturation policy, nor a waiting pool size, because with 5 worker threads and 10 tickets, the waiting queue will never exceed 5 tasks.

This seems like it would work pretty good in all situation. The main processor isn't held up by anything, the worker threads will always have some work to do.

Not too shabby..

Fine-Tuning the Thread Pool

You can still get your processing power performance up just a little bit. When a thread pool gets a task, it doesn't simply run Thread.start on it. The task immediately goes into your blocking queue, and threads will take the task out of there to work on. You have 3 basic types of Block Queues to use.

The LinkedBlockQueue I've found to be the slowest. It uses your typical LinkedList setup. The ArrayBlockingQueue runs faster, by using an Array.

If you don't plan on having any sort of waiting pool, you should definitely use the SynchronousQueue, which doesn't hold on to your tasks at all. It passes them to a worker thread right away, without delay.

There you have it

There you have it.. running concurrent processes will provide you a decent performance gain, but make sure you manage them properly, or else you may be in trouble.

Never use Thread().start.. Use Executors instead.

The information was learning through reading the book Java: Concurrency in Practice, written by Brain Goetz with Tim Peierls, Joshua Block, Joseph Bowbeer, Dave Holmes, and Doug Lea.  Definitely check it out for more information of concurrency ^_^