Monday, October 30, 2017

Thread Pool and Implementation of Thread Pool


Thread pool represents a group of worker threads which execute tasks, each thread can be reused many times. If a new task is submitted when all threads are active, they will wait in the queue until a thread is available. Thread pool implementation internally uses LinkedBlockingQueue for adding and removing tasks to the queue.
What we usually want is a work queue combined with a fixed group of worker threads, which uses wait()and notify() to signal waiting threads that new work has arrived. The following example shows a simple work queue which is queue of Runnable objects. This is a common convention for schedulers and work queues, although there is no particular need imposed by the Thread API to use the Runnable interface.


As threads are reused, performance of our application improves drastically.

Let’s look at what a threadpool is before the how:

Threadpool


Implementation of Thread Pool


package com.concurrent.jdk5;

import java.util.concurrent.LinkedBlockingQueue;

/* All tasks are added in a Blocking Queue
 * When worker threads find any Task in queue, they pick
 * it from Queue and execute ...
*/
class ThreadPool {
     private final int nThreads;
     private final LinkedBlockingQueue queue;
     private final WorkerThreads[] threads;

     public ThreadPool(int nThreads) {
          this.nThreads = nThreads;
          queue = new LinkedBlockingQueue<>();
          threads = new WorkerThreads[nThreads];

          for (int i = 0; i < nThreads; i++) {
              threads[i] = new WorkerThreads();
              threads[i].start();
          }
     }

     /* execute() accept Runnable task only */
     public void execute(Runnable task) {
          synchronized (queue) {
              queue.add(task);
              System.out.println("New Task Added !! Queue Size now is "+ queue.size());
              queue.notify();
          }
     }

     class WorkerThreads extends Thread {

          public void run() {
              Task task;
              while (true) {
                   synchronized (queue) {
                        while (queue.isEmpty()) {
                             try {
                                  queue.wait();
                                  System.out.println("Queue is waiting for task to add...");
                             } catch (InterruptedException e) {
                                  e.printStackTrace();
                             }
                        }
                        task = (Task) queue.poll();
                   }
                   task.run();
              }
          }
     }
}

public class ThreadPoolImpl {
     public static void main(String[] args) {
          ThreadPool pool = new ThreadPool(5);

          for (int i = 0; i < 10; i++) {
              Task task = new Task(i);
              pool.execute(task);
          }
     }
}

class Task implements Runnable {
     int i;

     public Task(int i) {
          this.i = i;
     }

     public void run() {
          System.out.println("Run Task " + i + " by Thread " + Thread.currentThread().getName());
     }
}

Output 

New Task Added !! Queue Size now is 1
New Task Added !! Queue Size now is 2
New Task Added !! Queue Size now is 3
New Task Added !! Queue Size now is 4
New Task Added !! Queue Size now is 5
New Task Added !! Queue Size now is 6
New Task Added !! Queue Size now is 7
New Task Added !! Queue Size now is 8
New Task Added !! Queue Size now is 9
New Task Added !! Queue Size now is 10
Queue is waiting for task to add...
Queue is waiting for task to add...
Run Task 0 by Thread Thread-0
Run Task 2 by Thread Thread-0
Run Task 3 by Thread Thread-0
Run Task 4 by Thread Thread-0
Queue is waiting for task to add...
Run Task 1 by Thread Thread-3
Queue is waiting for task to add...
Run Task 5 by Thread Thread-4
Queue is waiting for task to add...
Run Task 7 by Thread Thread-1
Run Task 8 by Thread Thread-1
Run Task 9 by Thread Thread-1
Run Task 6 by Thread Thread-2

No comments:

Post a Comment