Merge pull request #921 from metamx/onheap-incremental-index

separate implementations of  incremental index
This commit is contained in:
Fangjin Yang 2014-12-08 13:59:08 -07:00
commit eb233fe50e
46 changed files with 1326 additions and 665 deletions

View File

@ -89,6 +89,12 @@ public class ApproximateHistogramAggregator implements Aggregator
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getLong()");
}
@Override
public String getName()
{

View File

@ -84,7 +84,13 @@ public class ApproximateHistogramFoldingAggregator implements Aggregator
@Override
public float getFloat()
{
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()");
throw new UnsupportedOperationException("ApproximateHistogramFoldingAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingAggregator does not support getLong()");
}
@Override

View File

@ -36,18 +36,21 @@ import java.util.Map;
@JsonTypeName("hadoop")
public class HadoopTuningConfig implements TuningConfig
{
private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> defaultShardSpecs = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int defaultRowFlushBoundary = 80000;
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
public static HadoopTuningConfig makeDefaultTuningConfig()
{
return new HadoopTuningConfig(
null,
new DateTime().toString(),
defaultPartitionsSpec,
defaultShardSpecs,
defaultRowFlushBoundary,
DEFAULT_PARTITIONS_SPEC,
DEFAULT_SHARD_SPECS,
DEFAULT_ROW_FLUSH_BOUNDARY,
false,
true,
false,
@ -55,7 +58,9 @@ public class HadoopTuningConfig implements TuningConfig
null,
false,
false,
false
false,
DEFAULT_BUFFER_SIZE,
DEFAULT_AGG_BUFFER_RATIO
);
}
@ -72,6 +77,8 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean combineText;
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
@JsonCreator
public HadoopTuningConfig(
@ -87,14 +94,16 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap
final @JsonProperty("ingestOffheap") boolean ingestOffheap,
final @JsonProperty("bufferSize") Integer bufferSize,
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
)
{
this.workingPath = workingPath == null ? null : workingPath;
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec == null ? defaultPartitionsSpec : partitionsSpec;
this.shardSpecs = shardSpecs == null ? defaultShardSpecs : shardSpecs;
this.rowFlushBoundary = rowFlushBoundary == null ? defaultRowFlushBoundary : rowFlushBoundary;
this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec;
this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs;
this.rowFlushBoundary = rowFlushBoundary == null ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
this.overwriteFiles = overwriteFiles;
@ -105,6 +114,8 @@ public class HadoopTuningConfig implements TuningConfig
this.combineText = combineText;
this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
}
@JsonProperty
@ -184,6 +195,17 @@ public class HadoopTuningConfig implements TuningConfig
return ingestOffheap;
}
@JsonProperty
public int getBufferSize(){
return bufferSize;
}
@JsonProperty
public float getAggregationBufferRatio()
{
return aggregationBufferRatio;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -199,7 +221,9 @@ public class HadoopTuningConfig implements TuningConfig
jobProperties,
combineText,
persistInHeap,
ingestOffheap
ingestOffheap,
bufferSize,
aggregationBufferRatio
);
}
@ -218,7 +242,9 @@ public class HadoopTuningConfig implements TuningConfig
jobProperties,
combineText,
persistInHeap,
ingestOffheap
ingestOffheap,
bufferSize,
aggregationBufferRatio
);
}
@ -237,7 +263,9 @@ public class HadoopTuningConfig implements TuningConfig
jobProperties,
combineText,
persistInHeap,
ingestOffheap
ingestOffheap,
bufferSize,
aggregationBufferRatio
);
}
}

View File

@ -33,6 +33,7 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.offheap.OffheapBufferPool;
@ -46,6 +47,7 @@ import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
@ -323,8 +325,12 @@ public class IndexGeneratorJob implements Jobby
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize();
final int aggregationBufferSize = (int) ((double) maxTotalBufferSize
* config.getSchema().getTuningConfig().getAggregationBufferRatio());
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
final StupidPool<ByteBuffer> bufferPool = new OffheapBufferPool(aggregationBufferSize);
IncrementalIndex index = makeIncrementalIndex(bucket, aggs, bufferPool);
try {
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
@ -347,7 +353,8 @@ public class IndexGeneratorJob implements Jobby
int numRows = index.add(inputRow);
++lineCount;
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
if (!index.canAppendRow()) {
log.info(index.getOutOfRowsReason());
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
@ -361,9 +368,9 @@ public class IndexGeneratorJob implements Jobby
context.progress();
persist(index, interval, file, progressIndicator);
// close this index and make a new one
// close this index and make a new one, reusing same buffer
index.close();
index = makeIncrementalIndex(bucket, aggs);
index = makeIncrementalIndex(bucket, aggs, bufferPool);
startTime = System.currentTimeMillis();
++indexCount;
@ -628,14 +635,9 @@ public class IndexGeneratorJob implements Jobby
return numRead;
}
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool)
{
int aggsSize = 0;
for (AggregatorFactory agg : aggs) {
aggsSize += agg.getMaxIntermediateSize();
}
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
@ -643,14 +645,17 @@ public class IndexGeneratorJob implements Jobby
.withMetrics(aggs)
.build();
if (tuningConfig.isIngestOffheap()) {
return new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
bufferPool,
true,
tuningConfig.getBufferSize()
);
} else {
return new IncrementalIndex(
return new OnheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
tuningConfig.getRowFlushBoundary()
);
}
}

View File

@ -182,7 +182,9 @@ public class HadoopDruidIndexerConfigTest
null,
false,
false,
false
false,
null,
null
)
);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(spec);

View File

@ -38,6 +38,7 @@ import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -107,7 +108,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
@Override
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {

View File

@ -329,7 +329,7 @@ public class IndexTask extends AbstractFixedIntervalTask
tmpDir
).findPlumber(
schema,
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null),
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null, null),
metrics
);

View File

@ -49,6 +49,7 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
@ -292,7 +293,8 @@ public class RealtimeIndexTask extends AbstractTask
}
fireDepartment.getMetrics().incrementProcessed();
if (currCount >= tuningConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}

View File

@ -233,7 +233,8 @@ public class TaskSerdeTest
1,
new NoneShardSpec(),
false,
false
false,
null
)
)
);

View File

@ -39,4 +39,6 @@ public interface Aggregator {
float getFloat();
String getName();
void close();
long getLong();
}

View File

