From 7bbb6ba063cc87f121b0eebe68c41b4966250d8a Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 6 Feb 2014 23:33:32 +0530 Subject: [PATCH] support for bounding operation queue by the byte size --- .../client/cache/BytesBoundedLinkedQueue.java | 313 ++++++++++++++++++ .../io/druid/client/cache/MemcachedCache.java | 1 + .../client/cache/MemcachedCacheConfig.java | 11 +- .../cache/MemcachedOperationQueueFactory.java | 48 +++ 4 files changed, 369 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java create mode 100644 server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java new file mode 100644 index 00000000000..94d3acbe788 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -0,0 +1,313 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Abstract implementation of a BlockingQueue bounded by the size of elements, + * works similar to LinkedBlockingQueue except that capacity is bounded by the size in bytes of the elements in the queue. + */ +public abstract class BytesBoundedLinkedQueue extends AbstractQueue implements BlockingQueue +{ + private LinkedList delegate; + private int capacity; + private int currentSize; + private Lock lock = new ReentrantLock(); + private Condition notFull = lock.newCondition(); + private Condition notEmpty = lock.newCondition(); + + public BytesBoundedLinkedQueue(int capacity) + { + delegate = new LinkedList<>(); + this.capacity = capacity; + } + + private static void checkNotNull(Object v) + { + if (v == null) { + throw new NullPointerException(); + } + } + + public abstract long getBytesSize(E e); + + public void operationAdded(E e) + { + currentSize += getBytesSize(e); + notEmpty.signalAll(); + } + + public void operationRemoved(E e) + { + currentSize -= getBytesSize(e); + notFull.signalAll(); + } + + @Override + public int size() + { + lock.lock(); + try { + return delegate.size(); + } + finally { + lock.unlock(); + } + } + + @Override + public void put(E e) throws InterruptedException + { + while (!offer(e, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { + // keep trying until added successfully + } + } + + @Override + public boolean offer( + E e, long timeout, TimeUnit unit + ) throws InterruptedException + { + checkNotNull(e); + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (currentSize > capacity) { + if (nanos <= 0) { + return false; + } + nanos = notFull.awaitNanos(nanos); + } + delegate.add(e); + operationAdded(e); + return true; + } + finally { + lock.unlock(); + } + + } + + @Override + public E take() throws InterruptedException + { + lock.lockInterruptibly(); + try { + while (delegate.size() == 0) { + notEmpty.await(); + } + E e = delegate.remove(); + operationRemoved(e); + return e; + } + finally { + lock.unlock(); + } + } + + @Override + public int remainingCapacity() + { + lock.lock(); + try { + // return approximate remaining capacity based on current data + if (delegate.size() == 0) { + return capacity; + } else { + int averageByteSize = currentSize / delegate.size(); + return (capacity - currentSize) / averageByteSize; + } + } + finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c) + { + return drainTo(c, Integer.MAX_VALUE); + } + + @Override + public int drainTo(Collection c, int maxElements) + { + if (c == null) { + throw new NullPointerException(); + } + if (c == this) { + throw new IllegalArgumentException(); + } + lock.lock(); + try { + int n = Math.min(maxElements, delegate.size()); + if (n < 0) { + return 0; + } + // count.get provides visibility to first n Nodes + for (int i = 0; i < n; i++) { + E e = delegate.remove(0); + operationRemoved(e); + c.add(e); + } + return n; + } + finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E e) + { + checkNotNull(e); + lock.lock(); + try { + if (currentSize > capacity) { + return false; + } else { + boolean added = delegate.add(e); + if (added) { + operationAdded(e); + } + return added; + } + } + finally { + lock.unlock(); + } + } + + @Override + public E poll() + { + lock.lock(); + try { + E e = delegate.poll(); + if (e != null) { + operationRemoved(e); + } + return e; + } + finally { + lock.unlock(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException + { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (delegate.size() == 0) { + if (nanos <= 0) { + return null; + } + nanos = notEmpty.awaitNanos(nanos); + } + return delegate.poll(); + } + finally { + lock.unlock(); + } + } + + @Override + public E peek() + { + lock.lock(); + try { + return delegate.peek(); + } + finally { + lock.unlock(); + } + } + + @Override + public Iterator iterator() + { + return new Itr(delegate.iterator()); + } + + private class Itr implements Iterator + { + + private final Iterator delegate; + private E lastReturned; + + Itr(Iterator delegate) + { + this.delegate = delegate; + } + + @Override + public boolean hasNext() + { + lock.lock(); + try { + return delegate.hasNext(); + } + finally { + lock.unlock(); + } + } + + @Override + public E next() + { + lock.lock(); + try { + this.lastReturned = delegate.next(); + return lastReturned; + } + finally { + lock.unlock(); + } + } + + @Override + public void remove() + { + lock.lock(); + try { + if (this.lastReturned == null) { + throw new IllegalStateException(); + } + delegate.remove(); + operationRemoved(lastReturned); + lastReturned = null; + } + finally { + lock.unlock(); + } + } + } +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 155c75e3f86..2186adc8e24 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -68,6 +68,7 @@ public class MemcachedCache implements Cache .setShouldOptimize(true) .setOpQueueMaxBlockTime(config.getTimeout()) .setOpTimeout(config.getTimeout()) + .setOpQueueFactory(new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize())) .build(), AddrUtil.getAddresses(config.getHosts()) ), diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index 2cc06cf3637..f66aaa3f333 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -27,19 +27,17 @@ public class MemcachedCacheConfig { @JsonProperty private int expiration = 2592000; // What is this number? - @JsonProperty private int timeout = 500; - @JsonProperty @NotNull private String hosts; - @JsonProperty private int maxObjectSize = 50 * 1024 * 1024; - @JsonProperty private String memcachedPrefix = "druid"; + @JsonProperty + private int maxOperationQueueSize = 256 * 1024 * 1024; // 256 MB public int getExpiration() { @@ -65,4 +63,9 @@ public class MemcachedCacheConfig { return memcachedPrefix; } + + public int getMaxOperationQueueSize() + { + return maxOperationQueueSize; + } } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java new file mode 100644 index 00000000000..a91a162c371 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationQueueFactory; + +import java.util.concurrent.BlockingQueue; + +public class MemcachedOperationQueueFactory implements OperationQueueFactory +{ + public final int maxQueueSize; + + public MemcachedOperationQueueFactory(int maxQueueSize) + { + this.maxQueueSize = maxQueueSize; + } + + @Override + public BlockingQueue create() + { + return new BytesBoundedLinkedQueue(maxQueueSize) + { + @Override + public long getBytesSize(Operation operation) + { + return operation.getBuffer().capacity(); + } + }; + } +} \ No newline at end of file