Computing Magazine

My Custom Thread Pool Executor in Java

Posted on the 27 February 2013 by Abhishek Somani @somaniabhi

ThreadPoolExecutor is a feature added by java concurrent api to maintain and reuse threads efficiently , so that our programs don't have to worry about creating and destroying threads and focus on the core functionality. I have created a custom thread pool executor to get better understanding of how thread pool executor would work .
Functionality :
  • It maintains a fixed thread pool ,and creates threads and start the threads even if no task is submitted whereas ThreadPoolExecutor creates threads on demand , i.e. whenever a runnable is submitted to pool and the number of threads are less than core pool size .
  • In ThreadPoolExecutor, we provide a waiting queue ,where new runnable task waits when all threads are busy running existing task. Once the queue is filled , new threads will be created up to maximum pool size. In MyThreadPool , i am storing the runnable in a linked list , so every task will wait in the list and it is unbounded , so no usage of maxPoolSize in this .
  • In ThreadPoolExecutor , we use Future Objects to get the result from task , future.get() method will block if the result is not available , or we use CompletionService . In MyThreadPoolExecutor , i have created a simple interface called ResultListener , user has to provide a implementation of this as to how he wants the output to be processed . After every task is completed , the ResultListener will get callback with the output of task or error method will be called in case of any exception.
  • When shutdown method is called , MyThreadPoolExecutor will stop accepting new tasks and complete the remaining tasks .
  • I have provided very basic functionality as compared to ThreadPoolExecutor ,  i have used simple thread mechanism like wait() , notify() , notifyAll(), and join().
  • Performance wise it is similar to ThreadPoolExecutor , some times better in some cases. Do let me know if you find any interesting results or ways to improve it .   

package com.util;

import java.util.concurrent.Callable;

/**
 * Run submitted task of {@link MyThreadPool} After running the task , It calls
 * on {@link ResultListener}object with {@link Output}which contains returned
 * result of {@link Callable}task. Waits if the pool is empty.
 * 
 * @author abhishek
 * 
 * @param 
 */