@ -64,6 +64,12 @@ public class Aggregators
{
}
@Override
public long getLong()
{
return 0;
}
};
}

View File

@ -64,6 +64,12 @@ public class CountAggregator implements Aggregator
return (float) count;
}
@Override
public long getLong()
{
return count;
}
@Override
public String getName()
{

View File

@ -80,6 +80,12 @@ public class DoubleSumAggregator implements Aggregator
return (float) sum;
}
@Override
public long getLong()
{
return (long) sum;
}
@Override
public String getName()
{

View File

@ -58,6 +58,12 @@ public class FilteredAggregator implements Aggregator
return delegate.getFloat();
}
@Override
public long getLong()
{
return delegate.getLong();
}
@Override
public String getName()
{

View File

@ -74,6 +74,12 @@ public class HistogramAggregator implements Aggregator
throw new UnsupportedOperationException("HistogramAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("HistogramAggregator does not support getLong()");
}
@Override
public String getName()
{

View File

@ -76,6 +76,12 @@ public class JavaScriptAggregator implements Aggregator
return (float) current;
}
@Override
public long getLong()
{
return (long) current;
}
@Override
public String getName()
{

View File

@ -79,6 +79,12 @@ public class LongSumAggregator implements Aggregator
return (float) sum;
}
@Override
public long getLong()
{
return sum;
}
@Override
public String getName()
{

View File

@ -71,6 +71,12 @@ public class MaxAggregator implements Aggregator
return (float) max;
}
@Override
public long getLong()
{
return (long) max;
}
@Override
public String getName()
{

View File

@ -71,6 +71,12 @@ public class MinAggregator implements Aggregator
return (float) min;
}
@Override
public long getLong()
{
return (long) min;
}
@Override
public String getName()
{

View File

@ -126,6 +126,12 @@ public class CardinalityAggregator implements Aggregator
throw new UnsupportedOperationException("CardinalityAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("CardinalityAggregator does not support getLong()");
}
@Override
public String getName()
{

View File

@ -63,7 +63,13 @@ public class HyperUniquesAggregator implements Aggregator
@Override
public float getFloat()
{
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("HyperUniquesAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("HyperUniquesAggregator does not support getLong()");
}
@Override

View File

@ -31,7 +31,9 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List;
@ -75,7 +77,7 @@ public class GroupByQueryHelper
}
);
final IncrementalIndex index;
if(query.getContextValue("useOffheap", false)){
if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
@ -83,18 +85,19 @@ public class GroupByQueryHelper
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
bufferPool,
false
false,
Integer.MAX_VALUE
);
} else {
index = new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
bufferPool,
false
);
index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults()
);
}
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
@ -104,9 +107,10 @@ public class GroupByQueryHelper
{
if (in instanceof Row) {
if (accumulated.add(Rows.toCaseInsensitiveInputRow((Row) in, dimensions))
> config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
try {
accumulated.add(Rows.toCaseInsensitiveInputRow((Row) in, dimensions));
} catch(IndexSizeExceededException e) {
throw new ISE(e.getMessage());
}
} else {
throw new ISE("Unable to accumulate something of type [%s]", in.getClass());

View File

@ -20,9 +20,6 @@
package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
@ -32,15 +29,12 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
@ -60,24 +54,174 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class IncrementalIndex implements Iterable<Row>, Closeable
public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>, Closeable
{
protected static ColumnSelectorFactory makeColumnSelectorFactory(
final AggregatorFactory agg,
final ThreadLocal<InputRow> in,
final boolean deserializeComplexMetrics
)
{
return new ColumnSelectorFactory()
{
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getLongMetric(columnName);
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(final String column)
{
final String typeName = agg.getTypeName();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(column);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), column);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimension).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimension).indexOf(name);
}
};
}
};
}
private final long minTimestamp;
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
@ -85,16 +229,16 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
private final BufferAggregator[] aggs;
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final LinkedHashMap<String, Integer> dimensionOrder;
protected final CopyOnWriteArrayList<String> dimensions;
private final AggregatorType[] aggs;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder;
private final boolean deserializeComplexMetrics;
protected final CopyOnWriteArrayList<String> dimensions;
private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section.
private ThreadLocal<InputRow> in = new ThreadLocal<>();
@ -102,14 +246,12 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* @param incrementalIndexSchema
* @param bufferPool
* @param incrementalIndexSchema the schema to use for incremental index
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
*/
public IncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool,
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics
)
{
@ -117,165 +259,14 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = Lists.newCopyOnWriteArrayList();
this.deserializeComplexMetrics = deserializeComplexMetrics;
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggs = new BufferAggregator[metrics.length];
this.aggPositionOffsets = new int[metrics.length];
int currAggSize = 0;
this.aggs = initAggs(metrics, in, deserializeComplexMetrics);
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
new ColumnSelectorFactory()
{
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
if(columnName.equals(Column.TIME_COLUMN_NAME)){
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getLongMetric(columnName);
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(final String column)
{
final String typeName = agg.getTypeName();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(column);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), column);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimension).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimension).indexOf(name);
}
};
}
}
);
aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize();
final String metricName = metrics[i].getName();
metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i);
@ -285,8 +276,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build();
this.totalAggSize = currAggSize;
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<>();
// This should really be more generic
@ -320,56 +309,46 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
this.facts = createFactsTable();
}
protected ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable() {
return new ConcurrentSkipListMap<>();
}
public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
public IncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
bufferPool,
true
);
}
public abstract boolean canAppendRow();
public IncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool
)
{
this(incrementalIndexSchema, bufferPool, true);
}
public abstract String getOutOfRowsReason();
public IncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool,
protected abstract DimDim makeDimDim(String dimension);
protected abstract AggregatorType[] initAggs(
AggregatorFactory[] metrics,
ThreadLocal<InputRow> in,
boolean deserializeComplexMetrics
)
);
protected abstract Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
) throws IndexSizeExceededException;
protected abstract AggregatorType[] getAggsForRow(int rowOffset);
protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition);
protected abstract float getMetricFloatValue(int rowOffset, int aggOffset);
protected abstract long getMetricLongValue(int rowOffset, int aggOffset);
protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset);
@Override
public void close()
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
bufferPool,
deserializeComplexMetrics
);
// Nothing to close
}
public InputRow formatRow(InputRow row)
@ -396,7 +375,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
@ -438,7 +417,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
}
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
@ -450,31 +428,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Integer rowOffset;
synchronized (this) {
rowOffset = totalAggSize * numEntries.get();
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
if (rowOffset + totalAggSize > bufferHolder.get().limit()) {
facts.remove(key);
throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get());
}
numEntries.incrementAndGet();
for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
}
in.set(row);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
in.set(null);
return numEntries.get();
return addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in);
}
public boolean isEmpty()
@ -487,14 +441,14 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return numEntries.get();
}
public long getMinTimeMillis()
private long getMinTimeMillis()
{
return facts.firstKey().getTimestamp();
return getFacts().firstKey().getTimestamp();
}
public long getMaxTimeMillis()
private long getMaxTimeMillis()
{
return facts.lastKey().getTimestamp();
return getFacts().lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
@ -515,11 +469,21 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return retVal;
}
public AggregatorType[] getAggs()
{
return aggs;
}
public AggregatorFactory[] getMetricAggs()
{
return metrics;
}
public DimensionHolder getDimValues()
{
return dimValues;
}
public List<String> getDimensions()
{
return dimensions;
@ -530,16 +494,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return metricTypes.get(metric);
}
public long getMinTimestamp()
{
return minTimestamp;
}
public QueryGranularity getGranularity()
{
return gran;
}
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
@ -555,54 +509,34 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
}
DimDim getDimension(String dimension)
public DimDim getDimension(String dimension)
{
return isEmpty() ? null : dimValues.get(dimension);
}
Integer getDimensionIndex(String dimension)
public Integer getDimensionIndex(String dimension)
{
return dimensionOrder.get(dimension);
}
List<String> getMetricNames()
public List<String> getMetricNames()
{
return metricNames;
}
Integer getMetricIndex(String metricName)
public Integer getMetricIndex(String metricName)
{
return metricIndexes.get(metricName);
}
int getMetricPosition(int rowOffset, int metricIndex)
{
return rowOffset + aggPositionOffsets[metricIndex];
}
ByteBuffer getMetricBuffer()
{
return bufferHolder.get();
}
BufferAggregator getAggregator(int metricIndex)
{
return aggs[metricIndex];
}
ColumnCapabilities getCapabilities(String column)
public ColumnCapabilities getCapabilities(String column)
{
return columnCapabilities.get(column);
}
ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return facts;
}
ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return facts.subMap(start, end);
return getFacts().subMap(start, end);
}
@Override
@ -619,7 +553,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
public Iterator<Row> iterator()
{
return Iterators.transform(
facts.entrySet().iterator(),
getFacts().entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
@ -638,8 +572,9 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
}
AggregatorType[] aggs = getAggsForRow(rowOffset);
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i)));
theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
}
if (postAggs != null) {
@ -656,17 +591,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
};
}
@Override
public void close()
{
try {
bufferHolder.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
class DimensionHolder
{
private final Map<String, DimDim> dimensions;
@ -676,16 +600,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
dimensions = Maps.newConcurrentMap();
}
void reset()
{
dimensions.clear();
}
DimDim add(String dimension)
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = createDimDim(dimension);
holder = makeDimDim(dimension);
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
@ -699,8 +618,27 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
}
protected DimDim createDimDim(String dimension){
return new DimDimImpl();
static interface DimDim
{
public String get(String value);
public int getId(String value);
public String getValue(int id);
public boolean contains(String value);
public int size();
public int add(String value);
public int getSortedId(String value);
public String getSortedValue(int index);
public void sort();
public boolean compareCannonicalValues(String s1, String s2);
}
static class TimeAndDims implements Comparable<TimeAndDims>
@ -773,134 +711,17 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
"timestamp=" + new DateTime(timestamp) +
", dims=" + Lists.transform(
Arrays.asList(dims), new Function<String[], Object>()
{
@Override
public Object apply(@Nullable String[] input)
{
if (input == null || input.length == 0) {
return Arrays.asList("null");
{
@Override
public Object apply(@Nullable String[] input)
{
if (input == null || input.length == 0) {
return Arrays.asList("null");
}
return Arrays.asList(input);
}
}
return Arrays.asList(input);
}
}
) +
'}';
}
}
static interface DimDim
{
public String get(String value);
public int getId(String value);
public String getValue(int id);
public boolean contains(String value);
public int size();
public int add(String value);
public int getSortedId(String value);
public String getSortedValue(int index);
public void sort();
public boolean compareCannonicalValues(String s1, String s2);
}
private static class DimDimImpl implements DimDim{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap();
public DimDimImpl()
{
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
falseIds = biMap;
falseIdsReverse = biMap.inverse();
}
/**
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/
public String get(String str)
{
String prev = poorMansInterning.putIfAbsent(str, str);
return prev != null ? prev : str;
}
public int getId(String value)
{
if (value == null) {
value = "";
}
final Integer id = falseIds.get(value);
return id == null ? -1 : id;
}
public String getValue(int id)
{
return falseIdsReverse.get(id);
}
public boolean contains(String value)
{
return falseIds.containsKey(value);
}
public int size()
{
return falseIds.size();
}
public synchronized int add(String value)
{
int id = falseIds.size();
falseIds.put(value, id);
return id;
}
public int getSortedId(String value)
{
assertSorted();
return Arrays.binarySearch(sortedVals, value);
}
public String getSortedValue(int index)
{
assertSorted();
return sortedVals[index];
}
public void sort()
{
if (sortedVals == null) {
sortedVals = new String[falseIds.size()];
int index = 0;
for (String value : falseIds.keySet()) {
sortedVals[index++] = value;
}
Arrays.sort(sortedVals);
}
}
private void assertSorted()
{
if (sortedVals == null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
}
}
public boolean compareCannonicalValues(String s1, String s2)
{
return s1 ==s2;
) + '}';
}
}
}

View File

@ -46,11 +46,11 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{
private static final Logger log = new Logger(IncrementalIndexAdapter.class);
private final Interval dataInterval;
private final IncrementalIndex index;
private final IncrementalIndex<Object> index;
private final Map<String, Map<String, MutableBitmap>> invertedIndexes;
public IncrementalIndexAdapter(
Interval dataInterval, IncrementalIndex index, BitmapFactory bitmapFactory
Interval dataInterval, IncrementalIndex<Object> index, BitmapFactory bitmapFactory
)
{
this.dataInterval = dataInterval;
@ -205,8 +205,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
Object[] metrics = new Object[index.getMetricAggs().length];
for (int i = 0; i < metrics.length; i++) {
metrics[i] = index.getAggregator(i)
.get(index.getMetricBuffer(), index.getMetricPosition(rowOffset, i));
metrics[i] = index.getMetricObjectValue(rowOffset, i);
}
return new Rowboat(

View File

@ -1,4 +1,3 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
@ -30,7 +29,6 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory;
@ -352,17 +350,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final int metricIndex = metricIndexInt;
final BufferAggregator agg = index.getAggregator(metricIndex);
return new FloatColumnSelector()
{
@Override
public float get()
{
return agg.getFloat(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
);
return index.getMetricFloatValue(currEntry.getValue(), metricIndex);
}
};
}
@ -370,7 +363,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
if(columnName.equals(Column.TIME_COLUMN_NAME)){
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new LongColumnSelector()
{
@Override
@ -393,16 +386,15 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final int metricIndex = metricIndexInt;
final BufferAggregator agg = index.getAggregator(metricIndex);
return new LongColumnSelector()
{
@Override
public long get()
{
return agg.getLong(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
return index.getMetricLongValue(
currEntry.getValue(),
metricIndex
);
}
};
@ -417,7 +409,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final int metricIndex = metricIndexInt;
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(column));
final BufferAggregator agg = index.getAggregator(metricIndex);
return new ObjectColumnSelector()
{
@Override
@ -429,9 +420,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public Object get()
{
return agg.get(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
return index.getMetricObjectValue(
currEntry.getValue(),
metricIndex
);
}
};
@ -453,7 +444,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
public Object get()
{
final String[][] dims = currEntry.getKey().getDims();
if(dimensionIndex >= dims.length) {
if (dimensionIndex >= dims.length) {
return null;
}
final String[] dimVals = dims[dimensionIndex];
@ -562,7 +553,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
for (String dimVal : dims[dimIndex]) {
if (dimDim.compareCannonicalValues(id,dimVal)) {
if (dimDim.compareCannonicalValues(id, dimVal)) {
return true;
}
}

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.incremental;
import java.io.IOException;
public class IndexSizeExceededException extends IOException
{
public IndexSizeExceededException()
{
}
public IndexSizeExceededException(String message)
{
super(message);
}
public IndexSizeExceededException(String message, Throwable cause)
{
super(message, cause);
}
public IndexSizeExceededException(Throwable cause)
{
super(cause);
}
}

View File

@ -19,21 +19,27 @@
package io.druid.segment.incremental;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import org.mapdb.BTreeKeySerializer;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import org.mapdb.Store;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
@ -41,18 +47,80 @@ import java.util.Map;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.atomic.AtomicInteger;
public class OffheapIncrementalIndex extends IncrementalIndex
/**
*/
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{
private volatile DB db;
private volatile DB factsDb;
private static final long STORE_CHUNK_SIZE;
static
{
// MapDB allocated memory in chunks. We need to know CHUNK_SIZE
// in order to get a crude estimate of how much more direct memory
// might be used when adding an additional row.
try {
Field field = Store.class.getDeclaredField("CHUNK_SIZE");
field.setAccessible(true);
STORE_CHUNK_SIZE = field.getLong(null);
} catch(NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Unable to determine MapDB store chunk size", e);
}
}
private final ResourceHolder<ByteBuffer> bufferHolder;
private final DB db;
private final DB factsDb;
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final int maxTotalBufferSize;
private String outOfRowsReason = null;
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics,
int maxTotalBufferSize
)
{
super(incrementalIndexSchema, bufferPool);
super(incrementalIndexSchema, deserializeComplexMetrics);
this.bufferHolder = bufferPool.take();
Preconditions.checkArgument(
maxTotalBufferSize > bufferHolder.get().limit(),
"Maximum total buffer size must be greater than aggregation buffer size"
);
final AggregatorFactory[] metrics = incrementalIndexSchema.getMetrics();
this.aggPositionOffsets = new int[metrics.length];
int currAggSize = 0;
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize();
}
this.totalAggSize = currAggSize;
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
.transactionDisable()
.asyncWriteEnable()
.cacheLRUEnable()
.cacheSize(16384);
this.factsDb = dbMaker.make();
this.db = dbMaker.make();
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID())
.keySerializer(timeAndDimsSerializer)
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
this.maxTotalBufferSize = maxTotalBufferSize;
}
public OffheapIncrementalIndex(
@ -60,44 +128,177 @@ public class OffheapIncrementalIndex extends IncrementalIndex
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics
boolean deserializeComplexMetrics,
int maxTotalBufferSize
)
{
super(minTimestamp, gran, metrics, bufferPool, deserializeComplexMetrics);
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
bufferPool,
deserializeComplexMetrics,
maxTotalBufferSize
);
}
@Override
protected synchronized ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable()
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
if (factsDb == null) {
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
.transactionDisable()
.asyncWriteEnable()
.cacheSoftRefEnable();
factsDb = dbMaker.make();
db = dbMaker.make();
}
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
return factsDb.createTreeMap("__facts" + UUID.randomUUID())
.keySerializer(timeAndDimsSerializer)
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
return facts;
}
@Override
protected DimDim createDimDim(String dimension)
protected DimDim makeDimDim(String dimension)
{
return new OffheapDimDim(dimension);
}
public static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
@Override
protected BufferAggregator[] initAggs(
AggregatorFactory[] metrics,
ThreadLocal<InputRow> in,
boolean deserializeComplexMetrics
)
{
BufferAggregator[] aggs = new BufferAggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
makeColumnSelectorFactory(agg, in, deserializeComplexMetrics)
);
}
return aggs;
}
@Override
protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
) throws IndexSizeExceededException
{
final BufferAggregator[] aggs = getAggs();
Integer rowOffset;
synchronized (this) {
if (!facts.containsKey(key)) {
if (!canAppendRow(false)) {
throw new IndexSizeExceededException(getOutOfRowsReason());
}
}
rowOffset = totalAggSize * numEntries.get();
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
numEntries.incrementAndGet();
for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
}
in.set(row);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
in.set(null);
return numEntries.get();
}
public boolean canAppendRow() {
return canAppendRow(true);
}
private boolean canAppendRow(boolean includeFudgeFactor)
{
// there is a race condition when checking current MapDB
// when canAppendRow() is called after adding a row it may return true, but on a subsequence call
// to addToFacts that may not be the case anymore because MapDB size may have changed.
// so we add this fudge factor, hoping that will be enough.
final int aggBufferSize = bufferHolder.get().limit();
if ((size() + 1) * totalAggSize > aggBufferSize) {
outOfRowsReason = String.format("Maximum aggregation buffer limit reached [%d bytes].", aggBufferSize);
return false;
}
// hopefully both MapDBs will grow by at most STORE_CHUNK_SIZE each when we add the next row.
if (getCurrentSize() + totalAggSize + 2 * STORE_CHUNK_SIZE + (includeFudgeFactor ? STORE_CHUNK_SIZE : 0) > maxTotalBufferSize) {
outOfRowsReason = String.format("Maximum time and dimension buffer limit reached [%d bytes].", maxTotalBufferSize - aggBufferSize);
return false;
}
return true;
}
public String getOutOfRowsReason() {
return outOfRowsReason;
}
@Override
protected BufferAggregator[] getAggsForRow(int rowOffset)
{
return getAggs();
}
@Override
protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition)
{
return agg.get(bufferHolder.get(), getMetricPosition(rowOffset, aggPosition));
}
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return getAggs()[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return getAggs()[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
return getAggs()[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public void close()
{
try {
bufferHolder.close();
Store.forDB(db).close();
Store.forDB(factsDb).close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
private int getMetricPosition(int rowOffset, int metricIndex)
{
return rowOffset + aggPositionOffsets[metricIndex];
}
private DimDim getDimDim(int dimIndex)
{
return getDimValues().get(getDimensions().get(dimIndex));
}
// MapDB forces serializers to implement serializable, which sucks
private static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
{
private final TimeAndDimsComparator comparator;
private final transient IncrementalIndex incrementalIndex;
private final transient OffheapIncrementalIndex incrementalIndex;
TimeAndDimsSerializer(IncrementalIndex incrementalIndex)
TimeAndDimsSerializer(OffheapIncrementalIndex incrementalIndex)
{
this.comparator = new TimeAndDimsComparator();
this.incrementalIndex = incrementalIndex;
@ -115,7 +316,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex
if (dims == null) {
out.write(-1);
} else {
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index));
DimDim dimDim = incrementalIndex.getDimDim(index);
out.writeInt(dims.length);
for (String value : dims) {
out.writeInt(dimDim.getId(value));
@ -136,7 +337,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex
for (int k = 0; k < dims.length; k++) {
int len = in.readInt();
if (len != -1) {
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k));
DimDim dimDim = incrementalIndex.getDimDim(k);
String[] col = new String[len];
for (int l = 0; l < col.length; l++) {
col[l] = dimDim.get(dimDim.getValue(in.readInt()));
@ -156,7 +357,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex
}
}
public static class TimeAndDimsComparator implements Comparator, Serializable
private static class TimeAndDimsComparator implements Comparator, Serializable
{
@Override
public int compare(Object o1, Object o2)
@ -169,8 +370,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex
{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private final WeakHashMap<String, WeakReference<String>> cache =
new WeakHashMap();
private final WeakHashMap<String, WeakReference<String>> cache = new WeakHashMap();
private volatile String[] sortedVals = null;
// size on MapDB is slow so maintain a count here
private volatile int size = 0;
@ -271,4 +472,11 @@ public class OffheapIncrementalIndex extends IncrementalIndex
}
}
private long getCurrentSize()
{
return Store.forDB(db).getCurrSize() +
Store.forDB(factsDb).getCurrSize()
// Size of aggregators
+ size() * totalAggSize;
}
}

View File

@ -0,0 +1,303 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.incremental;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final List<Aggregator[]> aggList = Lists.newArrayList();
private final int maxRowCount;
private String outOfRowsReason = null;
public OnheapIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, int maxRowCount)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
this.facts = new ConcurrentSkipListMap<>();
this.maxRowCount = maxRowCount;
}
public OnheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
int maxRowCount
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
deserializeComplexMetrics,
maxRowCount
);
}
public OnheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
int maxRowCount
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
true,
maxRowCount
);
}
public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
int maxRowCount
)
{
this(incrementalIndexSchema, true, maxRowCount);
}
@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
@Override
protected DimDim makeDimDim(String dimension)
{
return new OnHeapDimDim();
}
@Override
protected Aggregator[] initAggs(
AggregatorFactory[] metrics, ThreadLocal<InputRow> in, boolean deserializeComplexMetrics
)
{
return new Aggregator[metrics.length];
}
@Override
protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
) throws IndexSizeExceededException
{
Integer rowOffset;
synchronized (this) {
rowOffset = numEntries.get();
if(rowOffset >= maxRowCount && !facts.containsKey(key)) {
throw new IndexSizeExceededException("Maximum number of rows reached");
}
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
Aggregator[] aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
makeColumnSelectorFactory(agg, in, deserializeComplexMetrics)
);
}
aggList.add(aggs);
numEntries.incrementAndGet();
}
}
in.set(row);
final Aggregator[] aggs = aggList.get(rowOffset);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate();
}
}
in.set(null);
return numEntries.get();
}
@Override
public boolean canAppendRow()
{
final boolean canAdd = size() < maxRowCount;
if(!canAdd) {
outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount);
}
return canAdd;
}
@Override
public String getOutOfRowsReason()
{
return outOfRowsReason;
}
@Override
protected Aggregator[] getAggsForRow(int rowOffset)
{
return aggList.get(rowOffset);
}
@Override
protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition)
{
return agg.get();
}
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return aggList.get(rowOffset)[aggOffset].getFloat();
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return aggList.get(rowOffset)[aggOffset].getLong();
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
return aggList.get(rowOffset)[aggOffset].get();
}
private static class OnHeapDimDim implements DimDim
{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap();
public OnHeapDimDim()
{
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
falseIds = biMap;
falseIdsReverse = biMap.inverse();
}
/**
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
*
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/
public String get(String str)
{
String prev = poorMansInterning.putIfAbsent(str, str);
return prev != null ? prev : str;
}
public int getId(String value)
{
if (value == null) {
value = "";
}
final Integer id = falseIds.get(value);
return id == null ? -1 : id;
}
public String getValue(int id)
{
return falseIdsReverse.get(id);
}
public boolean contains(String value)
{
return falseIds.containsKey(value);
}
public int size()
{
return falseIds.size();
}
public synchronized int add(String value)
{
int id = falseIds.size();
falseIds.put(value, id);
return id;
}
public int getSortedId(String value)
{
assertSorted();
return Arrays.binarySearch(sortedVals, value);
}
public String getSortedValue(int index)
{
assertSorted();
return sortedVals[index];
}
public void sort()
{
if (sortedVals == null) {
sortedVals = new String[falseIds.size()];
int index = 0;
for (String value : falseIds.keySet()) {
sortedVals[index++] = value;
}
Arrays.sort(sortedVals);
}
}
private void assertSorted()
{
if (sortedVals == null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
}
}
public boolean compareCannonicalValues(String s1, String s2)
{
return s1 == s2;
}
}
}

View File

@ -39,6 +39,7 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -52,9 +53,8 @@ public class TimeseriesQueryRunnerBonusTest
@Test
public void testOneRowAtATime() throws Exception
{
final IncrementalIndex oneRowIndex = new IncrementalIndex(
new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{},
TestQueryRunners.pool
final IncrementalIndex oneRowIndex = new OnheapIncrementalIndex(
new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{}, 1000
);
List<Result<TimeseriesResultValue>> results;

View File

@ -28,6 +28,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -48,11 +49,11 @@ public class EmptyIndexTest
}
tmpDir.deleteOnExit();
IncrementalIndex emptyIndex = new IncrementalIndex(
IncrementalIndex emptyIndex = new OnheapIncrementalIndex(
0,
QueryGranularity.NONE,
new AggregatorFactory[0],
TestQueryRunners.pool
1000
);
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(
new Interval("2012-08-01/P3D"),

View File

@ -25,11 +25,12 @@ import com.google.common.collect.Lists;
import com.google.common.io.Files;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
@ -50,7 +51,8 @@ public class IndexMergerTest
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist = IncrementalIndexTest.createCaseInsensitiveIndex(timestamp);
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true);
IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = Files.createTempDir();
try {
@ -58,7 +60,7 @@ public class IndexMergerTest
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
Assert.assertEquals(2, index.getColumnNames().size());
Assert.assertEquals(3, index.getColumnNames().size());
}
finally {
tempDir.delete();
@ -69,9 +71,10 @@ public class IndexMergerTest
public void testPersistMerge() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(timestamp);
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, 1000);
toPersist2.add(
new MapBasedInputRow(
@ -97,25 +100,25 @@ public class IndexMergerTest
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(2, index1.getColumnNames().size());
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2));
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(2, index2.getColumnNames().size());
Assert.assertEquals(3, index2.getColumnNames().size());
QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(index1, index2),
new AggregatorFactory[]{},
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir
)
);
Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(2, merged.getColumnNames().size());
Assert.assertEquals(3, merged.getColumnNames().size());
}
finally {
FileUtils.deleteQuietly(tempDir1);
@ -127,8 +130,8 @@ public class IndexMergerTest
@Test
public void testPersistEmptyColumn() throws Exception
{
final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, 10);
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, 10);
final File tmpDir1 = Files.createTempDir();
final File tmpDir2 = Files.createTempDir();
final File tmpDir3 = Files.createTempDir();

View File

@ -40,6 +40,8 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -134,7 +136,7 @@ public class SchemalessIndex
final long timestamp = new DateTime(event.get(TIMESTAMP)).getMillis();
if (theIndex == null) {
theIndex = new IncrementalIndex(timestamp, QueryGranularity.MINUTE, METRIC_AGGS, TestQueryRunners.pool);
theIndex = new OnheapIncrementalIndex(timestamp, QueryGranularity.MINUTE, METRIC_AGGS, 1000);
}
final List<String> dims = Lists.newArrayList();
@ -144,7 +146,11 @@ public class SchemalessIndex
}
}
theIndex.add(new MapBasedInputRow(timestamp, dims, event));
try {
theIndex.add(new MapBasedInputRow(timestamp, dims, event));
} catch(IndexSizeExceededException e) {
Throwables.propagate(e);
}
count++;
}
@ -330,8 +336,8 @@ public class SchemalessIndex
}
}
final IncrementalIndex rowIndex = new IncrementalIndex(
timestamp, QueryGranularity.MINUTE, METRIC_AGGS, TestQueryRunners.pool
final IncrementalIndex rowIndex = new OnheapIncrementalIndex(
timestamp, QueryGranularity.MINUTE, METRIC_AGGS, 1000
);
rowIndex.add(
@ -360,8 +366,8 @@ public class SchemalessIndex
String filename = resource.getFile();
log.info("Realtime loading index file[%s]", filename);
final IncrementalIndex retVal = new IncrementalIndex(
new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.MINUTE, aggs, TestQueryRunners.pool
final IncrementalIndex retVal = new OnheapIncrementalIndex(
new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.MINUTE, aggs, 1000
);
try {

View File

@ -39,6 +39,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -156,20 +157,22 @@ public class TestIndex
final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
log.info("Realtime loading index file[%s]", resource);
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.withQueryGranularity(QueryGranularity.NONE)
.withMetrics(METRIC_AGGS)
.build();
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.withQueryGranularity(QueryGranularity.NONE)
.withMetrics(METRIC_AGGS)
.build();
final IncrementalIndex retVal;
if (useOffheap) {
retVal = new OffheapIncrementalIndex(
schema,
TestQueryRunners.pool
TestQueryRunners.pool,
true,
100 * 1024 * 1024
);
} else {
retVal = new IncrementalIndex(
retVal = new OnheapIncrementalIndex(
schema,
TestQueryRunners.pool
10000
);
}

View File

@ -27,11 +27,18 @@ import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import junit.framework.Assert;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -41,16 +48,73 @@ import java.util.concurrent.TimeUnit;
/**
*/
@RunWith(Parameterized.class)
public class IncrementalIndexTest
{
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp)
interface IndexCreator
{
IncrementalIndex index = new IncrementalIndex(
0L, QueryGranularity.NONE, new AggregatorFactory[]{},
TestQueryRunners.pool
);
public IncrementalIndex createIndex();
}
private final IndexCreator indexCreator;
public IncrementalIndexTest(
IndexCreator indexCreator
)
{
this.indexCreator = indexCreator;
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
return Arrays.asList(
new Object[][]{
{
new IndexCreator()
{
@Override
public IncrementalIndex createIndex()
{
return IncrementalIndexTest.createIndex(true);
}
}
},
{
new IndexCreator()
{
@Override
public IncrementalIndex createIndex()
{
return IncrementalIndexTest.createIndex(false);
}
}
}
}
);
}
public static IncrementalIndex createIndex(boolean offheap)
{
if (offheap) {
return new OffheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool,
true,
100 * 1024 * 1024
);
} else {
return new OnheapIncrementalIndex(
0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, 1000
);
}
}
public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException
{
index.add(
new MapBasedInputRow(
timestamp,
@ -66,7 +130,6 @@ public class IncrementalIndexTest
ImmutableMap.<String, Object>of("dim1", "3", "dim2", "4")
)
);
return index;
}
public static MapBasedInputRow getRow(long timestamp, int rowID, int dimensionCount)
@ -84,10 +147,9 @@ public class IncrementalIndexTest
@Test
public void testCaseSensitivity() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex index = createCaseInsensitiveIndex(timestamp);
long timestamp = System.currentTimeMillis();
IncrementalIndex index = indexCreator.createIndex();
populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensions());
Assert.assertEquals(2, index.size());
@ -106,12 +168,7 @@ public class IncrementalIndexTest
@Test
public void testConcurrentAdd() throws Exception
{
final IncrementalIndex index = new IncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool
);
final IncrementalIndex index = indexCreator.createIndex();
final int threadCount = 10;
final int elementsPerThread = 200;
final int dimensionCount = 5;
@ -147,9 +204,31 @@ public class IncrementalIndexTest
while (iterator.hasNext()) {
Row row = iterator.next();
Assert.assertEquals(timestamp + curr, row.getTimestampFromEpoch());
Assert.assertEquals(Float.valueOf(threadCount), row.getFloatMetric("count"));
Assert.assertEquals(Float.valueOf(threadCount), (Float)row.getFloatMetric("count"));
curr++;
}
Assert.assertEquals(elementsPerThread, curr);
}
@Test
public void testOffheapIndexIsFull() throws IndexSizeExceededException
{
OffheapIncrementalIndex index = new OffheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool,
true,
(10 + 2) * 1024 * 1024
);
int rowCount = 0;
for (int i = 0; i < 500; i++) {
rowCount = index.add(getRow(System.currentTimeMillis(), i, 100));
if (!index.canAppendRow()) {
break;
}
}
Assert.assertTrue("rowCount : " + rowCount, rowCount > 200 && rowCount < 600);
}
}

