Sunday, August 6, 2017

CyclicBarrier– It gives a way to wait for final computation once all threads are done with their computation.

CyclicBarrier also the does the same thing but there is a difference that you cannot reuse CountDownLatch once the count reaches zero while you can reuse CyclicBarrier by calling reset() method which resets Barrier to its initial State.

What it implies that CountDownLatch is a good for one-time events like application start-up time and CyclicBarrier can be used to in case of the recurrent event e.g. concurrently calculating a solution of the big problem etc. 

First a new instance of a CyclicBarriers is created specifying the number of threads that the barriers should wait upon.
  Below means that n thread that the barriers should wait upon
CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads);

  Ø  Each and every thread does some computation and after completing it’s execution, calls await() methods as shown:
public void run()
{
                    // thread does the computation
                    newBarrier.await();
}

    Ø  Once the number of threads that called await() equals numberOfThreads, the barrier then gives a way for the waiting threads. The CyclicBarrier can also be initialized with some action that is performed once all the threads have reached the barrier. This action can combine/utilize the result of computation of individual thread waiting in the barrier.

Runnable action = ...
//action to be performed when all threads reach the barrier;
CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads, action);

Task 1- Product of two numbers

import java.util.concurrent.BrokenBarrierException;

/* Task 1 which Thread 1 will pick and then call await()
 * on CyclicBarrier so as to wait other tasks to compelete */

public class Computation1 implements Runnable
{
    public static int product = 0;
    public void run()
    {
        /*Thread 1 doing its task computation
         * and once finish will call await() on barrier*/
          product = 2 * 3;
        try
        {
           System.out.println(Thread.currentThread().getName() + " is waiting on barrier");
           /* Waits until all parties have invoked
             await on this barrier */
           CyclicBarrierImpl.newBarrier.await();
           System.out.println(Thread.currentThread().getName() + " has crossed the barrier");
        }
        catch (InterruptedException
              |BrokenBarrierException e)
        {
            e.printStackTrace();
        }
    }
}

Task 2- Summation of two numbers

class Computation2 implements Runnable
{
    public static int sum = 0;
    public void run()
    {
        /* check if newBarrier is broken or not */
        System.out.println("Is the barrier broken?
        - " + CyclicBarrierImpl.newBarrier.isBroken());
        sum = 10 + 20;
        try
        {
          System.out.println(Thread.currentThread().getName() + " is waiting on barrier");
           /* Waits until all parties have invoked
           await on this barrier */
           CyclicBarrierImpl.newBarrier.await
          (3000, TimeUnit.MILLISECONDS);
           System.out.println(Thread.currentThread()
           .getName() + " has crossed the barrier");
          
            // number of parties waiting at the barrier
        System.out.println("Number of parties
         waiting  at the barrier "+ "at this point = " 
       + CyclicBarrierImpl.newBarrier.getNumberWaiting());
        }
        catch (InterruptedException
        | BrokenBarrierException e)
        {
            e.printStackTrace();
        }
        catch (TimeoutException e)
        {
            e.printStackTrace();
        }
    }
}

Actual Worker thread which is waiting for task1 and task2 to complete and perform final action.

public class CyclicBarrierImpl implements Runnable
{
    public static CyclicBarrier newBarrier = new CyclicBarrier(3);
    
    public static void main(String[] args)
    {
        /* parent thread */
    CyclicBarrierImpl test = new CyclicBarrierImpl();
        
        Thread t1 = new Thread(test);
        t1.start();
    }
    public void run()
    {
        System.out.println("Number of parties
        required to trip the barrier = "+
        newBarrier.getParties());
        System.out.println("Sum of product and sum = " 
        + (Computation1.product + Computation2.sum));
        
        // objects on which the child thread has to run
        Computation1 comp1 = new Computation1();
        Computation2 comp2 = new Computation2();
        
        // creation of child thread
        Thread t1 = new Thread(comp1);
        t1.setName("Product Thread");
       
        Thread t2 = new Thread(comp2);
        t2.setName("Sum Thread");
      
        /* moving child thread to runnable state*/
        t1.start();
        t2.start();

        try
        {
           System.out.println(Thread.currentThread().getName() + " is waiting on barrier");
           CyclicBarrierImpl.newBarrier.await();
           System.out.println(Thread.currentThread().getName() + " has crossed the barrier");
        }
        catch (InterruptedException | BrokenBarrierException e)
        {
            e.printStackTrace();
        }
        
        // barrier breaks as the number of thread waiting for the barrier
        // at this point = 3
        System.out.println("Sum of product and sum = " + (Computation1.product +
        Computation2.sum));
                
        // Resetting the newBarrier
        newBarrier.reset();
        System.out.println("Barrier reset successful");
    }
}

Output


Number of parties required to trip the barrier = 3
Sum of product and sum = 0
Thread-0 is waiting on barrier
Product Thread is waiting on barrier
Is the barrier broken? - false
Sum Thread is waiting on barrier
Sum Thread has crossed the barrier
Thread-0 has crossed the barrier
Sum of product and sum = 36
Barrier reset successful
Product Thread has crossed the barrier
Number of parties waiting at the barrier at this point = 0

1 comment:

  1. Difference between CountDownLatch and CyclicBarrier

    CountDownLatch CountDownLatch latch = new CountDownLatch(3);
    Once latch count becomes 0, main thread proceed.
    CountDownLatch.await() Here main thread awaits till all threads latch count goes down to 0
    CountDownLatch.countDown() Each thread decrements latch count once completed. Once final count bring to 0, main thread start running.

    CyclicBarrier CyclicBarrier newBarrier = new CyclicBarrier(numberOfThreads);
    X numberofThreads – number of threads

    newBarrier.await() Once a thread complete run execution , calling await() increases count to 1 which means that 1 thread has cleared the barrier. X-1 is left. Once all threads cleared all barriers, main thread proceeds.

    ReplyDelete