From 7bbb6ba063cc87f121b0eebe68c41b4966250d8a Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 6 Feb 2014 23:33:32 +0530 Subject: [PATCH 01/12] support for bounding operation queue by the byte size --- .../client/cache/BytesBoundedLinkedQueue.java | 313 ++++++++++++++++++ .../io/druid/client/cache/MemcachedCache.java | 1 + .../client/cache/MemcachedCacheConfig.java | 11 +- .../cache/MemcachedOperationQueueFactory.java | 48 +++ 4 files changed, 369 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java create mode 100644 server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java new file mode 100644 index 00000000000..94d3acbe788 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -0,0 +1,313 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Abstract implementation of a BlockingQueue bounded by the size of elements, + * works similar to LinkedBlockingQueue except that capacity is bounded by the size in bytes of the elements in the queue. + */ +public abstract class BytesBoundedLinkedQueue extends AbstractQueue implements BlockingQueue +{ + private LinkedList delegate; + private int capacity; + private int currentSize; + private Lock lock = new ReentrantLock(); + private Condition notFull = lock.newCondition(); + private Condition notEmpty = lock.newCondition(); + + public BytesBoundedLinkedQueue(int capacity) + { + delegate = new LinkedList<>(); + this.capacity = capacity; + } + + private static void checkNotNull(Object v) + { + if (v == null) { + throw new NullPointerException(); + } + } + + public abstract long getBytesSize(E e); + + public void operationAdded(E e) + { + currentSize += getBytesSize(e); + notEmpty.signalAll(); + } + + public void operationRemoved(E e) + { + currentSize -= getBytesSize(e); + notFull.signalAll(); + } + + @Override + public int size() + { + lock.lock(); + try { + return delegate.size(); + } + finally { + lock.unlock(); + } + } + + @Override + public void put(E e) throws InterruptedException + { + while (!offer(e, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { + // keep trying until added successfully + } + } + + @Override + public boolean offer( + E e, long timeout, TimeUnit unit + ) throws InterruptedException + { + checkNotNull(e); + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (currentSize > capacity) { + if (nanos <= 0) { + return false; + } + nanos = notFull.awaitNanos(nanos); + } + delegate.add(e); + operationAdded(e); + return true; + } + finally { + lock.unlock(); + } + + } + + @Override + public E take() throws InterruptedException + { + lock.lockInterruptibly(); + try { + while (delegate.size() == 0) { + notEmpty.await(); + } + E e = delegate.remove(); + operationRemoved(e); + return e; + } + finally { + lock.unlock(); + } + } + + @Override + public int remainingCapacity() + { + lock.lock(); + try { + // return approximate remaining capacity based on current data + if (delegate.size() == 0) { + return capacity; + } else { + int averageByteSize = currentSize / delegate.size(); + return (capacity - currentSize) / averageByteSize; + } + } + finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c) + { + return drainTo(c, Integer.MAX_VALUE); + } + + @Override + public int drainTo(Collection c, int maxElements) + { + if (c == null) { + throw new NullPointerException(); + } + if (c == this) { + throw new IllegalArgumentException(); + } + lock.lock(); + try { + int n = Math.min(maxElements, delegate.size()); + if (n < 0) { + return 0; + } + // count.get provides visibility to first n Nodes + for (int i = 0; i < n; i++) { + E e = delegate.remove(0); + operationRemoved(e); + c.add(e); + } + return n; + } + finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E e) + { + checkNotNull(e); + lock.lock(); + try { + if (currentSize > capacity) { + return false; + } else { + boolean added = delegate.add(e); + if (added) { + operationAdded(e); + } + return added; + } + } + finally { + lock.unlock(); + } + } + + @Override + public E poll() + { + lock.lock(); + try { + E e = delegate.poll(); + if (e != null) { + operationRemoved(e); + } + return e; + } + finally { + lock.unlock(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException + { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (delegate.size() == 0) { + if (nanos <= 0) { + return null; + } + nanos = notEmpty.awaitNanos(nanos); + } + return delegate.poll(); + } + finally { + lock.unlock(); + } + } + + @Override + public E peek() + { + lock.lock(); + try { + return delegate.peek(); + } + finally { + lock.unlock(); + } + } + + @Override + public Iterator iterator() + { + return new Itr(delegate.iterator()); + } + + private class Itr implements Iterator + { + + private final Iterator delegate; + private E lastReturned; + + Itr(Iterator delegate) + { + this.delegate = delegate; + } + + @Override + public boolean hasNext() + { + lock.lock(); + try { + return delegate.hasNext(); + } + finally { + lock.unlock(); + } + } + + @Override + public E next() + { + lock.lock(); + try { + this.lastReturned = delegate.next(); + return lastReturned; + } + finally { + lock.unlock(); + } + } + + @Override + public void remove() + { + lock.lock(); + try { + if (this.lastReturned == null) { + throw new IllegalStateException(); + } + delegate.remove(); + operationRemoved(lastReturned); + lastReturned = null; + } + finally { + lock.unlock(); + } + } + } +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 155c75e3f86..2186adc8e24 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -68,6 +68,7 @@ public class MemcachedCache implements Cache .setShouldOptimize(true) .setOpQueueMaxBlockTime(config.getTimeout()) .setOpTimeout(config.getTimeout()) + .setOpQueueFactory(new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize())) .build(), AddrUtil.getAddresses(config.getHosts()) ), diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index 2cc06cf3637..f66aaa3f333 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -27,19 +27,17 @@ public class MemcachedCacheConfig { @JsonProperty private int expiration = 2592000; // What is this number? - @JsonProperty private int timeout = 500; - @JsonProperty @NotNull private String hosts; - @JsonProperty private int maxObjectSize = 50 * 1024 * 1024; - @JsonProperty private String memcachedPrefix = "druid"; + @JsonProperty + private int maxOperationQueueSize = 256 * 1024 * 1024; // 256 MB public int getExpiration() { @@ -65,4 +63,9 @@ public class MemcachedCacheConfig { return memcachedPrefix; } + + public int getMaxOperationQueueSize() + { + return maxOperationQueueSize; + } } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java new file mode 100644 index 00000000000..a91a162c371 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationQueueFactory; + +import java.util.concurrent.BlockingQueue; + +public class MemcachedOperationQueueFactory implements OperationQueueFactory +{ + public final int maxQueueSize; + + public MemcachedOperationQueueFactory(int maxQueueSize) + { + this.maxQueueSize = maxQueueSize; + } + + @Override + public BlockingQueue create() + { + return new BytesBoundedLinkedQueue(maxQueueSize) + { + @Override + public long getBytesSize(Operation operation) + { + return operation.getBuffer().capacity(); + } + }; + } +} \ No newline at end of file From d27274463b91ed5f0ed0b0c65deb036309812c29 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 7 Feb 2014 16:52:07 +0530 Subject: [PATCH 02/12] add separate putLock and takeLocks + test --- .../client/cache/BytesBoundedLinkedQueue.java | 201 +++++++++++------- .../cache/BytesBoundedLinkedQueueTest.java | 175 +++++++++++++++ 2 files changed, 305 insertions(+), 71 deletions(-) create mode 100644 server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java index 94d3acbe788..50ca475c0d5 100644 --- a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -35,14 +36,15 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implements BlockingQueue { - private LinkedList delegate; - private int capacity; - private int currentSize; - private Lock lock = new ReentrantLock(); - private Condition notFull = lock.newCondition(); - private Condition notEmpty = lock.newCondition(); + private final LinkedList delegate; + private final AtomicLong currentSize = new AtomicLong(0); + private final Lock putLock = new ReentrantLock(); + private final Condition notFull = putLock.newCondition(); + private final Lock takeLock = new ReentrantLock(); + private final Condition notEmpty = takeLock.newCondition(); + private long capacity; - public BytesBoundedLinkedQueue(int capacity) + public BytesBoundedLinkedQueue(long capacity) { delegate = new LinkedList<>(); this.capacity = capacity; @@ -57,28 +59,54 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem public abstract long getBytesSize(E e); - public void operationAdded(E e) + public void elementAdded(E e) { - currentSize += getBytesSize(e); - notEmpty.signalAll(); + currentSize.addAndGet(getBytesSize(e)); } - public void operationRemoved(E e) + public void elementRemoved(E e) { - currentSize -= getBytesSize(e); - notFull.signalAll(); + currentSize.addAndGet(-1 * getBytesSize(e)); + } + + private void fullyUnlock() + { + takeLock.unlock(); + putLock.unlock(); + } + + private void fullyLock() + { + takeLock.lock(); + putLock.lock(); + } + + private void signalNotEmpty() + { + takeLock.lock(); + try { + notEmpty.signal(); + } + finally { + takeLock.unlock(); + } + } + + private void signalNotFull() + { + putLock.lock(); + try { + notFull.signal(); + } + finally { + putLock.unlock(); + } } @Override public int size() { - lock.lock(); - try { - return delegate.size(); - } - finally { - lock.unlock(); - } + return delegate.size(); } @Override @@ -96,57 +124,66 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem { checkNotNull(e); long nanos = unit.toNanos(timeout); - lock.lockInterruptibly(); + boolean added = false; + putLock.lockInterruptibly(); try { - while (currentSize > capacity) { + while (currentSize.get() >= capacity) { if (nanos <= 0) { - return false; + break; } nanos = notFull.awaitNanos(nanos); } delegate.add(e); - operationAdded(e); - return true; + elementAdded(e); + added = true; } finally { - lock.unlock(); + putLock.unlock(); } + if (added) { + signalNotEmpty(); + } + return added; } @Override public E take() throws InterruptedException { - lock.lockInterruptibly(); + E e; + takeLock.lockInterruptibly(); try { while (delegate.size() == 0) { notEmpty.await(); } - E e = delegate.remove(); - operationRemoved(e); - return e; + e = delegate.remove(); + elementRemoved(e); } finally { - lock.unlock(); + takeLock.unlock(); } + if (e != null) { + signalNotFull(); + } + return e; } @Override public int remainingCapacity() { - lock.lock(); - try { - // return approximate remaining capacity based on current data - if (delegate.size() == 0) { - return capacity; - } else { - int averageByteSize = currentSize / delegate.size(); - return (capacity - currentSize) / averageByteSize; - } - } - finally { - lock.unlock(); + int delegateSize = delegate.size(); + long currentByteSize = currentSize.get(); + // return approximate remaining capacity based on current data + if (delegateSize == 0) { + return (int) Math.min(capacity, Integer.MAX_VALUE); + } else if (capacity > currentByteSize) { + long averageElementSize = currentByteSize / delegateSize; + return (int) ((capacity - currentByteSize) / averageElementSize); + } else { + // queue full + return 0; } + } @Override @@ -164,67 +201,80 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem if (c == this) { throw new IllegalArgumentException(); } - lock.lock(); + int n = 0; + takeLock.lock(); try { - int n = Math.min(maxElements, delegate.size()); + n = Math.min(maxElements, delegate.size()); if (n < 0) { return 0; } // count.get provides visibility to first n Nodes for (int i = 0; i < n; i++) { E e = delegate.remove(0); - operationRemoved(e); + elementRemoved(e); c.add(e); } - return n; } finally { - lock.unlock(); + takeLock.unlock(); } + if (n > 0) { + signalNotFull(); + } + return n; } @Override public boolean offer(E e) { checkNotNull(e); - lock.lock(); + boolean added = false; + putLock.lock(); try { - if (currentSize > capacity) { + if (currentSize.get() >= capacity) { return false; } else { - boolean added = delegate.add(e); + added = delegate.add(e); if (added) { - operationAdded(e); + elementAdded(e); } - return added; } } finally { - lock.unlock(); + putLock.unlock(); } + if (added) { + signalNotEmpty(); + } + return added; } @Override public E poll() { - lock.lock(); + E e = null; + takeLock.lock(); try { - E e = delegate.poll(); + e = delegate.poll(); if (e != null) { - operationRemoved(e); + elementRemoved(e); } - return e; } finally { - lock.unlock(); + takeLock.unlock(); } + if (e != null) { + signalNotFull(); + } + return e; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); - lock.lockInterruptibly(); + E e = null; + takeLock.lockInterruptibly(); try { while (delegate.size() == 0) { if (nanos <= 0) { @@ -232,22 +282,30 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem } nanos = notEmpty.awaitNanos(nanos); } - return delegate.poll(); + e = delegate.poll(); + if (e != null) { + elementRemoved(e); + } } finally { - lock.unlock(); + takeLock.unlock(); } + if (e != null) { + signalNotFull(); + } + return e; + } @Override public E peek() { - lock.lock(); + takeLock.lock(); try { return delegate.peek(); } finally { - lock.unlock(); + takeLock.unlock(); } } @@ -271,42 +329,43 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem @Override public boolean hasNext() { - lock.lock(); + fullyLock(); try { return delegate.hasNext(); } finally { - lock.unlock(); + fullyUnlock(); } } @Override public E next() { - lock.lock(); + fullyLock(); try { this.lastReturned = delegate.next(); return lastReturned; } finally { - lock.unlock(); + fullyUnlock(); } } @Override public void remove() { - lock.lock(); + fullyLock(); try { if (this.lastReturned == null) { throw new IllegalStateException(); } delegate.remove(); - operationRemoved(lastReturned); + elementRemoved(lastReturned); + signalNotFull(); lastReturned = null; } finally { - lock.unlock(); + fullyUnlock(); } } } diff --git a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java new file mode 100644 index 00000000000..5d6c4ea2b1f --- /dev/null +++ b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java @@ -0,0 +1,175 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + + +import com.metamx.common.ISE; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class BytesBoundedLinkedQueueTest +{ + private static int delayMS = 50; + private ExecutorService exec = Executors.newCachedThreadPool(); + + private static BlockingQueue getQueue(final int capacity) + { + return new BytesBoundedLinkedQueue(capacity) + { + @Override + public long getBytesSize(TestObject o) + { + return o.getSize(); + } + }; + } + + @Test + public void testPoll() throws InterruptedException + { + final BlockingQueue q = getQueue(10); + long startTime = System.nanoTime(); + Assert.assertNull(q.poll(delayMS, TimeUnit.MILLISECONDS)); + Assert.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) >= delayMS); + TestObject obj = new TestObject(2); + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertSame(obj, q.poll(delayMS, TimeUnit.MILLISECONDS)); + + Thread.currentThread().interrupt(); + try { + q.poll(delayMS, TimeUnit.MILLISECONDS); + throw new ISE("FAIL"); + } + catch (InterruptedException success) { + } + Assert.assertFalse(Thread.interrupted()); + } + + @Test + public void testTake() throws Exception + { + final BlockingQueue q = getQueue(10); + Thread.currentThread().interrupt(); + try { + q.take(); + Assert.fail(); + } + catch (InterruptedException success) { + // + } + final CountDownLatch latch = new CountDownLatch(1); + final TestObject object = new TestObject(4); + Future future = exec.submit( + new Callable() + { + @Override + public TestObject call() throws Exception + { + latch.countDown(); + return q.take(); + + } + } + ); + latch.await(); + // test take blocks on empty queue + try { + future.get(delayMS, TimeUnit.MILLISECONDS); + Assert.fail(); + } + catch (TimeoutException success) { + + } + + q.offer(object); + Assert.assertEquals(object, future.get()); + } + + @Test + public void testOfferAndPut() throws Exception + { + final BlockingQueue q = getQueue(10); + try { + q.offer(null); + Assert.fail(); + } + catch (NullPointerException success) { + + } + + final TestObject obj = new TestObject(2); + while (q.remainingCapacity() > 0) { + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + } + // queue full + Assert.assertEquals(0, q.remainingCapacity()); + Assert.assertFalse(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertFalse(q.offer(obj)); + final CyclicBarrier barrier = new CyclicBarrier(2); + + Future future = exec.submit( + new Callable() + { + @Override + public Boolean call() throws Exception + { + barrier.await(); + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertEquals(q.remainingCapacity(), 0); + barrier.await(); + q.put(obj); + return true; + } + } + ); + barrier.await(); + q.take(); + barrier.await(); + q.take(); + Assert.assertTrue(future.get()); + + } + + public static class TestObject + { + public final int size; + + TestObject(int size) + { + this.size = size; + } + + public int getSize() + { + return size; + } + } +} \ No newline at end of file From 61d99bb8f2f401778079b9add75fb87a6f34d938 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 7 Feb 2014 16:56:15 +0530 Subject: [PATCH 03/12] change byte size config to long --- .../src/main/java/io/druid/client/cache/MemcachedCache.java | 5 +++-- .../java/io/druid/client/cache/MemcachedCacheConfig.java | 4 ++-- .../druid/client/cache/MemcachedOperationQueueFactory.java | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 2186adc8e24..a52e1b82e4b 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -56,7 +56,7 @@ public class MemcachedCache implements Cache // always use compression transcoder.setCompressionThreshold(0); - + MemcachedOperationQueueFactory queueFactory = new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize()); return new MemcachedCache( new MemcachedClient( new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) @@ -68,7 +68,8 @@ public class MemcachedCache implements Cache .setShouldOptimize(true) .setOpQueueMaxBlockTime(config.getTimeout()) .setOpTimeout(config.getTimeout()) - .setOpQueueFactory(new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize())) + .setOpQueueFactory(queueFactory) + .setWriteOpQueueFactory(queueFactory) .build(), AddrUtil.getAddresses(config.getHosts()) ), diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index f66aaa3f333..2d8674cdd24 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -37,7 +37,7 @@ public class MemcachedCacheConfig @JsonProperty private String memcachedPrefix = "druid"; @JsonProperty - private int maxOperationQueueSize = 256 * 1024 * 1024; // 256 MB + private long maxOperationQueueSize = 256 * 1024 * 1024L; // 256 MB public int getExpiration() { @@ -64,7 +64,7 @@ public class MemcachedCacheConfig return memcachedPrefix; } - public int getMaxOperationQueueSize() + public long getMaxOperationQueueSize() { return maxOperationQueueSize; } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java index a91a162c371..6f765a5ab28 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java @@ -26,9 +26,9 @@ import java.util.concurrent.BlockingQueue; public class MemcachedOperationQueueFactory implements OperationQueueFactory { - public final int maxQueueSize; + public final long maxQueueSize; - public MemcachedOperationQueueFactory(int maxQueueSize) + public MemcachedOperationQueueFactory(long maxQueueSize) { this.maxQueueSize = maxQueueSize; } From db93e45bef99cdb41678326e5e7c1aa0c5f7781b Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 7 Feb 2014 17:52:52 +0530 Subject: [PATCH 04/12] minor fix --- .../java/io/druid/client/cache/BytesBoundedLinkedQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java index 50ca475c0d5..f0ab9763f6b 100644 --- a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -129,7 +129,7 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem try { while (currentSize.get() >= capacity) { if (nanos <= 0) { - break; + return false; } nanos = notFull.awaitNanos(nanos); } From dc097081a64e2fbef2a0b40c6645a94f41cd59f8 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 7 Feb 2014 20:00:25 +0530 Subject: [PATCH 05/12] remove writeOpQueueFactory for writeQueueFactory the buffer of the element is nulled by memcache before remove is called, thus we are not able to determine the size of element while removing, We can potentially work around this by adding a wrapper to keep track of size but that will have its extra overhead, Also verified from the heap dump that the growing queue was the queue for opQueue. --- server/src/main/java/io/druid/client/cache/MemcachedCache.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index a52e1b82e4b..abbfb54139c 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -69,7 +69,6 @@ public class MemcachedCache implements Cache .setOpQueueMaxBlockTime(config.getTimeout()) .setOpTimeout(config.getTimeout()) .setOpQueueFactory(queueFactory) - .setWriteOpQueueFactory(queueFactory) .build(), AddrUtil.getAddresses(config.getHosts()) ), From ec2a32acfc8b3258cf6a21c2e24e9f8faa8d6aea Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 10 Feb 2014 12:53:36 +0530 Subject: [PATCH 06/12] review comments --- .../io/druid/client/cache/MemcachedOperationQueueFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java index 6f765a5ab28..8118d869731 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java @@ -41,8 +41,8 @@ public class MemcachedOperationQueueFactory implements OperationQueueFactory @Override public long getBytesSize(Operation operation) { - return operation.getBuffer().capacity(); + return operation.getBuffer().remaining(); } }; } -} \ No newline at end of file +} From f8d8290a1eea162f2119369b7fb34773af207e38 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 10 Feb 2014 15:44:28 +0530 Subject: [PATCH 07/12] add newline at the end of file --- .../java/io/druid/client/cache/BytesBoundedLinkedQueue.java | 2 +- .../java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java index f0ab9763f6b..d6d41311355 100644 --- a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -369,4 +369,4 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem } } } -} \ No newline at end of file +} diff --git a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java index 5d6c4ea2b1f..2a05eac2291 100644 --- a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java +++ b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java @@ -172,4 +172,4 @@ public class BytesBoundedLinkedQueueTest return size; } } -} \ No newline at end of file +} From 533a263fbd6c4e2f4326003034d500224b2af5b3 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 10 Feb 2014 22:32:26 +0530 Subject: [PATCH 08/12] initial working version --- .../query/ChainedExecutionQueryRunner.java | 63 +++++- .../io/druid/query/ParallelQueryRunner.java | 14 ++ .../druid/query/aggregation/Aggregators.java | 56 +++++ .../groupby/GroupByQueryQueryToolChest.java | 48 +++-- .../segment/incremental/IncrementalIndex.java | 192 ++++++++++-------- 5 files changed, 260 insertions(+), 113 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/ParallelQueryRunner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/Aggregators.java diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 316c8d8675e..0339c016125 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -25,6 +25,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.Sequence; @@ -51,10 +52,9 @@ import java.util.concurrent.Future; * That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B * must be fully cached in memory before the results for Aa and Ab are computed. */ -public class ChainedExecutionQueryRunner implements QueryRunner +public class ChainedExecutionQueryRunner implements ParallelQueryRunner { private static final Logger log = new Logger(ChainedExecutionQueryRunner.class); - private final Iterable> queryables; private final ExecutorService exec; private final Ordering ordering; @@ -157,4 +157,63 @@ public class ChainedExecutionQueryRunner implements QueryRunner } ); } + + @Override + public OutType runAndAccumulate( + final Query query, + final OutType outType, final Accumulator outTypeTAccumulator + ) + { + final int priority = Integer.parseInt(query.getContextValue("priority", "0")); + + if (Iterables.isEmpty(queryables)) { + log.warn("No queryables found."); + return outType; + } + List> futures = Lists.newArrayList( + Iterables.transform( + queryables, + new Function, Future>() + { + @Override + public Future apply(final QueryRunner input) + { + return exec.submit( + new PrioritizedCallable(priority) + { + @Override + public Boolean call() throws Exception + { + try { + input.run(query).accumulate(outType, outTypeTAccumulator); + return true; + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } + } + ); + } + } + ) + ); + + // Let the runners complete + for (Future future : futures) { + try { + future.get(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + + return outType; + } } diff --git a/processing/src/main/java/io/druid/query/ParallelQueryRunner.java b/processing/src/main/java/io/druid/query/ParallelQueryRunner.java new file mode 100644 index 00000000000..28f095b5460 --- /dev/null +++ b/processing/src/main/java/io/druid/query/ParallelQueryRunner.java @@ -0,0 +1,14 @@ +package io.druid.query; + +public interface ParallelQueryRunner extends QueryRunner +{ + + /** + * accumulator passed should be thread safe + */ + OutType runAndAccumulate( + Query query, + OutType outType, + com.metamx.common.guava.Accumulator outTypeTAccumulator + ); +} diff --git a/processing/src/main/java/io/druid/query/aggregation/Aggregators.java b/processing/src/main/java/io/druid/query/aggregation/Aggregators.java new file mode 100644 index 00000000000..871667d4b46 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/Aggregators.java @@ -0,0 +1,56 @@ +package io.druid.query.aggregation; + +public class Aggregators +{ + + public static Aggregator synchronizedAggregator(Aggregator aggregator){ + return new SynchronizedAggregator(aggregator); + } + + private static class SynchronizedAggregator implements Aggregator + { + + private final Aggregator delegate; + + SynchronizedAggregator(Aggregator delegate) + { + this.delegate = delegate; + } + + @Override + public synchronized void aggregate() + { + delegate.aggregate(); + } + + @Override + public synchronized void reset() + { + delegate.reset(); + } + + @Override + public synchronized Object get() + { + return delegate.get(); + } + + @Override + public synchronized float getFloat() + { + return delegate.getFloat(); + } + + @Override + public synchronized String getName() + { + return delegate.getName(); + } + + @Override + public synchronized void close() + { + delegate.close(); + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 9bae22699c6..ceafc550250 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -38,6 +38,7 @@ import io.druid.data.input.Row; import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; import io.druid.query.IntervalChunkingQueryRunner; +import io.druid.query.ParallelQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -61,7 +62,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); - private final Supplier configSupplier; @Inject @@ -121,28 +121,32 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() - { - @Override - public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) - { - if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) { - throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); - } - - return accumulated; - } - } + IncrementalIndex index = new IncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]) ); + Accumulator accumulator = new Accumulator() + { + @Override + public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) + { + if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) { + throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); + } + + return accumulated; + } + }; + + + if (runner instanceof ParallelQueryRunner && Boolean.getBoolean("optimize")) { + index = (IncrementalIndex) ((ParallelQueryRunner) runner).runAndAccumulate(query, index, accumulator); + } else { + index = runner.run(query).accumulate(index, accumulator); + } // convert millis back to timestamp according to granularity to preserve time zone information Sequence retVal = Sequences.map( diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index c13a1c3c588..7ff46fb703b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -40,6 +40,7 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.Aggregators; import io.druid.query.aggregation.PostAggregator; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; @@ -62,6 +63,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -69,11 +71,9 @@ public class IncrementalIndex implements Iterable { private static final Logger log = new Logger(IncrementalIndex.class); private static final Joiner JOINER = Joiner.on(","); - private final long minTimestamp; private final QueryGranularity gran; private final AggregatorFactory[] metrics; - private final Map metricIndexes; private final Map metricTypes; private final ImmutableList metricNames; @@ -83,9 +83,7 @@ public class IncrementalIndex implements Iterable private final SpatialDimensionRowFormatter spatialDimensionRowFormatter; private final DimensionHolder dimValues; private final ConcurrentSkipListMap facts; - - private volatile int numEntries = 0; - + private volatile AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() by a (hopefully) single thread. private InputRow in; @@ -162,15 +160,22 @@ public class IncrementalIndex implements Iterable dimension = dimension.toLowerCase(); List dimensionValues = row.getDimension(dimension); - final Integer index = dimensionOrder.get(dimension); + Integer index = dimensionOrder.get(dimension); if (index == null) { - dimensionOrder.put(dimension, dimensionOrder.size()); - dimensions.add(dimension); + synchronized (dimensionOrder) { + index = dimensionOrder.get(dimension); + if (index == null) { + dimensionOrder.put(dimension, dimensionOrder.size()); + dimensions.add(dimension); - if (overflow == null) { - overflow = Lists.newArrayList(); + if (overflow == null) { + overflow = Lists.newArrayList(); + } + overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); + } else { + dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); + } } - overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); } else { dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); } @@ -188,118 +193,128 @@ public class IncrementalIndex implements Iterable TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); - in = row; Aggregator[] aggs = facts.get(key); if (aggs == null) { aggs = new Aggregator[metrics.length]; for (int i = 0; i < metrics.length; ++i) { final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorize( - new ColumnSelectorFactory() - { - @Override - public TimestampColumnSelector makeTimestampColumnSelector() - { - return new TimestampColumnSelector() + aggs[i] = + agg.factorize( + new ColumnSelectorFactory() { @Override - public long getTimestamp() + public TimestampColumnSelector makeTimestampColumnSelector() { - return in.getTimestampFromEpoch(); - } - }; - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - final String metricName = columnName.toLowerCase(); - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.getFloatMetric(metricName); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) - { - final String typeName = agg.getTypeName(); - final String columnName = column.toLowerCase(); - - if (typeName.equals("float")) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() + return new TimestampColumnSelector() { - return Float.TYPE; + @Override + public long getTimestamp() + { + return in.getTimestampFromEpoch(); + } + }; + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + final String metricName = columnName.toLowerCase(); + return new FloatColumnSelector() + { + @Override + public float get() + { + return in.getFloatMetric(metricName); + } + }; + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + final String typeName = agg.getTypeName(); + final String columnName = column.toLowerCase(); + + if (typeName.equals("float")) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.TYPE; + } + + @Override + public Float get() + { + return in.getFloatMetric(columnName); + } + }; } - @Override - public Float get() - { - return in.getFloatMetric(columnName); + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); } - }; - } - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + final ComplexMetricExtractor extractor = serde.getExtractor(); - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } - final ComplexMetricExtractor extractor = serde.getExtractor(); - - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); + @Override + public Object get() + { + return extractor.extractValue(in, columnName); + } + }; } @Override - public Object get() + public DimensionSelector makeDimensionSelector(String dimension) { - return extractor.extractValue(in, columnName); + // we should implement this, but this is going to be rewritten soon anyways + throw new UnsupportedOperationException( + "Incremental index aggregation does not support dimension selectors" + ); } - }; - } + } - @Override - public DimensionSelector makeDimensionSelector(String dimension) { - // we should implement this, but this is going to be rewritten soon anyways - throw new UnsupportedOperationException("Incremental index aggregation does not support dimension selectors"); - } - } ); } - facts.put(key, aggs); - ++numEntries; + Aggregator[] prev = facts.putIfAbsent(key, aggs); + if (prev != null) { + aggs = prev; + } + numEntries.incrementAndGet(); } - for (Aggregator agg : aggs) { - agg.aggregate(); + synchronized (this) { + in = row; + for (Aggregator agg : aggs) { + agg.aggregate(); + } + in = null; } - in = null; - return numEntries; + return numEntries.get(); } public boolean isEmpty() { - return numEntries == 0; + return numEntries.get() == 0; } public int size() { - return numEntries; + return numEntries.get(); } public long getMinTimeMillis() @@ -585,7 +600,6 @@ public class IncrementalIndex implements Iterable private final Map poorMansInterning = Maps.newConcurrentMap(); private final Map falseIds; private final Map falseIdsReverse; - private volatile String[] sortedVals = null; public DimDim() @@ -658,4 +672,4 @@ public class IncrementalIndex implements Iterable } } } -} +} \ No newline at end of file From 3302cff6dbaf297f4aa16308dbadfd5835a4f6a6 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 12 Feb 2014 00:51:10 +0530 Subject: [PATCH 09/12] groupBy improvements --- .../query/ChainedExecutionQueryRunner.java | 63 +--------- .../query/GroupByParallelQueryRunner.java | 117 ++++++++++++++++++ .../io/druid/query/ParallelQueryRunner.java | 14 --- .../druid/query/aggregation/Aggregators.java | 56 --------- .../query/groupby/GroupByQueryHelper.java | 99 +++++++++++++++ .../groupby/GroupByQueryQueryToolChest.java | 79 ++---------- .../groupby/GroupByQueryRunnerFactory.java | 9 +- .../segment/incremental/IncrementalIndex.java | 6 +- .../main/java/io/druid/cli/CliRealtime.java | 2 +- 9 files changed, 238 insertions(+), 207 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java delete mode 100644 processing/src/main/java/io/druid/query/ParallelQueryRunner.java delete mode 100644 processing/src/main/java/io/druid/query/aggregation/Aggregators.java create mode 100644 processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 0339c016125..316c8d8675e 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -25,7 +25,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.Sequence; @@ -52,9 +51,10 @@ import java.util.concurrent.Future; * That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B * must be fully cached in memory before the results for Aa and Ab are computed. */ -public class ChainedExecutionQueryRunner implements ParallelQueryRunner +public class ChainedExecutionQueryRunner implements QueryRunner { private static final Logger log = new Logger(ChainedExecutionQueryRunner.class); + private final Iterable> queryables; private final ExecutorService exec; private final Ordering ordering; @@ -157,63 +157,4 @@ public class ChainedExecutionQueryRunner implements ParallelQueryRunner } ); } - - @Override - public OutType runAndAccumulate( - final Query query, - final OutType outType, final Accumulator outTypeTAccumulator - ) - { - final int priority = Integer.parseInt(query.getContextValue("priority", "0")); - - if (Iterables.isEmpty(queryables)) { - log.warn("No queryables found."); - return outType; - } - List> futures = Lists.newArrayList( - Iterables.transform( - queryables, - new Function, Future>() - { - @Override - public Future apply(final QueryRunner input) - { - return exec.submit( - new PrioritizedCallable(priority) - { - @Override - public Boolean call() throws Exception - { - try { - input.run(query).accumulate(outType, outTypeTAccumulator); - return true; - } - catch (Exception e) { - log.error(e, "Exception with one of the sequences!"); - throw Throwables.propagate(e); - } - } - } - ); - } - } - ) - ); - - // Let the runners complete - for (Future future : futures) { - try { - future.get(); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - - return outType; - } } diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java new file mode 100644 index 00000000000..f1d381f3187 --- /dev/null +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -0,0 +1,117 @@ +package io.druid.query; + +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.metamx.common.Pair; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import io.druid.data.input.Row; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryHelper; +import io.druid.segment.incremental.IncrementalIndex; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + + +public class GroupByParallelQueryRunner implements QueryRunner +{ + private static final Logger log = new Logger(GroupByParallelQueryRunner.class); + private final Iterable> queryables; + private final ExecutorService exec; + private final Ordering ordering; + private final Supplier configSupplier; + + public GroupByParallelQueryRunner( + ExecutorService exec, + Ordering ordering, Supplier configSupplier, + QueryRunner... queryables + ) + { + this(exec, ordering, configSupplier, Arrays.asList(queryables)); + } + + public GroupByParallelQueryRunner( + ExecutorService exec, + Ordering ordering, Supplier configSupplier, + Iterable> queryables + ) + { + this.exec = exec; + this.ordering = ordering; + this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); + this.configSupplier = configSupplier; + } + + @Override + public Sequence run(final Query queryParam) + { + + final GroupByQuery query = (GroupByQuery) queryParam; + final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( + query, + configSupplier.get() + ); + final int priority = Integer.parseInt(query.getContextValue("priority", "0")); + + if (Iterables.isEmpty(queryables)) { + log.warn("No queryables found."); + } + List> futures = Lists.newArrayList( + Iterables.transform( + queryables, + new Function, Future>() + { + @Override + public Future apply(final QueryRunner input) + { + return exec.submit( + new PrioritizedCallable(priority) + { + @Override + public Boolean call() throws Exception + { + try { + input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + return true; + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } + } + ); + } + } + ) + ); + + // Let the runners complete + for (Future future : futures) { + try { + future.get(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null)); + } + +} diff --git a/processing/src/main/java/io/druid/query/ParallelQueryRunner.java b/processing/src/main/java/io/druid/query/ParallelQueryRunner.java deleted file mode 100644 index 28f095b5460..00000000000 --- a/processing/src/main/java/io/druid/query/ParallelQueryRunner.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.druid.query; - -public interface ParallelQueryRunner extends QueryRunner -{ - - /** - * accumulator passed should be thread safe - */ - OutType runAndAccumulate( - Query query, - OutType outType, - com.metamx.common.guava.Accumulator outTypeTAccumulator - ); -} diff --git a/processing/src/main/java/io/druid/query/aggregation/Aggregators.java b/processing/src/main/java/io/druid/query/aggregation/Aggregators.java deleted file mode 100644 index 871667d4b46..00000000000 --- a/processing/src/main/java/io/druid/query/aggregation/Aggregators.java +++ /dev/null @@ -1,56 +0,0 @@ -package io.druid.query.aggregation; - -public class Aggregators -{ - - public static Aggregator synchronizedAggregator(Aggregator aggregator){ - return new SynchronizedAggregator(aggregator); - } - - private static class SynchronizedAggregator implements Aggregator - { - - private final Aggregator delegate; - - SynchronizedAggregator(Aggregator delegate) - { - this.delegate = delegate; - } - - @Override - public synchronized void aggregate() - { - delegate.aggregate(); - } - - @Override - public synchronized void reset() - { - delegate.reset(); - } - - @Override - public synchronized Object get() - { - return delegate.get(); - } - - @Override - public synchronized float getFloat() - { - return delegate.getFloat(); - } - - @Override - public synchronized String getName() - { - return delegate.getName(); - } - - @Override - public synchronized void close() - { - delegate.close(); - } - } -} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java new file mode 100644 index 00000000000..cb1f0279d13 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -0,0 +1,99 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.groupby; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.Pair; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.data.input.Rows; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.dimension.DimensionSpec; +import io.druid.segment.incremental.IncrementalIndex; + +import javax.annotation.Nullable; +import java.util.List; + +public class GroupByQueryHelper +{ + public static Pair> createIndexAccumulatorPair( + final GroupByQuery query, + final GroupByQueryConfig config + ) + { + final QueryGranularity gran = query.getGranularity(); + final long timeStart = query.getIntervals().get(0).getStartMillis(); + + // use gran.iterable instead of gran.truncate so that + // AllGranularity returns timeStart instead of Long.MIN_VALUE + final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next(); + + final List aggs = Lists.transform( + query.getAggregatorSpecs(), + new Function() + { + @Override + public AggregatorFactory apply(@Nullable AggregatorFactory input) + { + return input.getCombiningFactory(); + } + } + ); + final List dimensions = Lists.transform( + query.getDimensions(), + new Function() + { + @Override + public String apply(@Nullable DimensionSpec input) + { + return input.getOutputName(); + } + } + ); + IncrementalIndex index = new IncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]) + ); + + Accumulator accumulator = new Accumulator() + { + @Override + public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) + { + if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) { + throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); + } + + return accumulated; + } + }; + return new Pair>(index, accumulator); + } + +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index ceafc550250..2b215b8ec7d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -24,10 +24,9 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; -import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.ConcatSequence; import com.metamx.common.guava.Sequence; @@ -35,22 +34,16 @@ import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; -import io.druid.data.input.Rows; -import io.druid.granularity.QueryGranularity; import io.druid.query.IntervalChunkingQueryRunner; -import io.druid.query.ParallelQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; -import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; import org.joda.time.Interval; import org.joda.time.Minutes; -import javax.annotation.Nullable; -import java.util.List; import java.util.Map; /** @@ -92,64 +85,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults(final GroupByQuery query, QueryRunner runner) { final GroupByQueryConfig config = configSupplier.get(); - final QueryGranularity gran = query.getGranularity(); - final long timeStart = query.getIntervals().get(0).getStartMillis(); - - // use gran.iterable instead of gran.truncate so that - // AllGranularity returns timeStart instead of Long.MIN_VALUE - final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next(); - - final List aggs = Lists.transform( - query.getAggregatorSpecs(), - new Function() - { - @Override - public AggregatorFactory apply(@Nullable AggregatorFactory input) - { - return input.getCombiningFactory(); - } - } + Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( + query, + config ); - final List dimensions = Lists.transform( - query.getDimensions(), - new Function() - { - @Override - public String apply(@Nullable DimensionSpec input) - { - return input.getOutputName(); - } - } - ); - IncrementalIndex index = new IncrementalIndex( - // use granularity truncated min timestamp - // since incoming truncated timestamps may precede timeStart - granTimeStart, - gran, - aggs.toArray(new AggregatorFactory[aggs.size()]) - ); - Accumulator accumulator = new Accumulator() - { - @Override - public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) - { - if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) { - throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); - } - return accumulated; - } - }; - - - if (runner instanceof ParallelQueryRunner && Boolean.getBoolean("optimize")) { - index = (IncrementalIndex) ((ParallelQueryRunner) runner).runAndAccumulate(query, index, accumulator); - } else { - index = runner.run(query).accumulate(index, accumulator); - } - - // convert millis back to timestamp according to granularity to preserve time zone information - Sequence retVal = Sequences.map( + IncrementalIndex index = runner.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + Sequence sequence = Sequences.map( Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), new Function() { @@ -157,12 +99,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest(queryExecutor, new RowOrdering(), queryRunners); + } else { + return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryRunners); } } @@ -142,7 +141,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory run(Query input) { - if (! (input instanceof GroupByQuery)) { + if (!(input instanceof GroupByQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 7ff46fb703b..c5aab625ed5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -40,7 +40,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.Aggregators; import io.druid.query.aggregation.PostAggregator; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; @@ -84,7 +83,7 @@ public class IncrementalIndex implements Iterable private final DimensionHolder dimValues; private final ConcurrentSkipListMap facts; private volatile AtomicInteger numEntries = new AtomicInteger(); - // This is modified on add() by a (hopefully) single thread. + // This is modified on add() in a critical section. private InputRow in; public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema) @@ -287,7 +286,7 @@ public class IncrementalIndex implements Iterable } } - ); + ); } Aggregator[] prev = facts.putIfAbsent(key, aggs); @@ -601,6 +600,7 @@ public class IncrementalIndex implements Iterable private final Map falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; + private final AtomicInteger falseIDSize = new AtomicInteger(); public DimDim() { diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java index c42bc14b461..59cf5bf0623 100644 --- a/services/src/main/java/io/druid/cli/CliRealtime.java +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -34,7 +34,7 @@ import java.util.List; ) public class CliRealtime extends ServerRunnable { - private static final Logger log = new Logger(CliBroker.class); + private static final Logger log = new Logger(CliRealtime.class); public CliRealtime() { From 74966eb2efe6f284030800722af87f63ca3f711b Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 12 Feb 2014 01:00:38 +0530 Subject: [PATCH 10/12] add copyright --- .../query/GroupByParallelQueryRunner.java | 19 +++++++++++++++++++ .../segment/incremental/IncrementalIndex.java | 3 +-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index f1d381f3187..81510b12623 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package io.druid.query; import com.google.common.base.Function; diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index c5aab625ed5..a2f2f87ba4c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -600,7 +600,6 @@ public class IncrementalIndex implements Iterable private final Map falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; - private final AtomicInteger falseIDSize = new AtomicInteger(); public DimDim() { @@ -672,4 +671,4 @@ public class IncrementalIndex implements Iterable } } } -} \ No newline at end of file +} From 78eddc50ee87b41924661fe1d378363db33dbfd3 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Sat, 15 Feb 2014 00:12:21 +0530 Subject: [PATCH 11/12] add checks for elements larger than capacity and account for the size of element being added --- .../client/cache/BytesBoundedLinkedQueue.java | 15 ++++++++++++-- .../cache/BytesBoundedLinkedQueueTest.java | 20 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java index d6d41311355..84c5f83d6a2 100644 --- a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -57,6 +57,15 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem } } + private void checkSize(E e) + { + if (getBytesSize(e) > capacity) { + throw new IllegalArgumentException( + String.format("cannot add element of size[%d] greater than capacity[%d]", getBytesSize(e), capacity) + ); + } + } + public abstract long getBytesSize(E e); public void elementAdded(E e) @@ -123,11 +132,12 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem ) throws InterruptedException { checkNotNull(e); + checkSize(e); long nanos = unit.toNanos(timeout); boolean added = false; putLock.lockInterruptibly(); try { - while (currentSize.get() >= capacity) { + while (currentSize.get() + getBytesSize(e) > capacity) { if (nanos <= 0) { return false; } @@ -228,10 +238,11 @@ public abstract class BytesBoundedLinkedQueue extends AbstractQueue implem public boolean offer(E e) { checkNotNull(e); + checkSize(e); boolean added = false; putLock.lock(); try { - if (currentSize.get() >= capacity) { + if (currentSize.get() + getBytesSize(e) > capacity) { return false; } else { added = delegate.add(e); diff --git a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java index 2a05eac2291..67a863ff8a1 100644 --- a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java +++ b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java @@ -158,6 +158,26 @@ public class BytesBoundedLinkedQueueTest } + @Test + public void testAddBiggerElementThanCapacityFails() + { + BlockingQueue q = getQueue(5); + try { + q.offer(new TestObject(10)); + Assert.fail(); + } + catch (IllegalArgumentException success) { + + } + } + + @Test public void testAddedObjectExceedsCapacity() throws Exception { + BlockingQueue q = getQueue(4); + Assert.assertTrue(q.offer(new TestObject(3))); + Assert.assertFalse(q.offer(new TestObject(2))); + Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS)); + } + public static class TestObject { public final int size; From 5be5cadd7b3e7c789e4a6907eadb027733d4f2a9 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 19 Feb 2014 21:55:18 +0530 Subject: [PATCH 12/12] minor refactoring --- .../main/java/io/druid/query/GroupByParallelQueryRunner.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 81510b12623..fb98968fb43 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -54,7 +54,8 @@ public class GroupByParallelQueryRunner implements QueryRunner public GroupByParallelQueryRunner( ExecutorService exec, - Ordering ordering, Supplier configSupplier, + Ordering ordering, + Supplier configSupplier, QueryRunner... queryables ) {