View File

@ -53,6 +53,7 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
@ -72,6 +73,7 @@ import java.util.Random;
@RunWith(Parameterized.class)
public class SpatialFilterBonusTest
{
public static final int NUM_POINTS = 5000;
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
@ -108,7 +110,7 @@ public class SpatialFilterBonusTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
IncrementalIndex theIndex = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -124,8 +126,8 @@ public class SpatialFilterBonusTest
)
)
).build(),
TestQueryRunners.pool,
false
false,
NUM_POINTS
);
theIndex.add(
new MapBasedInputRow(
@ -202,7 +204,7 @@ public class SpatialFilterBonusTest
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
for (int i = 6; i < NUM_POINTS; i++) {
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
@ -239,7 +241,7 @@ public class SpatialFilterBonusTest
private static QueryableIndex makeMergedQueryableIndex()
{
try {
IncrementalIndex first = new IncrementalIndex(
IncrementalIndex first = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -256,10 +258,10 @@ public class SpatialFilterBonusTest
)
).build(),
TestQueryRunners.pool,
false
false,
NUM_POINTS
);
IncrementalIndex second = new IncrementalIndex(
IncrementalIndex second = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -275,10 +277,10 @@ public class SpatialFilterBonusTest
)
)
).build(),
TestQueryRunners.pool,
false
false,
NUM_POINTS
);
IncrementalIndex third = new IncrementalIndex(
IncrementalIndex third = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -295,8 +297,8 @@ public class SpatialFilterBonusTest
)
).build(),
TestQueryRunners.pool,
false
false,
NUM_POINTS
);
@ -375,7 +377,7 @@ public class SpatialFilterBonusTest
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
for (int i = 6; i < NUM_POINTS; i++) {
third.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),

