Monday, April 18, 2016

Using The Java ThreadPoolExecutor framework

Hi folks, 

The following exercise helps you understand and implement java's Thread Pool Executor framework. A thread pool manages the pool of worker threads . It contains a queue that keeps the tasks waiting to be executed.

The most common fixed thread pool always has a specified number of threads running; if a thread is somehow terminated while it is still in use, it is automatically replaced with a new thread. Tasks are submitted to the pool via an internal queue, which holds extra tasks whenever there are more active tasks than threads.

Thread pools address two different problems:

1. They usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead.


2.They provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks.

ThreadPoolExecutor ,  an executor service that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.

Each ThreadPoolExecutor also maintains some basic statistics, such as the number of completed tasks.

However, programmers are urged to use the more convenient Executors factory methods
a).newCachedThreadPool()[unbounded thread pool, with automatic thread reclamation] ,

b).newFixedThreadpool(int) [fixed size thread pool] and

c).newSingleThreadExecutor()[single background thread].


Using A ThreadPool Executor - 

public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                  BlockingQueue<Runnable> workQueue)

-- Creates a new ThreadPoolExecutor with the given initial parameters and default thread factory and rejected execution handler.
 



-> CorePoolSize and MaxPoolSize

 A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by corePoolSize & maximumPoolSize. When a new task is submitted  and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks.New threads are created using a ThreadFactory.The defaultThreadFactory creates threads to all be in the same ThreadGroup and with the same NORM_PRIORITY priority and non-daemon status.By supplying a different ThreadFactory, you can alter the thread's name, thread group, priority, daemon status, etc. 

-> Rejected Tasks - 

New tasks submitted  will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated.In either case, the execute method invokes RejectedExecutionHandler.rejectedExecution .

 ->Hook methods:

This class provides protected overridable beforeExecute(java.lang.Thread , java.lang.Runnable) and afterExecute(java.lang.Runnable , java.lang.Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated() can be overridden to perform any special processing that needs to be done once the Executor has fully terminated. 

-> Queuing:

Any BlockingQueue may be used to transfer and hold submitted tasks.If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.There are three types of blocking queues : SynchronousQueue , LinkedBlockingQueue , ArrayBlockingQueue. Read more about BlockingQuere here. 

-> Keep-alive times:

If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for than the keepAliveTime. This provides a means of reducing resource consumption when the pool is not being actively used. Using a value of Long.MAX_VALUE, TimeUnit.NANOSECONDS disables idle threads from ever terminating prior to shutdown.

Exercise :   

1. Let's extend the ThreadPoolExecutor by our own class -

package sample.exercise;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SampleThreadPoolExecutor extends ThreadPoolExecutor {

public SampleThreadPoolExecutor(int corePoolSize, int maxPoolSize,
long keepAliveTime, TimeUnit timeUnit,
BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory) {
super(corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
blockingQueue, threadFactory);
}

@Override
protected void beforeExecute(Thread arg0, Runnable arg1) {
                // s.o.p before execute called
super.beforeExecute(arg0, arg1);
}

@Override
protected void afterExecute(Runnable arg0, Throwable arg1) {
                // s.o.p after execute called
super.afterExecute(arg0, arg1);
}
}

---------------------------------------------------------------------------------------------------------------------

2. Let's create a basic TaskClass class that implements run method of the Runnable interface , the object of this class is the actual task going to the blocking queue of the threadPoolExcecutor.

package sample.exercise;

public class TaskClass implements Runnable {

@Override
public void run() {
System.out.println("Write your task here");
}
}

---------------------------------------------------------------------------------------------------------------------

3.  Let's have a main method to run our executor .

package sample.exercise;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class InitThreadPool {

private BlockingQueue<Runnable> blockingQueue;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
private SampleThreadPoolExecutor executor;
private int corePoolSize;
private int maxPoolSize;
private long keepAliveTime;

public InitThreadPool(BlockingQueue<Runnable> blockingQueue)
throws ConfigurationException {
// set the core poolSize this.corePoolSize = ;
// set the maxPoolSize this.maxPoolSize = ;
// set the keep alive time this.keepAliveTime = ;
TimeUnit timeUnit = TimeUnit.NANOSECONDS;
this.blockingQueue = blockingQueue;
this.executor = new SrimsThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveTime, timeUnit, blockingQueue, threadFactory);
}

public static void main(String[] args) throws Exception {
InitThreadPool sampleThreadPool = new InitThreadPool(
new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 100; i++) {
executor.execute(new TaskClass());
}
                executor.shutdown();
                while (!executor.isTerminated()) {
                }
                System.out.println("Finished all threads");
}
}
----------------------------------------------------------------------------------------------------------------------------------

100 tasks of class TaskClass will be sent to the linkedBlocking queue for execution , meaning 100 run methods will be executed by the executor.
 

I hope you learnt something new through this exercise and your understanding of the ThreadPoolExecutor is clear now.
 

Happy Coding.


No comments:

Post a Comment