Merge pull request #1076 from metamx/remove-threadlocals

remove thread-locals in GenericIndexed in favor of wrapped objects
This commit is contained in:
Fangjin Yang 2015-02-02 20:02:33 -08:00
commit 71b4c5fa86
6 changed files with 290 additions and 192 deletions

View File

@ -17,7 +17,7 @@
package io.druid.segment.column;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.CachingIndexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
@ -30,17 +30,17 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
{
private final VSizeIndexedInts column;
private final VSizeIndexed multiValueColumn;
private final GenericIndexed<String> lookups;
private final CachingIndexed<String> cachedLookups;
public SimpleDictionaryEncodedColumn(
VSizeIndexedInts singleValueColumn,
VSizeIndexed multiValueColumn,
GenericIndexed<String> lookups
CachingIndexed<String> cachedLookups
)
{
this.column = singleValueColumn;
this.multiValueColumn = multiValueColumn;
this.lookups = lookups;
this.cachedLookups = cachedLookups;
}
@Override
@ -70,24 +70,24 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
@Override
public String lookupName(int id)
{
return lookups.get(id);
return cachedLookups.get(id);
}
@Override
public int lookupId(String name)
{
return lookups.indexOf(name);
return cachedLookups.indexOf(name);
}
@Override
public int getCardinality()
{
return lookups.size();
return cachedLookups.size();
}
@Override
public void close() throws IOException
{
lookups.close();
cachedLookups.close();
}
}

View File

@ -0,0 +1,143 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.data;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
public class CachingIndexed<T> implements Indexed<T>, Closeable
{
public static final int INITIAL_CACHE_CAPACITY = 16384;
private static final Logger log = new Logger(CachingIndexed.class);
private final GenericIndexed<T>.BufferIndexed delegate;
private final SizedLRUMap<Integer, T> cachedValues;
/**
* Creates a CachingIndexed wrapping the given GenericIndexed with a value lookup cache
*
* CachingIndexed objects are not thread safe and should only be used by a single thread at a time.
* CachingIndexed objects must be closed to release any underlying cache resources.
*
* @param delegate the GenericIndexed to wrap with a lookup cache.
* @param lookupCacheSize maximum size in bytes of the lookup cache if greater than zero
*/
public CachingIndexed(GenericIndexed<T> delegate, final int lookupCacheSize)
{
this.delegate = delegate.singleThreaded();
if(lookupCacheSize > 0) {
log.debug("Allocating column cache of max size[%d]", lookupCacheSize);
cachedValues = new SizedLRUMap<>(INITIAL_CACHE_CAPACITY, lookupCacheSize);
} else {
cachedValues = null;
}
}
@Override
public Class<? extends T> getClazz()
{
return delegate.getClazz();
}
@Override
public int size()
{
return delegate.size();
}
@Override
public T get(int index)
{
if(cachedValues != null) {
final T cached = cachedValues.getValue(index);
if (cached != null) {
return cached;
}
final T value = delegate.get(index);
cachedValues.put(index, value, delegate.getLastValueSize());
return value;
} else {
return delegate.get(index);
}
}
@Override
public int indexOf(T value)
{
return delegate.indexOf(value);
}
@Override
public Iterator<T> iterator()
{
return delegate.iterator();
}
@Override
public void close() throws IOException
{
if (cachedValues != null) {
log.debug("Closing column cache");
cachedValues.clear();
}
}
private static class SizedLRUMap<K, V> extends LinkedHashMap<K, Pair<Integer, V>>
{
private final int maxBytes;
private int numBytes = 0;
public SizedLRUMap(int initialCapacity, int maxBytes)
{
super(initialCapacity, 0.75f, true);
this.maxBytes = maxBytes;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, Pair<Integer, V>> eldest)
{
if (numBytes > maxBytes) {
numBytes -= eldest.getValue().lhs;
return true;
}
return false;
}
public void put(K key, V value, int size)
{
final int totalSize = size + 48; // add approximate object overhead
numBytes += totalSize;
super.put(key, new Pair<>(totalSize, value));
}
public V getValue(Object key)
{
final Pair<Integer, V> sizeValuePair = super.get(key);
return sizeValuePair == null ? null : sizeValuePair.rhs;
}
}
}

