mirror of https://github.com/apache/druid.git
separate offheapIncrementalIndex implementation
This commit is contained in:
parent
358ff915bb
commit
ad75a21040
|
@ -45,6 +45,7 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.SegmentUtils;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.OffheapIncrementalIndex;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
|
@ -636,16 +637,23 @@ public class IndexGeneratorJob implements Jobby
|
|||
}
|
||||
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
|
||||
int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary();
|
||||
return new IncrementalIndex(
|
||||
new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(theBucket.time.getMillis())
|
||||
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
|
||||
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
|
||||
.withMetrics(aggs)
|
||||
.build(),
|
||||
new OffheapBufferPool(bufferSize),
|
||||
tuningConfig.isIngestOffheap()
|
||||
);
|
||||
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(theBucket.time.getMillis())
|
||||
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
|
||||
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
|
||||
.withMetrics(aggs)
|
||||
.build();
|
||||
if (tuningConfig.isIngestOffheap()) {
|
||||
return new OffheapIncrementalIndex(
|
||||
indexSchema,
|
||||
new OffheapBufferPool(bufferSize)
|
||||
);
|
||||
} else {
|
||||
return new IncrementalIndex(
|
||||
indexSchema,
|
||||
new OffheapBufferPool(bufferSize)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewZipEntry(ZipOutputStream out, String name) throws IOException
|
||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.granularity.QueryGranularity;
|
|||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.OffheapIncrementalIndex;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
@ -73,17 +74,28 @@ public class GroupByQueryHelper
|
|||
}
|
||||
}
|
||||
);
|
||||
|
||||
IncrementalIndex index = new IncrementalIndex(
|
||||
final IncrementalIndex index;
|
||||
if(query.getContextValue("useOffheap", false)){
|
||||
index = new OffheapIncrementalIndex(
|
||||
// use granularity truncated min timestamp
|
||||
// since incoming truncated timestamps may precede timeStart
|
||||
granTimeStart,
|
||||
gran,
|
||||
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
||||
bufferPool,
|
||||
false
|
||||
);
|
||||
} else {
|
||||
index = new IncrementalIndex(
|
||||
// use granularity truncated min timestamp
|
||||
// since incoming truncated timestamps may precede timeStart
|
||||
granTimeStart,
|
||||
gran,
|
||||
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
||||
bufferPool,
|
||||
false,
|
||||
query.getContextValue("useOffheap", false)
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
|
||||
{
|
||||
|
|
|
@ -56,29 +56,18 @@ import io.druid.segment.serde.ComplexMetricSerde;
|
|||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.mapdb.BTreeKeySerializer;
|
||||
import org.mapdb.CC;
|
||||
import org.mapdb.DB;
|
||||
import org.mapdb.DBMaker;
|
||||
import org.mapdb.Serializer;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
@ -99,14 +88,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
private final int[] aggPositionOffsets;
|
||||
private final int totalAggSize;
|
||||
private final LinkedHashMap<String, Integer> dimensionOrder;
|
||||
private final CopyOnWriteArrayList<String> dimensions;
|
||||
protected final CopyOnWriteArrayList<String> dimensions;
|
||||
private final DimensionHolder dimValues;
|
||||
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
|
||||
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
|
||||
private final ResourceHolder<ByteBuffer> bufferHolder;
|
||||
private final DB db;
|
||||
private final DB factsDb;
|
||||
private final boolean useOffheap;
|
||||
private volatile AtomicInteger numEntries = new AtomicInteger();
|
||||
// This is modified on add() in a critical section.
|
||||
private ThreadLocal<InputRow> in = new ThreadLocal<>();
|
||||
|
@ -123,8 +109,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
public IncrementalIndex(
|
||||
IncrementalIndexSchema incrementalIndexSchema,
|
||||
StupidPool<ByteBuffer> bufferPool,
|
||||
final boolean deserializeComplexMetrics,
|
||||
final boolean useOffheap
|
||||
final boolean deserializeComplexMetrics
|
||||
)
|
||||
{
|
||||
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
|
||||
|
@ -327,25 +312,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
}
|
||||
this.bufferHolder = bufferPool.take();
|
||||
this.dimValues = new DimensionHolder();
|
||||
this.useOffheap = useOffheap;
|
||||
if (this.useOffheap) {
|
||||
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
|
||||
.transactionDisable()
|
||||
.asyncWriteEnable()
|
||||
.cacheSoftRefEnable();
|
||||
db = dbMaker.make();
|
||||
factsDb = dbMaker.make();
|
||||
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
|
||||
this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID())
|
||||
.keySerializer(timeAndDimsSerializer)
|
||||
.comparator(timeAndDimsSerializer.getComparator())
|
||||
.valueSerializer(Serializer.INTEGER)
|
||||
.make();
|
||||
} else {
|
||||
db = null;
|
||||
factsDb = null;
|
||||
this.facts = new ConcurrentSkipListMap<>();
|
||||
}
|
||||
this.facts = createFactsTable();
|
||||
}
|
||||
|
||||
protected ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable() {
|
||||
return new ConcurrentSkipListMap<>();
|
||||
}
|
||||
|
||||
public IncrementalIndex(
|
||||
|
@ -361,18 +332,16 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
.withMetrics(metrics)
|
||||
.build(),
|
||||
bufferPool,
|
||||
true,
|
||||
false
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
public IncrementalIndex(
|
||||
IncrementalIndexSchema incrementalIndexSchema,
|
||||
StupidPool<ByteBuffer> bufferPool,
|
||||
boolean useOffheap
|
||||
StupidPool<ByteBuffer> bufferPool
|
||||
)
|
||||
{
|
||||
this(incrementalIndexSchema, bufferPool, true, useOffheap);
|
||||
this(incrementalIndexSchema, bufferPool, true);
|
||||
}
|
||||
|
||||
public IncrementalIndex(
|
||||
|
@ -380,8 +349,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
QueryGranularity gran,
|
||||
final AggregatorFactory[] metrics,
|
||||
StupidPool<ByteBuffer> bufferPool,
|
||||
boolean deserializeComplexMetrics,
|
||||
boolean useOffheap
|
||||
boolean deserializeComplexMetrics
|
||||
)
|
||||
{
|
||||
this(
|
||||
|
@ -390,8 +358,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
.withMetrics(metrics)
|
||||
.build(),
|
||||
bufferPool,
|
||||
deserializeComplexMetrics,
|
||||
useOffheap
|
||||
deserializeComplexMetrics
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -685,10 +652,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
{
|
||||
try {
|
||||
bufferHolder.close();
|
||||
if (db != null) {
|
||||
factsDb.close();
|
||||
db.close();
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -713,7 +676,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
{
|
||||
DimDim holder = dimensions.get(dimension);
|
||||
if (holder == null) {
|
||||
holder = new DimDim(dimension);
|
||||
holder = createDimDim(dimension);
|
||||
dimensions.put(dimension, holder);
|
||||
} else {
|
||||
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
|
||||
|
@ -727,6 +690,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
}
|
||||
}
|
||||
|
||||
protected DimDim createDimDim(String dimension){
|
||||
return new DimDimImpl();
|
||||
}
|
||||
|
||||
static class TimeAndDims implements Comparable<TimeAndDims>
|
||||
{
|
||||
private final long timestamp;
|
||||
|
@ -812,42 +779,51 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
}
|
||||
}
|
||||
|
||||
class DimDim
|
||||
static interface DimDim
|
||||
{
|
||||
private final Interner interner;
|
||||
public String get(String value);
|
||||
|
||||
public int getId(String value);
|
||||
|
||||
public String getValue(int id);
|
||||
|
||||
public boolean contains(String value);
|
||||
|
||||
public int size();
|
||||
|
||||
public int add(String value);
|
||||
|
||||
public int getSortedId(String value);
|
||||
|
||||
public String getSortedValue(int index);
|
||||
|
||||
public void sort();
|
||||
|
||||
public boolean compareCannonicalValues(String s1, String s2);
|
||||
}
|
||||
|
||||
private static class DimDimImpl implements DimDim{
|
||||
private final Map<String, Integer> falseIds;
|
||||
private final Map<Integer, String> falseIdsReverse;
|
||||
private volatile String[] sortedVals = null;
|
||||
// size on MapDB is slow so maintain a count here
|
||||
private volatile int size = 0;
|
||||
final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap();
|
||||
|
||||
public DimDim(String dimName)
|
||||
|
||||
public DimDimImpl()
|
||||
{
|
||||
if (useOffheap) {
|
||||
falseIds = db.createHashMap(dimName)
|
||||
.keySerializer(Serializer.STRING)
|
||||
.valueSerializer(Serializer.INTEGER)
|
||||
.make();
|
||||
falseIdsReverse = db.createHashMap(dimName + "_inverse")
|
||||
.keySerializer(Serializer.INTEGER)
|
||||
.valueSerializer(Serializer.STRING)
|
||||
.make();
|
||||
interner = new WeakInterner();
|
||||
} else {
|
||||
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
|
||||
falseIds = biMap;
|
||||
falseIdsReverse = biMap.inverse();
|
||||
interner = new StrongInterner();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
|
||||
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
|
||||
*/
|
||||
public String get(String value)
|
||||
public String get(String str)
|
||||
{
|
||||
return interner.getCanonicalValue(value);
|
||||
String prev = poorMansInterning.putIfAbsent(str, str);
|
||||
return prev != null ? prev : str;
|
||||
}
|
||||
|
||||
public int getId(String value)
|
||||
|
@ -867,17 +843,13 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
|
||||
public int size()
|
||||
{
|
||||
return size;
|
||||
return falseIds.size();
|
||||
}
|
||||
|
||||
public synchronized int add(String value)
|
||||
{
|
||||
int id = size++;
|
||||
int id = falseIds.size();
|
||||
falseIds.put(value, id);
|
||||
if (useOffheap) {
|
||||
// onheap implementation uses a Bimap.
|
||||
falseIdsReverse.put(id, value);
|
||||
}
|
||||
return id;
|
||||
}
|
||||
|
||||
|
@ -915,138 +887,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
|
||||
public boolean compareCannonicalValues(String s1, String s2)
|
||||
{
|
||||
return interner.compareCanonicalValues(s1, s2);
|
||||
}
|
||||
}
|
||||
|
||||
private static interface Interner
|
||||
{
|
||||
public String getCanonicalValue(String str);
|
||||
|
||||
public boolean compareCanonicalValues(String s1, String s2);
|
||||
}
|
||||
|
||||
private static class WeakInterner implements Interner
|
||||
{
|
||||
private static final WeakHashMap<String, WeakReference<String>> cache =
|
||||
new WeakHashMap();
|
||||
|
||||
@Override
|
||||
public String getCanonicalValue(String str)
|
||||
{
|
||||
final WeakReference<String> cached = cache.get(str);
|
||||
if (cached != null) {
|
||||
final String value = cached.get();
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
cache.put(str, new WeakReference(str));
|
||||
return str;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean compareCanonicalValues(String s1, String s2)
|
||||
{
|
||||
return s1.equals(s2);
|
||||
}
|
||||
}
|
||||
|
||||
private static class StrongInterner implements Interner
|
||||
{
|
||||
final Map<String, String> poorMansInterning = Maps.newConcurrentMap();
|
||||
|
||||
@Override
|
||||
public String getCanonicalValue(String str)
|
||||
{
|
||||
String value = poorMansInterning.get(str);
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
poorMansInterning.put(str, str);
|
||||
return str;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean compareCanonicalValues(String s1, String s2)
|
||||
{
|
||||
/**
|
||||
* using == here instead of .equals() to speed up lookups
|
||||
*/
|
||||
return s1 == s2;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
|
||||
{
|
||||
private final TimeAndDimsComparator comparator;
|
||||
private final transient IncrementalIndex incrementalIndex;
|
||||
|
||||
TimeAndDimsSerializer(IncrementalIndex incrementalIndex)
|
||||
{
|
||||
this.comparator = new TimeAndDimsComparator();
|
||||
this.incrementalIndex = incrementalIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException
|
||||
{
|
||||
for (int i = start; i < end; i++) {
|
||||
TimeAndDims timeAndDim = (TimeAndDims) keys[i];
|
||||
out.writeLong(timeAndDim.timestamp);
|
||||
out.writeInt(timeAndDim.dims.length);
|
||||
int index = 0;
|
||||
for (String[] dims : timeAndDim.dims) {
|
||||
if (dims == null) {
|
||||
out.write(-1);
|
||||
} else {
|
||||
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index));
|
||||
out.writeInt(dims.length);
|
||||
for (String value : dims) {
|
||||
out.writeInt(dimDim.getId(value));
|
||||
}
|
||||
}
|
||||
index++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException
|
||||
{
|
||||
Object[] ret = new Object[size];
|
||||
for (int i = start; i < end; i++) {
|
||||
final long timeStamp = in.readLong();
|
||||
final String[][] dims = new String[in.readInt()][];
|
||||
for (int k = 0; k < dims.length; k++) {
|
||||
int len = in.readInt();
|
||||
if (len != -1) {
|
||||
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k));
|
||||
String[] col = new String[len];
|
||||
for (int l = 0; l < col.length; l++) {
|
||||
col[l] = dimDim.get(dimDim.getValue(in.readInt()));
|
||||
}
|
||||
dims[k] = col;
|
||||
}
|
||||
}
|
||||
ret[i] = new TimeAndDims(timeStamp, dims);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<TimeAndDims> getComparator()
|
||||
{
|
||||
return comparator;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TimeAndDimsComparator implements Comparator, Serializable
|
||||
{
|
||||
@Override
|
||||
public int compare(Object o1, Object o2)
|
||||
{
|
||||
return ((TimeAndDims) o1).compareTo((TimeAndDims) o2);
|
||||
return s1 ==s2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,255 @@
|
|||
package io.druid.segment.incremental;
|
||||
|
||||
|
||||
import com.metamx.common.ISE;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import org.mapdb.BTreeKeySerializer;
|
||||
import org.mapdb.DB;
|
||||
import org.mapdb.DBMaker;
|
||||
import org.mapdb.Serializer;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
|
||||
public class OffheapIncrementalIndex extends IncrementalIndex
|
||||
{
|
||||
private volatile DB db;
|
||||
private volatile DB factsDb;
|
||||
|
||||
public OffheapIncrementalIndex(
|
||||
IncrementalIndexSchema incrementalIndexSchema,
|
||||
StupidPool<ByteBuffer> bufferPool
|
||||
)
|
||||
{
|
||||
super(incrementalIndexSchema, bufferPool);
|
||||
}
|
||||
|
||||
public OffheapIncrementalIndex(
|
||||
long minTimestamp,
|
||||
QueryGranularity gran,
|
||||
final AggregatorFactory[] metrics,
|
||||
StupidPool<ByteBuffer> bufferPool,
|
||||
boolean deserializeComplexMetrics
|
||||
|
||||
)
|
||||
{
|
||||
super(minTimestamp, gran, metrics, bufferPool, deserializeComplexMetrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable()
|
||||
{
|
||||
if (factsDb == null) {
|
||||
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
|
||||
.transactionDisable()
|
||||
.asyncWriteEnable()
|
||||
.cacheSoftRefEnable();
|
||||
factsDb = dbMaker.make();
|
||||
db = dbMaker.make();
|
||||
}
|
||||
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
|
||||
return factsDb.createTreeMap("__facts" + UUID.randomUUID())
|
||||
.keySerializer(timeAndDimsSerializer)
|
||||
.comparator(timeAndDimsSerializer.getComparator())
|
||||
.valueSerializer(Serializer.INTEGER)
|
||||
.make();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DimDim createDimDim(String dimension)
|
||||
{
|
||||
return new OffheapDimDim(dimension);
|
||||
}
|
||||
|
||||
public static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
|
||||
{
|
||||
private final TimeAndDimsComparator comparator;
|
||||
private final transient IncrementalIndex incrementalIndex;
|
||||
|
||||
TimeAndDimsSerializer(IncrementalIndex incrementalIndex)
|
||||
{
|
||||
this.comparator = new TimeAndDimsComparator();
|
||||
this.incrementalIndex = incrementalIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException
|
||||
{
|
||||
for (int i = start; i < end; i++) {
|
||||
TimeAndDims timeAndDim = (TimeAndDims) keys[i];
|
||||
out.writeLong(timeAndDim.getTimestamp());
|
||||
out.writeInt(timeAndDim.getDims().length);
|
||||
int index = 0;
|
||||
for (String[] dims : timeAndDim.getDims()) {
|
||||
if (dims == null) {
|
||||
out.write(-1);
|
||||
} else {
|
||||
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index));
|
||||
out.writeInt(dims.length);
|
||||
for (String value : dims) {
|
||||
out.writeInt(dimDim.getId(value));
|
||||
}
|
||||
}
|
||||
index++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException
|
||||
{
|
||||
Object[] ret = new Object[size];
|
||||
for (int i = start; i < end; i++) {
|
||||
final long timeStamp = in.readLong();
|
||||
final String[][] dims = new String[in.readInt()][];
|
||||
for (int k = 0; k < dims.length; k++) {
|
||||
int len = in.readInt();
|
||||
if (len != -1) {
|
||||
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k));
|
||||
String[] col = new String[len];
|
||||
for (int l = 0; l < col.length; l++) {
|
||||
col[l] = dimDim.get(dimDim.getValue(in.readInt()));
|
||||
}
|
||||
dims[k] = col;
|
||||
}
|
||||
}
|
||||
ret[i] = new TimeAndDims(timeStamp, dims);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<TimeAndDims> getComparator()
|
||||
{
|
||||
return comparator;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TimeAndDimsComparator implements Comparator, Serializable
|
||||
{
|
||||
@Override
|
||||
public int compare(Object o1, Object o2)
|
||||
{
|
||||
return ((TimeAndDims) o1).compareTo((TimeAndDims) o2);
|
||||
}
|
||||
}
|
||||
|
||||
private class OffheapDimDim implements DimDim
|
||||
{
|
||||
private final Map<String, Integer> falseIds;
|
||||
private final Map<Integer, String> falseIdsReverse;
|
||||
private final WeakHashMap<String, WeakReference<String>> cache =
|
||||
new WeakHashMap();
|
||||
private volatile String[] sortedVals = null;
|
||||
// size on MapDB is slow so maintain a count here
|
||||
private volatile int size = 0;
|
||||
|
||||
public OffheapDimDim(String dimension)
|
||||
{
|
||||
falseIds = db.createHashMap(dimension)
|
||||
.keySerializer(Serializer.STRING)
|
||||
.valueSerializer(Serializer.INTEGER)
|
||||
.make();
|
||||
falseIdsReverse = db.createHashMap(dimension + "_inverse")
|
||||
.keySerializer(Serializer.INTEGER)
|
||||
.valueSerializer(Serializer.STRING)
|
||||
.make();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
|
||||
*
|
||||
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
|
||||
*/
|
||||
public String get(String str)
|
||||
{
|
||||
final WeakReference<String> cached = cache.get(str);
|
||||
if (cached != null) {
|
||||
final String value = cached.get();
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
cache.put(str, new WeakReference(str));
|
||||
return str;
|
||||
}
|
||||
|
||||
public int getId(String value)
|
||||
{
|
||||
return falseIds.get(value);
|
||||
}
|
||||
|
||||
public String getValue(int id)
|
||||
{
|
||||
return falseIdsReverse.get(id);
|
||||
}
|
||||
|
||||
public boolean contains(String value)
|
||||
{
|
||||
return falseIds.containsKey(value);
|
||||
}
|
||||
|
||||
public int size()
|
||||
{
|
||||
return size;
|
||||
}
|
||||
|
||||
public synchronized int add(String value)
|
||||
{
|
||||
int id = size++;
|
||||
falseIds.put(value, id);
|
||||
falseIdsReverse.put(id, value);
|
||||
return id;
|
||||
}
|
||||
|
||||
public int getSortedId(String value)
|
||||
{
|
||||
assertSorted();
|
||||
return Arrays.binarySearch(sortedVals, value);
|
||||
}
|
||||
|
||||
public String getSortedValue(int index)
|
||||
{
|
||||
assertSorted();
|
||||
return sortedVals[index];
|
||||
}
|
||||
|
||||
public void sort()
|
||||
{
|
||||
if (sortedVals == null) {
|
||||
sortedVals = new String[falseIds.size()];
|
||||
|
||||
int index = 0;
|
||||
for (String value : falseIds.keySet()) {
|
||||
sortedVals[index++] = value;
|
||||
}
|
||||
Arrays.sort(sortedVals);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertSorted()
|
||||
{
|
||||
if (sortedVals == null) {
|
||||
throw new ISE("Call sort() before calling the getSorted* methods.");
|
||||
}
|
||||
}
|
||||
|
||||
public boolean compareCannonicalValues(String s1, String s2)
|
||||
{
|
||||
return s1.equals(s2);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -38,6 +38,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
|||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.OffheapIncrementalIndex;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -53,12 +54,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
*/
|
||||
public class TestIndex
|
||||
{
|
||||
private static final Logger log = new Logger(TestIndex.class);
|
||||
|
||||
private static IncrementalIndex realtimeIndex = null;
|
||||
private static QueryableIndex mmappedIndex = null;
|
||||
private static QueryableIndex mergedRealtime = null;
|
||||
|
||||
public static final String[] COLUMNS = new String[]{
|
||||
"ts",
|
||||
"provider",
|
||||
|
@ -70,6 +65,7 @@ public class TestIndex
|
|||
};
|
||||
public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"};
|
||||
public static final String[] METRICS = new String[]{"iNdEx"};
|
||||
private static final Logger log = new Logger(TestIndex.class);
|
||||
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
|
||||
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
|
||||
|
@ -82,6 +78,10 @@ public class TestIndex
|
|||
}
|
||||
}
|
||||
|
||||
private static IncrementalIndex realtimeIndex = null;
|
||||
private static QueryableIndex mmappedIndex = null;
|
||||
private static QueryableIndex mergedRealtime = null;
|
||||
|
||||
public static IncrementalIndex getIncrementalTestIndex(boolean useOffheap)
|
||||
{
|
||||
synchronized (log) {
|
||||
|
@ -155,16 +155,23 @@ public class TestIndex
|
|||
{
|
||||
final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
|
||||
log.info("Realtime loading index file[%s]", resource);
|
||||
|
||||
final IncrementalIndex retVal = new IncrementalIndex(
|
||||
new IncrementalIndexSchema.Builder().withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
|
||||
.withQueryGranularity(QueryGranularity.NONE)
|
||||
.withMetrics(METRIC_AGGS)
|
||||
.build(),
|
||||
TestQueryRunners.pool,
|
||||
true,
|
||||
useOffheap
|
||||
);
|
||||
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
|
||||
.withQueryGranularity(QueryGranularity.NONE)
|
||||
.withMetrics(METRIC_AGGS)
|
||||
.build();
|
||||
final IncrementalIndex retVal;
|
||||
if (useOffheap) {
|
||||
retVal = new OffheapIncrementalIndex(
|
||||
schema,
|
||||
TestQueryRunners.pool
|
||||
);
|
||||
} else {
|
||||
retVal = new IncrementalIndex(
|
||||
schema,
|
||||
TestQueryRunners.pool
|
||||
);
|
||||
}
|
||||
|
||||
final AtomicLong startTime = new AtomicLong();
|
||||
int lineCount;
|
||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.offheap.OffheapBufferPool;
|
|||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.OffheapIncrementalIndex;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
import io.druid.segment.realtime.FireHydrant;
|
||||
|
@ -180,16 +181,24 @@ public class Sink implements Iterable<FireHydrant>
|
|||
aggsSize += agg.getMaxIntermediateSize();
|
||||
}
|
||||
int bufferSize = aggsSize * config.getMaxRowsInMemory();
|
||||
IncrementalIndex newIndex = new IncrementalIndex(
|
||||
new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(minTimestamp)
|
||||
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
|
||||
.withDimensionsSpec(schema.getParser())
|
||||
.withMetrics(schema.getAggregators())
|
||||
.build(),
|
||||
new OffheapBufferPool(bufferSize),
|
||||
config.isIngestOffheap()
|
||||
);
|
||||
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(minTimestamp)
|
||||
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
|
||||
.withDimensionsSpec(schema.getParser())
|
||||
.withMetrics(schema.getAggregators())
|
||||
.build();
|
||||
final IncrementalIndex newIndex;
|
||||
if (config.isIngestOffheap()) {
|
||||
newIndex = new OffheapIncrementalIndex(
|
||||
indexSchema,
|
||||
new OffheapBufferPool(bufferSize)
|
||||
);
|
||||
} else {
|
||||
newIndex = new IncrementalIndex(
|
||||
indexSchema,
|
||||
new OffheapBufferPool(bufferSize)
|
||||
);
|
||||
}
|
||||
|
||||
final FireHydrant old;
|
||||
synchronized (hydrantLock) {
|
||||
|
|
Loading…
Reference in New Issue