How will you implement a Blocking Queue in Java

Carvia Tech | May 28, 2019 | 3 min read | 60 views


A blocking queue allows multiple threads to communicate with each other and pass data around. For example, a producer can put items to the queue while consumer can take items out from the queue.

BlockingQueue implementations are designed to be used primarily for producer-consumer queues.

A blocking queue has below characteristics:

  1. It is always thread-safe

  2. It can hold arbitrary data

  3. Producer has to wait if the queue is already full, similarly consumer has to be wait if no item is present in the queue.

Java implementation

A trivial implementation of BlockingQueue using intrinsic locking using synchronized keyword will look like the following:

BlockingQueue Implementation using synchronization
class SimpleBlockingQueue {
    final Object[] items = new Object[100]; (1)
    int putptr, takeptr, count;

    public synchronized void put(Object x) throws InterruptedException {
        while (count == items.length)
            wait(); (2)
        items[putptr] = x;
        if (++putptr == items.length) putptr = 0;
        ++count;
        notifyAll();    (3)
    }

    public synchronized Object take() throws InterruptedException {
        while (count == 0)
            wait(); (4)
        Object x = items[takeptr];
        if (++takeptr == items.length) takeptr = 0;
        --count;
        notifyAll();    (5)
        return x;
    }
}
1 Max capacity of blocking queue is 100
2 We are waiting inside a while loop while queue capacity is full. While loop is required to avoid spurious wakeups.
3 All other waiting threads are notified as soon as a new item is added to the queue.
4 Consumer thread waits inside a while loop for arrival of new item, if queue is empty. while loop prevents spurious wakeup problem.
5 Consumer thread notifies all waiting producer threads as soon as an item is removed from the queue.

A realistic implementation would have much more methods (peek(), remove(), offer(), isFull(), etc.) to the queue, but for brevity we have implemented only two methods.

A slightly improved version

Improved version of Queue will use explicit locking, to improve the multi-threading performance. Lock and Condition interface provides much better flexibility compared to intrinsic locking mechanism, but this flexibility brings more responsibility as we have to take care of calling lock and unlock ourselves. Since one lock can be associated with multiple conditions (notFull & notEmpty in this case), this results in better throughput due to lesser thread contention.

Lock is analogical to synchronized keyword and Condition is similar to wait/notify.
SimpleBlockingQueue using Lock and Condition
class SimpleBlockingQueue {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

Here we are using ReentrantLock along with Condition to explicitly define the lock. This reduces thread contention under heavy load circumstances.

We can do few further improvements to the above version, for example we can use LinkedList instead of array and optionally make it generic to support any item type.

Generic BlockingQueue using LinkedList under the hood
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueue<T> {
    private Queue<T> queue = new LinkedList<T>();
    private int capacity;
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public void put(T element) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await();
            }
            queue.add(element);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();
            }
            T item = queue.remove();
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

Please be noted that all above implementations are for learning purpose only, not for production use. For production, you shall prefer to use LinkedBlockingQueue or any other equivalent implementation from java.util.concurrent package.

That’s all.


Buy my ebook for complete question bank

Most of these questions has been answered in my eBook "Cracking the Core Java Interview" updated on June 2018, that you can buy from this link:

Buy from Shunya (DRM Free PDF download with updates)

Top articles in this category:
  1. Producer Consumer Problem using Blocking Queue in Java
  2. How will you implement your custom threadsafe Semaphore in Java
  3. How to implement Thread pool in Java without executor framework
  4. How will you calculate factorial of a large number in Java
  5. What is Deadlock in Java? How to troubleshoot and how to avoid deadlock
  6. What do you understand by Java Memory Model?
  7. Difference between Implementing Runnable and Extending Thread



Find more on this topic:
Core Java image
Core Java

Core Java - OOP Concepts, Garbage Collection, Multi-threading, Collections Framework, Java 8 Features, Lambda Functions, Streams.

Last updated 1 week ago


Recommended books for interview preparation:

This website uses cookies to ensure you get the best experience on our website. more info