View File

@ -231,6 +231,8 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
private class CompressedIndexedFloats implements IndexedFloats
{
final Indexed<ResourceHolder<FloatBuffer>> singleThreadedFloatBuffers = baseFloatBuffers.singleThreaded();
int currIndex = -1;
ResourceHolder<FloatBuffer> holder;
FloatBuffer buffer;
@ -288,7 +290,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
protected void loadBuffer(int bufferNum)
{
CloseQuietly.close(holder);
holder = baseFloatBuffers.get(bufferNum);
holder = singleThreadedFloatBuffers.get(bufferNum);
buffer = holder.get();
currIndex = bufferNum;
}
@ -299,7 +301,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
return "CompressedFloatsIndexedSupplier_Anonymous{" +
"currIndex=" + currIndex +
", sizePer=" + sizePer +
", numChunks=" + baseFloatBuffers.size() +
", numChunks=" + singleThreadedFloatBuffers.size() +
", totalSize=" + totalSize +
'}';
}

View File

@ -216,6 +216,8 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
private class CompressedIndexedLongs implements IndexedLongs
{
final Indexed<ResourceHolder<LongBuffer>> singleThreadedLongBuffers = baseLongBuffers.singleThreaded();
int currIndex = -1;
ResourceHolder<LongBuffer> holder;
LongBuffer buffer;
@ -273,7 +275,7 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
protected void loadBuffer(int bufferNum)
{
CloseQuietly.close(holder);
holder = baseLongBuffers.get(bufferNum);
holder = singleThreadedLongBuffers.get(bufferNum);
buffer = holder.get();
currIndex = bufferNum;
}
@ -296,7 +298,7 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
return "CompressedLongsIndexedSupplier_Anonymous{" +
"currIndex=" + currIndex +
", sizePer=" + sizePer +
", numChunks=" + baseLongBuffers.size() +
", numChunks=" + singleThreadedLongBuffers.size() +
", totalSize=" + totalSize +
'}';
}

View File