View File

@ -54,6 +54,7 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
@ -72,6 +73,7 @@ import java.util.Random;
@RunWith(Parameterized.class)
public class SpatialFilterTest
{
public static final int NUM_POINTS = 5000;
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
@ -104,7 +106,7 @@ public class SpatialFilterTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
IncrementalIndex theIndex = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -125,9 +127,10 @@ public class SpatialFilterTest
)
)
).build(),
TestQueryRunners.pool,
false
false,
NUM_POINTS
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
@ -233,7 +236,7 @@ public class SpatialFilterTest
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
for (int i = 8; i < NUM_POINTS; i++) {
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
@ -267,7 +270,7 @@ public class SpatialFilterTest
private static QueryableIndex makeMergedQueryableIndex()
{
try {
IncrementalIndex first = new IncrementalIndex(
IncrementalIndex first = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -288,10 +291,10 @@ public class SpatialFilterTest
)
)
).build(),
TestQueryRunners.pool,
false
false,
1000
);
IncrementalIndex second = new IncrementalIndex(
IncrementalIndex second = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -312,10 +315,10 @@ public class SpatialFilterTest
)
)
).build(),
TestQueryRunners.pool,
false
false,
1000
);
IncrementalIndex third = new IncrementalIndex(
IncrementalIndex third = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -336,8 +339,8 @@ public class SpatialFilterTest
)
)
).build(),
TestQueryRunners.pool,
false
false,
NUM_POINTS
);
@ -446,7 +449,7 @@ public class SpatialFilterTest
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
for (int i = 8; i < NUM_POINTS; i++) {
third.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),

