From ad75a21040def8a7a70a63ef0cf41a2bb203043d Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 1 Oct 2014 13:58:51 +0530 Subject: [PATCH] separate offheapIncrementalIndex implementation --- .../io/druid/indexer/IndexGeneratorJob.java | 28 +- .../query/groupby/GroupByQueryHelper.java | 20 +- .../segment/incremental/IncrementalIndex.java | 259 ++++-------------- .../incremental/OffheapIncrementalIndex.java | 255 +++++++++++++++++ .../test/java/io/druid/segment/TestIndex.java | 39 +-- .../druid/segment/realtime/plumber/Sink.java | 29 +- 6 files changed, 381 insertions(+), 249 deletions(-) create mode 100644 processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 74ac61e637c..5f33d677014 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -45,6 +45,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.SegmentUtils; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configurable; @@ -636,16 +637,23 @@ public class IndexGeneratorJob implements Jobby } final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary(); - return new IncrementalIndex( - new IncrementalIndexSchema.Builder() - .withMinTimestamp(theBucket.time.getMillis()) - .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) - .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) - .withMetrics(aggs) - .build(), - new OffheapBufferPool(bufferSize), - tuningConfig.isIngestOffheap() - ); + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(theBucket.time.getMillis()) + .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) + .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) + .withMetrics(aggs) + .build(); + if (tuningConfig.isIngestOffheap()) { + return new OffheapIncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } else { + return new IncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } } private void createNewZipEntry(ZipOutputStream out, String name) throws IOException diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 3cb263b5b4d..a3ca6dbee72 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -31,6 +31,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OffheapIncrementalIndex; import java.nio.ByteBuffer; import java.util.List; @@ -73,17 +74,28 @@ public class GroupByQueryHelper } } ); - - IncrementalIndex index = new IncrementalIndex( + final IncrementalIndex index; + if(query.getContextValue("useOffheap", false)){ + index = new OffheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + bufferPool, + false + ); + } else { + index = new IncrementalIndex( // use granularity truncated min timestamp // since incoming truncated timestamps may precede timeStart granTimeStart, gran, aggs.toArray(new AggregatorFactory[aggs.size()]), bufferPool, - false, - query.getContextValue("useOffheap", false) + false ); + } Accumulator accumulator = new Accumulator() { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 96584317c62..320f6f8648c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -56,29 +56,18 @@ import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.mapdb.BTreeKeySerializer; -import org.mapdb.CC; -import org.mapdb.DB; -import org.mapdb.DBMaker; -import org.mapdb.Serializer; import javax.annotation.Nullable; import java.io.Closeable; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; -import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -99,14 +88,11 @@ public class IncrementalIndex implements Iterable, Closeable private final int[] aggPositionOffsets; private final int totalAggSize; private final LinkedHashMap dimensionOrder; - private final CopyOnWriteArrayList dimensions; + protected final CopyOnWriteArrayList dimensions; private final DimensionHolder dimValues; private final Map columnCapabilities; private final ConcurrentNavigableMap facts; private final ResourceHolder bufferHolder; - private final DB db; - private final DB factsDb; - private final boolean useOffheap; private volatile AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. private ThreadLocal in = new ThreadLocal<>(); @@ -123,8 +109,7 @@ public class IncrementalIndex implements Iterable, Closeable public IncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, StupidPool bufferPool, - final boolean deserializeComplexMetrics, - final boolean useOffheap + final boolean deserializeComplexMetrics ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -327,25 +312,11 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - this.useOffheap = useOffheap; - if (this.useOffheap) { - final DBMaker dbMaker = DBMaker.newMemoryDirectDB() - .transactionDisable() - .asyncWriteEnable() - .cacheSoftRefEnable(); - db = dbMaker.make(); - factsDb = dbMaker.make(); - final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); - this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) - .keySerializer(timeAndDimsSerializer) - .comparator(timeAndDimsSerializer.getComparator()) - .valueSerializer(Serializer.INTEGER) - .make(); - } else { - db = null; - factsDb = null; - this.facts = new ConcurrentSkipListMap<>(); - } + this.facts = createFactsTable(); + } + + protected ConcurrentNavigableMap createFactsTable() { + return new ConcurrentSkipListMap<>(); } public IncrementalIndex( @@ -361,18 +332,16 @@ public class IncrementalIndex implements Iterable, Closeable .withMetrics(metrics) .build(), bufferPool, - true, - false + true ); } public IncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, - StupidPool bufferPool, - boolean useOffheap + StupidPool bufferPool ) { - this(incrementalIndexSchema, bufferPool, true, useOffheap); + this(incrementalIndexSchema, bufferPool, true); } public IncrementalIndex( @@ -380,8 +349,7 @@ public class IncrementalIndex implements Iterable, Closeable QueryGranularity gran, final AggregatorFactory[] metrics, StupidPool bufferPool, - boolean deserializeComplexMetrics, - boolean useOffheap + boolean deserializeComplexMetrics ) { this( @@ -390,8 +358,7 @@ public class IncrementalIndex implements Iterable, Closeable .withMetrics(metrics) .build(), bufferPool, - deserializeComplexMetrics, - useOffheap + deserializeComplexMetrics ); } @@ -685,10 +652,6 @@ public class IncrementalIndex implements Iterable, Closeable { try { bufferHolder.close(); - if (db != null) { - factsDb.close(); - db.close(); - } } catch (IOException e) { throw Throwables.propagate(e); @@ -713,7 +676,7 @@ public class IncrementalIndex implements Iterable, Closeable { DimDim holder = dimensions.get(dimension); if (holder == null) { - holder = new DimDim(dimension); + holder = createDimDim(dimension); dimensions.put(dimension, holder); } else { throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); @@ -727,6 +690,10 @@ public class IncrementalIndex implements Iterable, Closeable } } + protected DimDim createDimDim(String dimension){ + return new DimDimImpl(); + } + static class TimeAndDims implements Comparable { private final long timestamp; @@ -812,42 +779,51 @@ public class IncrementalIndex implements Iterable, Closeable } } - class DimDim + static interface DimDim { - private final Interner interner; + public String get(String value); + + public int getId(String value); + + public String getValue(int id); + + public boolean contains(String value); + + public int size(); + + public int add(String value); + + public int getSortedId(String value); + + public String getSortedValue(int index); + + public void sort(); + + public boolean compareCannonicalValues(String s1, String s2); + } + + private static class DimDimImpl implements DimDim{ private final Map falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; - // size on MapDB is slow so maintain a count here - private volatile int size = 0; + final ConcurrentMap poorMansInterning = Maps.newConcurrentMap(); - public DimDim(String dimName) + + public DimDimImpl() { - if (useOffheap) { - falseIds = db.createHashMap(dimName) - .keySerializer(Serializer.STRING) - .valueSerializer(Serializer.INTEGER) - .make(); - falseIdsReverse = db.createHashMap(dimName + "_inverse") - .keySerializer(Serializer.INTEGER) - .valueSerializer(Serializer.STRING) - .make(); - interner = new WeakInterner(); - } else { BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); falseIds = biMap; falseIdsReverse = biMap.inverse(); - interner = new StrongInterner(); - } } /** * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) */ - public String get(String value) + public String get(String str) { - return interner.getCanonicalValue(value); + String prev = poorMansInterning.putIfAbsent(str, str); + return prev != null ? prev : str; } public int getId(String value) @@ -867,17 +843,13 @@ public class IncrementalIndex implements Iterable, Closeable public int size() { - return size; + return falseIds.size(); } public synchronized int add(String value) { - int id = size++; + int id = falseIds.size(); falseIds.put(value, id); - if (useOffheap) { - // onheap implementation uses a Bimap. - falseIdsReverse.put(id, value); - } return id; } @@ -915,138 +887,7 @@ public class IncrementalIndex implements Iterable, Closeable public boolean compareCannonicalValues(String s1, String s2) { - return interner.compareCanonicalValues(s1, s2); - } - } - - private static interface Interner - { - public String getCanonicalValue(String str); - - public boolean compareCanonicalValues(String s1, String s2); - } - - private static class WeakInterner implements Interner - { - private static final WeakHashMap> cache = - new WeakHashMap(); - - @Override - public String getCanonicalValue(String str) - { - final WeakReference cached = cache.get(str); - if (cached != null) { - final String value = cached.get(); - if (value != null) { - return value; - } - } - cache.put(str, new WeakReference(str)); - return str; - } - - @Override - public boolean compareCanonicalValues(String s1, String s2) - { - return s1.equals(s2); - } - } - - private static class StrongInterner implements Interner - { - final Map poorMansInterning = Maps.newConcurrentMap(); - - @Override - public String getCanonicalValue(String str) - { - String value = poorMansInterning.get(str); - if (value != null) { - return value; - } - poorMansInterning.put(str, str); - return str; - } - - @Override - public boolean compareCanonicalValues(String s1, String s2) - { - /** - * using == here instead of .equals() to speed up lookups - */ - return s1 == s2; - } - } - - public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable - { - private final TimeAndDimsComparator comparator; - private final transient IncrementalIndex incrementalIndex; - - TimeAndDimsSerializer(IncrementalIndex incrementalIndex) - { - this.comparator = new TimeAndDimsComparator(); - this.incrementalIndex = incrementalIndex; - } - - @Override - public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException - { - for (int i = start; i < end; i++) { - TimeAndDims timeAndDim = (TimeAndDims) keys[i]; - out.writeLong(timeAndDim.timestamp); - out.writeInt(timeAndDim.dims.length); - int index = 0; - for (String[] dims : timeAndDim.dims) { - if (dims == null) { - out.write(-1); - } else { - DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); - out.writeInt(dims.length); - for (String value : dims) { - out.writeInt(dimDim.getId(value)); - } - } - index++; - } - } - } - - @Override - public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException - { - Object[] ret = new Object[size]; - for (int i = start; i < end; i++) { - final long timeStamp = in.readLong(); - final String[][] dims = new String[in.readInt()][]; - for (int k = 0; k < dims.length; k++) { - int len = in.readInt(); - if (len != -1) { - DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); - String[] col = new String[len]; - for (int l = 0; l < col.length; l++) { - col[l] = dimDim.get(dimDim.getValue(in.readInt())); - } - dims[k] = col; - } - } - ret[i] = new TimeAndDims(timeStamp, dims); - } - return ret; - } - - @Override - public Comparator getComparator() - { - return comparator; - } - } - - public static class TimeAndDimsComparator implements Comparator, Serializable - { - @Override - public int compare(Object o1, Object o2) - { - return ((TimeAndDims) o1).compareTo((TimeAndDims) o2); + return s1 ==s2; } } } diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java new file mode 100644 index 00000000000..55c388c653a --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -0,0 +1,255 @@ +package io.druid.segment.incremental; + + +import com.metamx.common.ISE; +import io.druid.collections.StupidPool; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import org.mapdb.BTreeKeySerializer; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.UUID; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentNavigableMap; + +public class OffheapIncrementalIndex extends IncrementalIndex +{ + private volatile DB db; + private volatile DB factsDb; + + public OffheapIncrementalIndex( + IncrementalIndexSchema incrementalIndexSchema, + StupidPool bufferPool + ) + { + super(incrementalIndexSchema, bufferPool); + } + + public OffheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics, + StupidPool bufferPool, + boolean deserializeComplexMetrics + + ) + { + super(minTimestamp, gran, metrics, bufferPool, deserializeComplexMetrics); + } + + @Override + protected synchronized ConcurrentNavigableMap createFactsTable() + { + if (factsDb == null) { + final DBMaker dbMaker = DBMaker.newMemoryDirectDB() + .transactionDisable() + .asyncWriteEnable() + .cacheSoftRefEnable(); + factsDb = dbMaker.make(); + db = dbMaker.make(); + } + final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); + return factsDb.createTreeMap("__facts" + UUID.randomUUID()) + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); + } + + @Override + protected DimDim createDimDim(String dimension) + { + return new OffheapDimDim(dimension); + } + + public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable + { + private final TimeAndDimsComparator comparator; + private final transient IncrementalIndex incrementalIndex; + + TimeAndDimsSerializer(IncrementalIndex incrementalIndex) + { + this.comparator = new TimeAndDimsComparator(); + this.incrementalIndex = incrementalIndex; + } + + @Override + public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException + { + for (int i = start; i < end; i++) { + TimeAndDims timeAndDim = (TimeAndDims) keys[i]; + out.writeLong(timeAndDim.getTimestamp()); + out.writeInt(timeAndDim.getDims().length); + int index = 0; + for (String[] dims : timeAndDim.getDims()) { + if (dims == null) { + out.write(-1); + } else { + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); + out.writeInt(dims.length); + for (String value : dims) { + out.writeInt(dimDim.getId(value)); + } + } + index++; + } + } + } + + @Override + public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException + { + Object[] ret = new Object[size]; + for (int i = start; i < end; i++) { + final long timeStamp = in.readLong(); + final String[][] dims = new String[in.readInt()][]; + for (int k = 0; k < dims.length; k++) { + int len = in.readInt(); + if (len != -1) { + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); + String[] col = new String[len]; + for (int l = 0; l < col.length; l++) { + col[l] = dimDim.get(dimDim.getValue(in.readInt())); + } + dims[k] = col; + } + } + ret[i] = new TimeAndDims(timeStamp, dims); + } + return ret; + } + + @Override + public Comparator getComparator() + { + return comparator; + } + } + + public static class TimeAndDimsComparator implements Comparator, Serializable + { + @Override + public int compare(Object o1, Object o2) + { + return ((TimeAndDims) o1).compareTo((TimeAndDims) o2); + } + } + + private class OffheapDimDim implements DimDim + { + private final Map falseIds; + private final Map falseIdsReverse; + private final WeakHashMap> cache = + new WeakHashMap(); + private volatile String[] sortedVals = null; + // size on MapDB is slow so maintain a count here + private volatile int size = 0; + + public OffheapDimDim(String dimension) + { + falseIds = db.createHashMap(dimension) + .keySerializer(Serializer.STRING) + .valueSerializer(Serializer.INTEGER) + .make(); + falseIdsReverse = db.createHashMap(dimension + "_inverse") + .keySerializer(Serializer.INTEGER) + .valueSerializer(Serializer.STRING) + .make(); + } + + /** + * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` + * + * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) + */ + public String get(String str) + { + final WeakReference cached = cache.get(str); + if (cached != null) { + final String value = cached.get(); + if (value != null) { + return value; + } + } + cache.put(str, new WeakReference(str)); + return str; + } + + public int getId(String value) + { + return falseIds.get(value); + } + + public String getValue(int id) + { + return falseIdsReverse.get(id); + } + + public boolean contains(String value) + { + return falseIds.containsKey(value); + } + + public int size() + { + return size; + } + + public synchronized int add(String value) + { + int id = size++; + falseIds.put(value, id); + falseIdsReverse.put(id, value); + return id; + } + + public int getSortedId(String value) + { + assertSorted(); + return Arrays.binarySearch(sortedVals, value); + } + + public String getSortedValue(int index) + { + assertSorted(); + return sortedVals[index]; + } + + public void sort() + { + if (sortedVals == null) { + sortedVals = new String[falseIds.size()]; + + int index = 0; + for (String value : falseIds.keySet()) { + sortedVals[index++] = value; + } + Arrays.sort(sortedVals); + } + } + + private void assertSorted() + { + if (sortedVals == null) { + throw new ISE("Call sort() before calling the getSorted* methods."); + } + } + + public boolean compareCannonicalValues(String s1, String s2) + { + return s1.equals(s2); + } + } + +} diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 3f431ba84e6..7180a6edb52 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -38,6 +38,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -53,12 +54,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class TestIndex { - private static final Logger log = new Logger(TestIndex.class); - - private static IncrementalIndex realtimeIndex = null; - private static QueryableIndex mmappedIndex = null; - private static QueryableIndex mergedRealtime = null; - public static final String[] COLUMNS = new String[]{ "ts", "provider", @@ -70,6 +65,7 @@ public class TestIndex }; public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"}; public static final String[] METRICS = new String[]{"iNdEx"}; + private static final Logger log = new Logger(TestIndex.class); private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"); private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]), @@ -82,6 +78,10 @@ public class TestIndex } } + private static IncrementalIndex realtimeIndex = null; + private static QueryableIndex mmappedIndex = null; + private static QueryableIndex mergedRealtime = null; + public static IncrementalIndex getIncrementalTestIndex(boolean useOffheap) { synchronized (log) { @@ -155,16 +155,23 @@ public class TestIndex { final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); log.info("Realtime loading index file[%s]", resource); - - final IncrementalIndex retVal = new IncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) - .withQueryGranularity(QueryGranularity.NONE) - .withMetrics(METRIC_AGGS) - .build(), - TestQueryRunners.pool, - true, - useOffheap - ); + final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) + .withQueryGranularity(QueryGranularity.NONE) + .withMetrics(METRIC_AGGS) + .build(); + final IncrementalIndex retVal; + if (useOffheap) { + retVal = new OffheapIncrementalIndex( + schema, + TestQueryRunners.pool + ); + } else { + retVal = new IncrementalIndex( + schema, + TestQueryRunners.pool + ); + } final AtomicLong startTime = new AtomicLong(); int lineCount; diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index f739f0e8bb5..d3e9a0b26a2 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -31,6 +31,7 @@ import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireHydrant; @@ -180,16 +181,24 @@ public class Sink implements Iterable aggsSize += agg.getMaxIntermediateSize(); } int bufferSize = aggsSize * config.getMaxRowsInMemory(); - IncrementalIndex newIndex = new IncrementalIndex( - new IncrementalIndexSchema.Builder() - .withMinTimestamp(minTimestamp) - .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) - .withDimensionsSpec(schema.getParser()) - .withMetrics(schema.getAggregators()) - .build(), - new OffheapBufferPool(bufferSize), - config.isIngestOffheap() - ); + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(minTimestamp) + .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) + .withDimensionsSpec(schema.getParser()) + .withMetrics(schema.getAggregators()) + .build(); + final IncrementalIndex newIndex; + if (config.isIngestOffheap()) { + newIndex = new OffheapIncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } else { + newIndex = new IncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } final FireHydrant old; synchronized (hydrantLock) {