A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, or if you try to enqueue items to it and the queue is already full.
A thread trying to dequeue from an empty queue is blocked until some other thread inserts an item into the queue. A thread trying to enqueue an item in a full queue is blocked until some other thread makes space in the queue, either by dequeuing one or more items or clearing the queue completely.
take() method of BlockingQueue will block if Queue is empty and thread trying to dequeue from an empty queue is blocked until some other thread inserts an item into the queue.
put() method of BlockingQueue will block if Queue is full and thread trying to enqueue an item in a full queue is blocked until some other thread makes space in the queue.
A BlockingQueue with one thread putting into it, and another thread taking from it.
Java BlockingQueue
doesn’t accept
null
values
and throw NullPointerException
if
you try to store null value in the queue.
Java provides several
BlockingQueue implementations such as ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue etc.
Actual Working of Blocking Queue
A thread trying to dequeue from an empty queue is blocked until some
other thread inserts an item into the queue. A thread trying to enqueue an item
in a full queue is blocked until some other thread makes space in the queue,
either by dequeuing one or more items or clearing the queue completely.
Important Methods to
remember
/*
OFFER()
Add message in queue without waiting if queue has space and
return false if queue is full- No Exception is thrown queue.offer(msg);
Add()
Add message to queue till the time queue has space.
Throws Exception when queue is full queue.add(msg);
PUT()
Add message to queue and wait if queue is full. As soon as queue
has space, it starts putting those elements in queue again
*/
package
com.concurrent.jdk5;
import
java.util.concurrent.ArrayBlockingQueue;
import
java.util.concurrent.BlockingQueue;
class Message {
String message;
public Message(String str) {
this.message = str;
}
public String
getMessage() {
return message;
}
}
class Producer implements Runnable {
BlockingQueue<Message> queue;
public
Producer(BlockingQueue<Message> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
/* Addding Message in Queue and
then sleep */
try {
Message msg = new Message("" + i);
Thread.sleep(i);
queue.put(msg);
System.out.println("Produced
"+msg.getMessage());
} catch
(InterruptedException e) {
e.printStackTrace();
}
}
//adding exit message
Message msg = new Message("EXIT");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
BlockingQueue<Message> queue;
public
Consumer(BlockingQueue<Message> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Message msg;
while(!((msg=queue.take()).getMessage().equals("EXIT"))){
Thread.sleep(10);
System.out.println("Message
Consumed " + msg.getMessage());
}
System.out.println("All
Messages Produced , Consumed and Finished");
} catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
public class
BlockingQueueImpl {
public static void main(String[] args) throws
InterruptedException {
BlockingQueue<Message> queue = new
ArrayBlockingQueue<>(10);
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}
Output
Produced
0
Produced
1
Produced
2
Produced
3
Message
Consumed 0
Produced
4
Produced
5
Message Consumed 1
--
--
Produced
96
Message
Consumed 96
Produced
97
Message
Consumed 97
Produced
98
Message
Consumed 98
Produced
99
Message
Consumed 99
All
Messages Produced , Consumed and Finished
No comments:
Post a Comment