View File

@ -49,25 +49,79 @@ import io.druid.segment.DimensionSelector;
import io.druid.segment.filter.SelectorFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
/**
*/
@RunWith(Parameterized.class)
public class IncrementalIndexStorageAdapterTest
{
interface IndexCreator
{
public IncrementalIndex createIndex();
}
private final IndexCreator indexCreator;
public IncrementalIndexStorageAdapterTest(
IndexCreator IndexCreator
)
{
this.indexCreator = IndexCreator;
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
return Arrays.asList(
new Object[][]{
{ new IndexCreator()
{
@Override
public IncrementalIndex createIndex()
{
return new OnheapIncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
}
}
},
{
new IndexCreator()
{
@Override
public IncrementalIndex createIndex()
{
return new OffheapIncrementalIndex(
0,
QueryGranularity.MINUTE,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool,
true,
100 * 1024 * 1024
);
}
}
}
}
);
}
@Test
public void testSanity() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
);
IncrementalIndex index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
@ -111,10 +165,7 @@ public class IncrementalIndexStorageAdapterTest
@Test
public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, TestQueryRunners.pool
);
IncrementalIndex index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
new DateTime("2014-09-01T00:00:00"),
@ -196,13 +247,9 @@ public class IncrementalIndexStorageAdapterTest
}
@Test
public void testResetSanity() {
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
);
public void testResetSanity() throws IOException{
IncrementalIndex index = indexCreator.createIndex();
DateTime t = DateTime.now();
Interval interval = new Interval(t.minusMinutes(1), t.plusMinutes(1));
@ -248,13 +295,9 @@ public class IncrementalIndexStorageAdapterTest
}
@Test
public void testSingleValueTopN()
public void testSingleValueTopN() throws IOException
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
);
IncrementalIndex index = indexCreator.createIndex();
DateTime t = DateTime.now();
index.add(
new MapBasedInputRow(
@ -306,11 +349,7 @@ public class IncrementalIndexStorageAdapterTest
@Test
public void testFilterByNull() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
);
IncrementalIndex index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),