import java.util.concurrent.Callable;
/**
* Run submitted task of {@link MyThreadPool} After running the task , It calls
* on {@link ResultListener}object with {@link Output}which contains returned
* result of {@link Callable}task. Waits if the pool is empty.
*
* @author abhishek
*
* @param <V>
*/
public class MyThread<V> extends Thread {
    /**
    * MyThreadPool object, from which the task to be run
    */
    private MyThreadPool<V> pool;
    private boolean active = true;
    public boolean isActive() {
        return active;
    }
    public void setPool(MyThreadPool<V> p) {
        pool = p;
    }
    /**
    * Checks if there are any unfinished tasks left. if there are , then runs
    * the task and call back with output on resultListner Waits if there are no
    * tasks available to run If shutDown is called on MyThreadPool, all waiting
    * threads will exit and all running threads will exit after finishing the
    * task
    */
    public void run() {
        ResultListener<V> result = pool.getResultListener();
        Callable<V> task;
        while (true)
        {
            task = pool.removeFromQueue();
            if (task != null)
            {
                try
                {
                    V output = task.call();
                    result.finish(output);
                } catch (Exception e)
                {
                    result.error(e);
                }
            } else
            {
                if (!isActive())
                break;
                else
                {
                    synchronized (pool.getWaitLock())
                    {
                        try
                        {
                            pool.getWaitLock().wait();
                        } catch (InterruptedException e)
                        {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    void shutdown() {
        active = false;
    }
}

package com.util;
import java.util.LinkedList;
import java.util.concurrent.Callable;
/**
* This class is used to execute submitted {@link Callable} tasks. this class
* creates and manages fixed number of threads User will provide a
* {@link ResultListener}object in order to get the Result of submitted task
*
* @author abhishek
*
*
*/
public class MyThreadPool<V> {
    private Object waitLock = new Object();
    public Object getWaitLock() {
        return waitLock;
    }
    /**
    * list of threads for completing submitted tasks
    */
    private final LinkedList<MyThread<V>> threads;
    /**
    * submitted task will be kept in this list untill they run by one of
    * threads in pool
    */
    private final LinkedList<Callable<V>> tasks;
    /**
    * shutDown flag to shut Down service
    */
    private volatile boolean shutDown;
    /**
    * ResultListener to get back the result of submitted tasks
    */
    private ResultListener<V> resultListener;
    /**
    * initializes the threadPool by starting the threads threads will wait till
    * tasks are not submitted
    *
    * @param size
    * Number of threads to be created and maintained in pool
    * @param myResultListener
    * ResultListener to get back result
    */
    public MyThreadPool(int size, ResultListener<V> myResultListener) {
        tasks = new LinkedList<Callable<V>>();
        threads = new LinkedList<MyThread<V>>();
        shutDown = false;
        resultListener = myResultListener;
        for (int i = 0; i < size; i++) {
            MyThread<V> myThread = new MyThread<V>();
            myThread.setPool(this);
            threads.add(myThread);
            myThread.start();
        }
    }
    public ResultListener<V> getResultListener() {
        return resultListener;
    }
    public void setResultListener(ResultListener<V> resultListener) {
        this.resultListener = resultListener;
    }
    public boolean isShutDown() {
        return shutDown;
    }
    public int getThreadPoolSize() {
        return threads.size();
    }
    public synchronized Callable<V> removeFromQueue() {
        return tasks.poll();
    }
    public synchronized void addToTasks(Callable<V> callable) {
        tasks.add(callable);
    }
    /**
    * submits the task to threadPool. will not accept any new task if shutDown
    * is called Adds the task to the list and notify any waiting threads
    *
    * @param callable
    */
    public void submit(Callable<V> callable) {
        if (!shutDown) {
            addToTasks(callable);
            synchronized (this.waitLock) {
                waitLock.notify();
            }
            } else {
            System.out.println("task is rejected.. Pool shutDown executed");
        }
    }
    /**
    * Initiates a shutdown in which previously submitted tasks are executed,
    * but no new tasks will be accepted. Waits if there are unfinished tasks
    * remaining
    *
    */
    public void stop() {
        for (MyThread<V> mythread : threads) {
            mythread.shutdown();
        }
        synchronized (this.waitLock) {
            waitLock.notifyAll();
        }
        for (MyThread<V> mythread : threads) {
            try {
                mythread.join();
                } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

package com.util;

/**
 * This interface imposes finish method 
 * which is used to get the {@link Output} object 
 * of finished task
 * @author abhishek
 *
 * @param 
 */

public interface ResultListener  {
 
 public void finish(T obj);
 public void error(Exception ex);
 
}
you can implement this class as you want to get back and process the result returned by tasks.
package com.util;

public class DefaultResultListener implements ResultListener{

 @Override
 public void finish(Object obj) {
  
 }

 @Override
 public void error(Exception ex) {
  ex.printStackTrace();
 }

}


For example this class will add the number returned by tasks .
package com.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * ResultListener class to keep track of total matched count
 * @author abhishek
 * 
 * @param 
 */
public class MatchedCountResultListener implements ResultListener {

	/**
	 * matchedCount to keep track of the number of matches returned by submitted
	 * task
	 */
	AtomicInteger matchedCount = new AtomicInteger();

	/**
	 * this method is called by ThreadPool to give back the result of callable
	 * task. if the task completed successfully then increment the matchedCount by
	 * result count
	 */
	@Override
	public void finish(V obj) {
		//System.out.println("count is "+obj);
		matchedCount.addAndGet((Integer)obj);
	}

	/**
	 * print exception thrown in running the task
	 */
	@Override
	public void error(Exception ex) {
		ex.printStackTrace();
	}
	
	
	/**
	 * returns the final matched count of all the finished tasks
	 * 
	 * @return
	 */
	public int getFinalCount() {
		return matchedCount.get();
	}
}

This is a test class which runs simple for loop using CompletionService and MyThreadPoolExecutor
package test;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.util.DefaultResultListener;
import com.util.MyThreadPool;

public class TestClass {
	
	public static void main(String[] args) throws InterruptedException {
		CompletionService threadService;
		ExecutorService service = Executors.newFixedThreadPool(2);
		threadService = new ExecutorCompletionService(service);
		
		long b = System.currentTimeMillis();
		for(int i =0;i<50000;i++){
			threadService.submit(new MyRunable (i));
		}
		
		service.shutdown();
		System.out.println("time taken by Completion Service " + (System.currentTimeMillis()-b));
		
		
		DefaultResultListener result = new DefaultResultListener();
		MyThreadPool newPool = new MyThreadPool(2,result);
		long a = System.currentTimeMillis();
		
		int cc =0;
		for(int i =0;i<50000;i++)
		{
			cc = cc+i;
		}
		System.out.println("time taken without any pool " + (System.currentTimeMillis()-a));
		a= System.currentTimeMillis();
		
		for(int i =0;i<5000;i++){
			newPool.submit(new MyRunable (i));
		}
		
		newPool.stop();
		System.out.println("time taken by myThreadPool " + (System.currentTimeMillis()-a));
	}
	
	
	

}

class MyRunable implements Callable
{
	int index = -1;
	public MyRunable(int index)
	{
		this.index = index;
	}
	@Override
	public Integer call() throws Exception {
		return index;
	}
	
}

You Might Also Like :

Back to Featured Articles on Logo Paperblog

These articles might interest you :

  • Big Food Giants Manipulate Public Health Policy in China

    Food Giants Manipulate Public Health Policy China

    Coca-Cola is at it again. As soda sales decline in the United States and Europe, beverage companies look to emerging economies like China for growth. And, it... Read more

    The 15 January 2019 by   Dietdoctor
    DIET & WEIGHT, HEALTH, HEALTHY LIVING, MEDICINE
  • Jewellery for a Precious You

    Jewellery Precious

    Jewellery is always close to a woman’s heart. It completes her look and boost confidence. Considering the changing trends in jewellery fashion, it becomes... Read more

    The 15 January 2019 by   Dr.jenifer Sayyed
    LIFESTYLE
  • Rajshri Productions’ Next Is A Film On Friendship | Hum Chaar | Trailer

    Abhishek Dixit’s debut feature film Hum Chaar is a Bollywood film made under the banner of Rajshri’s film. Hum Chaar is written and directed by Abhishek Dixit. Read more

    The 15 January 2019 by   Themoviean
    ENTERTAINMENT, MOVIES
  • Saint Paul the First Hermit

    Saint Paul First Hermit

    Today is the feast day of Saint Paul the hermit. This is a sweet and delicate Oatmeal Bread topped with rolled oats and naturally sweetened with agave. Saint... Read more

    The 15 January 2019 by   Veronica46
    FOOD & DRINK, RECIPES
  • Irupathiyonnaam Noottaandu | Teaser | Pranav Mohanlal | Arun Gopy

    Arun Gopy’s Irupathiyonnaam Noottaandu is an upcoming Malayalam action-drama feature film starring Pranav Mohanlal and Zaya David in the lead roles. Read more

    The 15 January 2019 by   Themoviean
    ENTERTAINMENT, MOVIES
  • A Year Of Body Positivity

    Year Body Positivity

    Last January, as I sat there on New Years eve all set to make the same old resolutions I've made year after year for as long as I can remember, I realised how... Read more

    The 15 January 2019 by   Sparklesandstretchmarks
    DIARIES, SELF EXPRESSION
  • Garden Bloggers Bloom Day – Jan 2019

    Garden Bloggers Bloom 2019

    Euphorbia rigidaWhen I went out to take the photos for this blog post I was surprised at how much was in flower dotted around the garden. Read more

    The 15 January 2019 by   Patientgardener
    GARDENING, HOME

Magazines