An algorithm for a concurrent queue using only thread local and volatile fields

November 02, 2017

In the following, I want to show you an algorithm for a concurrent queue which supports one reading and multiple writing threads. The writing threads need only a read from a thread local field and a write to a volatile field to publish an event to the queue. Writing does not need "compare and swap" operations like the standard JDK concurrent queues, leading to an easier and potentially faster algorithm. A usage example is a background thread writing log events asynchronously to a file.

Writing

The main idea is to use not one single queue but many. We use one queue per writing thread stored in a thread local field. Then the queue is a simple linked list using a volatile field for the next element and final for the value:

public class LinkedList<T> implements Consumer<T>  {
	volatile ListElementPointer<T> lastRead;
	private LinkedListElement<T> lastWritten;
	// Constructor omitted 
	@Override
	public void accept(T event) {	
	// Queue stopped logic omitted
		LinkedListElement<T> linkedListElement= new LinkedListElement<T>(event);
		if( lastWritten == null )
		{
			lastWritten = linkedListElement;
			lastRead= new ListElementPointer<T>(lastWritten);
		}
		else
		{
			lastWritten.next = linkedListElement;
			lastWritten = lastWritten.next;
		}
	}
}
class LinkedListElement<T> {
    volatile LinkedListElement<T> next;
    final T event;	
    // Constructor omitted 
}

Writing an element to the queue is implemented in the accept method, line 6. When it is the first element written, lastWritten and lastRead will be set to the new LinkedListElement line 10 till 13. Otherwise, the list is extended by the new LinkedListElement and lastWritten is moved to the end of the list, line 16 and 17.

And here is the class storing each queue, called Consumer, in a thread local field:

public class ThreadLocalConsumer<T> implements Consumer<T> {
	private final EventBus<T> theBus;
    private ThreadLocal<Consumer<T>> threadLocal = new 	ThreadLocal<Consumer<T>>();
	// Constructor omitted 
	public void accept(T event) {	
		Consumer<T>  consumer = threadLocal.get();
		if( consumer == null )
		{
			consumer = theBus.newConsumerForThreadLocalStorage(Thread.currentThread());
			threadLocal.set( consumer );
		}
		consumer.accept(event);
	}
}

Reading

When reading the elements we must remember the last elements we read. We do this in the field lastRead in the LinkedList. The following shows the method used for reading elements from one queue:

public void prozessWithoutReadCount(EventSink<T> eventSink) {
		if(list.lastRead  != null  )
		{
			// First element read			
			if(  ! list.lastRead.isRead )
			{
				eventSink.execute( list.lastRead.element.event  );
				list.lastRead.isRead = true;
			}
			LinkedListElement<T> current = list.lastRead.element;
			while(  current.next != null )
			{
				eventSink.execute( current.event  );
				current = current.next;
			}
			list.lastRead.element = current;	
		}	
	}

And here is the class ListElementPointer used to store the last read element:

public class ListElementPointer<T> {
	LinkedListElement<T> element;
	boolean isRead;
    // Constructor omitted 
}

The field lastRead is initialized by the writing thread and afterward only modified by the reading thread.

I skip the logic for creating new LinkedLists and reading multiple LinkedLists in the reading thread. You can see the complete source code here.

Usage

The queue is open source and the source code is available on GitHub here. We use this queue in to write events asynchronously to a file for later analysis.

Make your application thread safe

LEARN MORE