@ -20,9 +20,7 @@ package io.druid.segment.data;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
@ -31,8 +29,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input
@ -47,14 +43,10 @@ import java.util.Map;
* bytes 10-((numElements * 4) + 10): integers representing *end* offsets of byte serialized values
* bytes ((numElements * 4) + 10)-(numBytesUsed + 2): 4-byte integer representing length of value, followed by bytes for value
*/
public class GenericIndexed<T> implements Indexed<T>, Closeable
public class GenericIndexed<T> implements Indexed<T>
{
private static final Logger log = new Logger(GenericIndexed.class);
private static final byte version = 0x1;
public static final int INITIAL_CACHE_CAPACITY = 16384;
private int indexOffset;
public static <T> GenericIndexed<T> fromArray(T[] objects, ObjectStrategy<T> strategy)
@ -121,39 +113,34 @@ public class GenericIndexed<T> implements Indexed<T>, Closeable
return new GenericIndexed<T>(theBuffer.asReadOnlyBuffer(), strategy, allowReverseLookup);
}
private static class SizedLRUMap<K, V> extends LinkedHashMap<K, Pair<Integer, V>>
@Override
public Class<? extends T> getClazz()
{
private final int maxBytes;
private int numBytes = 0;
return bufferIndexed.getClazz();
}
public SizedLRUMap(int initialCapacity, int maxBytes)
{
super(initialCapacity, 0.75f, true);
this.maxBytes = maxBytes;
}
@Override
public int size()
{
return bufferIndexed.size();
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, Pair<Integer, V>> eldest)
{
if (numBytes > maxBytes) {
numBytes -= eldest.getValue().lhs;
return true;
}
return false;
}
@Override
public T get(int index)
{
return bufferIndexed.get(index);
}
public void put(K key, V value, int size)
{
final int totalSize = size + 48; // add approximate object overhead
numBytes += totalSize;
super.put(key, new Pair<>(totalSize, value));
}
@Override
public int indexOf(T value)
{
return bufferIndexed.indexOf(value);
}
public V getValue(Object key)
{
final Pair<Integer, V> sizeValuePair = super.get(key);
return sizeValuePair == null ? null : sizeValuePair.rhs;
}
@Override
public Iterator<T> iterator()
{
return bufferIndexed.iterator();
}
private final ByteBuffer theBuffer;
@ -161,10 +148,8 @@ public class GenericIndexed<T> implements Indexed<T>, Closeable
private final boolean allowReverseLookup;
private final int size;
private final boolean cacheable;
private final ThreadLocal<ByteBuffer> cachedBuffer;
private final ThreadLocal<SizedLRUMap<Integer, T>> cachedValues;
private final int valuesOffset;
private final BufferIndexed bufferIndexed;
GenericIndexed(
ByteBuffer buffer,
@ -179,133 +164,108 @@ public class GenericIndexed<T> implements Indexed<T>, Closeable
size = theBuffer.getInt();
indexOffset = theBuffer.position();
valuesOffset = theBuffer.position() + (size << 2);
bufferIndexed = new BufferIndexed();
}
this.cachedBuffer = new ThreadLocal<ByteBuffer>()
class BufferIndexed implements Indexed<T>
{
int lastReadSize;
@Override
public Class<? extends T> getClazz()
{
@Override
protected ByteBuffer initialValue()
{
return theBuffer.asReadOnlyBuffer();
return strategy.getClazz();
}
@Override
public int size()
{
return size;
}
@Override
public T get(final int index)
{
return _get(theBuffer.asReadOnlyBuffer(), index);
}
protected T _get(final ByteBuffer copyBuffer, final int index)
{
if (index < 0) {
throw new IAE("Index[%s] < 0", index);
}
};
this.cacheable = false;
this.cachedValues = new ThreadLocal<>();
}
/**
* Creates a copy of the given indexed with the given cache size
* The resulting copy must be closed to release resources used by the cache
*/
GenericIndexed(GenericIndexed<T> other, final int maxBytes)
{
this.theBuffer = other.theBuffer;
this.strategy = other.strategy;
this.allowReverseLookup = other.allowReverseLookup;
this.size = other.size;
this.indexOffset = other.indexOffset;
this.valuesOffset = other.valuesOffset;
this.cachedBuffer = other.cachedBuffer;
this.cachedValues = new ThreadLocal<SizedLRUMap<Integer, T>>() {
@Override
protected SizedLRUMap<Integer, T> initialValue()
{
log.debug("Allocating column cache of max size[%d]", maxBytes);
return new SizedLRUMap<>(INITIAL_CACHE_CAPACITY, maxBytes);
}
};
this.cacheable = strategy instanceof CacheableObjectStrategy;
}
@Override
public Class<? extends T> getClazz()
{
return strategy.getClazz();
}
@Override
public int size()
{
return size;
}
@Override
public T get(int index)
{
if (index < 0) {
throw new IAE("Index[%s] < 0", index);
}
if (index >= size) {
throw new IAE(String.format("Index[%s] >= size[%s]", index, size));
}
if(cacheable) {
final T cached = cachedValues.get().getValue(index);
if (cached != null) {
return cached;
}
}
// using a cached copy of the buffer instead of making a read-only copy every time get() is called is faster
final ByteBuffer copyBuffer = this.cachedBuffer.get();
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = 4;
endOffset = copyBuffer.getInt(indexOffset);
} else {
copyBuffer.position(indexOffset + ((index - 1) * 4));
startOffset = copyBuffer.getInt() + 4;
endOffset = copyBuffer.getInt();
}
if (startOffset == endOffset) {
return null;
}
copyBuffer.position(valuesOffset + startOffset);
final int size = endOffset - startOffset;
// fromByteBuffer must not modify the buffer limit
final T value = strategy.fromByteBuffer(copyBuffer, size);
if(cacheable) {
cachedValues.get().put(index, value, size);
}
return value;
}
@Override
public int indexOf(T value)
{
if (!allowReverseLookup) {
throw new UnsupportedOperationException("Reverse lookup not allowed.");
}
value = (value != null && value.equals("")) ? null : value;
int minIndex = 0;
int maxIndex = size - 1;
while (minIndex <= maxIndex) {
int currIndex = (minIndex + maxIndex) >>> 1;
T currValue = get(currIndex);
int comparison = strategy.compare(currValue, value);
if (comparison == 0) {
return currIndex;
if (index >= size) {
throw new IAE(String.format("Index[%s] >= size[%s]", index, size));
}
if (comparison < 0) {
minIndex = currIndex + 1;
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = 4;
endOffset = copyBuffer.getInt(indexOffset);
} else {
maxIndex = currIndex - 1;
copyBuffer.position(indexOffset + ((index - 1) * 4));
startOffset = copyBuffer.getInt() + 4;
endOffset = copyBuffer.getInt();
}
if (startOffset == endOffset) {
return null;
}
copyBuffer.position(valuesOffset + startOffset);
final int size = endOffset - startOffset;
lastReadSize = size;
// fromByteBuffer must not modify the buffer limit
final T value = strategy.fromByteBuffer(copyBuffer, size);
return value;
}
return -(minIndex + 1);
/**
* This method makes no guarantees with respect to thread safety
* @return the size in bytes of the last value read
*/
public int getLastValueSize() {
return lastReadSize;
}
@Override
public int indexOf(T value)
{
if (!allowReverseLookup) {
throw new UnsupportedOperationException("Reverse lookup not allowed.");
}
value = (value != null && value.equals("")) ? null : value;
int minIndex = 0;
int maxIndex = size - 1;
while (minIndex <= maxIndex) {
int currIndex = (minIndex + maxIndex) >>> 1;
T currValue = get(currIndex);
int comparison = strategy.compare(currValue, value);
if (comparison == 0) {
return currIndex;
}
if (comparison < 0) {
minIndex = currIndex + 1;
} else {
maxIndex = currIndex - 1;
}
}
return -(minIndex + 1);
}
@Override
public Iterator<T> iterator()
{
return IndexedIterable.create(this).iterator();
}
}
public long getSerializedSize()
@ -322,24 +282,20 @@ public class GenericIndexed<T> implements Indexed<T>, Closeable
}
/**
* The returned GenericIndexed must be closed to release the underlying memory
* Create a non-thread-safe Indexed, which may perform better than the underlying Indexed.
*
* @param maxBytes maximum size in bytes of the lookup cache
* @return a copy of this GenericIndexed with a lookup cache.
* @return a non-thread-safe Indexed
*/
public GenericIndexed<T> withCache(int maxBytes)
public GenericIndexed<T>.BufferIndexed singleThreaded()
{
return new GenericIndexed<>(this, maxBytes);
}
@Override
public void close() throws IOException
{
if(cacheable) {
log.debug("Closing column cache");
cachedValues.get().clear();
cachedValues.remove();
}
final ByteBuffer copyBuffer = theBuffer.asReadOnlyBuffer();
return new BufferIndexed() {
@Override
public T get(int index)
{
return _get(copyBuffer, index);
}
};
}
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy)
@ -392,10 +348,4 @@ public class GenericIndexed<T> implements Indexed<T>, Closeable
return Ordering.natural().nullsFirst().compare(o1, o2);
}
};
@Override
public Iterator<T> iterator()
{
return IndexedIterable.create(this).iterator();
}
}

View File

@ -20,6 +20,7 @@ package io.druid.segment.serde;
import com.google.common.base.Supplier;
import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
import io.druid.segment.data.CachingIndexed;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
@ -52,7 +53,7 @@ public class DictionaryEncodedColumnSupplier implements Supplier<DictionaryEncod
return new SimpleDictionaryEncodedColumn(
singleValuedColumn,
multiValuedColumn,
lookupCacheSize > 0 ? dictionary.withCache(lookupCacheSize) : dictionary
new CachingIndexed<>(dictionary, lookupCacheSize)
);
}
}