Computing Magazine

My Custom Thread Pool Executor in Java

Posted on the 27 February 2013 by Abhishek Somani @somaniabhi

ThreadPoolExecutor is a feature added by java concurrent api to maintain and reuse threads efficiently , so that our programs don't have to worry about creating and destroying threads and focus on the core functionality. I have created a custom thread pool executor to get better understanding of how thread pool executor would work .
Functionality :
  • It maintains a fixed thread pool ,and creates threads and start the threads even if no task is submitted whereas ThreadPoolExecutor creates threads on demand , i.e. whenever a runnable is submitted to pool and the number of threads are less than core pool size .
  • In ThreadPoolExecutor, we provide a waiting queue ,where new runnable task waits when all threads are busy running existing task. Once the queue is filled , new threads will be created up to maximum pool size. In MyThreadPool , i am storing the runnable in a linked list , so every task will wait in the list and it is unbounded , so no usage of maxPoolSize in this .
  • In ThreadPoolExecutor , we use Future Objects to get the result from task , future.get() method will block if the result is not available , or we use CompletionService . In MyThreadPoolExecutor , i have created a simple interface called ResultListener , user has to provide a implementation of this as to how he wants the output to be processed . After every task is completed , the ResultListener will get callback with the output of task or error method will be called in case of any exception.
  • When shutdown method is called , MyThreadPoolExecutor will stop accepting new tasks and complete the remaining tasks .
  • I have provided very basic functionality as compared to ThreadPoolExecutor ,  i have used simple thread mechanism like wait() , notify() , notifyAll(), and join().
  • Performance wise it is similar to ThreadPoolExecutor , some times better in some cases. Do let me know if you find any interesting results or ways to improve it .   

package com.util;

import java.util.concurrent.Callable;

/**
 * Run submitted task of {@link MyThreadPool} After running the task , It calls
 * on {@link ResultListener}object with {@link Output}which contains returned
 * result of {@link Callable}task. Waits if the pool is empty.
 * 
 * @author abhishek
 * 
 * @param 
 */

