diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java index d6d41311355..84c5f83d6a2 100644 --- a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -57,6 +57,15 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem } } + private void checkSize(E e) + { + if (getBytesSize(e) > capacity) { + throw new IllegalArgumentException( + String.format("cannot add element of size[%d] greater than capacity[%d]", getBytesSize(e), capacity) + ); + } + } + public abstract long getBytesSize(E e); public void elementAdded(E e) @@ -123,11 +132,12 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem ) throws InterruptedException { checkNotNull(e); + checkSize(e); long nanos = unit.toNanos(timeout); boolean added = false; putLock.lockInterruptibly(); try { - while (currentSize.get() >= capacity) { + while (currentSize.get() + getBytesSize(e) > capacity) { if (nanos <= 0) { return false; } @@ -228,10 +238,11 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem public boolean offer(E e) { checkNotNull(e); + checkSize(e); boolean added = false; putLock.lock(); try { - if (currentSize.get() >= capacity) { + if (currentSize.get() + getBytesSize(e) > capacity) { return false; } else { added = delegate.add(e); diff --git a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java index 2a05eac2291..67a863ff8a1 100644 --- a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java +++ b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java @@ -158,6 +158,26 @@ public class BytesBoundedLinkedQueueTest } + @Test + public void testAddBiggerElementThanCapacityFails() + { + BlockingQueue q = getQueue(5); + try { + q.offer(new TestObject(10)); + Assert.fail(); + } + catch (IllegalArgumentException success) { + + } + } + + @Test public void testAddedObjectExceedsCapacity() throws Exception { + BlockingQueue q = getQueue(4); + Assert.assertTrue(q.offer(new TestObject(3))); + Assert.assertFalse(q.offer(new TestObject(2))); + Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS)); + } + public static class TestObject { public final int size;