mirror of https://github.com/apache/druid.git
make GenericIndexed release cache resources on close
This commit is contained in:
parent
be25d67894
commit
8840af5977
|
@ -187,6 +187,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
{
|
{
|
||||||
final Offset baseOffset = offset.clone();
|
final Offset baseOffset = offset.clone();
|
||||||
|
|
||||||
|
final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
|
||||||
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
|
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
|
||||||
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
|
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
|
||||||
final Map<String, Object> objectColumnCache = Maps.newHashMap();
|
final Map<String, Object> objectColumnCache = Maps.newHashMap();
|
||||||
|
@ -271,12 +272,16 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
public DimensionSelector makeDimensionSelector(String dimension)
|
public DimensionSelector makeDimensionSelector(String dimension)
|
||||||
{
|
{
|
||||||
final String dimensionName = dimension.toLowerCase();
|
final String dimensionName = dimension.toLowerCase();
|
||||||
|
|
||||||
|
DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimensionName);
|
||||||
final Column columnDesc = index.getColumn(dimensionName);
|
final Column columnDesc = index.getColumn(dimensionName);
|
||||||
if (columnDesc == null) {
|
|
||||||
return null;
|
if (cachedColumn == null && columnDesc != null) {
|
||||||
|
cachedColumn = columnDesc.getDictionaryEncoding();
|
||||||
|
dictionaryColumnCache.put(dimensionName, cachedColumn);
|
||||||
}
|
}
|
||||||
|
|
||||||
final DictionaryEncodedColumn column = columnDesc.getDictionaryEncoding();
|
final DictionaryEncodedColumn column = cachedColumn;
|
||||||
|
|
||||||
if (column == null) {
|
if (column == null) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -557,6 +562,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
public void close() throws IOException
|
public void close() throws IOException
|
||||||
{
|
{
|
||||||
CloseQuietly.close(timestamps);
|
CloseQuietly.close(timestamps);
|
||||||
|
for (DictionaryEncodedColumn column : dictionaryColumnCache.values()) {
|
||||||
|
CloseQuietly.close(column);
|
||||||
|
}
|
||||||
for (GenericColumn column : genericColumnCache.values()) {
|
for (GenericColumn column : genericColumnCache.values()) {
|
||||||
CloseQuietly.close(column);
|
CloseQuietly.close(column);
|
||||||
}
|
}
|
||||||
|
@ -641,6 +649,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
*/
|
*/
|
||||||
public Sequence<Cursor> build()
|
public Sequence<Cursor> build()
|
||||||
{
|
{
|
||||||
|
final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
|
||||||
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
|
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
|
||||||
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
|
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
|
||||||
final Map<String, Object> objectColumnCache = Maps.newHashMap();
|
final Map<String, Object> objectColumnCache = Maps.newHashMap();
|
||||||
|
@ -718,41 +727,45 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
public DimensionSelector makeDimensionSelector(String dimension)
|
public DimensionSelector makeDimensionSelector(String dimension)
|
||||||
{
|
{
|
||||||
final String dimensionName = dimension.toLowerCase();
|
final String dimensionName = dimension.toLowerCase();
|
||||||
final Column column = index.getColumn(dimensionName);
|
|
||||||
if (column == null) {
|
DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimensionName);
|
||||||
return null;
|
final Column columnDesc = index.getColumn(dimensionName);
|
||||||
|
|
||||||
|
if (cachedColumn == null && columnDesc != null) {
|
||||||
|
cachedColumn = columnDesc.getDictionaryEncoding();
|
||||||
|
dictionaryColumnCache.put(dimensionName, cachedColumn);
|
||||||
}
|
}
|
||||||
|
|
||||||
final DictionaryEncodedColumn dict = column.getDictionaryEncoding();
|
final DictionaryEncodedColumn column = cachedColumn;
|
||||||
|
|
||||||
if (dict == null) {
|
if (column == null) {
|
||||||
return null;
|
return null;
|
||||||
} else if (column.getCapabilities().hasMultipleValues()) {
|
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
|
||||||
return new DimensionSelector()
|
return new DimensionSelector()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public IndexedInts getRow()
|
public IndexedInts getRow()
|
||||||
{
|
{
|
||||||
return dict.getMultiValueRow(currRow);
|
return column.getMultiValueRow(currRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getValueCardinality()
|
public int getValueCardinality()
|
||||||
{
|
{
|
||||||
return dict.getCardinality();
|
return column.getCardinality();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String lookupName(int id)
|
public String lookupName(int id)
|
||||||
{
|
{
|
||||||
final String retVal = dict.lookupName(id);
|
final String retVal = column.lookupName(id);
|
||||||
return retVal == null ? "" : retVal;
|
return retVal == null ? "" : retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int lookupId(String name)
|
public int lookupId(String name)
|
||||||
{
|
{
|
||||||
return dict.lookupId(name);
|
return column.lookupId(name);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
|
@ -773,13 +786,13 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
@Override
|
@Override
|
||||||
public int get(int index)
|
public int get(int index)
|
||||||
{
|
{
|
||||||
return dict.getSingleValueRow(currRow);
|
return column.getSingleValueRow(currRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<Integer> iterator()
|
public Iterator<Integer> iterator()
|
||||||
{
|
{
|
||||||
return Iterators.singletonIterator(dict.getSingleValueRow(currRow));
|
return Iterators.singletonIterator(column.getSingleValueRow(currRow));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -787,19 +800,19 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
@Override
|
@Override
|
||||||
public int getValueCardinality()
|
public int getValueCardinality()
|
||||||
{
|
{
|
||||||
return dict.getCardinality();
|
return column.getCardinality();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String lookupName(int id)
|
public String lookupName(int id)
|
||||||
{
|
{
|
||||||
return dict.lookupName(id);
|
return column.lookupName(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int lookupId(String name)
|
public int lookupId(String name)
|
||||||
{
|
{
|
||||||
return dict.lookupId(name);
|
return column.lookupId(name);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -1004,6 +1017,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
||||||
public void close() throws IOException
|
public void close() throws IOException
|
||||||
{
|
{
|
||||||
CloseQuietly.close(timestamps);
|
CloseQuietly.close(timestamps);
|
||||||
|
for (DictionaryEncodedColumn column : dictionaryColumnCache.values()) {
|
||||||
|
CloseQuietly.close(column);
|
||||||
|
}
|
||||||
for (GenericColumn column : genericColumnCache.values()) {
|
for (GenericColumn column : genericColumnCache.values()) {
|
||||||
CloseQuietly.close(column);
|
CloseQuietly.close(column);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
package io.druid.segment.column;
|
||||||
|
|
||||||
|
public interface ColumnConfig
|
||||||
|
{
|
||||||
|
}
|
|
@ -49,9 +49,12 @@ import java.util.Map;
|
||||||
* bytes 10-((numElements * 4) + 10): integers representing *end* offsets of byte serialized values
|
* bytes 10-((numElements * 4) + 10): integers representing *end* offsets of byte serialized values
|
||||||
* bytes ((numElements * 4) + 10)-(numBytesUsed + 2): 4-byte integer representing length of value, followed by bytes for value
|
* bytes ((numElements * 4) + 10)-(numBytesUsed + 2): 4-byte integer representing length of value, followed by bytes for value
|
||||||
*/
|
*/
|
||||||
public class GenericIndexed<T> implements Indexed<T>
|
public class GenericIndexed<T> implements Indexed<T>, Closeable
|
||||||
{
|
{
|
||||||
private static final byte version = 0x1;
|
private static final byte version = 0x1;
|
||||||
|
|
||||||
|
public static final int INITIAL_CACHE_CAPACITY = 16384;
|
||||||
|
|
||||||
private int indexOffset;
|
private int indexOffset;
|
||||||
|
|
||||||
public static <T> GenericIndexed<T> fromArray(T[] objects, ObjectStrategy<T> strategy)
|
public static <T> GenericIndexed<T> fromArray(T[] objects, ObjectStrategy<T> strategy)
|
||||||
|
@ -155,14 +158,7 @@ public class GenericIndexed<T> implements Indexed<T>
|
||||||
|
|
||||||
private final boolean cacheable;
|
private final boolean cacheable;
|
||||||
private final ThreadLocal<ByteBuffer> cachedBuffer;
|
private final ThreadLocal<ByteBuffer> cachedBuffer;
|
||||||
private final ThreadLocal<SizedLRUMap<Integer, T>> cachedValues = new ThreadLocal<SizedLRUMap<Integer, T>>() {
|
private final ThreadLocal<SizedLRUMap<Integer, T>> cachedValues;
|
||||||
@Override
|
|
||||||
protected SizedLRUMap<Integer, T> initialValue()
|
|
||||||
{
|
|
||||||
return new SizedLRUMap<>(16384, 1024 * 1024); // 1MB cache per column, per thread
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private final int valuesOffset;
|
private final int valuesOffset;
|
||||||
|
|
||||||
GenericIndexed(
|
GenericIndexed(
|
||||||
|
@ -188,6 +184,35 @@ public class GenericIndexed<T> implements Indexed<T>
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
this.cacheable = false;
|
||||||
|
this.cachedValues = new ThreadLocal<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a copy of the given indexed with the given cache
|
||||||
|
* The resulting copy must be closed to release resources used by the cache
|
||||||
|
*
|
||||||
|
* @param other
|
||||||
|
* @param cache
|
||||||
|
*/
|
||||||
|
GenericIndexed(GenericIndexed<T> other, final SizedLRUMap<Integer, T> cache)
|
||||||
|
{
|
||||||
|
this.theBuffer = other.theBuffer;
|
||||||
|
this.strategy = other.strategy;
|
||||||
|
this.allowReverseLookup = other.allowReverseLookup;
|
||||||
|
this.size = other.size;
|
||||||
|
this.indexOffset = other.indexOffset;
|
||||||
|
this.valuesOffset = other.valuesOffset;
|
||||||
|
this.cachedBuffer = other.cachedBuffer;
|
||||||
|
|
||||||
|
this.cachedValues = new ThreadLocal<SizedLRUMap<Integer, T>>() {
|
||||||
|
@Override
|
||||||
|
protected SizedLRUMap<Integer, T> initialValue()
|
||||||
|
{
|
||||||
|
return cache;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
this.cacheable = strategy instanceof CacheableObjectStrategy;
|
this.cacheable = strategy instanceof CacheableObjectStrategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,6 +318,25 @@ public class GenericIndexed<T> implements Indexed<T>
|
||||||
channel.write(theBuffer.asReadOnlyBuffer());
|
channel.write(theBuffer.asReadOnlyBuffer());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The returned CachedIndexed must be closed to release the underlying memory
|
||||||
|
* @param maxBytes
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public GenericIndexed<T> withCache(int maxBytes)
|
||||||
|
{
|
||||||
|
return new GenericIndexed<>(this, new SizedLRUMap<Integer, T>(INITIAL_CACHE_CAPACITY, maxBytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
if(cacheable) {
|
||||||
|
cachedValues.get().clear();
|
||||||
|
cachedValues.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy)
|
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy)
|
||||||
{
|
{
|
||||||
byte versionFromBuffer = buffer.get();
|
byte versionFromBuffer = buffer.get();
|
||||||
|
|
Loading…
Reference in New Issue