From f10ac73eef90de3928e1c162952c79fc2c33aad1 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 12 Mar 2014 20:54:05 +0530 Subject: [PATCH] fix concurrency issues --- .../client/cache/BytesBoundedLinkedQueue.java | 35 +++++--- .../cache/BytesBoundedLinkedQueueTest.java | 82 ++++++++++++++++++- 2 files changed, 104 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java index 84c5f83d6a2..2d892bc00d9 100644 --- a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -22,9 +22,11 @@ package io.druid.client.cache; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -36,17 +38,18 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implements BlockingQueue { - private final LinkedList delegate; + private final Queue delegate; private final AtomicLong currentSize = new AtomicLong(0); private final Lock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); private final Lock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); + private final AtomicInteger elementCount = new AtomicInteger(0); private long capacity; public BytesBoundedLinkedQueue(long capacity) { - delegate = new LinkedList<>(); + delegate = new ConcurrentLinkedQueue<>(); this.capacity = capacity; } @@ -71,11 +74,13 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem public void elementAdded(E e) { currentSize.addAndGet(getBytesSize(e)); + elementCount.getAndIncrement(); } public void elementRemoved(E e) { currentSize.addAndGet(-1 * getBytesSize(e)); + elementCount.getAndDecrement(); } private void fullyUnlock() @@ -115,7 +120,7 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem @Override public int size() { - return delegate.size(); + return elementCount.get(); } @Override @@ -163,7 +168,7 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem E e; takeLock.lockInterruptibly(); try { - while (delegate.size() == 0) { + while (elementCount.get() == 0) { notEmpty.await(); } e = delegate.remove(); @@ -181,8 +186,16 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem @Override public int remainingCapacity() { - int delegateSize = delegate.size(); - long currentByteSize = currentSize.get(); + int delegateSize; + long currentByteSize; + fullyLock(); + try { + delegateSize = elementCount.get(); + currentByteSize = currentSize.get(); + } + finally { + fullyUnlock(); + } // return approximate remaining capacity based on current data if (delegateSize == 0) { return (int) Math.min(capacity, Integer.MAX_VALUE); @@ -214,13 +227,13 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem int n = 0; takeLock.lock(); try { - n = Math.min(maxElements, delegate.size()); + // elementCount.get provides visibility to first n Nodes + n = Math.min(maxElements, elementCount.get()); if (n < 0) { return 0; } - // count.get provides visibility to first n Nodes for (int i = 0; i < n; i++) { - E e = delegate.remove(0); + E e = delegate.remove(); elementRemoved(e); c.add(e); } @@ -287,7 +300,7 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem E e = null; takeLock.lockInterruptibly(); try { - while (delegate.size() == 0) { + while (elementCount.get() == 0) { if (nanos <= 0) { return null; } diff --git a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java index 67a863ff8a1..f98fc34ffa6 100644 --- a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java +++ b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java @@ -24,6 +24,8 @@ import com.metamx.common.ISE; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -33,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; public class BytesBoundedLinkedQueueTest @@ -171,11 +174,86 @@ public class BytesBoundedLinkedQueueTest } } - @Test public void testAddedObjectExceedsCapacity() throws Exception { + @Test + public void testAddedObjectExceedsCapacity() throws Exception + { BlockingQueue q = getQueue(4); Assert.assertTrue(q.offer(new TestObject(3))); Assert.assertFalse(q.offer(new TestObject(2))); - Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS)); + Assert.assertFalse(q.offer(new TestObject(2), delayMS, TimeUnit.MILLISECONDS)); + } + + // @Test + public void testConcurrentOperations() throws Exception + { + final BlockingQueue q = getQueue(Integer.MAX_VALUE); + long duration = TimeUnit.SECONDS.toMillis(10); + ExecutorService executor = Executors.newCachedThreadPool(); + final AtomicBoolean stopTest = new AtomicBoolean(false); + List futures = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + futures.add( + executor.submit( + new Callable() + { + @Override + public Boolean call() + { + while (!stopTest.get()) { + q.add(new TestObject(1)); + q.add(new TestObject(2)); + } + return true; + + } + } + ) + ); + } + + for (int i = 0; i < 10; i++) { + futures.add( + executor.submit( + new Callable() + { + @Override + public Boolean call() throws InterruptedException + { + while (!stopTest.get()) { + q.poll(100,TimeUnit.MILLISECONDS); + q.offer(new TestObject(2)); + } + return true; + + } + } + ) + ); + } + + for (int i = 0; i < 5; i++) { + futures.add( + executor.submit( + new Callable() + { + @Override + public Boolean call() + { + while (!stopTest.get()) { + System.out + .println("drained elements : " + q.drainTo(new ArrayList(), Integer.MAX_VALUE)); + } + return true; + } + } + ) + ); + } + Thread.sleep(duration); + stopTest.set(true); + for (Future future : futures) { + Assert.assertTrue(future.get()); + } } public static class TestObject