Thursday, June 29, 2017

All about Concurrent API in java7

Executor Interface
Executes the given command at some time in the future.  The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the {@code Executor} implementation.

public interface Executor {

   void execute(Runnable command);
}


Executor is the root interface with a single execute method. Anything that implements a Runnable interface can passed as a parameter. Silly Executor, however, has no support for Callable though.

ExecutorService Interface

public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}


Good news : ExecutorService interface, which extends Executor, adds support for Callable. Its implementation class is ThreadPoolExecutor.

Executor Interface –shutdown() facts

executorService.shutdown();

You could imagine shutdown as a half-closed door of a mall. No new customers will be let in but the existing customers can leave the mall once they are done.

To reiterate,
  1. shutdown is a polite method. It does not actually shut down the tasks in the pool immediately. It just says that no new tasks will be accepted.
  2. Unless you are executing your tasks using invokeAll, you would need to wait for all tasks in progress to complete. This is achieved by calling the awaitTermination method. 
  3. Once all the current tasks are done, the executor service shuts down.
If you are in need of an impolite, intruding method which doesn't care whether the current threads are done with their tasks, then shutdownNow is your guy. However, there's no guarantee that the method will shutdown the service on the dot but it is the closest you have to immediate shutdown.

On awaitTermination, you could specify the timeout period until which the main thread should wait for the pool threads to complete its tasks.

ExecutorService executorService=Executors.newFixedThreadPool(10); 
future = executorService.submit(getInstanceOfCallable(count,sum)); 
executorService.shutdown();

if (executorService.awaitTermination(10, TimeUnit.SECONDS)){ 
System.out.println("All threads done with their jobs"); 
}

EXECUTORS CLASS

Executors is a class with just factory methods for creating various forms of executor service with some commonly used defaults. Note that, other than the awesome factory methods, it doesn't bring any new features to the table.

public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
newSingleThreadExecutor()
Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time.
Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
}

Difference between Thread and Executor

Thread
Executor
  1)  java.lang.Thread is a class in Java 
  1)  java.util.concurrent.Executor is an interface
  2)  Both task and execution are tightly   
  2)  Decouples a task (the code which needs to be executed in parallel) from execution
  3)  Thread itself execute your task
  3)  The Executor concept allows your task is to be executed by a worker thread from the thread pool
  4)  A Thread can only execute one Runnable task
  4)  An Executor can execute any number of Runnable task

  5)  In the case of Thread, the task is  executed by the Thread which accepts Runnable instance.


  5)  In the case of Execution the command (a Runnable implementation) may be executed in a new thread, a pooled thread or in the calling thread itself, depending upon the implementation of Executor interface.


Below program shows decoupling of a task from its execution and creation.
Executors class is responsible of giving Thread Pool which will be used later in execution of tasks asynchronously.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/*Thread Pool - Way of managing multiple threads at same time
 * Separating Task creation with its execution */

/* 1) Create a task to execute */

class ProcessorThread implements Runnable {
     private int id;

     public ProcessorThread(int id) {
          this.id = id;
     }

     @Override
     public void run() {
          System.out.println("Starting :" + id);
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          System.out.println("Completed: " + id);
     }
}

/*2) Execute tasks using Executors */

public class ThreadPoolApp {
     public static void main(String[] args) throws InterruptedException {

/* Having 2 workers in a factory .Creates a thread pool that reuses a fixed number of threads  */
          ExecutorService executor = Executors.newCachedThreadPool();
         
          /* Assigning 2 threads each 5 tasks*/
          for(int i=0;i<5;i++){
              executor.submit(new ProcessorThread(i));
          }
         
          /*Stop accepting new tasks. It'll wait till all threads to complete what they doing*/
          executor.shutdown();
          System.out.println("All tasks submitted...");
         
          /*Wait for 1 day to complete all tasks*/
          executor.awaitTermination(1, TimeUnit.DAYS);
         
          System.out.println("All tasks completed...");
         
     }
}

Output

All tasks submitted...
Starting :0
Starting :1

Completed: 0
Completed: 1
Starting :2
Starting :3

Completed: 2
Completed: 3
Starting :4

Completed: 4
All tasks completed...


No comments:

Post a Comment