Producer Consumer Problem using Blocking Queue in Java

Munish Chandel | July 31, 2018 at 04:34 AM | 165 views | Multithreading and Concurrency


Producer Consumer Problem

Producer Consumer Problem is a classic example of multi-thread (or multi-process) synchronization problem. It is a must to know problem if you want to delve into Java Concurrency & mutli-threading.

Problem Description

The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer’s job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won’t try to add data into the buffer if it’s full and that the consumer won’t try to remove data from an empty buffer.

producer consumer problem

Steps to Solve problem

  1. The solution for the producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again.

  2. In the same way, the consumer can go to sleep if it finds the buffer to be empty. The next time the producer puts data into the buffer, it wakes up the sleeping consumer.

Did you know?

JMS (Java Messaging Service) is a implementation of Producer Consumer problem. Multiple Producers & multiple Consumers can connect to JMS and distribute the work.

Solving Producer Consumer Problem in Core Java

There are multiple ways to solve this problem -

  1. Semaphores can be used to coordinate b/w producer and consumers.

  2. Using synchronization to solve the problem.

  3. Using non-blocking algorithms to solve this problem.

  4. Using BlockingQueue to solbe the problem.

Interviewer is mostly interested in solving Producer Consumer problem from scratch to evaluate your multi-threading skills.

To solve this we need three different components:
  1. A BoundedBuffer

  2. Producer thread

  3. Consumer thread

We can implement our own BoundedBuffer using basic synchronization, as shown in below code:

BoundedBuffer (Using Intrinsic Locking)
class BoundedBuffer<T> {
    private volatile boolean closed = false;

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public synchronized void put(T x) throws InterruptedException {
        while (count == items.length)
            wait();
        items[putptr] = x;
        if (++putptr == items.length) putptr = 0;
        ++count;
        notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (count == 0)
            wait();
        Object x = items[takeptr];
        if (++takeptr == items.length) takeptr = 0;
        --count;
        notifyAll();
        return (T) x;
    }

    public boolean isClosed() {
        return closed;
    }

    public void setClosed(boolean closed) {
        this.closed = closed;
    }
}

A producer is nothing but a thread puts task into boundedBuffer till the queue is full.

Producer Thread
import java.util.concurrent.ThreadLocalRandom;

public class Producer implements Runnable {
    private final BoundedBuffer<SquareTask> queue;

    Producer(BoundedBuffer<SquareTask> q) {
        queue = q;
    }

    public void run() {
        try {
            while (!queue.isClosed()) {
                queue.put(produce());
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    private SquareTask produce() {
        return new SquareTask(ThreadLocalRandom.current().nextInt(1, 200));
    }
}

A consumer listens on BoundedBuffer and keeps consuming the tasks waiting if queue is empty.

Consumer Thread
public class Consumer implements Runnable {
    private final BoundedBuffer<SquareTask> queue;

    Consumer(BoundedBuffer<SquareTask> q) {
        queue = q;
    }

    public void run() {
        try {
            while (!queue.isClosed()) {
                consume(queue.take());
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    private void consume(SquareTask x) {
        System.out.println(x.execute());
    }
}
Main Program
public class ProducerConsumer {

    public static void main(String[] args) throws InterruptedException {
        BoundedBuffer<SquareTask> boundedBuffer = new BoundedBuffer<>();
        Producer p = new Producer(boundedBuffer);
        Consumer c1 = new Consumer(boundedBuffer);
        Consumer c2 = new Consumer(boundedBuffer);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
        Thread.sleep(100);
        boundedBuffer.setClosed(true);
    }
}

Why Producer Consumer Problem is Important?

  1. It can be used to distribute the work among various workers that can be scaled up or down as per the load requirements.

  2. It can be used to abstract Producer and Consumer connected through a shared pipeline. Producer does not need to know about the Consumer, thus there is abstraction of producers and consumers of work items i.e. separation of concerns. This leads to a better OOP Design.

  3. Producer and Consumer does not need to be available at the same time. Consumer can pick up tasks produced by producer at a different time.


Buy DRM Free PDF for Complete Collection of Interview Questions
Generic placeholder image
ebook PDF - Cracking Java Interviews v3.4 by Munish Chandel

240 real Java interview questions on core Java, concurrency, algorithms, design & data structures, spring, hibernate for Investment Bank, Healthcare IT, product and service based companies, Author : Munish Chandel, Price: 250, Type: PDF

Free Email Updates
Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.


Similar Articles:
  1. What is Immutable Class. Why would you choose it? How would you make a class immutable?
  2. What is difference between Callable and Runnable Interface?
  3. What is Double Checked Locking Problem in Multi-Threading?
  4. Can two threads call two different synchronized instance methods of an Object?
  5. What is Deadlock in Java? How to troubleshoot and how to avoid deadlock
  6. Explain the threading Jargon in Java
  7. What do you understand by Java Memory Model?
  8. What is AtomicInteger class and why it is needed?
  9. Discuss internals of a Concurrent Hashmap (CHM) in Java
  10. What is volatile keyword in Java

This website uses cookies to ensure you get the best experience on our website. more info