Queue latency benchmark: the Disruptor style vs the ArrayBlockingQueue style

Queue latency benchmark: the Disruptor style vs the ArrayBlockingQueue style

Queue latency is the main performance metric that Disruptor sets itself apart from traditional queue implementations, e.g., ArrayBlockingQueue. To my understanding, Disruptor is based on volatile variable polling, rather than the blocking wait() and notify() mechanism. In this post, we benchmark the Disruptor polling style and the ArryaBlockingQueue blocking style using a simple queue with just one producer and one consumer. We define two latency metrics: one is the latency between a producer starts producing an item and ends producing the item, i.e., the producing latency; the other is the latency between a producer starts producing an item and a consumer starts consuming the same item, i.e., the queue latency. By definition, the queue latency includes the producing latency.

Our test case for the benchmark using the simple queue is that the producer offers n Integers, starting from 0 until n - 1, into the queue, and the consumer takes the Integers in order. Each Integer is not only the item being produced and consumed, but also an identity to track when they are produced and when they are consumed. We put our setup into an abstract class QueueLatencyBenchmark. Subclasses only need to implement the abstract methods void offer(int i, Integer datum) and Integer take(int i) to implement polling and blocking.

  1. public abstract class QueueLatencyBenchmark {
  2.  
  3.   protected abstract void offer(int i, Integer datum);
  4.   protected abstract Integer take(int i);
  5.  
  6.   private final int capacity;     // large enough
  7.   private final long[] prepared;  // start producing
  8.   private final long[] committed; // end producing
  9.   private final long[] consumed;  // start consuming
  10.   private final Integer[] pool;   // pre-allocated Integers
  11.  
  12.   private final Thread producer = new Thread(new Runnable() {
  13.  
  14.     @Override
  15.     public void run() {
  16.       for(int i = 0; i < capacity; ++i) {
  17.         final Integer datum = pool[i];
  18.         prepared[i] = System.nanoTime();
  19.         offer(i, datum);
  20.         committed[i] = System.nanoTime();
  21.       }
  22.     }
  23.   });
  24.  
  25.   private final Thread consumer = new Thread(new Runnable() {
  26.  
  27.     @Override
  28.     public void run() {
  29.       for(int i = 0; i < capacity; ++i) {
  30.         final int datum = take(i).intValue();
  31.         consumed[datum] = System.nanoTime();
  32.       }
  33.     }
  34.   });
  35.  
  36.   protected QueueLatencyBenchmark(final int capacity) {
  37.     this.capacity = capacity;
  38.     prepared = ArrayUtility.newUniform(capacity, Long.MIN_VALUE);
  39.     committed = ArrayUtility.newUniform(capacity, Long.MIN_VALUE);
  40.     consumed = ArrayUtility.newUniform(capacity, Long.MIN_VALUE);
  41.     pool = ArrayUtility.box(ArrayUtility.newSequence(capacity, 0, 1));
  42.   }
  43.  
  44.   public final void run() {
  45.     consumer.start();
  46.     producer.start();
  47.     ThreadUtility.joinUninterruptedly(producer);
  48.     ThreadUtility.joinUninterruptedly(consumer);
  49.  
  50.     // calculate the averages of the producing latency and the queue latency
  51.     // only from the second half of the prepared, committed and consumed.
  52.   }
  53. }

For the validity of the benchmark, we have ensured that there is enough capacity in the queue to hold all the Integers so that the queue is essentially unbounded, and all the Integers are pre-allocated in the pool so that no resources are allocated during the producing and consuming.

Here are the subclasses implementing polling and blocking:

  1. public final class VolatilePollingBenchmark extends QueueLatencyBenchmark {
  2.  
  3.   private final Integer[] buffer;
  4.   private volatile int produced = 0;
  5.  
  6.   public VolatilePollingBenchmark(final int capacity) {
  7.     super(capacity);
  8.     buffer = new Integer[capacity];
  9.   }
  10.  
  11.   @Override
  12.   protected void offer(final int i, final Integer datum) {
  13.     buffer[i] = datum;
  14.     produced = i + 1; // write barrier
  15.   }
  16.  
  17.   @Override
  18.   protected Integer take(final int i) {
  19.     while(i == produced); // read barrier
  20.     return buffer[i];
  21.   }
  22. }
  1. public final class IntrinsicMonitorBenchmark extends QueueLatencyBenchmark {
  2.  
  3.   private final Integer[] buffer;
  4.   private int produced = 0;
  5.  
  6.   public IntrinsicMonitorBenchmark(final int capacity) {
  7.     super(capacity);
  8.     buffer = new Integer[capacity];
  9.   }
  10.  
  11.   @Override
  12.   protected void offer(final int i, final Integer datum) {
  13.     synchronized(buffer) {
  14.       buffer[i] = datum;
  15.       produced = i + 1;
  16.       buffer.notify();
  17.     }
  18.   }
  19.  
  20.   @Override
  21.   protected Integer take(final int i) {
  22.     boolean interrupted = false;
  23.     synchronized(buffer) {
  24.       while(i == produced) {
  25.         try {
  26.           buffer.wait();
  27.         } catch(InterruptedException unused) {
  28.           interrupted = true;
  29.         }
  30.       }
  31.     }
  32.     if(interrupted) {
  33.       Thread.currentThread().interrupt();
  34.     }
  35.     return buffer[i];
  36.   }
  37. }

We run the benchmark like what is shown in the following and the result is shown in the table below:

  1. public class QueueLatency {
  2.  
  3.   public static void main(String[] args) {
  4.     new VolatilePollingBenchmark(10000000).run();
  5.     new IntrinsicMonitorBenchmark(10000000).run();
  6.   }
  7. }
Table 1.  Queue Latency Benchmark (sample = 5000000)
Implementation Average Producing Latency (ns) Average Queue Latency (ns)
VolatilePollingBenchmark 101.8750828 413.4643006
IntrinsicMonitorBenchmark 403.6898356 98473.444669

The queue latency of polling is indeed orders of magnitude lower than blocking. In the situation where the latency is important, polling is definitely the way to go. However, be aware that polling comes with 100% cpu consumption (see line 19 of VolatilePollingBenchmark), which pretty much limits its use to only the latency sensitive situations.

The producing latency of polling is also several times lower than blocking, where technically the producing latency does not depend on the actual strategy of polling. If we switch to a poll-with-pause strategy, we can remove the high cpu consumption and implement a fast producing and intermediate latency queue. An example use case could be a write buffer with a queue supporting multiple producers and one consumer, where the consumer only needs to drain the queue every some fixed amount of time.

Post new comment

The content of this field is kept private and will not be shown publicly.
  • Web page addresses and e-mail addresses turn into links automatically.
  • Lines and paragraphs break automatically.

More information about formatting options

To prevent automated spam submissions leave this field empty.