Second draft implementation of bounded and unbounded ConcurrentArrayBlockingQueue.

This commit is contained in:
Simone Bordet 2013-03-13 22:57:37 +01:00
parent e40e2a9913
commit 4a8b7409af
3 changed files with 38 additions and 2 deletions

View File

@ -27,6 +27,13 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Common functionality for a blocking version of {@link ConcurrentArrayQueue}.
*
* @see Unbounded
* @see Bounded
* @param <T>
*/
public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQueue<T> implements BlockingQueue<T>
{
private final Lock _lock = new ReentrantLock();
@ -163,6 +170,11 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
signalProducers();
}
/**
* An unbounded, blocking version of {@link ConcurrentArrayQueue}.
*
* @param <E>
*/
public static class Unbounded<E> extends ConcurrentArrayBlockingQueue<E>
{
private static final int HEAD_OFFSET = MemoryUtils.getLongsPerCacheLine() - 1;
@ -245,6 +257,11 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
}
}
/**
* A bounded, blocking version of {@link ConcurrentArrayQueue}.
*
* @param <E>
*/
public static class Bounded<E> extends ConcurrentArrayBlockingQueue<E>
{
private final AtomicInteger _size = new AtomicInteger();

View File

@ -25,9 +25,23 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
* to store elements.
* <p/>
* This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
* but producing less garbage because arrays are used to store elements rather than nodes.
* <p/>
* The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
* (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
*
* @param <T>
*/
public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
{
public static final int DEFAULT_BLOCK_SIZE = 512;

View File

@ -27,15 +27,20 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class ConcurrentArrayBlockingQueueTest extends ConcurrentArrayQueueTest
public class ConcurrentArrayBlockingQueueUnboundedTest extends ConcurrentArrayQueueTest
{
@Rule
public final TestTracker tracker = new TestTracker();
@Override
protected ConcurrentArrayBlockingQueue<Integer> newConcurrentArrayQueue(int blockSize)
{
return new ConcurrentArrayBlockingQueue<>(blockSize);
return new ConcurrentArrayBlockingQueue.Unbounded<>(blockSize);
}
@Test