Merge pull request #752 from metamx/mapdb-branch

Offheap dimensions
This commit is contained in:
fjy 2014-10-01 16:19:34 -06:00
commit 263c45c015
24 changed files with 512 additions and 103 deletions

View File

@ -166,6 +166,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText, combineText,
false,
false false
); );
} }

View File

@ -54,6 +54,7 @@ public class HadoopTuningConfig implements TuningConfig
false, false,
null, null,
false, false,
false,
false false
); );
} }
@ -70,6 +71,7 @@ public class HadoopTuningConfig implements TuningConfig
private final Map<String, String> jobProperties; private final Map<String, String> jobProperties;
private final boolean combineText; private final boolean combineText;
private final boolean persistInHeap; private final boolean persistInHeap;
private final boolean ingestOffheap;
@JsonCreator @JsonCreator
public HadoopTuningConfig( public HadoopTuningConfig(
@ -84,7 +86,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties, final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText, final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("persistInHeap") boolean persistInHeap final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap
) )
{ {
this.workingPath = workingPath == null ? null : workingPath; this.workingPath = workingPath == null ? null : workingPath;
@ -101,6 +104,7 @@ public class HadoopTuningConfig implements TuningConfig
: ImmutableMap.copyOf(jobProperties)); : ImmutableMap.copyOf(jobProperties));
this.combineText = combineText; this.combineText = combineText;
this.persistInHeap = persistInHeap; this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
} }
@JsonProperty @JsonProperty
@ -175,6 +179,11 @@ public class HadoopTuningConfig implements TuningConfig
return persistInHeap; return persistInHeap;
} }
@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}
public HadoopTuningConfig withWorkingPath(String path) public HadoopTuningConfig withWorkingPath(String path)
{ {
return new HadoopTuningConfig( return new HadoopTuningConfig(
@ -189,7 +198,8 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText, combineText,
persistInHeap persistInHeap,
ingestOffheap
); );
} }
@ -207,7 +217,8 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText, combineText,
persistInHeap persistInHeap,
ingestOffheap
); );
} }
@ -225,7 +236,8 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText, combineText,
persistInHeap persistInHeap,
ingestOffheap
); );
} }
} }

View File

@ -45,6 +45,7 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils; import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
@ -634,16 +635,25 @@ public class IndexGeneratorJob implements Jobby
for (AggregatorFactory agg : aggs) { for (AggregatorFactory agg : aggs) {
aggsSize += agg.getMaxIntermediateSize(); aggsSize += agg.getMaxIntermediateSize();
} }
int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary(); final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
return new IncrementalIndex( int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary();
new IncrementalIndexSchema.Builder() final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis()) .withMinTimestamp(theBucket.time.getMillis())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser()) .withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs) .withMetrics(aggs)
.build(), .build();
if (tuningConfig.isIngestOffheap()) {
return new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize) new OffheapBufferPool(bufferSize)
); );
} else {
return new IncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
);
}
} }
private void createNewZipEntry(ZipOutputStream out, String name) throws IOException private void createNewZipEntry(ZipOutputStream out, String name) throws IOException

View File

@ -403,7 +403,7 @@ public class IndexTask extends AbstractFixedIntervalTask
tmpDir tmpDir
).findPlumber( ).findPlumber(
schema, schema,
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null), new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null),
metrics metrics
); );

View File

@ -144,6 +144,7 @@ public class RealtimeIndexTask extends AbstractTask
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy, rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists, maxPendingPersists,
spec.getShardSpec(), spec.getShardSpec(),
false,
false false
), ),
null, null, null, null null, null, null, null

View File

@ -429,6 +429,11 @@
<version>2.3.0</version> <version>2.3.0</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.6</version>
</dependency>
<!-- Test Scope --> <!-- Test Scope -->
<dependency> <dependency>

View File

@ -86,6 +86,10 @@
<groupId>net.jpountz.lz4</groupId> <groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId> <artifactId>lz4</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->

View File

