Merge pull request #428 from metamx/fix-bytesboundedLinkedQueue

fix concurrency issues in bytesboundedlinkedqueue
This commit is contained in:
fjy 2014-03-12 13:30:53 -06:00
commit a6fc82e267
2 changed files with 104 additions and 13 deletions

View File

@ -22,9 +22,11 @@ package io.druid.client.cache;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@ -36,17 +38,18 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
{
private final LinkedList<E> delegate;
private final Queue<E> delegate;
private final AtomicLong currentSize = new AtomicLong(0);
private final Lock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final AtomicInteger elementCount = new AtomicInteger(0);
private long capacity;
public BytesBoundedLinkedQueue(long capacity)
{
delegate = new LinkedList<>();
delegate = new ConcurrentLinkedQueue<>();
this.capacity = capacity;
}
@ -71,11 +74,13 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
public void elementAdded(E e)
{
currentSize.addAndGet(getBytesSize(e));
elementCount.getAndIncrement();
}
public void elementRemoved(E e)
{
currentSize.addAndGet(-1 * getBytesSize(e));
elementCount.getAndDecrement();
}
private void fullyUnlock()
@ -115,7 +120,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
@Override
public int size()
{
return delegate.size();
return elementCount.get();
}
@Override
@ -163,7 +168,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
E e;
takeLock.lockInterruptibly();
try {
while (delegate.size() == 0) {
while (elementCount.get() == 0) {
notEmpty.await();
}
e = delegate.remove();
@ -181,8 +186,16 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
@Override
public int remainingCapacity()
{
int delegateSize = delegate.size();
long currentByteSize = currentSize.get();
int delegateSize;
long currentByteSize;
fullyLock();
try {
delegateSize = elementCount.get();
currentByteSize = currentSize.get();
}
finally {
fullyUnlock();
}
// return approximate remaining capacity based on current data
if (delegateSize == 0) {
return (int) Math.min(capacity, Integer.MAX_VALUE);
@ -214,13 +227,13 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
int n = 0;
takeLock.lock();
try {
n = Math.min(maxElements, delegate.size());
// elementCount.get provides visibility to first n Nodes
n = Math.min(maxElements, elementCount.get());
if (n < 0) {
return 0;
}
// count.get provides visibility to first n Nodes
for (int i = 0; i < n; i++) {
E e = delegate.remove(0);
E e = delegate.remove();
elementRemoved(e);
c.add(e);
}
@ -287,7 +300,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
E e = null;
takeLock.lockInterruptibly();
try {
while (delegate.size() == 0) {
while (elementCount.get() == 0) {
if (nanos <= 0) {
return null;
}

View File

@ -24,6 +24,8 @@ import com.metamx.common.ISE;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@ -33,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public class BytesBoundedLinkedQueueTest
@ -171,13 +174,88 @@ public class BytesBoundedLinkedQueueTest
}
}
@Test public void testAddedObjectExceedsCapacity() throws Exception {
@Test
public void testAddedObjectExceedsCapacity() throws Exception
{
BlockingQueue<TestObject> q = getQueue(4);
Assert.assertTrue(q.offer(new TestObject(3)));
Assert.assertFalse(q.offer(new TestObject(2)));
Assert.assertFalse(q.offer(new TestObject(2), delayMS, TimeUnit.MILLISECONDS));
}
// @Test
public void testConcurrentOperations() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(Integer.MAX_VALUE);
long duration = TimeUnit.SECONDS.toMillis(10);
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean stopTest = new AtomicBoolean(false);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
while (!stopTest.get()) {
q.add(new TestObject(1));
q.add(new TestObject(2));
}
return true;
}
}
)
);
}
for (int i = 0; i < 10; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws InterruptedException
{
while (!stopTest.get()) {
q.poll(100,TimeUnit.MILLISECONDS);
q.offer(new TestObject(2));
}
return true;
}
}
)
);
}
for (int i = 0; i < 5; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
while (!stopTest.get()) {
System.out
.println("drained elements : " + q.drainTo(new ArrayList<TestObject>(), Integer.MAX_VALUE));
}
return true;
}
}
)
);
}
Thread.sleep(duration);
stopTest.set(true);
for (Future<Boolean> future : futures) {
Assert.assertTrue(future.get());
}
}
public static class TestObject
{
public final int size;