mirror of https://github.com/apache/druid.git
add checks for elements larger than capacity and account for the size of element being added
This commit is contained in:
parent
f8d8290a1e
commit
78eddc50ee
|
@ -57,6 +57,15 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
|
|||
}
|
||||
}
|
||||
|
||||
private void checkSize(E e)
|
||||
{
|
||||
if (getBytesSize(e) > capacity) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("cannot add element of size[%d] greater than capacity[%d]", getBytesSize(e), capacity)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract long getBytesSize(E e);
|
||||
|
||||
public void elementAdded(E e)
|
||||
|
@ -123,11 +132,12 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
|
|||
) throws InterruptedException
|
||||
{
|
||||
checkNotNull(e);
|
||||
checkSize(e);
|
||||
long nanos = unit.toNanos(timeout);
|
||||
boolean added = false;
|
||||
putLock.lockInterruptibly();
|
||||
try {
|
||||
while (currentSize.get() >= capacity) {
|
||||
while (currentSize.get() + getBytesSize(e) > capacity) {
|
||||
if (nanos <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
@ -228,10 +238,11 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
|
|||
public boolean offer(E e)
|
||||
{
|
||||
checkNotNull(e);
|
||||
checkSize(e);
|
||||
boolean added = false;
|
||||
putLock.lock();
|
||||
try {
|
||||
if (currentSize.get() >= capacity) {
|
||||
if (currentSize.get() + getBytesSize(e) > capacity) {
|
||||
return false;
|
||||
} else {
|
||||
added = delegate.add(e);
|
||||
|
|
|
@ -158,6 +158,26 @@ public class BytesBoundedLinkedQueueTest
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddBiggerElementThanCapacityFails()
|
||||
{
|
||||
BlockingQueue<TestObject> q = getQueue(5);
|
||||
try {
|
||||
q.offer(new TestObject(10));
|
||||
Assert.fail();
|
||||
}
|
||||
catch (IllegalArgumentException success) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test public void testAddedObjectExceedsCapacity() throws Exception {
|
||||
BlockingQueue<TestObject> q = getQueue(4);
|
||||
Assert.assertTrue(q.offer(new TestObject(3)));
|
||||
Assert.assertFalse(q.offer(new TestObject(2)));
|
||||
Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
public static class TestObject
|
||||
{
|
||||
public final int size;
|
||||
|
|
Loading…
Reference in New Issue