add separate putLock and takeLocks + test

This commit is contained in:
nishantmonu51 2014-02-07 16:52:07 +05:30
parent 7bbb6ba063
commit d27274463b
2 changed files with 305 additions and 71 deletions

View File

@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -35,14 +36,15 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
{
private LinkedList<E> delegate;
private int capacity;
private int currentSize;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private final LinkedList<E> delegate;
private final AtomicLong currentSize = new AtomicLong(0);
private final Lock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private long capacity;
public BytesBoundedLinkedQueue(int capacity)
public BytesBoundedLinkedQueue(long capacity)
{
delegate = new LinkedList<>();
this.capacity = capacity;
@ -57,29 +59,55 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
public abstract long getBytesSize(E e);
public void operationAdded(E e)
public void elementAdded(E e)
{
currentSize += getBytesSize(e);
notEmpty.signalAll();
currentSize.addAndGet(getBytesSize(e));
}
public void operationRemoved(E e)
public void elementRemoved(E e)
{
currentSize -= getBytesSize(e);
notFull.signalAll();
currentSize.addAndGet(-1 * getBytesSize(e));
}
private void fullyUnlock()
{
takeLock.unlock();
putLock.unlock();
}
private void fullyLock()
{
takeLock.lock();
putLock.lock();
}
private void signalNotEmpty()
{
takeLock.lock();
try {
notEmpty.signal();
}
finally {
takeLock.unlock();
}
}
private void signalNotFull()
{
putLock.lock();
try {
notFull.signal();
}
finally {
putLock.unlock();
}
}
@Override
public int size()
{
lock.lock();
try {
return delegate.size();
}
finally {
lock.unlock();
}
}
@Override
public void put(E e) throws InterruptedException
@ -96,57 +124,66 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
{
checkNotNull(e);
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
boolean added = false;
putLock.lockInterruptibly();
try {
while (currentSize > capacity) {
while (currentSize.get() >= capacity) {
if (nanos <= 0) {
return false;
break;
}
nanos = notFull.awaitNanos(nanos);
}
delegate.add(e);
operationAdded(e);
return true;
elementAdded(e);
added = true;
}
finally {
lock.unlock();
putLock.unlock();
}
if (added) {
signalNotEmpty();
}
return added;
}
@Override
public E take() throws InterruptedException
{
lock.lockInterruptibly();
E e;
takeLock.lockInterruptibly();
try {
while (delegate.size() == 0) {
notEmpty.await();
}
E e = delegate.remove();
operationRemoved(e);
return e;
e = delegate.remove();
elementRemoved(e);
}
finally {
lock.unlock();
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public int remainingCapacity()
{
lock.lock();
try {
int delegateSize = delegate.size();
long currentByteSize = currentSize.get();
// return approximate remaining capacity based on current data
if (delegate.size() == 0) {
return capacity;
if (delegateSize == 0) {
return (int) Math.min(capacity, Integer.MAX_VALUE);
} else if (capacity > currentByteSize) {
long averageElementSize = currentByteSize / delegateSize;
return (int) ((capacity - currentByteSize) / averageElementSize);
} else {
int averageByteSize = currentSize / delegate.size();
return (capacity - currentSize) / averageByteSize;
}
}
finally {
lock.unlock();
// queue full
return 0;
}
}
@Override
@ -164,67 +201,80 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
if (c == this) {
throw new IllegalArgumentException();
}
lock.lock();
int n = 0;
takeLock.lock();
try {
int n = Math.min(maxElements, delegate.size());
n = Math.min(maxElements, delegate.size());
if (n < 0) {
return 0;
}
// count.get provides visibility to first n Nodes
for (int i = 0; i < n; i++) {
E e = delegate.remove(0);
operationRemoved(e);
elementRemoved(e);
c.add(e);
}
return n;
}
finally {
lock.unlock();
takeLock.unlock();
}
if (n > 0) {
signalNotFull();
}
return n;
}
@Override
public boolean offer(E e)
{
checkNotNull(e);
lock.lock();
boolean added = false;
putLock.lock();
try {
if (currentSize > capacity) {
if (currentSize.get() >= capacity) {
return false;
} else {
boolean added = delegate.add(e);
added = delegate.add(e);
if (added) {
operationAdded(e);
elementAdded(e);
}
return added;
}
}
finally {
lock.unlock();
putLock.unlock();
}
if (added) {
signalNotEmpty();
}
return added;
}
@Override
public E poll()
{
lock.lock();
E e = null;
takeLock.lock();
try {
E e = delegate.poll();
e = delegate.poll();
if (e != null) {
operationRemoved(e);
elementRemoved(e);
}
return e;
}
finally {
lock.unlock();
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException
{
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
E e = null;
takeLock.lockInterruptibly();
try {
while (delegate.size() == 0) {
if (nanos <= 0) {
@ -232,22 +282,30 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
}
nanos = notEmpty.awaitNanos(nanos);
}
return delegate.poll();
e = delegate.poll();
if (e != null) {
elementRemoved(e);
}
}
finally {
lock.unlock();
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public E peek()
{
lock.lock();
takeLock.lock();
try {
return delegate.peek();
}
finally {
lock.unlock();
takeLock.unlock();
}
}
@ -271,42 +329,43 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
@Override
public boolean hasNext()
{
lock.lock();
fullyLock();
try {
return delegate.hasNext();
}
finally {
lock.unlock();
fullyUnlock();
}
}
@Override
public E next()
{
lock.lock();
fullyLock();
try {
this.lastReturned = delegate.next();
return lastReturned;
}
finally {
lock.unlock();
fullyUnlock();
}
}
@Override
public void remove()
{
lock.lock();
fullyLock();
try {
if (this.lastReturned == null) {
throw new IllegalStateException();
}
delegate.remove();
operationRemoved(lastReturned);
elementRemoved(lastReturned);
signalNotFull();
lastReturned = null;
}
finally {
lock.unlock();
fullyUnlock();
}
}
}

View File

@ -0,0 +1,175 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.cache;
import com.metamx.common.ISE;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class BytesBoundedLinkedQueueTest
{
private static int delayMS = 50;
private ExecutorService exec = Executors.newCachedThreadPool();
private static BlockingQueue<TestObject> getQueue(final int capacity)
{
return new BytesBoundedLinkedQueue<TestObject>(capacity)
{
@Override
public long getBytesSize(TestObject o)
{
return o.getSize();
}
};
}
@Test
public void testPoll() throws InterruptedException
{
final BlockingQueue q = getQueue(10);
long startTime = System.nanoTime();
Assert.assertNull(q.poll(delayMS, TimeUnit.MILLISECONDS));
Assert.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) >= delayMS);
TestObject obj = new TestObject(2);
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertSame(obj, q.poll(delayMS, TimeUnit.MILLISECONDS));
Thread.currentThread().interrupt();
try {
q.poll(delayMS, TimeUnit.MILLISECONDS);
throw new ISE("FAIL");
}
catch (InterruptedException success) {
}
Assert.assertFalse(Thread.interrupted());
}
@Test
public void testTake() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(10);
Thread.currentThread().interrupt();
try {
q.take();
Assert.fail();
}
catch (InterruptedException success) {
//
}
final CountDownLatch latch = new CountDownLatch(1);
final TestObject object = new TestObject(4);
Future<TestObject> future = exec.submit(
new Callable<TestObject>()
{
@Override
public TestObject call() throws Exception
{
latch.countDown();
return q.take();
}
}
);
latch.await();
// test take blocks on empty queue
try {
future.get(delayMS, TimeUnit.MILLISECONDS);
Assert.fail();
}
catch (TimeoutException success) {
}
q.offer(object);
Assert.assertEquals(object, future.get());
}
@Test
public void testOfferAndPut() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(10);
try {
q.offer(null);
Assert.fail();
}
catch (NullPointerException success) {
}
final TestObject obj = new TestObject(2);
while (q.remainingCapacity() > 0) {
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
}
// queue full
Assert.assertEquals(0, q.remainingCapacity());
Assert.assertFalse(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertFalse(q.offer(obj));
final CyclicBarrier barrier = new CyclicBarrier(2);
Future<Boolean> future = exec.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
barrier.await();
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertEquals(q.remainingCapacity(), 0);
barrier.await();
q.put(obj);
return true;
}
}
);
barrier.await();
q.take();
barrier.await();
q.take();
Assert.assertTrue(future.get());
}
public static class TestObject
{
public final int size;
TestObject(int size)
{
this.size = size;
}
public int getSize()
{
return size;
}
}
}