@ -31,6 +31,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
@ -73,7 +74,9 @@ public class GroupByQueryHelper
} }
} }
); );
IncrementalIndex index = new IncrementalIndex( final IncrementalIndex index;
if(query.getContextValue("useOffheap", false)){
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp // use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart // since incoming truncated timestamps may precede timeStart
granTimeStart, granTimeStart,
@ -82,6 +85,17 @@ public class GroupByQueryHelper
bufferPool, bufferPool,
false false
); );
} else {
index = new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
bufferPool,
false
);
}
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>() Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{ {

View File

@ -67,7 +67,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -88,10 +88,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
private final int[] aggPositionOffsets; private final int[] aggPositionOffsets;
private final int totalAggSize; private final int totalAggSize;
private final LinkedHashMap<String, Integer> dimensionOrder; private final LinkedHashMap<String, Integer> dimensionOrder;
private final CopyOnWriteArrayList<String> dimensions; protected final CopyOnWriteArrayList<String> dimensions;
private final DimensionHolder dimValues; private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities; private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentSkipListMap<TimeAndDims, Integer> facts; private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder; private final ResourceHolder<ByteBuffer> bufferHolder;
private volatile AtomicInteger numEntries = new AtomicInteger(); private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section. // This is modified on add() in a critical section.
@ -312,7 +312,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
} }
this.bufferHolder = bufferPool.take(); this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder(); this.dimValues = new DimensionHolder();
this.facts = new ConcurrentSkipListMap<>(); this.facts = createFactsTable();
}
protected ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable() {
return new ConcurrentSkipListMap<>();
} }
public IncrementalIndex( public IncrementalIndex(
@ -437,21 +441,24 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
} }
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Integer rowOffset;
synchronized (this) { synchronized (this) {
if (!facts.containsKey(key)) { rowOffset = totalAggSize * numEntries.get();
int rowOffset = totalAggSize * numEntries.getAndIncrement(); final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
if (rowOffset + totalAggSize > bufferHolder.get().limit()) { if (rowOffset + totalAggSize > bufferHolder.get().limit()) {
facts.remove(key);
throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get()); throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get());
} }
numEntries.incrementAndGet();
for (int i = 0; i < aggs.length; i++) { for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i)); aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
} }
facts.put(key, rowOffset);
} }
} }
in.set(row); in.set(row);
int rowOffset = facts.get(key);
for (int i = 0; i < aggs.length; i++) { for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) { synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
@ -488,11 +495,9 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
int count = 0; int count = 0;
for (String dimValue : dimValues) { for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue); String canonicalDimValue = dimLookup.get(dimValue);
if (canonicalDimValue == null) { if (!dimLookup.contains(canonicalDimValue)) {
canonicalDimValue = dimValue;
dimLookup.add(dimValue); dimLookup.add(dimValue);
} }
retVal[count] = canonicalDimValue; retVal[count] = canonicalDimValue;
count++; count++;
} }
@ -581,7 +586,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return columnCapabilities.get(column); return columnCapabilities.get(column);
} }
ConcurrentSkipListMap<TimeAndDims, Integer> getFacts() ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{ {
return facts; return facts;
} }
@ -653,7 +658,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
} }
} }
static class DimensionHolder class DimensionHolder
{ {
private final Map<String, DimDim> dimensions; private final Map<String, DimDim> dimensions;
@ -671,7 +676,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
{ {
DimDim holder = dimensions.get(dimension); DimDim holder = dimensions.get(dimension);
if (holder == null) { if (holder == null) {
holder = new DimDim(); holder = createDimDim(dimension);
dimensions.put(dimension, holder); dimensions.put(dimension, holder);
} else { } else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
@ -685,6 +690,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
} }
} }
protected DimDim createDimDim(String dimension){
return new DimDimImpl();
}
static class TimeAndDims implements Comparable<TimeAndDims> static class TimeAndDims implements Comparable<TimeAndDims>
{ {
private final long timestamp; private final long timestamp;
@ -770,14 +779,37 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
} }
} }
static class DimDim static interface DimDim
{ {
private final Map<String, String> poorMansInterning = Maps.newConcurrentMap(); public String get(String value);
public int getId(String value);
public String getValue(int id);
public boolean contains(String value);
public int size();
public int add(String value);
public int getSortedId(String value);
public String getSortedValue(int index);
public void sort();
public boolean compareCannonicalValues(String s1, String s2);
}
private static class DimDimImpl implements DimDim{
private final Map<String, Integer> falseIds; private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse; private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null; private volatile String[] sortedVals = null;
final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap();
public DimDim()
public DimDimImpl()
{ {
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create()); BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
falseIds = biMap; falseIds = biMap;
@ -788,9 +820,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/ */
public String get(String value) public String get(String str)
{ {
return value == null ? null : poorMansInterning.get(value); String prev = poorMansInterning.putIfAbsent(str, str);
return prev != null ? prev : str;
} }
public int getId(String value) public int getId(String value)
@ -803,20 +836,21 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return falseIdsReverse.get(id); return falseIdsReverse.get(id);
} }
public boolean contains(String value)
{
return falseIds.containsKey(value);
}
public int size() public int size()
{ {
return poorMansInterning.size(); return falseIds.size();
} }
public Set<String> keySet() public synchronized int add(String value)
{ {
return poorMansInterning.keySet(); int id = falseIds.size();
} falseIds.put(value, id);
return id;
public synchronized void add(String value)
{
poorMansInterning.put(value, value);
falseIds.put(value, falseIds.size());
} }
public int getSortedId(String value) public int getSortedId(String value)
@ -850,5 +884,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
throw new ISE("Call sort() before calling the getSorted* methods."); throw new ISE("Call sort() before calling the getSorted* methods.");
} }
} }
public boolean compareCannonicalValues(String s1, String s2)
{
return s1 ==s2;
}
} }
} }

