From d27274463b91ed5f0ed0b0c65deb036309812c29 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 7 Feb 2014 16:52:07 +0530 Subject: [PATCH] add separate putLock and takeLocks + test --- .../client/cache/BytesBoundedLinkedQueue.java | 201 +++++++++++------- .../cache/BytesBoundedLinkedQueueTest.java | 175 +++++++++++++++ 2 files changed, 305 insertions(+), 71 deletions(-) create mode 100644 server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java index 94d3acbe788..50ca475c0d5 100644 --- a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -35,14 +36,15 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implements BlockingQueue { - private LinkedList delegate; - private int capacity; - private int currentSize; - private Lock lock = new ReentrantLock(); - private Condition notFull = lock.newCondition(); - private Condition notEmpty = lock.newCondition(); + private final LinkedList delegate; + private final AtomicLong currentSize = new AtomicLong(0); + private final Lock putLock = new ReentrantLock(); + private final Condition notFull = putLock.newCondition(); + private final Lock takeLock = new ReentrantLock(); + private final Condition notEmpty = takeLock.newCondition(); + private long capacity; - public BytesBoundedLinkedQueue(int capacity) + public BytesBoundedLinkedQueue(long capacity) { delegate = new LinkedList<>(); this.capacity = capacity; @@ -57,28 +59,54 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem public abstract long getBytesSize(E e); - public void operationAdded(E e) + public void elementAdded(E e) { - currentSize += getBytesSize(e); - notEmpty.signalAll(); + currentSize.addAndGet(getBytesSize(e)); } - public void operationRemoved(E e) + public void elementRemoved(E e) { - currentSize -= getBytesSize(e); - notFull.signalAll(); + currentSize.addAndGet(-1 * getBytesSize(e)); + } + + private void fullyUnlock() + { + takeLock.unlock(); + putLock.unlock(); + } + + private void fullyLock() + { + takeLock.lock(); + putLock.lock(); + } + + private void signalNotEmpty() + { + takeLock.lock(); + try { + notEmpty.signal(); + } + finally { + takeLock.unlock(); + } + } + + private void signalNotFull() + { + putLock.lock(); + try { + notFull.signal(); + } + finally { + putLock.unlock(); + } } @Override public int size() { - lock.lock(); - try { - return delegate.size(); - } - finally { - lock.unlock(); - } + return delegate.size(); } @Override @@ -96,57 +124,66 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem { checkNotNull(e); long nanos = unit.toNanos(timeout); - lock.lockInterruptibly(); + boolean added = false; + putLock.lockInterruptibly(); try { - while (currentSize > capacity) { + while (currentSize.get() >= capacity) { if (nanos <= 0) { - return false; + break; } nanos = notFull.awaitNanos(nanos); } delegate.add(e); - operationAdded(e); - return true; + elementAdded(e); + added = true; } finally { - lock.unlock(); + putLock.unlock(); } + if (added) { + signalNotEmpty(); + } + return added; } @Override public E take() throws InterruptedException { - lock.lockInterruptibly(); + E e; + takeLock.lockInterruptibly(); try { while (delegate.size() == 0) { notEmpty.await(); } - E e = delegate.remove(); - operationRemoved(e); - return e; + e = delegate.remove(); + elementRemoved(e); } finally { - lock.unlock(); + takeLock.unlock(); } + if (e != null) { + signalNotFull(); + } + return e; } @Override public int remainingCapacity() { - lock.lock(); - try { - // return approximate remaining capacity based on current data - if (delegate.size() == 0) { - return capacity; - } else { - int averageByteSize = currentSize / delegate.size(); - return (capacity - currentSize) / averageByteSize; - } - } - finally { - lock.unlock(); + int delegateSize = delegate.size(); + long currentByteSize = currentSize.get(); + // return approximate remaining capacity based on current data + if (delegateSize == 0) { + return (int) Math.min(capacity, Integer.MAX_VALUE); + } else if (capacity > currentByteSize) { + long averageElementSize = currentByteSize / delegateSize; + return (int) ((capacity - currentByteSize) / averageElementSize); + } else { + // queue full + return 0; } + } @Override @@ -164,67 +201,80 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem if (c == this) { throw new IllegalArgumentException(); } - lock.lock(); + int n = 0; + takeLock.lock(); try { - int n = Math.min(maxElements, delegate.size()); + n = Math.min(maxElements, delegate.size()); if (n < 0) { return 0; } // count.get provides visibility to first n Nodes for (int i = 0; i < n; i++) { E e = delegate.remove(0); - operationRemoved(e); + elementRemoved(e); c.add(e); } - return n; } finally { - lock.unlock(); + takeLock.unlock(); } + if (n > 0) { + signalNotFull(); + } + return n; } @Override public boolean offer(E e) { checkNotNull(e); - lock.lock(); + boolean added = false; + putLock.lock(); try { - if (currentSize > capacity) { + if (currentSize.get() >= capacity) { return false; } else { - boolean added = delegate.add(e); + added = delegate.add(e); if (added) { - operationAdded(e); + elementAdded(e); } - return added; } } finally { - lock.unlock(); + putLock.unlock(); } + if (added) { + signalNotEmpty(); + } + return added; } @Override public E poll() { - lock.lock(); + E e = null; + takeLock.lock(); try { - E e = delegate.poll(); + e = delegate.poll(); if (e != null) { - operationRemoved(e); + elementRemoved(e); } - return e; } finally { - lock.unlock(); + takeLock.unlock(); } + if (e != null) { + signalNotFull(); + } + return e; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); - lock.lockInterruptibly(); + E e = null; + takeLock.lockInterruptibly(); try { while (delegate.size() == 0) { if (nanos <= 0) { @@ -232,22 +282,30 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem } nanos = notEmpty.awaitNanos(nanos); } - return delegate.poll(); + e = delegate.poll(); + if (e != null) { + elementRemoved(e); + } } finally { - lock.unlock(); + takeLock.unlock(); } + if (e != null) { + signalNotFull(); + } + return e; + } @Override public E peek() { - lock.lock(); + takeLock.lock(); try { return delegate.peek(); } finally { - lock.unlock(); + takeLock.unlock(); } } @@ -271,42 +329,43 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem @Override public boolean hasNext() { - lock.lock(); + fullyLock(); try { return delegate.hasNext(); } finally { - lock.unlock(); + fullyUnlock(); } } @Override public E next() { - lock.lock(); + fullyLock(); try { this.lastReturned = delegate.next(); return lastReturned; } finally { - lock.unlock(); + fullyUnlock(); } } @Override public void remove() { - lock.lock(); + fullyLock(); try { if (this.lastReturned == null) { throw new IllegalStateException(); } delegate.remove(); - operationRemoved(lastReturned); + elementRemoved(lastReturned); + signalNotFull(); lastReturned = null; } finally { - lock.unlock(); + fullyUnlock(); } } } diff --git a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java new file mode 100644 index 00000000000..5d6c4ea2b1f --- /dev/null +++ b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java @@ -0,0 +1,175 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + + +import com.metamx.common.ISE; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class BytesBoundedLinkedQueueTest +{ + private static int delayMS = 50; + private ExecutorService exec = Executors.newCachedThreadPool(); + + private static BlockingQueue getQueue(final int capacity) + { + return new BytesBoundedLinkedQueue(capacity) + { + @Override + public long getBytesSize(TestObject o) + { + return o.getSize(); + } + }; + } + + @Test + public void testPoll() throws InterruptedException + { + final BlockingQueue q = getQueue(10); + long startTime = System.nanoTime(); + Assert.assertNull(q.poll(delayMS, TimeUnit.MILLISECONDS)); + Assert.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) >= delayMS); + TestObject obj = new TestObject(2); + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertSame(obj, q.poll(delayMS, TimeUnit.MILLISECONDS)); + + Thread.currentThread().interrupt(); + try { + q.poll(delayMS, TimeUnit.MILLISECONDS); + throw new ISE("FAIL"); + } + catch (InterruptedException success) { + } + Assert.assertFalse(Thread.interrupted()); + } + + @Test + public void testTake() throws Exception + { + final BlockingQueue q = getQueue(10); + Thread.currentThread().interrupt(); + try { + q.take(); + Assert.fail(); + } + catch (InterruptedException success) { + // + } + final CountDownLatch latch = new CountDownLatch(1); + final TestObject object = new TestObject(4); + Future future = exec.submit( + new Callable() + { + @Override + public TestObject call() throws Exception + { + latch.countDown(); + return q.take(); + + } + } + ); + latch.await(); + // test take blocks on empty queue + try { + future.get(delayMS, TimeUnit.MILLISECONDS); + Assert.fail(); + } + catch (TimeoutException success) { + + } + + q.offer(object); + Assert.assertEquals(object, future.get()); + } + + @Test + public void testOfferAndPut() throws Exception + { + final BlockingQueue q = getQueue(10); + try { + q.offer(null); + Assert.fail(); + } + catch (NullPointerException success) { + + } + + final TestObject obj = new TestObject(2); + while (q.remainingCapacity() > 0) { + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + } + // queue full + Assert.assertEquals(0, q.remainingCapacity()); + Assert.assertFalse(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertFalse(q.offer(obj)); + final CyclicBarrier barrier = new CyclicBarrier(2); + + Future future = exec.submit( + new Callable() + { + @Override + public Boolean call() throws Exception + { + barrier.await(); + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertEquals(q.remainingCapacity(), 0); + barrier.await(); + q.put(obj); + return true; + } + } + ); + barrier.await(); + q.take(); + barrier.await(); + q.take(); + Assert.assertTrue(future.get()); + + } + + public static class TestObject + { + public final int size; + + TestObject(int size) + { + this.size = size; + } + + public int getSize() + { + return size; + } + } +} \ No newline at end of file