fix concurrency issues

This commit is contained in:
nishantmonu51 2014-03-12 20:54:05 +05:30
parent 9033472fe7
commit f10ac73eef
2 changed files with 104 additions and 13 deletions

View File

@ -22,9 +22,11 @@ package io.druid.client.cache;
import java.util.AbstractQueue; import java.util.AbstractQueue;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -36,17 +38,18 @@ import java.util.concurrent.locks.ReentrantLock;
*/ */
public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
{ {
private final LinkedList<E> delegate; private final Queue<E> delegate;
private final AtomicLong currentSize = new AtomicLong(0); private final AtomicLong currentSize = new AtomicLong(0);
private final Lock putLock = new ReentrantLock(); private final Lock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition(); private final Condition notFull = putLock.newCondition();
private final Lock takeLock = new ReentrantLock(); private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition(); private final Condition notEmpty = takeLock.newCondition();
private final AtomicInteger elementCount = new AtomicInteger(0);
private long capacity; private long capacity;
public BytesBoundedLinkedQueue(long capacity) public BytesBoundedLinkedQueue(long capacity)
{ {
delegate = new LinkedList<>(); delegate = new ConcurrentLinkedQueue<>();
this.capacity = capacity; this.capacity = capacity;
} }
@ -71,11 +74,13 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
public void elementAdded(E e) public void elementAdded(E e)
{ {
currentSize.addAndGet(getBytesSize(e)); currentSize.addAndGet(getBytesSize(e));
elementCount.getAndIncrement();
} }
public void elementRemoved(E e) public void elementRemoved(E e)
{ {
currentSize.addAndGet(-1 * getBytesSize(e)); currentSize.addAndGet(-1 * getBytesSize(e));
elementCount.getAndDecrement();
} }
private void fullyUnlock() private void fullyUnlock()
@ -115,7 +120,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
@Override @Override
public int size() public int size()
{ {
return delegate.size(); return elementCount.get();
} }
@Override @Override
@ -163,7 +168,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
E e; E e;
takeLock.lockInterruptibly(); takeLock.lockInterruptibly();
try { try {
while (delegate.size() == 0) { while (elementCount.get() == 0) {
notEmpty.await(); notEmpty.await();
} }
e = delegate.remove(); e = delegate.remove();
@ -181,8 +186,16 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
@Override @Override
public int remainingCapacity() public int remainingCapacity()
{ {
int delegateSize = delegate.size(); int delegateSize;
long currentByteSize = currentSize.get(); long currentByteSize;
fullyLock();
try {
delegateSize = elementCount.get();
currentByteSize = currentSize.get();
}
finally {
fullyUnlock();
}
// return approximate remaining capacity based on current data // return approximate remaining capacity based on current data
if (delegateSize == 0) { if (delegateSize == 0) {
return (int) Math.min(capacity, Integer.MAX_VALUE); return (int) Math.min(capacity, Integer.MAX_VALUE);
@ -214,13 +227,13 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
int n = 0; int n = 0;
takeLock.lock(); takeLock.lock();
try { try {
n = Math.min(maxElements, delegate.size()); // elementCount.get provides visibility to first n Nodes
n = Math.min(maxElements, elementCount.get());
if (n < 0) { if (n < 0) {
return 0; return 0;
} }
// count.get provides visibility to first n Nodes
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
E e = delegate.remove(0); E e = delegate.remove();
elementRemoved(e); elementRemoved(e);
c.add(e); c.add(e);
} }
@ -287,7 +300,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
E e = null; E e = null;
takeLock.lockInterruptibly(); takeLock.lockInterruptibly();
try { try {
while (delegate.size() == 0) { while (elementCount.get() == 0) {
if (nanos <= 0) { if (nanos <= 0) {
return null; return null;
} }

View File

@ -24,6 +24,8 @@ import com.metamx.common.ISE;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -33,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public class BytesBoundedLinkedQueueTest public class BytesBoundedLinkedQueueTest
@ -171,13 +174,88 @@ public class BytesBoundedLinkedQueueTest
} }
} }
@Test public void testAddedObjectExceedsCapacity() throws Exception { @Test
public void testAddedObjectExceedsCapacity() throws Exception
{
BlockingQueue<TestObject> q = getQueue(4); BlockingQueue<TestObject> q = getQueue(4);
Assert.assertTrue(q.offer(new TestObject(3))); Assert.assertTrue(q.offer(new TestObject(3)));
Assert.assertFalse(q.offer(new TestObject(2))); Assert.assertFalse(q.offer(new TestObject(2)));
Assert.assertFalse(q.offer(new TestObject(2), delayMS, TimeUnit.MILLISECONDS)); Assert.assertFalse(q.offer(new TestObject(2), delayMS, TimeUnit.MILLISECONDS));
} }
// @Test
public void testConcurrentOperations() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(Integer.MAX_VALUE);
long duration = TimeUnit.SECONDS.toMillis(10);
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean stopTest = new AtomicBoolean(false);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
while (!stopTest.get()) {
q.add(new TestObject(1));
q.add(new TestObject(2));
}
return true;
}
}
)
);
}
for (int i = 0; i < 10; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws InterruptedException
{
while (!stopTest.get()) {
q.poll(100,TimeUnit.MILLISECONDS);
q.offer(new TestObject(2));
}
return true;
}
}
)
);
}
for (int i = 0; i < 5; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
while (!stopTest.get()) {
System.out
.println("drained elements : " + q.drainTo(new ArrayList<TestObject>(), Integer.MAX_VALUE));
}
return true;
}
}
)
);
}
Thread.sleep(duration);
stopTest.set(true);
for (Future<Boolean> future : futures) {
Assert.assertTrue(future.get());
}
}
public static class TestObject public static class TestObject
{ {
public final int size; public final int size;