Just another weblog

Multithreading Spring-Batch using Concurrent API

Posted by damuchinni on March 12, 2009

One of the recurrent task in my job is to create batch programs in Java.

Some are extraction batches (we get data from a DB and write it to an XML file), some are import (from XML to DB), and some are DB treatments only. All these batches are using Spring-Batch Library to do their tasks.

Spring Batch is a well thought-out library, that, among other things, allows you to:

Handle large data by splitting it in small chunks
Restart a batch exactly where it has stopped
Define how many handled chunks per commit steps
Automatic Rollback / Retry in case of error

The diagram below shows a typical execution of a batch:

To simplify things, let’s say a batch is composed of steps, and a Step is a Reader and a Writer communicating together.

At the beginning of the step, the Reader and Writer are opened
The Reader opens a cursor on the DB, using a query.
The Writer creates the XML file to write
Then for each element read by the reader (For each row of the result set, we convert it into a Java Object)
The element read is sent to the Writer
The Writer convert it to XML then write it to the file.
When the Reader returns null, that means no more data to process
The Step tells the Writer to flush all it’s data
The Step closes the Writer and Reader.

Recently, I was asked to optimize a slow extraction batch.

Profiling the code, I see that the Reader must execute 15 SQL requests to get all the data to send to the Writer. As all elements are handled sequentially, that meant 150 000 requests for 10 000 elements ! A quick look at the database status with a batch running showed me that the database was not fully utilized.

I decided to Multi-thread the queries done in the Reader. By default, Spring-Batch does not easily support Multi-threading, and I didn’t want to do much change in the existing code, so I used the famous Java Concurrent API features of Java.

In my multithreaded batch, the Reader don’t execute the queries to get data for the Writer, but instead creates a FutureTask, and adds it to a Multithreaded ExecutorService. This FutureTask is then send to the Writer which ony stores it.

When writer.flush is called, we get the data one by one from each of the FutureTasks. All the threads of the ExecutorService are busy querying the DB to fill the data for the FutureTask. Thus all the reading will be multi-threaded.

Next, let’s deep a little bit into the code:

We create a Callable class that will execute the queries, we suppose we want to read contracts from the DB:
protected class ContractCreator implements Callable
protected long elementId;

public ContractCreator (long elementId)
public Contract call() throws Exception {
// Called by the threads
// Execute all the queries here
// And return the real data object

The reader creates the ExecutorService with a pool of 10 threads:

protected ExecutorService executor=Executors.newFixedThreadPool(10);

Then the reader creates the Callable class for each element to read and submits it to the threads of the ExecutorService.

public Object read (ResultSet rs, int rowNum) throws SQLException {
long elementId=rs.getLong(“ELEMENT_ID”);
// Multithreaded execution
Future result=executor.submit(new ContractCreator (elementId));
return result;

The Future created is sent to the Writer which only stores it.

* We store all the contracts running in parallel here
protected List<Future> contracts=new ArrayList<Future> (10);

public void write(Object output) {
contracts.add((Future) output);

It’s only when flush is called that the Writer tries to get each contract and write them to the XML file:

public void flush() throws FlushFailedException {
for (FuturecontractCreator:contracts)
// Get the data read by the threads
// Will block until the data is avaiable
Contract contract = (Contract) contractCreator.get();
writeContractToFile (contract):

We then have now 10 threads querying the DB in parallel with little efforts ! All the basic thread / synchronization handling stuff is done by the Concurrent API, and the resulting file is exactly the same as with the single threaded example.

I hope this Spring-Batch example allows you to understand better the power of the Concurrent API of Java !


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: