make ObjectStrategy cacheable

This commit is contained in:
Xavier Léauté 2014-05-19 22:23:40 -07:00
parent 276e48e564
commit be92c322e3
3 changed files with 107 additions and 12 deletions

View File

@ -0,0 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.data;
/**
* Implementing CacheableObjectStrategy instead of ObjectSrategy indicates
* that a column scan may cache the results of fromByteBuffer
*/
public interface CacheableObjectStrategy<T> extends ObjectStrategy<T> {}

View File

@ -20,6 +20,7 @@
package io.druid.segment.data;
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.metamx.common.IAE;
@ -32,6 +33,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input
@ -49,6 +52,7 @@ import java.util.Iterator;
public class GenericIndexed<T> implements Indexed<T>
{
private static final byte version = 0x1;
private int indexOffset;
public static <T> GenericIndexed<T> fromArray(T[] objects, ObjectStrategy<T> strategy)
{
@ -114,11 +118,50 @@ public class GenericIndexed<T> implements Indexed<T>
return new GenericIndexed<T>(theBuffer.asReadOnlyBuffer(), strategy, allowReverseLookup);
}
private static class SizedLRUMap<K, V> extends LinkedHashMap<K, V>
{
final Map<K, Integer> sizes = Maps.newHashMap();
int numBytes = 0;
int maxBytes = 0;
public SizedLRUMap(int initialCapacity, int maxBytes)
{
super(initialCapacity, 0.75f, true);
this.maxBytes = maxBytes;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest)
{
if (numBytes > maxBytes) {
numBytes -= sizes.remove(eldest.getKey());
return true;
}
return false;
}
public V put(K key, V value, int size)
{
numBytes += size;
sizes.put(key, size);
return super.put(key, value);
}
}
private final ByteBuffer theBuffer;
private final ObjectStrategy<T> strategy;
private final boolean allowReverseLookup;
private final int size;
private final boolean cacheable;
private final ThreadLocal<SizedLRUMap<Integer, T>> cachedValues = new ThreadLocal<SizedLRUMap<Integer, T>>() {
@Override
protected SizedLRUMap<Integer, T> initialValue()
{
return new SizedLRUMap<>(16384, 1024 * 1024);
}
};
private final int valuesOffset;
GenericIndexed(
@ -132,7 +175,10 @@ public class GenericIndexed<T> implements Indexed<T>
this.allowReverseLookup = allowReverseLookup;
size = theBuffer.getInt();
indexOffset = theBuffer.position();
valuesOffset = theBuffer.position() + (size << 2);
this.cacheable = strategy instanceof CacheableObjectStrategy;
}
@Override
@ -157,24 +203,38 @@ public class GenericIndexed<T> implements Indexed<T>
throw new IAE(String.format("Index[%s] >= size[%s]", index, size));
}
ByteBuffer myBuffer = theBuffer.asReadOnlyBuffer();
int startOffset = 4;
int endOffset;
if(cacheable) {
final T cached = cachedValues.get().get(index);
if (cached != null) {
return cached;
}
}
final int startOffset;
final int endOffset;
if (index == 0) {
endOffset = myBuffer.getInt();
startOffset = 4;
endOffset = theBuffer.getInt(indexOffset);
} else {
myBuffer.position(myBuffer.position() + ((index - 1) * 4));
startOffset = myBuffer.getInt() + 4;
endOffset = myBuffer.getInt();
final int position = indexOffset + ((index - 1) * 4);
startOffset = theBuffer.getInt(position) + 4;
endOffset = theBuffer.getInt(position + Ints.BYTES);
}
if (startOffset == endOffset) {
return null;
}
myBuffer.position(valuesOffset + startOffset);
return strategy.fromByteBuffer(myBuffer, endOffset - startOffset);
final ByteBuffer copyBuffer = this.theBuffer.asReadOnlyBuffer();
copyBuffer.position(valuesOffset + startOffset);
final int size = endOffset - startOffset;
final T value = strategy.fromByteBuffer(copyBuffer, size);
if(cacheable) {
cachedValues.get().put(index, value, size);
}
return value;
}
@Override
@ -241,7 +301,7 @@ public class GenericIndexed<T> implements Indexed<T>
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
public static ObjectStrategy<String> stringStrategy = new ObjectStrategy<String>()
public static ObjectStrategy<String> stringStrategy = new CacheableObjectStrategy<String>()
{
@Override
public Class<? extends String> getClazz()

View File

@ -22,11 +22,20 @@ package io.druid.segment.data;
import java.nio.ByteBuffer;
import java.util.Comparator;
/**
*/
public interface ObjectStrategy<T> extends Comparator<T>
{
public Class<? extends T> getClazz();
/**
* Convert values from their underlying byte representation.
*
* Implementations of this method must not change the given buffer mark, or limit, but may modify its position.
* Use buffer.asReadOnlyBuffer() or buffer.duplicate() if mark or limit need to be set.
*
* @param buffer buffer to read value from
* @param numBytes number of bytes used to store the value, starting at buffer.position()
* @return
*/
public T fromByteBuffer(ByteBuffer buffer, int numBytes);
public byte[] toBytes(T val);
}