mirror of https://github.com/apache/druid.git
Merge pull request #390 from metamx/bard-oome-fix
APP-2782 Bard oome fix
This commit is contained in:
commit
ef6208d192
|
@ -0,0 +1,383 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.cache;
|
||||||
|
|
||||||
|
import java.util.AbstractQueue;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract implementation of a BlockingQueue bounded by the size of elements,
|
||||||
|
* works similar to LinkedBlockingQueue except that capacity is bounded by the size in bytes of the elements in the queue.
|
||||||
|
*/
|
||||||
|
public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
|
||||||
|
{
|
||||||
|
private final LinkedList<E> delegate;
|
||||||
|
private final AtomicLong currentSize = new AtomicLong(0);
|
||||||
|
private final Lock putLock = new ReentrantLock();
|
||||||
|
private final Condition notFull = putLock.newCondition();
|
||||||
|
private final Lock takeLock = new ReentrantLock();
|
||||||
|
private final Condition notEmpty = takeLock.newCondition();
|
||||||
|
private long capacity;
|
||||||
|
|
||||||
|
public BytesBoundedLinkedQueue(long capacity)
|
||||||
|
{
|
||||||
|
delegate = new LinkedList<>();
|
||||||
|
this.capacity = capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void checkNotNull(Object v)
|
||||||
|
{
|
||||||
|
if (v == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkSize(E e)
|
||||||
|
{
|
||||||
|
if (getBytesSize(e) > capacity) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("cannot add element of size[%d] greater than capacity[%d]", getBytesSize(e), capacity)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract long getBytesSize(E e);
|
||||||
|
|
||||||
|
public void elementAdded(E e)
|
||||||
|
{
|
||||||
|
currentSize.addAndGet(getBytesSize(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void elementRemoved(E e)
|
||||||
|
{
|
||||||
|
currentSize.addAndGet(-1 * getBytesSize(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fullyUnlock()
|
||||||
|
{
|
||||||
|
takeLock.unlock();
|
||||||
|
putLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fullyLock()
|
||||||
|
{
|
||||||
|
takeLock.lock();
|
||||||
|
putLock.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void signalNotEmpty()
|
||||||
|
{
|
||||||
|
takeLock.lock();
|
||||||
|
try {
|
||||||
|
notEmpty.signal();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
takeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void signalNotFull()
|
||||||
|
{
|
||||||
|
putLock.lock();
|
||||||
|
try {
|
||||||
|
notFull.signal();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
putLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size()
|
||||||
|
{
|
||||||
|
return delegate.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(E e) throws InterruptedException
|
||||||
|
{
|
||||||
|
while (!offer(e, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
|
||||||
|
// keep trying until added successfully
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean offer(
|
||||||
|
E e, long timeout, TimeUnit unit
|
||||||
|
) throws InterruptedException
|
||||||
|
{
|
||||||
|
checkNotNull(e);
|
||||||
|
checkSize(e);
|
||||||
|
long nanos = unit.toNanos(timeout);
|
||||||
|
boolean added = false;
|
||||||
|
putLock.lockInterruptibly();
|
||||||
|
try {
|
||||||
|
while (currentSize.get() + getBytesSize(e) > capacity) {
|
||||||
|
if (nanos <= 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
nanos = notFull.awaitNanos(nanos);
|
||||||
|
}
|
||||||
|
delegate.add(e);
|
||||||
|
elementAdded(e);
|
||||||
|
added = true;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
putLock.unlock();
|
||||||
|
}
|
||||||
|
if (added) {
|
||||||
|
signalNotEmpty();
|
||||||
|
}
|
||||||
|
return added;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E take() throws InterruptedException
|
||||||
|
{
|
||||||
|
E e;
|
||||||
|
takeLock.lockInterruptibly();
|
||||||
|
try {
|
||||||
|
while (delegate.size() == 0) {
|
||||||
|
notEmpty.await();
|
||||||
|
}
|
||||||
|
e = delegate.remove();
|
||||||
|
elementRemoved(e);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
takeLock.unlock();
|
||||||
|
}
|
||||||
|
if (e != null) {
|
||||||
|
signalNotFull();
|
||||||
|
}
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int remainingCapacity()
|
||||||
|
{
|
||||||
|
int delegateSize = delegate.size();
|
||||||
|
long currentByteSize = currentSize.get();
|
||||||
|
// return approximate remaining capacity based on current data
|
||||||
|
if (delegateSize == 0) {
|
||||||
|
return (int) Math.min(capacity, Integer.MAX_VALUE);
|
||||||
|
} else if (capacity > currentByteSize) {
|
||||||
|
long averageElementSize = currentByteSize / delegateSize;
|
||||||
|
return (int) ((capacity - currentByteSize) / averageElementSize);
|
||||||
|
} else {
|
||||||
|
// queue full
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int drainTo(Collection<? super E> c)
|
||||||
|
{
|
||||||
|
return drainTo(c, Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int drainTo(Collection<? super E> c, int maxElements)
|
||||||
|
{
|
||||||
|
if (c == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
if (c == this) {
|
||||||
|
throw new IllegalArgumentException();
|
||||||
|
}
|
||||||
|
int n = 0;
|
||||||
|
takeLock.lock();
|
||||||
|
try {
|
||||||
|
n = Math.min(maxElements, delegate.size());
|
||||||
|
if (n < 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// count.get provides visibility to first n Nodes
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
E e = delegate.remove(0);
|
||||||
|
elementRemoved(e);
|
||||||
|
c.add(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
takeLock.unlock();
|
||||||
|
}
|
||||||
|
if (n > 0) {
|
||||||
|
signalNotFull();
|
||||||
|
}
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean offer(E e)
|
||||||
|
{
|
||||||
|
checkNotNull(e);
|
||||||
|
checkSize(e);
|
||||||
|
boolean added = false;
|
||||||
|
putLock.lock();
|
||||||
|
try {
|
||||||
|
if (currentSize.get() + getBytesSize(e) > capacity) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
added = delegate.add(e);
|
||||||
|
if (added) {
|
||||||
|
elementAdded(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
putLock.unlock();
|
||||||
|
}
|
||||||
|
if (added) {
|
||||||
|
signalNotEmpty();
|
||||||
|
}
|
||||||
|
return added;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E poll()
|
||||||
|
{
|
||||||
|
E e = null;
|
||||||
|
takeLock.lock();
|
||||||
|
try {
|
||||||
|
e = delegate.poll();
|
||||||
|
if (e != null) {
|
||||||
|
elementRemoved(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
takeLock.unlock();
|
||||||
|
}
|
||||||
|
if (e != null) {
|
||||||
|
signalNotFull();
|
||||||
|
}
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E poll(long timeout, TimeUnit unit) throws InterruptedException
|
||||||
|
{
|
||||||
|
long nanos = unit.toNanos(timeout);
|
||||||
|
E e = null;
|
||||||
|
takeLock.lockInterruptibly();
|
||||||
|
try {
|
||||||
|
while (delegate.size() == 0) {
|
||||||
|
if (nanos <= 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
nanos = notEmpty.awaitNanos(nanos);
|
||||||
|
}
|
||||||
|
e = delegate.poll();
|
||||||
|
if (e != null) {
|
||||||
|
elementRemoved(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
takeLock.unlock();
|
||||||
|
}
|
||||||
|
if (e != null) {
|
||||||
|
signalNotFull();
|
||||||
|
}
|
||||||
|
return e;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E peek()
|
||||||
|
{
|
||||||
|
takeLock.lock();
|
||||||
|
try {
|
||||||
|
return delegate.peek();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
takeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<E> iterator()
|
||||||
|
{
|
||||||
|
return new Itr(delegate.iterator());
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Itr implements Iterator<E>
|
||||||
|
{
|
||||||
|
|
||||||
|
private final Iterator<E> delegate;
|
||||||
|
private E lastReturned;
|
||||||
|
|
||||||
|
Itr(Iterator<E> delegate)
|
||||||
|
{
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext()
|
||||||
|
{
|
||||||
|
fullyLock();
|
||||||
|
try {
|
||||||
|
return delegate.hasNext();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
fullyUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E next()
|
||||||
|
{
|
||||||
|
fullyLock();
|
||||||
|
try {
|
||||||
|
this.lastReturned = delegate.next();
|
||||||
|
return lastReturned;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
fullyUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove()
|
||||||
|
{
|
||||||
|
fullyLock();
|
||||||
|
try {
|
||||||
|
if (this.lastReturned == null) {
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
delegate.remove();
|
||||||
|
elementRemoved(lastReturned);
|
||||||
|
signalNotFull();
|
||||||
|
lastReturned = null;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
fullyUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -56,7 +56,7 @@ public class MemcachedCache implements Cache
|
||||||
|
|
||||||
// always use compression
|
// always use compression
|
||||||
transcoder.setCompressionThreshold(0);
|
transcoder.setCompressionThreshold(0);
|
||||||
|
MemcachedOperationQueueFactory queueFactory = new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize());
|
||||||
return new MemcachedCache(
|
return new MemcachedCache(
|
||||||
new MemcachedClient(
|
new MemcachedClient(
|
||||||
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
||||||
|
@ -68,6 +68,7 @@ public class MemcachedCache implements Cache
|
||||||
.setShouldOptimize(true)
|
.setShouldOptimize(true)
|
||||||
.setOpQueueMaxBlockTime(config.getTimeout())
|
.setOpQueueMaxBlockTime(config.getTimeout())
|
||||||
.setOpTimeout(config.getTimeout())
|
.setOpTimeout(config.getTimeout())
|
||||||
|
.setOpQueueFactory(queueFactory)
|
||||||
.build(),
|
.build(),
|
||||||
AddrUtil.getAddresses(config.getHosts())
|
AddrUtil.getAddresses(config.getHosts())
|
||||||
),
|
),
|
||||||
|
|
|
@ -27,19 +27,17 @@ public class MemcachedCacheConfig
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private int expiration = 2592000; // What is this number?
|
private int expiration = 2592000; // What is this number?
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private int timeout = 500;
|
private int timeout = 500;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@NotNull
|
@NotNull
|
||||||
private String hosts;
|
private String hosts;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private int maxObjectSize = 50 * 1024 * 1024;
|
private int maxObjectSize = 50 * 1024 * 1024;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private String memcachedPrefix = "druid";
|
private String memcachedPrefix = "druid";
|
||||||
|
@JsonProperty
|
||||||
|
private long maxOperationQueueSize = 256 * 1024 * 1024L; // 256 MB
|
||||||
|
|
||||||
public int getExpiration()
|
public int getExpiration()
|
||||||
{
|
{
|
||||||
|
@ -65,4 +63,9 @@ public class MemcachedCacheConfig
|
||||||
{
|
{
|
||||||
return memcachedPrefix;
|
return memcachedPrefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getMaxOperationQueueSize()
|
||||||
|
{
|
||||||
|
return maxOperationQueueSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
48
server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java
vendored
Normal file
48
server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java
vendored
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.cache;
|
||||||
|
|
||||||
|
import net.spy.memcached.ops.Operation;
|
||||||
|
import net.spy.memcached.ops.OperationQueueFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
|
public class MemcachedOperationQueueFactory implements OperationQueueFactory
|
||||||
|
{
|
||||||
|
public final long maxQueueSize;
|
||||||
|
|
||||||
|
public MemcachedOperationQueueFactory(long maxQueueSize)
|
||||||
|
{
|
||||||
|
this.maxQueueSize = maxQueueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BlockingQueue<Operation> create()
|
||||||
|
{
|
||||||
|
return new BytesBoundedLinkedQueue<Operation>(maxQueueSize)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public long getBytesSize(Operation operation)
|
||||||
|
{
|
||||||
|
return operation.getBuffer().remaining();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
195
server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java
vendored
Normal file
195
server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java
vendored
Normal file
|
@ -0,0 +1,195 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.cache;
|
||||||
|
|
||||||
|
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
|
||||||
|
public class BytesBoundedLinkedQueueTest
|
||||||
|
{
|
||||||
|
private static int delayMS = 50;
|
||||||
|
private ExecutorService exec = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
|
private static BlockingQueue<TestObject> getQueue(final int capacity)
|
||||||
|
{
|
||||||
|
return new BytesBoundedLinkedQueue<TestObject>(capacity)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public long getBytesSize(TestObject o)
|
||||||
|
{
|
||||||
|
return o.getSize();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPoll() throws InterruptedException
|
||||||
|
{
|
||||||
|
final BlockingQueue q = getQueue(10);
|
||||||
|
long startTime = System.nanoTime();
|
||||||
|
Assert.assertNull(q.poll(delayMS, TimeUnit.MILLISECONDS));
|
||||||
|
Assert.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) >= delayMS);
|
||||||
|
TestObject obj = new TestObject(2);
|
||||||
|
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
|
||||||
|
Assert.assertSame(obj, q.poll(delayMS, TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
try {
|
||||||
|
q.poll(delayMS, TimeUnit.MILLISECONDS);
|
||||||
|
throw new ISE("FAIL");
|
||||||
|
}
|
||||||
|
catch (InterruptedException success) {
|
||||||
|
}
|
||||||
|
Assert.assertFalse(Thread.interrupted());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTake() throws Exception
|
||||||
|
{
|
||||||
|
final BlockingQueue<TestObject> q = getQueue(10);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
try {
|
||||||
|
q.take();
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
catch (InterruptedException success) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
final TestObject object = new TestObject(4);
|
||||||
|
Future<TestObject> future = exec.submit(
|
||||||
|
new Callable<TestObject>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public TestObject call() throws Exception
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
return q.take();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
latch.await();
|
||||||
|
// test take blocks on empty queue
|
||||||
|
try {
|
||||||
|
future.get(delayMS, TimeUnit.MILLISECONDS);
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
catch (TimeoutException success) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
q.offer(object);
|
||||||
|
Assert.assertEquals(object, future.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOfferAndPut() throws Exception
|
||||||
|
{
|
||||||
|
final BlockingQueue<TestObject> q = getQueue(10);
|
||||||
|
try {
|
||||||
|
q.offer(null);
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
catch (NullPointerException success) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
final TestObject obj = new TestObject(2);
|
||||||
|
while (q.remainingCapacity() > 0) {
|
||||||
|
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
// queue full
|
||||||
|
Assert.assertEquals(0, q.remainingCapacity());
|
||||||
|
Assert.assertFalse(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
|
||||||
|
Assert.assertFalse(q.offer(obj));
|
||||||
|
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||||
|
|
||||||
|
Future<Boolean> future = exec.submit(
|
||||||
|
new Callable<Boolean>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Boolean call() throws Exception
|
||||||
|
{
|
||||||
|
barrier.await();
|
||||||
|
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
|
||||||
|
Assert.assertEquals(q.remainingCapacity(), 0);
|
||||||
|
barrier.await();
|
||||||
|
q.put(obj);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
barrier.await();
|
||||||
|
q.take();
|
||||||
|
barrier.await();
|
||||||
|
q.take();
|
||||||
|
Assert.assertTrue(future.get());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddBiggerElementThanCapacityFails()
|
||||||
|
{
|
||||||
|
BlockingQueue<TestObject> q = getQueue(5);
|
||||||
|
try {
|
||||||
|
q.offer(new TestObject(10));
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
catch (IllegalArgumentException success) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testAddedObjectExceedsCapacity() throws Exception {
|
||||||
|
BlockingQueue<TestObject> q = getQueue(4);
|
||||||
|
Assert.assertTrue(q.offer(new TestObject(3)));
|
||||||
|
Assert.assertFalse(q.offer(new TestObject(2)));
|
||||||
|
Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestObject
|
||||||
|
{
|
||||||
|
public final int size;
|
||||||
|
|
||||||
|
TestObject(int size)
|
||||||
|
{
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSize()
|
||||||
|
{
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue