Producer Consumer Problem using Blocking Queue in Java

Upasana | August 31, 2019 | 3 min read | 2,024 views | Multithreading and Concurrency


Producer Consumer Problem

Producer-consumer problem is a classic example of multi-threading synchronization problem. It is a must to know problem if you want to delve into Java concurrency & mutli-threading concepts.

Problem Description

The problem describes two entities, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer’s job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time and processing it. The problem is to make sure that the producer won’t try to add data into the buffer if it’s full and that the consumer won’t try to remove data from an empty buffer, and at the same time ensure the thread-safety.

producer consumer problem
Producer-consuer illustration

Steps to Solve problem

  1. The solution for the producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again.

  2. In the same way, the consumer can go to sleep if it finds the buffer to be empty. The next time the producer puts data into the buffer, it wakes up the sleeping consumer.

  3. while doing all of this, ensure the thread-safety.

Did you know?

JMS (Java Messaging Service) is a implementation of Producer Consumer problem. Multiple Producers & multiple Consumers can connect to JMS and distribute the work.

Solving Producer Consumer Problem in Core Java

There are multiple ways to solve this problem -

  1. Semaphores can be used to coordinate b/w producer and consumers.

  2. Using synchronization to solve the problem.

  3. Using non-blocking algorithms to solve this problem.

  4. Using BlockingQueue to solve the problem.

In this article we will focus only on blocking key approach.

Interviewers are mostly interested in solving producer-consumer problem from scratch to evaluate your multi-threading skills, so we will implement a simple version of blocking queue from scratch.

To solve this we need three different components:
  1. A blocking queue

  2. Producer thread(s)

  3. Consumer thread(s)

We can implement our own simple thread-safe version of BlockingQueue using synchronization, as shown in below code:

BlockingQueue (Using Intrinsic Locking)
package com.shunya;

class BlockingQueue {
    final Object[] items = new Object[100];
    int putptr, takeptr, count;
    private boolean closed = false;

    public synchronized void put(Object x) throws InterruptedException {
        while (count == items.length)
            wait();
        items[putptr] = x;
        if (++putptr == items.length) putptr = 0;
        ++count;
        notifyAll();
    }

    public synchronized Object take() throws InterruptedException {
        while (count == 0)
            wait();
        Object x = items[takeptr];
        if (++takeptr == items.length) takeptr = 0;
        --count;
        notifyAll();
        return x;
    }

    public synchronized boolean isClosed() {
        return closed;
    }

    public synchronized void setClosed(boolean closed) {
        this.closed = closed;
    }
}

A producer is nothing but a thread puts task into BlockingQueue till the queue is full.

Producer Thread
import java.util.concurrent.ThreadLocalRandom;

public class Producer implements Runnable {
    private final BlockingQueue<SquareTask> queue;

    Producer(BlockingQueue<SquareTask> q) {
        queue = q;
    }

    public void run() {
        try {
            while (!queue.isClosed()) {
                queue.put(produce());
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    private SquareTask produce() {
        return new SquareTask(ThreadLocalRandom.current().nextInt(1, 200));
    }
}

A consumer listens on BlockingQueue and keeps consuming the tasks waiting if queue is empty.

Consumer Thread
public class Consumer implements Runnable {
    private final BlockingQueue<SquareTask> queue;

    Consumer(BlockingQueue<SquareTask> q) {
        queue = q;
    }

    public void run() {
        try {
            while (!queue.isClosed()) {
                consume(queue.take());
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    private void consume(SquareTask x) {
        System.out.println(x.execute());
    }
}
Main Program
public class ProducerConsumer {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<SquareTask> boundedBuffer = new BlockingQueue<>();
        Producer p = new Producer(boundedBuffer);
        Consumer c1 = new Consumer(boundedBuffer);
        Consumer c2 = new Consumer(boundedBuffer);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
        Thread.sleep(100);
        boundedBuffer.setClosed(true);
    }
}

Why producer-consumer problem is important?

  1. It can be used to distribute the work among various workers that can be scaled up or down as per the load requirements.

  2. It can be used to abstract Producer and Consumer connected through a shared pipeline. Producer does not need to know about the Consumer, thus there is abstraction of producers and consumers of work items i.e. separation of concerns. This leads to a better OOP Design.

  3. Producer and Consumer does not need to be available at the same time. Consumer can pick up tasks produced by producer at a different time.


Buy my ebook for complete question bank

Most of these questions has been answered in my eBook "Cracking the Core Java Interview" updated on June 2018, that you can buy from this link:

Buy from Shunya (DRM Free PDF download with updates)

Top articles in this category:
  1. Blocking Queue implementation in Java
  2. Diamond Problem of Inheritance in Java 8
  3. What is Double Checked Locking Problem in Multi-Threading?
  4. Custom Thread pool implementation in Java
  5. Troubleshooting Deadlock in Java
  6. what are Key classes in java.util.concurrent package
  7. Count word frequency in Java

Recommended books for interview preparation:

Find more on this topic:
Buy interview books

Java & Microservices interview refresher for experienced developers.