import java.util.concurrent.Callable;
/**
* Run submitted task of {@link MyThreadPool} After running the task , It calls
* on {@link ResultListener}object with {@link Output}which contains returned
* result of {@link Callable}task. Waits if the pool is empty.
*
* @author abhishek
*
* @param <V>
*/
public class MyThread<V> extends Thread {
    /**
    * MyThreadPool object, from which the task to be run
    */
    private MyThreadPool<V> pool;
    private boolean active = true;
    public boolean isActive() {
        return active;
    }
    public void setPool(MyThreadPool<V> p) {
        pool = p;
    }
    /**
    * Checks if there are any unfinished tasks left. if there are , then runs
    * the task and call back with output on resultListner Waits if there are no
    * tasks available to run If shutDown is called on MyThreadPool, all waiting
    * threads will exit and all running threads will exit after finishing the
    * task
    */
    public void run() {
        ResultListener<V> result = pool.getResultListener();
        Callable<V> task;
        while (true)
        {
            task = pool.removeFromQueue();
            if (task != null)
            {
                try
                {
                    V output = task.call();
                    result.finish(output);
                } catch (Exception e)
                {
                    result.error(e);
                }
            } else
            {
                if (!isActive())
                break;
                else
                {
                    synchronized (pool.getWaitLock())
                    {
                        try
                        {
                            pool.getWaitLock().wait();
                        } catch (InterruptedException e)
                        {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    void shutdown() {
        active = false;
    }
}

package com.util;
import java.util.LinkedList;
import java.util.concurrent.Callable;
/**
* This class is used to execute submitted {@link Callable} tasks. this class
* creates and manages fixed number of threads User will provide a
* {@link ResultListener}object in order to get the Result of submitted task
*
* @author abhishek
*
*
*/
public class MyThreadPool<V> {
    private Object waitLock = new Object();
    public Object getWaitLock() {
        return waitLock;
    }
    /**
    * list of threads for completing submitted tasks
    */
    private final LinkedList<MyThread<V>> threads;
    /**
    * submitted task will be kept in this list untill they run by one of
    * threads in pool
    */
    private final LinkedList<Callable<V>> tasks;
    /**
    * shutDown flag to shut Down service
    */
    private volatile boolean shutDown;
    /**
    * ResultListener to get back the result of submitted tasks
    */
    private ResultListener<V> resultListener;
    /**
    * initializes the threadPool by starting the threads threads will wait till
    * tasks are not submitted
    *
    * @param size
    * Number of threads to be created and maintained in pool
    * @param myResultListener
    * ResultListener to get back result
    */
    public MyThreadPool(int size, ResultListener<V> myResultListener) {
        tasks = new LinkedList<Callable<V>>();
        threads = new LinkedList<MyThread<V>>();
        shutDown = false;
        resultListener = myResultListener;
        for (int i = 0; i < size; i++) {
            MyThread<V> myThread = new MyThread<V>();
            myThread.setPool(this);
            threads.add(myThread);
            myThread.start();
        }
    }
    public ResultListener<V> getResultListener() {
        return resultListener;
    }
    public void setResultListener(ResultListener<V> resultListener) {
        this.resultListener = resultListener;
    }
    public boolean isShutDown() {
        return shutDown;
    }
    public int getThreadPoolSize() {
        return threads.size();
    }
    public synchronized Callable<V> removeFromQueue() {
        return tasks.poll();
    }
    public synchronized void addToTasks(Callable<V> callable) {
        tasks.add(callable);
    }
    /**
    * submits the task to threadPool. will not accept any new task if shutDown
    * is called Adds the task to the list and notify any waiting threads
    *
    * @param callable
    */
    public void submit(Callable<V> callable) {
        if (!shutDown) {
            addToTasks(callable);
            synchronized (this.waitLock) {
                waitLock.notify();
            }
            } else {
            System.out.println("task is rejected.. Pool shutDown executed");
        }
    }
    /**
    * Initiates a shutdown in which previously submitted tasks are executed,
    * but no new tasks will be accepted. Waits if there are unfinished tasks
    * remaining
    *
    */
    public void stop() {
        for (MyThread<V> mythread : threads) {
            mythread.shutdown();
        }
        synchronized (this.waitLock) {
            waitLock.notifyAll();
        }
        for (MyThread<V> mythread : threads) {
            try {
                mythread.join();
                } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

package com.util;

/**
 * This interface imposes finish method 
 * which is used to get the {@link Output} object 
 * of finished task
 * @author abhishek
 *
 * @param 
 */

public interface ResultListener  {
 
 public void finish(T obj);
 public void error(Exception ex);
 
}
you can implement this class as you want to get back and process the result returned by tasks.
package com.util;

public class DefaultResultListener implements ResultListener{

 @Override
 public void finish(Object obj) {
  
 }

 @Override
 public void error(Exception ex) {
  ex.printStackTrace();
 }

}


For example this class will add the number returned by tasks .
package com.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * ResultListener class to keep track of total matched count
 * @author abhishek
 * 
 * @param 
 */
public class MatchedCountResultListener implements ResultListener {

	/**
	 * matchedCount to keep track of the number of matches returned by submitted
	 * task
	 */
	AtomicInteger matchedCount = new AtomicInteger();

	/**
	 * this method is called by ThreadPool to give back the result of callable
	 * task. if the task completed successfully then increment the matchedCount by
	 * result count
	 */
	@Override
	public void finish(V obj) {
		//System.out.println("count is "+obj);
		matchedCount.addAndGet((Integer)obj);
	}

	/**
	 * print exception thrown in running the task
	 */
	@Override
	public void error(Exception ex) {
		ex.printStackTrace();
	}
	
	
	/**
	 * returns the final matched count of all the finished tasks
	 * 
	 * @return
	 */
	public int getFinalCount() {
		return matchedCount.get();
	}
}

This is a test class which runs simple for loop using CompletionService and MyThreadPoolExecutor
package test;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.util.DefaultResultListener;
import com.util.MyThreadPool;

public class TestClass {
	
	public static void main(String[] args) throws InterruptedException {
		CompletionService threadService;
		ExecutorService service = Executors.newFixedThreadPool(2);
		threadService = new ExecutorCompletionService(service);
		
		long b = System.currentTimeMillis();
		for(int i =0;i<50000;i++){
			threadService.submit(new MyRunable (i));
		}
		
		service.shutdown();
		System.out.println("time taken by Completion Service " + (System.currentTimeMillis()-b));
		
		
		DefaultResultListener result = new DefaultResultListener();
		MyThreadPool newPool = new MyThreadPool(2,result);
		long a = System.currentTimeMillis();
		
		int cc =0;
		for(int i =0;i<50000;i++)
		{
			cc = cc+i;
		}
		System.out.println("time taken without any pool " + (System.currentTimeMillis()-a));
		a= System.currentTimeMillis();
		
		for(int i =0;i<5000;i++){
			newPool.submit(new MyRunable (i));
		}
		
		newPool.stop();
		System.out.println("time taken by myThreadPool " + (System.currentTimeMillis()-a));
	}
	
	
	

}

class MyRunable implements Callable
{
	int index = -1;
	public MyRunable(int index)
	{
		this.index = index;
	}
	@Override
	public Integer call() throws Exception {
		return index;
	}
	
}

You Might Also Like :

Back to Featured Articles on Logo Paperblog

These articles might interest you :

  • Rewind: It’s the 90s

    Rewind: It’s

    It seems that the 1990s are everywhere right now. What’s old is new again and all that. The last concert of the summer, I took it to another level when I... Read more

    The 25 September 2018 by   Irene Gomes
    FASHION, LIFESTYLE
  • Thomas Land | Drayton Manor Park

    Thomas Land Drayton Manor Park

    If living near a theme park was in your criteria for buying a house, then our new house would certainly meet that need. Living about a 3 minute drive from... Read more

    The 25 September 2018 by   Thefoodiecoupleblog
    FOOD & DRINK, RECIPES
  • Your Eye Drops Can Kill You

    Your Drops Kill

    Anything can be a poison, it all depends on the dose. This includes the drops you use to clear your eyes. The active, and dangerous, ingredient in many of... Read more

    The 25 September 2018 by   Dplylemd
    BOOKS, CULTURE, HEALTH, MEDICINE
  • The Wood Brothers: "River Takes the Town" & "Happiness Jones" Dutch TV Live...

    Wood Brothers: "River Takes Town" "Happiness Jones" Dutch Live Videos

    Watch roots trio The Wood Brothers perform River Takes the Town and Happiness Jones two tracks from their latest album One Drop Of Truth for Dutch music show... Read more

    The 25 September 2018 by   Hctf
    ENTERTAINMENT, MUSIC
  • Opera Review: Falling Down

    Opera Review: Falling Down

    The Met opens with a disastrous Samson et Dalila. by Paul J. Pelkonen A world of toil: Robert Alagna does hard time in Samson et Dalila. Photo by Ken Howard ©... Read more

    The 25 September 2018 by   Superconductor
    CULTURE, THEATRE & OPERA
  • Could Neanderthals Speak? Implications of FOXP2

    FOXP2 is one of the most famous genes out there; notable for containing two mutations linked to language in humans. These mutations are also in Neanderthals,... Read more

    The 25 September 2018 by   Reprieve
    BIOLOGY, SCIENCE
  • Diet Doctor Podcast #3 – Dr. Jeffry Gerber and Ivor Cummins

    Ivor Cummins: Great to be here, Bret. Dr. Jeffry Gerber: Thanks, Bret. Bret: The first thing I want to talk to you about is I learned from you guys you have t... Read more

    The 25 September 2018 by   Dietdoctor
    DIET & WEIGHT, HEALTH, HEALTHY LIVING, MEDICINE

Magazines