View File

@ -46,6 +46,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final ShardSpec defaultShardSpec = new NoneShardSpec();
private static final boolean defaultPersistInHeap = false;
private static final boolean defaultIngestOffheap = false;
private static final int defaultBufferSize = 128 * 1024* 1024; // 128M
// Might make sense for this to be a builder
@ -61,7 +62,8 @@ public class RealtimeTuningConfig implements TuningConfig
defaultMaxPendingPersists,
defaultShardSpec,
defaultPersistInHeap,
defaultIngestOffheap
defaultIngestOffheap,
defaultBufferSize
);
}
@ -75,6 +77,7 @@ public class RealtimeTuningConfig implements TuningConfig
private final ShardSpec shardSpec;
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
@JsonCreator
public RealtimeTuningConfig(
@ -87,7 +90,8 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("persistInHeap") Boolean persistInHeap,
@JsonProperty("ingestOffheap") Boolean ingestOffheap
@JsonProperty("ingestOffheap") Boolean ingestOffheap,
@JsonProperty("buffersize") Integer bufferSize
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -104,6 +108,8 @@ public class RealtimeTuningConfig implements TuningConfig
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap;
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
}
@JsonProperty
@ -165,6 +171,11 @@ public class RealtimeTuningConfig implements TuningConfig
return ingestOffheap;
}
@JsonProperty
public int getBufferSize(){
return bufferSize;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
@ -177,7 +188,8 @@ public class RealtimeTuningConfig implements TuningConfig
maxPendingPersists,
shardSpec,
persistInHeap,
ingestOffheap
ingestOffheap,
bufferSize
);
}
@ -193,7 +205,8 @@ public class RealtimeTuningConfig implements TuningConfig
maxPendingPersists,
shardSpec,
persistInHeap,
ingestOffheap
ingestOffheap,
bufferSize
);
}
}