View File

@ -23,11 +23,9 @@ import com.google.common.base.Function;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.segment.IndexableAdapter; import io.druid.segment.IndexableAdapter;
import io.druid.segment.Rowboat; import io.druid.segment.Rowboat;
import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.EmptyIndexedInts; import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;

View File

@ -29,8 +29,8 @@ import com.metamx.collections.spatial.search.Bound;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.QueryInterruptedException; import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
@ -501,8 +501,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
if (dimIndexObject == null) { if (dimIndexObject == null) {
return new BooleanValueMatcher(false); return new BooleanValueMatcher(false);
} }
String idObject = index.getDimension(dimension.toLowerCase()).get(value); final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase());
if (idObject == null) { if (!dimDim.contains(value)) {
if (value == null || "".equals(value)) { if (value == null || "".equals(value)) {
final int dimIndex = dimIndexObject; final int dimIndex = dimIndexObject;
@ -523,7 +523,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
} }
final int dimIndex = dimIndexObject; final int dimIndex = dimIndexObject;
final String id = idObject; final String id = dimDim.get(value);
return new ValueMatcher() return new ValueMatcher()
{ {
@ -536,11 +536,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
} }
for (String dimVal : dims[dimIndex]) { for (String dimVal : dims[dimIndex]) {
/** if (dimDim.compareCannonicalValues(id,dimVal)) {
* using == here instead of .equals() to speed up lookups made possible by
* {@link io.druid.segment.incremental.IncrementalIndex.DimDim#poorMansInterning}
*/
if (id == dimVal) {
return true; return true;
} }
} }

View File

@ -0,0 +1,274 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.incremental;
import com.metamx.common.ISE;
import io.druid.collections.StupidPool;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import org.mapdb.BTreeKeySerializer;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
public class OffheapIncrementalIndex extends IncrementalIndex
{
private volatile DB db;
private volatile DB factsDb;
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, bufferPool);
}
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics
)
{
super(minTimestamp, gran, metrics, bufferPool, deserializeComplexMetrics);
}
@Override
protected synchronized ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable()
{
if (factsDb == null) {
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
.transactionDisable()
.asyncWriteEnable()
.cacheSoftRefEnable();
factsDb = dbMaker.make();
db = dbMaker.make();
}
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
return factsDb.createTreeMap("__facts" + UUID.randomUUID())
.keySerializer(timeAndDimsSerializer)
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
}
@Override
protected DimDim createDimDim(String dimension)
{
return new OffheapDimDim(dimension);
}
public static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
{
private final TimeAndDimsComparator comparator;
private final transient IncrementalIndex incrementalIndex;
TimeAndDimsSerializer(IncrementalIndex incrementalIndex)
{
this.comparator = new TimeAndDimsComparator();
this.incrementalIndex = incrementalIndex;
}
@Override
public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException
{
for (int i = start; i < end; i++) {
TimeAndDims timeAndDim = (TimeAndDims) keys[i];
out.writeLong(timeAndDim.getTimestamp());
out.writeInt(timeAndDim.getDims().length);
int index = 0;
for (String[] dims : timeAndDim.getDims()) {
if (dims == null) {
out.write(-1);
} else {
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index));
out.writeInt(dims.length);
for (String value : dims) {
out.writeInt(dimDim.getId(value));
}
}
index++;
}
}
}
@Override
public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException
{
Object[] ret = new Object[size];
for (int i = start; i < end; i++) {
final long timeStamp = in.readLong();
final String[][] dims = new String[in.readInt()][];
for (int k = 0; k < dims.length; k++) {
int len = in.readInt();
if (len != -1) {
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k));
String[] col = new String[len];
for (int l = 0; l < col.length; l++) {
col[l] = dimDim.get(dimDim.getValue(in.readInt()));
}
dims[k] = col;
}
}
ret[i] = new TimeAndDims(timeStamp, dims);
}
return ret;
}
@Override
public Comparator<TimeAndDims> getComparator()
{
return comparator;
}
}
public static class TimeAndDimsComparator implements Comparator, Serializable
{
@Override
public int compare(Object o1, Object o2)
{
return ((TimeAndDims) o1).compareTo((TimeAndDims) o2);
}
}
private class OffheapDimDim implements DimDim
{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private final WeakHashMap<String, WeakReference<String>> cache =
new WeakHashMap();
private volatile String[] sortedVals = null;
// size on MapDB is slow so maintain a count here
private volatile int size = 0;
public OffheapDimDim(String dimension)
{
falseIds = db.createHashMap(dimension)
.keySerializer(Serializer.STRING)
.valueSerializer(Serializer.INTEGER)
.make();
falseIdsReverse = db.createHashMap(dimension + "_inverse")
.keySerializer(Serializer.INTEGER)
.valueSerializer(Serializer.STRING)
.make();
}
/**
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
*
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/
public String get(String str)
{
final WeakReference<String> cached = cache.get(str);
if (cached != null) {
final String value = cached.get();
if (value != null) {
return value;
}
}
cache.put(str, new WeakReference(str));
return str;
}
public int getId(String value)
{
return falseIds.get(value);
}
public String getValue(int id)
{
return falseIdsReverse.get(id);
}
public boolean contains(String value)
{
return falseIds.containsKey(value);
}
public int size()
{
return size;
}
public synchronized int add(String value)
{
int id = size++;
falseIds.put(value, id);
falseIdsReverse.put(id, value);
return id;
}
public int getSortedId(String value)
{
assertSorted();
return Arrays.binarySearch(sortedVals, value);
}
public String getSortedValue(int index)
{
assertSorted();
return sortedVals[index];
}
public void sort()
{
if (sortedVals == null) {
sortedVals = new String[falseIds.size()];
int index = 0;
for (String value : falseIds.keySet()) {
sortedVals[index++] = value;
}
Arrays.sort(sortedVals);
}
}
private void assertSorted()
{
if (sortedVals == null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
}
}
public boolean compareCannonicalValues(String s1, String s2)
{
return s1.equals(s2);
}
}
}

View File

@ -194,9 +194,10 @@ public class QueryRunnerTestHelper
) )
throws IOException throws IOException
{ {
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false);
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true);
return Arrays.asList( return Arrays.asList(
new Object[][]{ new Object[][]{
{ {
@ -207,6 +208,9 @@ public class QueryRunnerTestHelper
}, },
{ {
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
},
{
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
} }
} }
); );
@ -218,9 +222,11 @@ public class QueryRunnerTestHelper
) )
throws IOException throws IOException
{ {
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false);
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true);
return Arrays.asList( return Arrays.asList(
new Object[][]{ new Object[][]{
{ {
@ -231,6 +237,9 @@ public class QueryRunnerTestHelper
}, },
{ {
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
},
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
} }
} }
); );

View File

@ -50,7 +50,7 @@ public class SegmentAnalyzerTest
public void testIncrementalDoesNotWork() throws Exception public void testIncrementalDoesNotWork() throws Exception
{ {
final List<SegmentAnalysis> results = getSegmentAnalysises( final List<SegmentAnalysis> results = getSegmentAnalysises(
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), null) new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null)
); );
Assert.assertEquals(0, results.size()); Assert.assertEquals(0, results.size());

View File

@ -37,6 +37,8 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics; import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -52,12 +54,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class TestIndex public class TestIndex
{ {
private static final Logger log = new Logger(TestIndex.class);
private static IncrementalIndex realtimeIndex = null;
private static QueryableIndex mmappedIndex = null;
private static QueryableIndex mergedRealtime = null;
public static final String[] COLUMNS = new String[]{ public static final String[] COLUMNS = new String[]{
"ts", "ts",
"provider", "provider",
@ -69,6 +65,7 @@ public class TestIndex
}; };
public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"}; public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"};
public static final String[] METRICS = new String[]{"iNdEx"}; public static final String[] METRICS = new String[]{"iNdEx"};
private static final Logger log = new Logger(TestIndex.class);
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"); private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]), new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
@ -81,7 +78,11 @@ public class TestIndex
} }
} }
public static IncrementalIndex getIncrementalTestIndex() private static IncrementalIndex realtimeIndex = null;
private static QueryableIndex mmappedIndex = null;
private static QueryableIndex mergedRealtime = null;
public static IncrementalIndex getIncrementalTestIndex(boolean useOffheap)
{ {
synchronized (log) { synchronized (log) {
if (realtimeIndex != null) { if (realtimeIndex != null) {
@ -89,7 +90,7 @@ public class TestIndex
} }
} }
return realtimeIndex = makeRealtimeIndex("druid.sample.tsv"); return realtimeIndex = makeRealtimeIndex("druid.sample.tsv", useOffheap);
} }
public static QueryableIndex getMMappedTestIndex() public static QueryableIndex getMMappedTestIndex()
@ -100,7 +101,7 @@ public class TestIndex
} }
} }
IncrementalIndex incrementalIndex = getIncrementalTestIndex(); IncrementalIndex incrementalIndex = getIncrementalTestIndex(false);
mmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex); mmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex);
return mmappedIndex; return mmappedIndex;
@ -114,8 +115,8 @@ public class TestIndex
} }
try { try {
IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top"); IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top", false);
IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom"); IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom", false);
File tmpFile = File.createTempFile("yay", "who"); File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete(); tmpFile.delete();
@ -150,15 +151,27 @@ public class TestIndex
} }
} }
private static IncrementalIndex makeRealtimeIndex(final String resourceFilename) private static IncrementalIndex makeRealtimeIndex(final String resourceFilename, final boolean useOffheap)
{ {
final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
log.info("Realtime loading index file[%s]", resource); log.info("Realtime loading index file[%s]", resource);
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
final IncrementalIndex retVal = new IncrementalIndex( .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.NONE, METRIC_AGGS, .withQueryGranularity(QueryGranularity.NONE)
.withMetrics(METRIC_AGGS)
.build();
final IncrementalIndex retVal;
if (useOffheap) {
retVal = new OffheapIncrementalIndex(
schema,
TestQueryRunners.pool TestQueryRunners.pool
); );
} else {
retVal = new IncrementalIndex(
schema,
TestQueryRunners.pool
);
}
final AtomicLong startTime = new AtomicLong(); final AtomicLong startTime = new AtomicLong();
int lineCount; int lineCount;

View File

@ -124,7 +124,8 @@ public class SpatialFilterBonusTest
) )
) )
).build(), ).build(),
TestQueryRunners.pool TestQueryRunners.pool,
false
); );
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
@ -255,7 +256,8 @@ public class SpatialFilterBonusTest
) )
).build(), ).build(),
TestQueryRunners.pool TestQueryRunners.pool,
false
); );
IncrementalIndex second = new IncrementalIndex( IncrementalIndex second = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -273,8 +275,8 @@ public class SpatialFilterBonusTest
) )
) )
).build(), ).build(),
TestQueryRunners.pool TestQueryRunners.pool,
false
); );
IncrementalIndex third = new IncrementalIndex( IncrementalIndex third = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -293,8 +295,8 @@ public class SpatialFilterBonusTest
) )
).build(), ).build(),
TestQueryRunners.pool TestQueryRunners.pool,
false
); );

View File

@ -124,7 +124,8 @@ public class SpatialFilterTest
) )
) )
).build(), ).build(),
TestQueryRunners.pool TestQueryRunners.pool,
false
); );
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
@ -269,7 +270,8 @@ public class SpatialFilterTest
) )
) )
).build(), ).build(),
TestQueryRunners.pool TestQueryRunners.pool,
false
); );
IncrementalIndex second = new IncrementalIndex( IncrementalIndex second = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -288,7 +290,8 @@ public class SpatialFilterTest
) )
).build(), ).build(),
TestQueryRunners.pool TestQueryRunners.pool,
false
); );
IncrementalIndex third = new IncrementalIndex( IncrementalIndex third = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -307,7 +310,8 @@ public class SpatialFilterTest
) )
).build(), ).build(),
TestQueryRunners.pool TestQueryRunners.pool,
false
); );

View File

@ -45,6 +45,8 @@ public class RealtimeTuningConfig implements TuningConfig
private static final int defaultMaxPendingPersists = 0; private static final int defaultMaxPendingPersists = 0;
private static final ShardSpec defaultShardSpec = new NoneShardSpec(); private static final ShardSpec defaultShardSpec = new NoneShardSpec();
private static final boolean defaultPersistInHeap = false; private static final boolean defaultPersistInHeap = false;
private static final boolean defaultIngestOffheap = false;
// Might make sense for this to be a builder // Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig() public static RealtimeTuningConfig makeDefaultTuningConfig()
@ -58,7 +60,8 @@ public class RealtimeTuningConfig implements TuningConfig
defaultRejectionPolicyFactory, defaultRejectionPolicyFactory,
defaultMaxPendingPersists, defaultMaxPendingPersists,
defaultShardSpec, defaultShardSpec,
defaultPersistInHeap defaultPersistInHeap,
defaultIngestOffheap
); );
} }
@ -71,6 +74,7 @@ public class RealtimeTuningConfig implements TuningConfig
private final int maxPendingPersists; private final int maxPendingPersists;
private final ShardSpec shardSpec; private final ShardSpec shardSpec;
private final boolean persistInHeap; private final boolean persistInHeap;
private final boolean ingestOffheap;
@JsonCreator @JsonCreator
public RealtimeTuningConfig( public RealtimeTuningConfig(
@ -82,7 +86,8 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("persistInHeap") Boolean persistInHeap @JsonProperty("persistInHeap") Boolean persistInHeap,
@JsonProperty("ingestOffheap") Boolean ingestOffheap
) )
{ {
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -98,6 +103,7 @@ public class RealtimeTuningConfig implements TuningConfig
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap; this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap;
} }
@JsonProperty @JsonProperty
@ -154,6 +160,11 @@ public class RealtimeTuningConfig implements TuningConfig
return persistInHeap; return persistInHeap;
} }
@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{ {
return new RealtimeTuningConfig( return new RealtimeTuningConfig(
@ -165,7 +176,8 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory, rejectionPolicyFactory,
maxPendingPersists, maxPendingPersists,
shardSpec, shardSpec,
persistInHeap persistInHeap,
ingestOffheap
); );
} }
@ -180,7 +192,8 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory, rejectionPolicyFactory,
maxPendingPersists, maxPendingPersists,
shardSpec, shardSpec,
persistInHeap persistInHeap,
ingestOffheap
); );
} }
} }

View File

@ -97,6 +97,7 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
((RealtimePlumberSchool) plumberSchool).getRejectionPolicyFactory(), ((RealtimePlumberSchool) plumberSchool).getRejectionPolicyFactory(),
((RealtimePlumberSchool) plumberSchool).getMaxPendingPersists(), ((RealtimePlumberSchool) plumberSchool).getMaxPendingPersists(),
schema.getShardSpec(), schema.getShardSpec(),
false,
false false
); );
} else { } else {

View File

@ -31,6 +31,7 @@ import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.FireHydrant;
@ -180,15 +181,24 @@ public class Sink implements Iterable<FireHydrant>
aggsSize += agg.getMaxIntermediateSize(); aggsSize += agg.getMaxIntermediateSize();
} }
int bufferSize = aggsSize * config.getMaxRowsInMemory(); int bufferSize = aggsSize * config.getMaxRowsInMemory();
IncrementalIndex newIndex = new IncrementalIndex( final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp) .withMinTimestamp(minTimestamp)
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
.withDimensionsSpec(schema.getParser()) .withDimensionsSpec(schema.getParser())
.withMetrics(schema.getAggregators()) .withMetrics(schema.getAggregators())
.build(), .build();
final IncrementalIndex newIndex;
if (config.isIngestOffheap()) {
newIndex = new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize) new OffheapBufferPool(bufferSize)
); );
} else {
newIndex = new IncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
);
}
final FireHydrant old; final FireHydrant old;
synchronized (hydrantLock) { synchronized (hydrantLock) {

View File

@ -77,7 +77,7 @@ public class FireDepartmentTest
) )
), ),
new RealtimeTuningConfig( new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, false null, null, null, null, null, null, null, null, false, false
), ),
null, null, null, null null, null, null, null
); );

View File

@ -117,6 +117,7 @@ public class RealtimeManagerTest
null, null,
null, null,
null, null,
null,
null null
); );
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -163,6 +163,7 @@ public class RealtimePlumberSchoolTest
rejectionPolicy, rejectionPolicy,
null, null,
null, null,
null,
null null
); );

View File

@ -64,6 +64,7 @@ public class SinkTest
null, null,
null, null,
null, null,
false,
false false
); );
final Sink sink = new Sink(interval, schema, tuningConfig, version); final Sink sink = new Sink(interval, schema, tuningConfig, version);