Monday, August 21, 2017

Implementation of Own BlockingQueue using LinkedList

In previous article BlockingQueue , we've seen famous Producer-Consumer problem being solved by using by default BlockingQueue provided by JDK.

What if we need to implement our own BlockingQueue ?
Below code snippet is self-explanatory and gives clear picture what needs to be done for own BlockingQueue implementation.

package com.concurrent.jdk5;

import java.util.LinkedList;
import java.util.List;

/* Own Implementation of BlockingQueue using LinkedList */
public class OwnBlockingQueue {

    public List<Message> queue;
    public int limit;

    public OwnBlockingQueue(int limit) {
         this.limit = limit;
         queue = new LinkedList<>();
    }

    public synchronized void put(Message message) throws InterruptedException {
         while (this.queue.size() == this.limit) {
             System.out.println(Thread.currentThread().getName() +"Waiting");
             wait();
         }
         if (this.queue.size() == 0) {
             System.out.println(Thread.currentThread()
          .getName()+"Notified to all waiting threads");
             notifyAll();
         }
         this.queue.add(message);
    }

    public synchronized Message take() throws InterruptedException {
         while (this.queue.size() == 0) {
             System.out.println(Thread.currentThread()
             .getName() +"Waiting");
             wait();
         }
         if (this.queue.size() == this.limit) {
             System.out.println(Thread.currentThread()
          .getName() +"Notified to all waiting threads");
             notifyAll();
         }
         return this.queue.remove(0);
    }

    public static void main(String[] args) throws InterruptedException {
         OwnBlockingQueue blockingQueue = new OwnBlockingQueue(10);

         /* PRODUCER THREAD */
         Thread producer = new Thread(new Runnable() {
             @Override
             public void run() {
                 for (int i = 0; i < 100; i++) {
                      Message message = new Message("" + i);
                      try {
                          blockingQueue.put(message);
                          Thread.sleep(10);
                         sysout("Produced message"+mes);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                 }
                 /* Putting last Message */
                 Message message = new Message("EXIT");
                 try {
                      blockingQueue.put(message);
                 } catch (InterruptedException e) {
                      e.printStackTrace();
                 }
                 System.out.println("Produced LAST Message "+message.getMessage());
             }
         });

         /* CONSUMER THREAD */
         Thread consumer = new Thread(new Runnable() {
             public void run() {
                 Message message;
                 try {
                      while (!((message = blockingQueue.take()).getMessage().equals("EXIT"))){
                          Thread.sleep(10);
                          System.out.println("Message Consumed " + message.getMessage());
                      }
                 } catch (InterruptedException e) {
                      e.printStackTrace();
                 }
                 System.out.println("All Messages Produced , Consumed and Finished");
             }
         });
        
         producer.setName("producer");
         consumer.setName("consumer");
         producer.start();
         consumer.start();
         producer.join();
         consumer.join();
    }
}

Output

consumerWaiting
producerNotified to all waiting threads
Message Consumed 0
Produced Message 0
consumerWaiting
producerNotified to all waiting threads
Message Consumed 1
consumerWaiting
Produced Message 1
producerNotified to all waiting threads
Message Consumed 2
---
---
---
consumerWaiting
Produced Message 97
producerNotified to all waiting threads
Message Consumed 97
Produced Message 98
producerNotified to all waiting threads
Message Consumed 98
Produced Message 99
producerNotified to all waiting threads
Produced LAST Message EXIT
Message Consumed 99
All Messages Produced , Consumed and Finished

No comments:

Post a Comment