Monday, October 30, 2017

Maximum and Minimum values of a shared resource if shared within 5 threads

Since int is a primitive and since we want a sharable resource , so it must be a reference which needs to be shared within 5 threads.

If Integer being used, since Integer is Immutable hence any change on Integer reference will create a new Object , so it must be avoided.

Its desirable to create own NonMutableInteger reference and share the reference to multiple thread using Threadpool

/* To check the minimum and maximum value of count
 * Minimum - Nothing to be Guaranteed(Race condition can happen)
 * Maximum  - 25*/

public class ThreadMinMaxCount {
     public static void main(String[] args) {
         
          /* Integer is an Immutable class and hence not used */
          NonMutableInteger count = new NonMutableInteger(0);
          ExecutorService service = Executors.newFixedThreadPool(5);
          for (int i = 0; i < 5; i++)
              service.execute(new Task(count));
         
          System.out.println(count.i);
          service.shutdown();
     }
}

class Task implements Runnable {

     NonMutableInteger count;

     public Task(NonMutableInteger count) {
          this.count = count;
     }

     @Override
     public void run() {
          for (int i = 0; i < 5; i++)
              count.increment();
     }
}

class NonMutableInteger {
     int i;

     public NonMutableInteger(int i) {
          this.i = i;
     }

     public int getMutableInteger() {
          return i;
     }

     public void increment() {
          i++;
     }
}

Output (Run multiple times)
10
5
20

Thread Pool and Implementation of Thread Pool


Thread pool represents a group of worker threads which execute tasks, each thread can be reused many times. If a new task is submitted when all threads are active, they will wait in the queue until a thread is available. Thread pool implementation internally uses LinkedBlockingQueue for adding and removing tasks to the queue.
What we usually want is a work queue combined with a fixed group of worker threads, which uses wait()and notify() to signal waiting threads that new work has arrived. The following example shows a simple work queue which is queue of Runnable objects. This is a common convention for schedulers and work queues, although there is no particular need imposed by the Thread API to use the Runnable interface.


As threads are reused, performance of our application improves drastically.

Let’s look at what a threadpool is before the how:

Threadpool


Implementation of Thread Pool


package com.concurrent.jdk5;

import java.util.concurrent.LinkedBlockingQueue;

/* All tasks are added in a Blocking Queue
 * When worker threads find any Task in queue, they pick
 * it from Queue and execute ...
*/
class ThreadPool {
     private final int nThreads;
     private final LinkedBlockingQueue queue;
     private final WorkerThreads[] threads;

     public ThreadPool(int nThreads) {
          this.nThreads = nThreads;
          queue = new LinkedBlockingQueue<>();
          threads = new WorkerThreads[nThreads];

          for (int i = 0; i < nThreads; i++) {
              threads[i] = new WorkerThreads();
              threads[i].start();
          }
     }

     /* execute() accept Runnable task only */
     public void execute(Runnable task) {
          synchronized (queue) {
              queue.add(task);
              System.out.println("New Task Added !! Queue Size now is "+ queue.size());
              queue.notify();
          }
     }

     class WorkerThreads extends Thread {

          public void run() {
              Task task;
              while (true) {
                   synchronized (queue) {
                        while (queue.isEmpty()) {
                             try {
                                  queue.wait();
                                  System.out.println("Queue is waiting for task to add...");
                             } catch (InterruptedException e) {
                                  e.printStackTrace();
                             }
                        }
                        task = (Task) queue.poll();
                   }
                   task.run();
              }
          }
     }
}

public class ThreadPoolImpl {
     public static void main(String[] args) {
          ThreadPool pool = new ThreadPool(5);

          for (int i = 0; i < 10; i++) {
              Task task = new Task(i);
              pool.execute(task);
          }
     }
}

class Task implements Runnable {
     int i;

     public Task(int i) {
          this.i = i;
     }

     public void run() {
          System.out.println("Run Task " + i + " by Thread " + Thread.currentThread().getName());
     }
}

Output 

New Task Added !! Queue Size now is 1
New Task Added !! Queue Size now is 2
New Task Added !! Queue Size now is 3
New Task Added !! Queue Size now is 4
New Task Added !! Queue Size now is 5
New Task Added !! Queue Size now is 6
New Task Added !! Queue Size now is 7
New Task Added !! Queue Size now is 8
New Task Added !! Queue Size now is 9
New Task Added !! Queue Size now is 10
Queue is waiting for task to add...
Queue is waiting for task to add...
Run Task 0 by Thread Thread-0
Run Task 2 by Thread Thread-0
Run Task 3 by Thread Thread-0
Run Task 4 by Thread Thread-0
Queue is waiting for task to add...
Run Task 1 by Thread Thread-3
Queue is waiting for task to add...
Run Task 5 by Thread Thread-4
Queue is waiting for task to add...
Run Task 7 by Thread Thread-1
Run Task 8 by Thread Thread-1
Run Task 9 by Thread Thread-1
Run Task 6 by Thread Thread-2