View File

@ -31,7 +31,6 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Processing;
@ -44,10 +43,11 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -237,19 +237,27 @@ public class RealtimeManager implements QuerySegmentWalker
continue;
}
int currCount = plumber.add(inputRow);
if (currCount == -1) {
boolean lateEvent = false;
boolean indexLimitExceeded = false;
try {
lateEvent = plumber.add(inputRow) == -1;
} catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
indexLimitExceeded = true;
}
if (indexLimitExceeded || lateEvent) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
if (indexLimitExceeded || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue;
}
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime.plumber;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.incremental.IndexSizeExceededException;
public interface Plumber
{
@ -36,7 +37,7 @@ public interface Plumber
* @return - positive numbers indicate how many summarized rows exist in the index for that timestamp,
* -1 means a row was thrown away because it was too late
*/
public int add(InputRow row);
public int add(InputRow row) throws IndexSizeExceededException;
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
@ -52,4 +53,6 @@ public interface Plumber
* fed into sinks and persisted.
*/
public void finishJob();
public Sink getSink(long timestamp);
}

View File

@ -38,6 +38,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -158,7 +159,7 @@ public class RealtimePlumber implements Plumber
}
@Override
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
final Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {

View File

@ -31,7 +31,9 @@ import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireHydrant;
@ -111,7 +113,7 @@ public class Sink implements Iterable<FireHydrant>
return currHydrant;
}
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
if (currHydrant == null) {
throw new IAE("No currHydrant but given row[%s]", row);
@ -126,6 +128,13 @@ public class Sink implements Iterable<FireHydrant>
}
}
public boolean canAppendRow()
{
synchronized (currHydrant) {
return currHydrant != null && currHydrant.getIndex().canAppendRow();
}
}
public boolean isEmpty()
{
synchronized (hydrantLock) {
@ -176,11 +185,6 @@ public class Sink implements Iterable<FireHydrant>
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
int aggsSize = 0;
for (AggregatorFactory agg : schema.getAggregators()) {
aggsSize += agg.getMaxIntermediateSize();
}
int bufferSize = aggsSize * config.getMaxRowsInMemory();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
@ -191,12 +195,15 @@ public class Sink implements Iterable<FireHydrant>
if (config.isIngestOffheap()) {
newIndex = new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
// Assuming half space for aggregates
new OffheapBufferPool(config.getBufferSize()),
true,
config.getBufferSize()
);
} else {
newIndex = new IncrementalIndex(
newIndex = new OnheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
config.getMaxRowsInMemory()
);
}

View File

@ -76,7 +76,7 @@ public class FireDepartmentTest
)
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, false, false
null, null, null, null, null, null, null, null, false, false, null
)
);

View File

@ -34,6 +34,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -106,6 +107,7 @@ public class RealtimeManagerTest
null,
null,
null,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));
@ -267,7 +269,7 @@ public class RealtimeManagerTest
}
@Override
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
if (row == null) {
return -1;

View File

@ -167,6 +167,7 @@ public class RealtimePlumberSchoolTest
null,
null,
null,
null,
null
);

View File

@ -65,7 +65,8 @@ public class SinkTest
null,
null,
false,
false
false,
null
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);