diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index 028f97d7db5..fec163e4164 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -166,6 +166,7 @@ public class HadoopIngestionSpec extends IngestionSpec jobProperties; private final boolean combineText; private final boolean persistInHeap; + private final boolean ingestOffheap; @JsonCreator public HadoopTuningConfig( @@ -84,7 +86,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("jobProperties") Map jobProperties, final @JsonProperty("combineText") boolean combineText, - final @JsonProperty("persistInHeap") boolean persistInHeap + final @JsonProperty("persistInHeap") boolean persistInHeap, + final @JsonProperty("ingestOffheap") boolean ingestOffheap ) { this.workingPath = workingPath == null ? null : workingPath; @@ -101,6 +104,7 @@ public class HadoopTuningConfig implements TuningConfig : ImmutableMap.copyOf(jobProperties)); this.combineText = combineText; this.persistInHeap = persistInHeap; + this.ingestOffheap = ingestOffheap; } @JsonProperty @@ -175,6 +179,11 @@ public class HadoopTuningConfig implements TuningConfig return persistInHeap; } + @JsonProperty + public boolean isIngestOffheap(){ + return ingestOffheap; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -189,7 +198,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -207,7 +217,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -225,7 +236,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 1b6878270f4..5f33d677014 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -45,6 +45,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.SegmentUtils; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configurable; @@ -634,16 +635,25 @@ public class IndexGeneratorJob implements Jobby for (AggregatorFactory agg : aggs) { aggsSize += agg.getMaxIntermediateSize(); } - int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary(); - return new IncrementalIndex( - new IncrementalIndexSchema.Builder() - .withMinTimestamp(theBucket.time.getMillis()) - .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) - .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) - .withMetrics(aggs) - .build(), - new OffheapBufferPool(bufferSize) - ); + final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); + int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary(); + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(theBucket.time.getMillis()) + .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) + .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) + .withMetrics(aggs) + .build(); + if (tuningConfig.isIngestOffheap()) { + return new OffheapIncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } else { + return new IncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } } private void createNewZipEntry(ZipOutputStream out, String name) throws IOException diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index c477cb5d153..058b3fd027e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -403,7 +403,7 @@ public class IndexTask extends AbstractFixedIntervalTask tmpDir ).findPlumber( schema, - new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null), + new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null), metrics ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index d073d88dc32..906e8e7a901 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -144,6 +144,7 @@ public class RealtimeIndexTask extends AbstractTask rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy, maxPendingPersists, spec.getShardSpec(), + false, false ), null, null, null, null diff --git a/pom.xml b/pom.xml index 023d21ab782..5248427356a 100644 --- a/pom.xml +++ b/pom.xml @@ -429,6 +429,11 @@ 2.3.0 provided + + org.mapdb + mapdb + 1.0.6 + diff --git a/processing/pom.xml b/processing/pom.xml index 8b04ea52763..7ccbed8b4da 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -86,6 +86,10 @@ net.jpountz.lz4 lz4 + + org.mapdb + mapdb + diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index aef4ae26098..a3ca6dbee72 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -31,6 +31,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OffheapIncrementalIndex; import java.nio.ByteBuffer; import java.util.List; @@ -73,7 +74,19 @@ public class GroupByQueryHelper } } ); - IncrementalIndex index = new IncrementalIndex( + final IncrementalIndex index; + if(query.getContextValue("useOffheap", false)){ + index = new OffheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + bufferPool, + false + ); + } else { + index = new IncrementalIndex( // use granularity truncated min timestamp // since incoming truncated timestamps may precede timeStart granTimeStart, @@ -82,6 +95,7 @@ public class GroupByQueryHelper bufferPool, false ); + } Accumulator accumulator = new Accumulator() { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 8cf5771aa4e..320f6f8648c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -67,7 +67,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -88,10 +88,10 @@ public class IncrementalIndex implements Iterable, Closeable private final int[] aggPositionOffsets; private final int totalAggSize; private final LinkedHashMap dimensionOrder; - private final CopyOnWriteArrayList dimensions; + protected final CopyOnWriteArrayList dimensions; private final DimensionHolder dimValues; private final Map columnCapabilities; - private final ConcurrentSkipListMap facts; + private final ConcurrentNavigableMap facts; private final ResourceHolder bufferHolder; private volatile AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. @@ -312,7 +312,11 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - this.facts = new ConcurrentSkipListMap<>(); + this.facts = createFactsTable(); + } + + protected ConcurrentNavigableMap createFactsTable() { + return new ConcurrentSkipListMap<>(); } public IncrementalIndex( @@ -437,21 +441,24 @@ public class IncrementalIndex implements Iterable, Closeable } final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); - + Integer rowOffset; synchronized (this) { - if (!facts.containsKey(key)) { - int rowOffset = totalAggSize * numEntries.getAndIncrement(); + rowOffset = totalAggSize * numEntries.get(); + final Integer prev = facts.putIfAbsent(key, rowOffset); + if (prev != null) { + rowOffset = prev; + } else { if (rowOffset + totalAggSize > bufferHolder.get().limit()) { + facts.remove(key); throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get()); } + numEntries.incrementAndGet(); for (int i = 0; i < aggs.length; i++) { aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i)); } - facts.put(key, rowOffset); } } in.set(row); - int rowOffset = facts.get(key); for (int i = 0; i < aggs.length; i++) { synchronized (aggs[i]) { aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); @@ -488,11 +495,9 @@ public class IncrementalIndex implements Iterable, Closeable int count = 0; for (String dimValue : dimValues) { String canonicalDimValue = dimLookup.get(dimValue); - if (canonicalDimValue == null) { - canonicalDimValue = dimValue; + if (!dimLookup.contains(canonicalDimValue)) { dimLookup.add(dimValue); } - retVal[count] = canonicalDimValue; count++; } @@ -581,7 +586,7 @@ public class IncrementalIndex implements Iterable, Closeable return columnCapabilities.get(column); } - ConcurrentSkipListMap getFacts() + ConcurrentNavigableMap getFacts() { return facts; } @@ -653,7 +658,7 @@ public class IncrementalIndex implements Iterable, Closeable } } - static class DimensionHolder + class DimensionHolder { private final Map dimensions; @@ -671,7 +676,7 @@ public class IncrementalIndex implements Iterable, Closeable { DimDim holder = dimensions.get(dimension); if (holder == null) { - holder = new DimDim(); + holder = createDimDim(dimension); dimensions.put(dimension, holder); } else { throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); @@ -685,6 +690,10 @@ public class IncrementalIndex implements Iterable, Closeable } } + protected DimDim createDimDim(String dimension){ + return new DimDimImpl(); + } + static class TimeAndDims implements Comparable { private final long timestamp; @@ -770,27 +779,51 @@ public class IncrementalIndex implements Iterable, Closeable } } - static class DimDim + static interface DimDim { - private final Map poorMansInterning = Maps.newConcurrentMap(); + public String get(String value); + + public int getId(String value); + + public String getValue(int id); + + public boolean contains(String value); + + public int size(); + + public int add(String value); + + public int getSortedId(String value); + + public String getSortedValue(int index); + + public void sort(); + + public boolean compareCannonicalValues(String s1, String s2); + } + + private static class DimDimImpl implements DimDim{ private final Map falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; + final ConcurrentMap poorMansInterning = Maps.newConcurrentMap(); - public DimDim() + + public DimDimImpl() { - BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); - falseIds = biMap; - falseIdsReverse = biMap.inverse(); + BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); + falseIds = biMap; + falseIdsReverse = biMap.inverse(); } /** * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) */ - public String get(String value) + public String get(String str) { - return value == null ? null : poorMansInterning.get(value); + String prev = poorMansInterning.putIfAbsent(str, str); + return prev != null ? prev : str; } public int getId(String value) @@ -803,20 +836,21 @@ public class IncrementalIndex implements Iterable, Closeable return falseIdsReverse.get(id); } + public boolean contains(String value) + { + return falseIds.containsKey(value); + } + public int size() { - return poorMansInterning.size(); + return falseIds.size(); } - public Set keySet() + public synchronized int add(String value) { - return poorMansInterning.keySet(); - } - - public synchronized void add(String value) - { - poorMansInterning.put(value, value); - falseIds.put(value, falseIds.size()); + int id = falseIds.size(); + falseIds.put(value, id); + return id; } public int getSortedId(String value) @@ -850,5 +884,10 @@ public class IncrementalIndex implements Iterable, Closeable throw new ISE("Call sort() before calling the getSorted* methods."); } } + + public boolean compareCannonicalValues(String s1, String s2) + { + return s1 ==s2; + } } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 0d7d10d2212..1addc0551e1 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -23,11 +23,9 @@ import com.google.common.base.Function; import com.google.common.collect.Maps; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; -import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.segment.IndexableAdapter; import io.druid.segment.Rowboat; import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ValueType; import io.druid.segment.data.EmptyIndexedInts; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index f52c57269ab..ac7e47ce108 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -29,8 +29,8 @@ import com.metamx.collections.spatial.search.Bound; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.BufferAggregator; import io.druid.query.QueryInterruptedException; +import io.druid.query.aggregation.BufferAggregator; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcherFactory; @@ -501,8 +501,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter if (dimIndexObject == null) { return new BooleanValueMatcher(false); } - String idObject = index.getDimension(dimension.toLowerCase()).get(value); - if (idObject == null) { + final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); + if (!dimDim.contains(value)) { if (value == null || "".equals(value)) { final int dimIndex = dimIndexObject; @@ -523,7 +523,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int dimIndex = dimIndexObject; - final String id = idObject; + final String id = dimDim.get(value); return new ValueMatcher() { @@ -536,11 +536,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } for (String dimVal : dims[dimIndex]) { - /** - * using == here instead of .equals() to speed up lookups made possible by - * {@link io.druid.segment.incremental.IncrementalIndex.DimDim#poorMansInterning} - */ - if (id == dimVal) { + if (dimDim.compareCannonicalValues(id,dimVal)) { return true; } } diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java new file mode 100644 index 00000000000..7e048c37e70 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -0,0 +1,274 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.incremental; + + +import com.metamx.common.ISE; +import io.druid.collections.StupidPool; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import org.mapdb.BTreeKeySerializer; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.UUID; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentNavigableMap; + +public class OffheapIncrementalIndex extends IncrementalIndex +{ + private volatile DB db; + private volatile DB factsDb; + + public OffheapIncrementalIndex( + IncrementalIndexSchema incrementalIndexSchema, + StupidPool bufferPool + ) + { + super(incrementalIndexSchema, bufferPool); + } + + public OffheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics, + StupidPool bufferPool, + boolean deserializeComplexMetrics + + ) + { + super(minTimestamp, gran, metrics, bufferPool, deserializeComplexMetrics); + } + + @Override + protected synchronized ConcurrentNavigableMap createFactsTable() + { + if (factsDb == null) { + final DBMaker dbMaker = DBMaker.newMemoryDirectDB() + .transactionDisable() + .asyncWriteEnable() + .cacheSoftRefEnable(); + factsDb = dbMaker.make(); + db = dbMaker.make(); + } + final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); + return factsDb.createTreeMap("__facts" + UUID.randomUUID()) + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); + } + + @Override + protected DimDim createDimDim(String dimension) + { + return new OffheapDimDim(dimension); + } + + public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable + { + private final TimeAndDimsComparator comparator; + private final transient IncrementalIndex incrementalIndex; + + TimeAndDimsSerializer(IncrementalIndex incrementalIndex) + { + this.comparator = new TimeAndDimsComparator(); + this.incrementalIndex = incrementalIndex; + } + + @Override + public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException + { + for (int i = start; i < end; i++) { + TimeAndDims timeAndDim = (TimeAndDims) keys[i]; + out.writeLong(timeAndDim.getTimestamp()); + out.writeInt(timeAndDim.getDims().length); + int index = 0; + for (String[] dims : timeAndDim.getDims()) { + if (dims == null) { + out.write(-1); + } else { + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); + out.writeInt(dims.length); + for (String value : dims) { + out.writeInt(dimDim.getId(value)); + } + } + index++; + } + } + } + + @Override + public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException + { + Object[] ret = new Object[size]; + for (int i = start; i < end; i++) { + final long timeStamp = in.readLong(); + final String[][] dims = new String[in.readInt()][]; + for (int k = 0; k < dims.length; k++) { + int len = in.readInt(); + if (len != -1) { + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); + String[] col = new String[len]; + for (int l = 0; l < col.length; l++) { + col[l] = dimDim.get(dimDim.getValue(in.readInt())); + } + dims[k] = col; + } + } + ret[i] = new TimeAndDims(timeStamp, dims); + } + return ret; + } + + @Override + public Comparator getComparator() + { + return comparator; + } + } + + public static class TimeAndDimsComparator implements Comparator, Serializable + { + @Override + public int compare(Object o1, Object o2) + { + return ((TimeAndDims) o1).compareTo((TimeAndDims) o2); + } + } + + private class OffheapDimDim implements DimDim + { + private final Map falseIds; + private final Map falseIdsReverse; + private final WeakHashMap> cache = + new WeakHashMap(); + private volatile String[] sortedVals = null; + // size on MapDB is slow so maintain a count here + private volatile int size = 0; + + public OffheapDimDim(String dimension) + { + falseIds = db.createHashMap(dimension) + .keySerializer(Serializer.STRING) + .valueSerializer(Serializer.INTEGER) + .make(); + falseIdsReverse = db.createHashMap(dimension + "_inverse") + .keySerializer(Serializer.INTEGER) + .valueSerializer(Serializer.STRING) + .make(); + } + + /** + * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` + * + * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) + */ + public String get(String str) + { + final WeakReference cached = cache.get(str); + if (cached != null) { + final String value = cached.get(); + if (value != null) { + return value; + } + } + cache.put(str, new WeakReference(str)); + return str; + } + + public int getId(String value) + { + return falseIds.get(value); + } + + public String getValue(int id) + { + return falseIdsReverse.get(id); + } + + public boolean contains(String value) + { + return falseIds.containsKey(value); + } + + public int size() + { + return size; + } + + public synchronized int add(String value) + { + int id = size++; + falseIds.put(value, id); + falseIdsReverse.put(id, value); + return id; + } + + public int getSortedId(String value) + { + assertSorted(); + return Arrays.binarySearch(sortedVals, value); + } + + public String getSortedValue(int index) + { + assertSorted(); + return sortedVals[index]; + } + + public void sort() + { + if (sortedVals == null) { + sortedVals = new String[falseIds.size()]; + + int index = 0; + for (String value : falseIds.keySet()) { + sortedVals[index++] = value; + } + Arrays.sort(sortedVals); + } + } + + private void assertSorted() + { + if (sortedVals == null) { + throw new ISE("Call sort() before calling the getSorted* methods."); + } + } + + public boolean compareCannonicalValues(String s1, String s2) + { + return s1.equals(s2); + } + } + +} diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index deeaff563e8..b47f78cefb1 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -194,9 +194,10 @@ public class QueryRunnerTestHelper ) throws IOException { - final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); + final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true); return Arrays.asList( new Object[][]{ { @@ -207,6 +208,9 @@ public class QueryRunnerTestHelper }, { makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) + }, + { + makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) } } ); @@ -218,9 +222,11 @@ public class QueryRunnerTestHelper ) throws IOException { - final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); + final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true); + return Arrays.asList( new Object[][]{ { @@ -231,6 +237,9 @@ public class QueryRunnerTestHelper }, { makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) + }, + { + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) } } ); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java index d22ba345a0e..503be5e5f6f 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java @@ -50,7 +50,7 @@ public class SegmentAnalyzerTest public void testIncrementalDoesNotWork() throws Exception { final List results = getSegmentAnalysises( - new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), null) + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null) ); Assert.assertEquals(0, results.size()); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index a5c9b42c412..7180a6edb52 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -37,6 +37,8 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -52,12 +54,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class TestIndex { - private static final Logger log = new Logger(TestIndex.class); - - private static IncrementalIndex realtimeIndex = null; - private static QueryableIndex mmappedIndex = null; - private static QueryableIndex mergedRealtime = null; - public static final String[] COLUMNS = new String[]{ "ts", "provider", @@ -69,6 +65,7 @@ public class TestIndex }; public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"}; public static final String[] METRICS = new String[]{"iNdEx"}; + private static final Logger log = new Logger(TestIndex.class); private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"); private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]), @@ -81,7 +78,11 @@ public class TestIndex } } - public static IncrementalIndex getIncrementalTestIndex() + private static IncrementalIndex realtimeIndex = null; + private static QueryableIndex mmappedIndex = null; + private static QueryableIndex mergedRealtime = null; + + public static IncrementalIndex getIncrementalTestIndex(boolean useOffheap) { synchronized (log) { if (realtimeIndex != null) { @@ -89,7 +90,7 @@ public class TestIndex } } - return realtimeIndex = makeRealtimeIndex("druid.sample.tsv"); + return realtimeIndex = makeRealtimeIndex("druid.sample.tsv", useOffheap); } public static QueryableIndex getMMappedTestIndex() @@ -100,7 +101,7 @@ public class TestIndex } } - IncrementalIndex incrementalIndex = getIncrementalTestIndex(); + IncrementalIndex incrementalIndex = getIncrementalTestIndex(false); mmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex); return mmappedIndex; @@ -114,8 +115,8 @@ public class TestIndex } try { - IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top"); - IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom"); + IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top", false); + IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom", false); File tmpFile = File.createTempFile("yay", "who"); tmpFile.delete(); @@ -150,15 +151,27 @@ public class TestIndex } } - private static IncrementalIndex makeRealtimeIndex(final String resourceFilename) + private static IncrementalIndex makeRealtimeIndex(final String resourceFilename, final boolean useOffheap) { final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); log.info("Realtime loading index file[%s]", resource); - - final IncrementalIndex retVal = new IncrementalIndex( - new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.NONE, METRIC_AGGS, - TestQueryRunners.pool - ); + final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) + .withQueryGranularity(QueryGranularity.NONE) + .withMetrics(METRIC_AGGS) + .build(); + final IncrementalIndex retVal; + if (useOffheap) { + retVal = new OffheapIncrementalIndex( + schema, + TestQueryRunners.pool + ); + } else { + retVal = new IncrementalIndex( + schema, + TestQueryRunners.pool + ); + } final AtomicLong startTime = new AtomicLong(); int lineCount; diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index df37ea25688..4e7ae7d3163 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -124,7 +124,8 @@ public class SpatialFilterBonusTest ) ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); theIndex.add( new MapBasedInputRow( @@ -255,7 +256,8 @@ public class SpatialFilterBonusTest ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); IncrementalIndex second = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -273,8 +275,8 @@ public class SpatialFilterBonusTest ) ) ).build(), - TestQueryRunners.pool - + TestQueryRunners.pool, + false ); IncrementalIndex third = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -293,8 +295,8 @@ public class SpatialFilterBonusTest ) ).build(), - TestQueryRunners.pool - + TestQueryRunners.pool, + false ); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index 7f6701c3407..5d94f9958c9 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -124,7 +124,8 @@ public class SpatialFilterTest ) ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); theIndex.add( new MapBasedInputRow( @@ -269,7 +270,8 @@ public class SpatialFilterTest ) ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); IncrementalIndex second = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -288,7 +290,8 @@ public class SpatialFilterTest ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); IncrementalIndex third = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -307,7 +310,8 @@ public class SpatialFilterTest ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index a4d60bfe77a..fa0c37b2cfd 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -45,6 +45,8 @@ public class RealtimeTuningConfig implements TuningConfig private static final int defaultMaxPendingPersists = 0; private static final ShardSpec defaultShardSpec = new NoneShardSpec(); private static final boolean defaultPersistInHeap = false; + private static final boolean defaultIngestOffheap = false; + // Might make sense for this to be a builder public static RealtimeTuningConfig makeDefaultTuningConfig() @@ -58,7 +60,8 @@ public class RealtimeTuningConfig implements TuningConfig defaultRejectionPolicyFactory, defaultMaxPendingPersists, defaultShardSpec, - defaultPersistInHeap + defaultPersistInHeap, + defaultIngestOffheap ); } @@ -71,6 +74,7 @@ public class RealtimeTuningConfig implements TuningConfig private final int maxPendingPersists; private final ShardSpec shardSpec; private final boolean persistInHeap; + private final boolean ingestOffheap; @JsonCreator public RealtimeTuningConfig( @@ -82,7 +86,8 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, - @JsonProperty("persistInHeap") Boolean persistInHeap + @JsonProperty("persistInHeap") Boolean persistInHeap, + @JsonProperty("ingestOffheap") Boolean ingestOffheap ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -98,6 +103,7 @@ public class RealtimeTuningConfig implements TuningConfig this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap; + this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap; } @JsonProperty @@ -154,6 +160,11 @@ public class RealtimeTuningConfig implements TuningConfig return persistInHeap; } + @JsonProperty + public boolean isIngestOffheap(){ + return ingestOffheap; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -165,7 +176,8 @@ public class RealtimeTuningConfig implements TuningConfig rejectionPolicyFactory, maxPendingPersists, shardSpec, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -180,7 +192,8 @@ public class RealtimeTuningConfig implements TuningConfig rejectionPolicyFactory, maxPendingPersists, shardSpec, - persistInHeap + persistInHeap, + ingestOffheap ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index 6851d693af5..3e2211f51fd 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -97,6 +97,7 @@ public class FireDepartment extends IngestionSpec aggsSize += agg.getMaxIntermediateSize(); } int bufferSize = aggsSize * config.getMaxRowsInMemory(); - IncrementalIndex newIndex = new IncrementalIndex( - new IncrementalIndexSchema.Builder() - .withMinTimestamp(minTimestamp) - .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) - .withDimensionsSpec(schema.getParser()) - .withMetrics(schema.getAggregators()) - .build(), - new OffheapBufferPool(bufferSize) - ); + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(minTimestamp) + .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) + .withDimensionsSpec(schema.getParser()) + .withMetrics(schema.getAggregators()) + .build(); + final IncrementalIndex newIndex; + if (config.isIngestOffheap()) { + newIndex = new OffheapIncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } else { + newIndex = new IncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } final FireHydrant old; synchronized (hydrantLock) { diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 4f9cb51ecc9..549574b238e 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -77,7 +77,7 @@ public class FireDepartmentTest ) ), new RealtimeTuningConfig( - null, null, null, null, null, null, null, null, false + null, null, null, null, null, null, null, null, false, false ), null, null, null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index d8c86386b8e..b6619b35f38 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -117,6 +117,7 @@ public class RealtimeManagerTest null, null, null, + null, null ); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index c120d31451e..c037d5b2b13 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -163,6 +163,7 @@ public class RealtimePlumberSchoolTest rejectionPolicy, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 227f753b114..8fed5962f54 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -64,6 +64,7 @@ public class SinkTest null, null, null, + false, false ); final Sink sink = new Sink(interval, schema, tuningConfig, version);