Allow use of non-threadsafe ObjectCachingColumnSelectorFactory (#4397)

* Adding a flag to indicate when ObjectCachingColumnSelectorFactory need not be threadsafe.

* - Use of computeIfAbsent over putIfAbsent
- Replace Maps.newXXXMap() with normal instantiation
- Documentations on when is thread-safe required.
- Use Builders for On/OffheapIncrementalIndex

* - Optimization on computeIfAbsent
- Constant EMPTY DimensionsSpec
- Improvement on IncrementalIndexSchema.Builder
  - Remove setting of default values
  - Use var args for metrics
- Correction on On/OffheapIncrementalIndex Builders
- Combine On/OffheapIncrementalIndex Builders

* - Removing unused imports.

* - Helper method for testing with IncrementalIndex.Builder

* - Correction on javadoc.

* Style fix
This commit is contained in:
Goh Wei Xiang 2017-06-16 14:04:19 -07:00 committed by Roman Leventov
parent e78d8584a1
commit f68a0693f3
63 changed files with 1119 additions and 1116 deletions

View File

@ -44,10 +44,7 @@ public class DimensionsSpec
private final Set<String> dimensionExclusions;
private final Map<String, DimensionSchema> dimensionSchemaMap;
public static DimensionsSpec ofEmpty()
{
return new DimensionsSpec(null, null, null);
}
public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, null);
public static List<DimensionSchema> getDefaultSchemas(List<String> dimNames)
{

View File

@ -29,7 +29,6 @@ import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -72,8 +71,6 @@ import io.druid.segment.filter.Filters;
import io.druid.segment.filter.OrFilter;
import io.druid.segment.filter.SelectorFilter;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
@ -228,17 +225,11 @@ public class FilterPartitionBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
@Benchmark

View File

@ -28,7 +28,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.benchmark.query.QueryBenchmarkUtil;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -73,8 +72,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
@ -239,17 +236,11 @@ public class FilteredAggregatorBenchmark
private IncrementalIndex makeIncIndex(AggregatorFactory[] metrics)
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(metrics)
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(metrics)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)

View File

@ -36,7 +36,6 @@ import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -73,8 +72,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
@ -433,17 +430,12 @@ public class GroupByTypeInterfaceBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
@TearDown(Level.Trial)

View File

@ -22,13 +22,11 @@ package io.druid.benchmark;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
@ -120,15 +118,12 @@ public class IncrementalIndexRowTypeBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
0,
Granularities.NONE,
aggs,
false,
false,
true,
maxRows
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(aggs)
.setDeserializeComplexMetrics(false)
.setReportParseExceptions(false)
.setMaxRowCount(maxRows)
.buildOnheap();
}
@Setup

View File

@ -30,7 +30,6 @@ import io.druid.benchmark.query.QueryBenchmarkUtil;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -70,8 +69,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -314,17 +311,11 @@ public class TopNTypeInterfaceBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)

View File

@ -24,7 +24,6 @@ import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
@ -49,7 +48,6 @@ import io.druid.segment.data.IndexedInts;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -124,18 +122,16 @@ public class IncrementalIndexReadBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
@Benchmark

View File

@ -23,14 +23,11 @@ import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -104,18 +101,16 @@ public class IndexIngestionBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
true,
rowsPerSegment * 2
);
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment * 2)
.buildOnheap();
}
@Benchmark

View File

@ -25,10 +25,8 @@ import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
@ -39,7 +37,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
@ -161,18 +158,16 @@ public class IndexMergeBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
true,
rowsPerSegment
);
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
@Benchmark

View File

@ -25,10 +25,8 @@ import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
@ -38,7 +36,6 @@ import io.druid.segment.IndexSpec;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
@ -155,18 +152,16 @@ public class IndexPersistBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
true,
rowsPerSegment
);
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
@Benchmark

View File

@ -36,7 +36,6 @@ import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -76,7 +75,6 @@ import io.druid.segment.column.ColumnConfig;
import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
@ -476,18 +474,17 @@ public class GroupByBenchmark
private IncrementalIndex makeIncIndex(boolean withRollup)
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(withRollup)
.build(),
true,
false,
true,
rowsPerSegment
);
.build()
)
.setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
@TearDown(Level.Trial)

View File

@ -32,7 +32,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -76,8 +75,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
@ -388,17 +385,11 @@ public class SearchBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)

View File

@ -31,7 +31,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -66,8 +65,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
@ -252,17 +249,11 @@ public class SelectBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)

View File

@ -28,7 +28,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -70,8 +69,6 @@ import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
@ -312,17 +309,11 @@ public class TimeseriesBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)

View File

@ -29,7 +29,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
@ -68,8 +67,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
@ -292,17 +289,11 @@ public class TopNBenchmark
private IncrementalIndex makeIncIndex()
{
return new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.build(),
true,
false,
true,
rowsPerSegment
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)

View File

@ -25,7 +25,6 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
@ -40,7 +39,7 @@ import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import org.junit.Test;
import java.util.Arrays;
@ -56,9 +55,17 @@ public class DistinctCountGroupByQueryTest
config.setMaxIntermediateRows(10000);
final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
IncrementalIndex index = new OnheapIncrementalIndex(
0, Granularities.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND)
.withMetrics(new CountAggregatorFactory("cnt"))
.build()
)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.buildOnheap();
String visitor_id = "visitor_id";
String client_type = "client_type";
long timestamp = System.currentTimeMillis();

View File

@ -27,15 +27,14 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.junit.Test;
@ -50,9 +49,16 @@ public class DistinctCountTimeseriesQueryTest
{
TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
IncrementalIndex index = new OnheapIncrementalIndex(
0, Granularities.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND)
.withMetrics(new CountAggregatorFactory("cnt"))
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
String visitor_id = "visitor_id";
String client_type = "client_type";
DateTime time = new DateTime("2016-03-04T00:00:00.000Z");

View File

@ -29,7 +29,6 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
@ -37,8 +36,8 @@ import io.druid.query.topn.TopNQueryEngine;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.junit.Test;
@ -68,9 +67,16 @@ public class DistinctCountTopNQueryTest
)
);
IncrementalIndex index = new OnheapIncrementalIndex(
0, Granularities.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND)
.withMetrics(new CountAggregatorFactory("cnt"))
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
String visitor_id = "visitor_id";
String client_type = "client_type";
DateTime time = new DateTime("2016-03-04T00:00:00.000Z");

View File

@ -39,7 +39,6 @@ import io.druid.segment.Segment;
import io.druid.segment.TestIndex;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.commons.io.IOUtils;
@ -150,7 +149,10 @@ public class MultiSegmentScanQueryTest
.withQueryGranularity(Granularities.HOUR)
.withMetrics(TestIndex.METRIC_AGGS)
.build();
return new OnheapIncrementalIndex(schema, true, maxRowCount);
return new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(maxRowCount)
.buildOnheap();
}
@AfterClass

View File

@ -30,7 +30,6 @@ import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.QueryRunner;
@ -46,7 +45,6 @@ import io.druid.query.select.SelectQueryRunnerFactory;
import io.druid.query.select.SelectResultValue;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@ -87,9 +85,11 @@ public class MapVirtualColumnTest
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.withQueryGranularity(Granularities.NONE)
.build();
final IncrementalIndex index = new OnheapIncrementalIndex(schema, true, 10000);
final IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(10000)
.buildOnheap();
final StringInputRowParser parser = new StringInputRowParser(
new DelimitedParseSpec(

View File

@ -27,7 +27,6 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.QueryContexts;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
@ -96,18 +95,16 @@ public class QuantileSqlAggregatorTest
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new ApproximateHistogramAggregatorFactory(
"hist_m1",
"m1",
null,
null,
null,
null
)
}
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new ApproximateHistogramAggregatorFactory(
"hist_m1",
"m1",
null,
null,
null,
null
)
)
.withRollup(false)
.build()

View File

@ -49,7 +49,6 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.ShardSpec;
@ -237,11 +236,11 @@ public class IndexGeneratorJob implements Jobby
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
.build();
OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex(
indexSchema,
!tuningConfig.isIgnoreInvalidRows(),
tuningConfig.getRowFlushBoundary()
);
IncrementalIndex newIndex = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows())
.setMaxRowCount(tuningConfig.getRowFlushBoundary())
.buildOnheap();
if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities);

View File

@ -54,10 +54,8 @@ import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
@ -65,8 +63,8 @@ import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
@ -133,21 +131,17 @@ public class IngestSegmentFirehoseFactoryTest
}
);
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMinTimestamp(JodaUtils.MIN_INSTANT)
.withDimensionsSpec(ROW_PARSER)
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME),
new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)
}
new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME),
new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)
)
.build();
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(
schema,
true,
MAX_ROWS * MAX_SHARD_NUMBER
);
final IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(MAX_ROWS * MAX_SHARD_NUMBER)
.buildOnheap();
for (Integer i = 0; i < MAX_ROWS; ++i) {
index.add(ROW_PARSER.parse(buildRow(i.longValue())));

View File

@ -49,17 +49,15 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.NoopDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
@ -211,16 +209,14 @@ public class IngestSegmentFirehoseFactoryTimelineTest
{
final File persistDir = new File(tmpDir, UUID.randomUUID().toString());
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMinTimestamp(JodaUtils.MIN_INSTANT)
.withDimensionsSpec(ROW_PARSER)
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory(METRICS[0], METRICS[0])
}
)
.withMetrics(new LongSumAggregatorFactory(METRICS[0], METRICS[0]))
.build();
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(schema, true, rows.length);
final IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(rows.length)
.buildOnheap();
for (InputRow row : rows) {
try {

View File

@ -44,8 +44,6 @@ import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
@ -120,22 +118,21 @@ public class GroupByQueryHelper
.build();
if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
indexSchema,
false,
true,
sortResults,
querySpecificConfig.getMaxResults(),
bufferPool
);
index = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.buildOffheap(bufferPool);
} else {
index = new OnheapIncrementalIndex(
indexSchema,
false,
true,
sortResults,
querySpecificConfig.getMaxResults()
);
index = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.buildOnheap();
}
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
@ -67,6 +68,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -75,6 +77,7 @@ import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
@ -210,16 +213,21 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* Set concurrentEventAdd to true to indicate that adding of input row should be thread-safe (for example, groupBy
* where the multiple threads can add concurrently to the IncrementalIndex).
*
* @param incrementalIndexSchema the schema to use for incremental index
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
* @param reportParseExceptions flag whether or not to report ParseExceptions that occur while extracting values
* from input rows
* @param concurrentEventAdd flag whether ot not adding of input rows should be thread-safe
*/
public IncrementalIndex(
protected IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
final boolean reportParseExceptions
final boolean reportParseExceptions,
final boolean concurrentEventAdd
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
@ -238,7 +246,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
.setQueryGranularity(this.gran)
.setRollup(this.rollup);
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd);
this.metricDescs = Maps.newLinkedHashMap();
for (AggregatorFactory metric : metrics) {
@ -280,6 +288,113 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
}
public static class Builder
{
private IncrementalIndexSchema incrementalIndexSchema;
private boolean deserializeComplexMetrics;
private boolean reportParseExceptions;
private boolean concurrentEventAdd;
private boolean sortFacts;
private int maxRowCount;
public Builder()
{
incrementalIndexSchema = null;
deserializeComplexMetrics = true;
reportParseExceptions = true;
concurrentEventAdd = false;
sortFacts = true;
maxRowCount = 0;
}
public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
{
this.incrementalIndexSchema = incrementalIndexSchema;
return this;
}
/**
* A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
* that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
* production settings.
*
* @param metrics variable array of {@link AggregatorFactory} metrics
*
* @return this
*/
@VisibleForTesting
public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
{
this.incrementalIndexSchema = new IncrementalIndexSchema.Builder()
.withMetrics(metrics)
.build();
return this;
}
public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
{
this.deserializeComplexMetrics = deserializeComplexMetrics;
return this;
}
public Builder setReportParseExceptions(final boolean reportParseExceptions)
{
this.reportParseExceptions = reportParseExceptions;
return this;
}
public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
{
this.concurrentEventAdd = concurrentEventAdd;
return this;
}
public Builder setSortFacts(final boolean sortFacts)
{
this.sortFacts = sortFacts;
return this;
}
public Builder setMaxRowCount(final int maxRowCount)
{
this.maxRowCount = maxRowCount;
return this;
}
public IncrementalIndex buildOnheap()
{
if (maxRowCount <= 0) {
throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
}
return new OnheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
deserializeComplexMetrics,
reportParseExceptions,
concurrentEventAdd,
sortFacts,
maxRowCount
);
}
public IncrementalIndex buildOffheap(final StupidPool<ByteBuffer> bufferPool)
{
if (maxRowCount <= 0) {
throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
}
return new OffheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
deserializeComplexMetrics,
reportParseExceptions,
concurrentEventAdd,
sortFacts,
maxRowCount,
Objects.requireNonNull(bufferPool, "bufferPool is null")
);
}
}
public boolean isRollup()
{
return rollup;
@ -294,7 +409,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
protected abstract AggregatorType[] initAggs(
AggregatorFactory[] metrics,
Supplier<InputRow> rowSupplier,
boolean deserializeComplexMetrics
boolean deserializeComplexMetrics,
boolean concurrentEventAdd
);
// Note: This method needs to be thread safe.

View File

@ -109,7 +109,7 @@ public class IncrementalIndexSchema
this.minTimestamp = 0L;
this.gran = Granularities.NONE;
this.virtualColumns = VirtualColumns.EMPTY;
this.dimensionsSpec = new DimensionsSpec(null, null, null);
this.dimensionsSpec = DimensionsSpec.EMPTY;
this.metrics = new AggregatorFactory[]{};
this.rollup = true;
}
@ -152,7 +152,7 @@ public class IncrementalIndexSchema
public Builder withDimensionsSpec(DimensionsSpec dimensionsSpec)
{
this.dimensionsSpec = dimensionsSpec == null ? DimensionsSpec.ofEmpty() : dimensionsSpec;
this.dimensionsSpec = dimensionsSpec == null ? DimensionsSpec.EMPTY : dimensionsSpec;
return this;
}
@ -169,7 +169,7 @@ public class IncrementalIndexSchema
return this;
}
public Builder withMetrics(AggregatorFactory[] metrics)
public Builder withMetrics(AggregatorFactory... metrics)
{
this.metrics = metrics;
return this;

View File

@ -26,7 +26,6 @@ import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.aggregation.AggregatorFactory;
@ -66,16 +65,17 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
private String outOfRowsReason = null;
public OffheapIncrementalIndex(
OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
@ -91,47 +91,6 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
aggBuffers.add(bb);
}
public OffheapIncrementalIndex(
long minTimestamp,
Granularity gran,
boolean rollup,
final AggregatorFactory[] metrics,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.withRollup(rollup)
.build(),
true,
true,
true,
maxRowCount,
bufferPool
);
}
public OffheapIncrementalIndex(
long minTimestamp,
Granularity gran,
final AggregatorFactory[] metrics,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
this(
minTimestamp,
gran,
IncrementalIndexSchema.DEFAULT_ROLLUP,
metrics,
maxRowCount,
bufferPool
);
}
@Override
public FactsHolder getFacts()
{
@ -140,7 +99,10 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
@Override
protected BufferAggregator[] initAggs(
AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics
final AggregatorFactory[] metrics,
final Supplier<InputRow> rowSupplier,
final boolean deserializeComplexMetrics,
final boolean concurrentEventAdd
)
{
selectors = Maps.newHashMap();
@ -157,7 +119,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
selectors.put(
agg.getName(),
new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory)
new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
);
if (i == 0) {

View File

@ -23,9 +23,7 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.aggregation.Aggregator;
@ -40,9 +38,9 @@ import io.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -59,93 +57,22 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private String outOfRowsReason = null;
public OnheapIncrementalIndex(
OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd);
this.maxRowCount = maxRowCount;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
: new PlainFactsHolder(sortFacts);
}
public OnheapIncrementalIndex(
long minTimestamp,
Granularity gran,
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.withRollup(IncrementalIndexSchema.DEFAULT_ROLLUP)
.build(),
deserializeComplexMetrics,
reportParseExceptions,
sortFacts,
maxRowCount
);
}
public OnheapIncrementalIndex(
long minTimestamp,
Granularity gran,
boolean rollup,
DimensionsSpec dimensionsSpec,
AggregatorFactory[] metrics,
int maxRowCount
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withDimensionsSpec(dimensionsSpec)
.withMetrics(metrics)
.withRollup(rollup)
.build(),
true,
true,
true,
maxRowCount
);
}
public OnheapIncrementalIndex(
long minTimestamp,
Granularity gran,
final AggregatorFactory[] metrics,
int maxRowCount
)
{
this(
minTimestamp,
gran,
IncrementalIndexSchema.DEFAULT_ROLLUP,
null,
metrics,
maxRowCount
);
}
public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean reportParseExceptions,
int maxRowCount
)
{
this(incrementalIndexSchema, true, reportParseExceptions, true, maxRowCount);
}
@Override
public FactsHolder getFacts()
{
@ -154,14 +81,20 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
@Override
protected Aggregator[] initAggs(
AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics
final AggregatorFactory[] metrics,
final Supplier<InputRow> rowSupplier,
final boolean deserializeComplexMetrics,
final boolean concurrentEventAdd
)
{
selectors = Maps.newHashMap();
for (AggregatorFactory agg : metrics) {
selectors.put(
agg.getName(),
new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics))
new ObjectCachingColumnSelectorFactory(
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics),
concurrentEventAdd
)
);
}
@ -246,7 +179,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
rowContainer.set(row);
for (int i = 0 ; i < aggs.length ; i++) {
for (int i = 0; i < aggs.length; i++) {
final Aggregator agg = aggs[i];
synchronized (agg) {
try {
@ -363,17 +296,28 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
// Caches references to selector objects for each column instead of creating a new object each time in order to save heap space.
// In general the selectorFactory need not to thread-safe.
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
// If required, set concurrentEventAdd to true to use concurrent hash map instead of vanilla hash map for thread-safe
// operations.
static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
{
private final ConcurrentMap<String, LongColumnSelector> longColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, FloatColumnSelector> floatColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, ObjectColumnSelector> objectColumnSelectorMap = Maps.newConcurrentMap();
private final Map<String, LongColumnSelector> longColumnSelectorMap;
private final Map<String, FloatColumnSelector> floatColumnSelectorMap;
private final Map<String, ObjectColumnSelector> objectColumnSelectorMap;
private final ColumnSelectorFactory delegate;
public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate)
public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate, boolean concurrentEventAdd)
{
this.delegate = delegate;
if (concurrentEventAdd) {
longColumnSelectorMap = new ConcurrentHashMap<>();
floatColumnSelectorMap = new ConcurrentHashMap<>();
objectColumnSelectorMap = new ConcurrentHashMap<>();
} else {
longColumnSelectorMap = new HashMap<>();
floatColumnSelectorMap = new HashMap<>();
objectColumnSelectorMap = new HashMap<>();
}
}
@Override
@ -385,49 +329,31 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
FloatColumnSelector existing = floatColumnSelectorMap.get(columnName);
final FloatColumnSelector existing = floatColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName);
FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
return floatColumnSelectorMap.computeIfAbsent(columnName, delegate::makeFloatColumnSelector);
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
LongColumnSelector existing = longColumnSelectorMap.get(columnName);
final LongColumnSelector existing = longColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName);
LongColumnSelector prev = longColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
return longColumnSelectorMap.computeIfAbsent(columnName, delegate::makeLongColumnSelector);
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName);
final ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName);
ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
return objectColumnSelectorMap.computeIfAbsent(columnName, delegate::makeObjectColumnSelector);
}
@Nullable

View File

@ -60,7 +60,6 @@ import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.junit.AfterClass;
@ -113,17 +112,10 @@ public class MultiValuedDimensionTest
@BeforeClass
public static void setupClass() throws Exception
{
incrementalIndex = new OnheapIncrementalIndex(
0,
Granularities.NONE,
new AggregatorFactory[]{
new CountAggregatorFactory("count")
},
true,
true,
true,
5000
);
incrementalIndex = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(5000)
.buildOnheap();
StringInputRowParser parser = new StringInputRowParser(
new CSVParseSpec(

View File

@ -150,7 +150,7 @@ public class SchemaEvolutionTest
.tmpDir(temporaryFolder.newFolder())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("cnt")})
.withMetrics(new CountAggregatorFactory("cnt"))
.withRollup(false)
.build()
)
@ -162,11 +162,11 @@ public class SchemaEvolutionTest
.tmpDir(temporaryFolder.newFolder())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(new AggregatorFactory[]{
.withMetrics(
new CountAggregatorFactory("cnt"),
new LongSumAggregatorFactory("c1", "c1"),
new HyperUniquesAggregatorFactory("uniques", "c2")
})
)
.withRollup(false)
.build()
)
@ -178,11 +178,11 @@ public class SchemaEvolutionTest
.tmpDir(temporaryFolder.newFolder())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(new AggregatorFactory[]{
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("c1", "c1"),
new HyperUniquesAggregatorFactory("uniques", "c2")
})
)
.withRollup(false)
.build()
)
@ -194,9 +194,7 @@ public class SchemaEvolutionTest
.tmpDir(temporaryFolder.newFolder())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(new AggregatorFactory[]{
new HyperUniquesAggregatorFactory("c2", "c2")
})
.withMetrics(new HyperUniquesAggregatorFactory("c2", "c2"))
.withRollup(false)
.build()
)

View File

@ -77,7 +77,7 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.junit.rules.TemporaryFolder;
@ -413,7 +413,18 @@ public class AggregationTestHelper
List<File> toMerge = new ArrayList<>();
try {
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount);
index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build()
)
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
.buildOnheap();
while (rows.hasNext()) {
Object row = rows.next();
if (!index.canAppendRow()) {
@ -421,7 +432,17 @@ public class AggregationTestHelper
toMerge.add(tmp);
indexMerger.persist(index, tmp, new IndexSpec());
index.close();
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount);
index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build()
)
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
.buildOnheap();
}
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to

View File

@ -27,7 +27,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import io.druid.data.input.MapBasedInputRow;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.Druids;
@ -38,11 +37,9 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.LogicalSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -114,10 +111,11 @@ public class DataSourceMetadataQueryTest
@Test
public void testMaxIngestedEventTime() throws Exception
{
final IncrementalIndex rtIndex = new OnheapIncrementalIndex(
0L, Granularities.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, 1000
);
;
final IncrementalIndex rtIndex = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.buildOnheap();
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
(QueryRunnerFactory) new DataSourceMetadataQueryRunnerFactory(
new DataSourceQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),

View File

@ -46,7 +46,6 @@ import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.junit.Rule;
import org.junit.Test;
@ -129,17 +128,11 @@ public class GroupByQueryRunnerFactoryTest
private Segment createSegment() throws Exception
{
IncrementalIndex incrementalIndex = new OnheapIncrementalIndex(
0,
Granularities.NONE,
new AggregatorFactory[]{
new CountAggregatorFactory("count")
},
true,
true,
true,
5000
);
IncrementalIndex incrementalIndex = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setConcurrentEventAdd(true)
.setMaxRowCount(5000)
.buildOnheap();
StringInputRowParser parser = new StringInputRowParser(
new CSVParseSpec(

View File

@ -23,7 +23,6 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
@ -61,7 +60,6 @@ import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -745,13 +743,15 @@ public class SearchQueryRunnerTest
@Test
public void testSearchWithNullValueInDimension() throws Exception
{
IncrementalIndex<Aggregator> index = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.NONE)
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()).build(),
true,
10
);
IncrementalIndex<Aggregator> index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.build()
)
.setMaxRowCount(10)
.buildOnheap();
index.add(
new MapBasedInputRow(
1481871600000L,

View File

@ -43,7 +43,6 @@ import io.druid.segment.Segment;
import io.druid.segment.TestIndex;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -188,7 +187,10 @@ public class MultiSegmentSelectQueryTest
.withQueryGranularity(Granularities.HOUR)
.withMetrics(TestIndex.METRIC_AGGS)
.build();
return new OnheapIncrementalIndex(schema, true, maxRowCount);
return new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(maxRowCount)
.buildOnheap();
}
@AfterClass

View File

@ -38,7 +38,6 @@ import io.druid.segment.Segment;
import io.druid.segment.TestIndex;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -119,7 +118,10 @@ public class TimeBoundaryQueryRunnerTest
.withQueryGranularity(Granularities.HOUR)
.withMetrics(TestIndex.METRIC_AGGS)
.build();
return new OnheapIncrementalIndex(schema, true, maxRowCount);
return new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(maxRowCount)
.buildOnheap();
}
private static String makeIdentifier(IncrementalIndex index, String version)

View File

@ -37,7 +37,7 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -69,9 +69,14 @@ public class TimeseriesQueryRunnerBonusTest
@Test
public void testOneRowAtATime() throws Exception
{
final IncrementalIndex oneRowIndex = new OnheapIncrementalIndex(
new DateTime("2012-01-01T00:00:00Z").getMillis(), Granularities.NONE, new AggregatorFactory[]{}, 1000
);
final IncrementalIndex oneRowIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2012-01-01T00:00:00Z").getMillis())
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
List<Result<TimeseriesResultValue>> results;

View File

@ -22,12 +22,10 @@ package io.druid.segment;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.collections.bitmap.ConciseBitmapFactory;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.junit.Assert;
@ -49,12 +47,11 @@ public class EmptyIndexTest
}
try {
IncrementalIndex emptyIndex = new OnheapIncrementalIndex(
0,
Granularities.NONE,
new AggregatorFactory[0],
1000
);
IncrementalIndex emptyIndex = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(1000)
.buildOnheap();
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(
new Interval("2012-08-01/P3D"),
emptyIndex,

View File

@ -30,7 +30,6 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.io.File;
import java.io.IOException;
@ -47,9 +46,9 @@ public class IndexBuilder
private static final int ROWS_PER_INDEX_FOR_MERGING = 1;
private static final int DEFAULT_MAX_ROWS = Integer.MAX_VALUE;
private IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder().withMetrics(new AggregatorFactory[]{
new CountAggregatorFactory("count")
}).build();
private IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMetrics(new CountAggregatorFactory("count"))
.build();
private IndexMerger indexMerger = TestHelper.getTestIndexMerger();
private File tmpDir;
private IndexSpec indexSpec = new IndexSpec();
@ -203,11 +202,11 @@ public class IndexBuilder
)
{
Preconditions.checkNotNull(schema, "schema");
final IncrementalIndex incrementalIndex = new OnheapIncrementalIndex(
schema,
true,
maxRows
);
final IncrementalIndex incrementalIndex = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(maxRows)
.buildOnheap();
for (InputRow row : rows) {
try {
incrementalIndex.add(row);

View File

@ -30,9 +30,7 @@ import com.google.common.collect.Maps;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.java.util.common.UOE;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
@ -41,7 +39,6 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@ -263,49 +260,39 @@ public class IndexIOTest
this.exception = exception;
}
final IncrementalIndex<Aggregator> incrementalIndex1 = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
.withQueryGranularity(Granularities.NONE)
.withMetrics(
new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
}
)
.withDimensionsSpec(
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
)
.build(),
true,
1000000
);
final IncrementalIndex<Aggregator> incrementalIndex1 = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
.withMetrics(new CountAggregatorFactory("count"))
.withDimensionsSpec(
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
)
.build()
)
.setMaxRowCount(1000000)
.buildOnheap();
final IncrementalIndex<Aggregator> incrementalIndex2 = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
.withQueryGranularity(Granularities.NONE)
.withMetrics(
new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
}
)
.withDimensionsSpec(
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
)
.build(),
true,
1000000
);
final IncrementalIndex<Aggregator> incrementalIndex2 = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
.withMetrics(new CountAggregatorFactory("count"))
.withDimensionsSpec(
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
)
.build()
)
.setMaxRowCount(1000000)
.buildOnheap();
IndexableAdapter adapter1;
IndexableAdapter adapter2;

View File

@ -58,7 +58,6 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -293,12 +292,10 @@ public class IndexMergerTest
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
0L,
Granularities.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.buildOnheap();
toPersist2.add(
new MapBasedInputRow(
@ -379,18 +376,16 @@ public class IndexMergerTest
@Test
public void testPersistEmptyColumn() throws Exception
{
final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
0L,
Granularities.NONE,
new AggregatorFactory[]{},
10
);
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
0L,
Granularities.NONE,
new AggregatorFactory[]{},
10
);
final IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(10)
.buildOnheap();
final IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(10)
.buildOnheap();
final File tmpDir1 = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
final File tmpDir3 = temporaryFolder.newFolder();
@ -924,15 +919,22 @@ public class IndexMergerTest
null,
null
))
.withMinTimestamp(0L)
.withQueryGranularity(Granularities.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withMetrics(new CountAggregatorFactory("count"))
.build();
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, true, 1000);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, true, 1000);
IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, true, 1000);
IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
.buildOnheap();
IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
.buildOnheap();
IncrementalIndex toPersist3 = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
.buildOnheap();
addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2"));
@ -1142,12 +1144,11 @@ public class IndexMergerTest
// d8: 'has null' join 'no null'
// d9: 'no null' join 'no null'
IncrementalIndex toPersistA = new OnheapIncrementalIndex(
0L,
Granularities.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
IncrementalIndex toPersistA = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.buildOnheap();
toPersistA.add(
new MapBasedInputRow(
1,
@ -1167,12 +1168,11 @@ public class IndexMergerTest
)
);
IncrementalIndex toPersistB = new OnheapIncrementalIndex(
0L,
Granularities.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
IncrementalIndex toPersistB = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.buildOnheap();
toPersistB.add(
new MapBasedInputRow(
3,
@ -1283,12 +1283,14 @@ public class IndexMergerTest
// d9: 'no null' join 'no null'
IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(0L)
.withQueryGranularity(Granularities.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withMetrics(new CountAggregatorFactory("count"))
.withRollup(false)
.build();
IncrementalIndex toPersistA = new OnheapIncrementalIndex(indexSchema, true, 1000);
IncrementalIndex toPersistA = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
.buildOnheap();
toPersistA.add(
new MapBasedInputRow(
1,
@ -1308,7 +1310,11 @@ public class IndexMergerTest
)
);
IncrementalIndex toPersistB = new OnheapIncrementalIndex(indexSchema, true, 1000);
IncrementalIndex toPersistB = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
.buildOnheap();
toPersistB.add(
new MapBasedInputRow(
3,
@ -1418,12 +1424,14 @@ public class IndexMergerTest
// 3. merge 2 indexes with duplicate rows
IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(0L)
.withQueryGranularity(Granularities.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withMetrics(new CountAggregatorFactory("count"))
.withRollup(false)
.build();
IncrementalIndex toPersistA = new OnheapIncrementalIndex(indexSchema, true, 1000);
IncrementalIndex toPersistA = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
.buildOnheap();
toPersistA.add(
new MapBasedInputRow(
1,
@ -1443,7 +1451,11 @@ public class IndexMergerTest
)
);
IncrementalIndex toPersistB = new OnheapIncrementalIndex(indexSchema, true, 1000);
IncrementalIndex toPersistB = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
.buildOnheap();
toPersistB.add(
new MapBasedInputRow(
1,
@ -1543,12 +1555,10 @@ public class IndexMergerTest
IncrementalIndex toPersistBA = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
addDimValuesToIndex(toPersistBA, "dimA", Arrays.asList("1", "2"));
IncrementalIndex toPersistBA2 = new OnheapIncrementalIndex(
0L,
Granularities.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
IncrementalIndex toPersistBA2 = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.buildOnheap();
toPersistBA2.add(
new MapBasedInputRow(
@ -2117,14 +2127,14 @@ public class IndexMergerTest
private IncrementalIndex getIndexWithDimsFromSchemata(List<DimensionSchema> dims)
{
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(0L)
.withQueryGranularity(Granularities.NONE)
.withDimensionsSpec(new DimensionsSpec(dims, null, null))
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withRollup(true)
.withMetrics(new CountAggregatorFactory("count"))
.build();
return new OnheapIncrementalIndex(schema, true, 1000);
return new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
.buildOnheap();
}
@ -2174,12 +2184,10 @@ public class IndexMergerTest
private IncrementalIndex getIndexD3() throws Exception
{
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
0L,
Granularities.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.buildOnheap();
toPersist1.add(
new MapBasedInputRow(
@ -2210,12 +2218,10 @@ public class IndexMergerTest
private IncrementalIndex getSingleDimIndex(String dimName, List<String> values) throws Exception
{
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
0L,
Granularities.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.buildOnheap();
addDimValuesToIndex(toPersist1, dimName, values);
return toPersist1;
@ -2237,14 +2243,14 @@ public class IndexMergerTest
private IncrementalIndex getIndexWithDims(List<String> dims)
{
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(0L)
.withQueryGranularity(Granularities.NONE)
.withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null))
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withRollup(true)
.withMetrics(new CountAggregatorFactory("count"))
.build();
return new OnheapIncrementalIndex(schema, true, 1000);
return new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
.buildOnheap();
}
private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators)

View File

@ -28,14 +28,13 @@ import com.google.common.io.Files;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.junit.After;
@ -166,12 +165,16 @@ public class IndexMergerV9CompatibilityTest
@Before
public void setUp() throws IOException
{
toPersist = new OnheapIncrementalIndex(
JodaUtils.MIN_INSTANT,
Granularities.NONE,
DEFAULT_AGG_FACTORIES,
1000000
);
toPersist = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(JodaUtils.MIN_INSTANT)
.withMetrics(DEFAULT_AGG_FACTORIES)
.build()
)
.setMaxRowCount(1000000)
.buildOnheap();
toPersist.getMetadata().put("key", "value");
for (InputRow event : events) {
toPersist.add(event);

View File

@ -45,7 +45,6 @@ import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -102,30 +101,33 @@ public class IndexMergerV9WithSpatialIndexTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
IncrementalIndex theIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
false,
NUM_POINTS
);
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
theIndex.add(
new MapBasedInputRow(
@ -270,79 +272,89 @@ public class IndexMergerV9WithSpatialIndexTest
private static QueryableIndex makeMergedQueryableIndex(IndexSpec indexSpec)
{
try {
IncrementalIndex first = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
IncrementalIndex first = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
false,
1000
);
IncrementalIndex second = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(1000)
.buildOnheap();
)
)
).build(),
false,
1000
);
IncrementalIndex third = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
IncrementalIndex second = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
false,
NUM_POINTS
);
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(1000)
.buildOnheap();
IncrementalIndex third = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
first.add(
new MapBasedInputRow(

View File

@ -39,8 +39,8 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -141,7 +141,16 @@ public class SchemalessIndexTest
final long timestamp = new DateTime(event.get(TIMESTAMP)).getMillis();
if (theIndex == null) {
theIndex = new OnheapIncrementalIndex(timestamp, Granularities.MINUTE, METRIC_AGGS, 1000);
theIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(timestamp)
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(METRIC_AGGS)
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
}
final List<String> dims = Lists.newArrayList();
@ -350,9 +359,16 @@ public class SchemalessIndexTest
}
}
final IncrementalIndex rowIndex = new OnheapIncrementalIndex(
timestamp, Granularities.MINUTE, METRIC_AGGS, 1000
);
final IncrementalIndex rowIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(timestamp)
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(METRIC_AGGS)
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
rowIndex.add(
new MapBasedInputRow(timestamp, dims, event)
@ -380,9 +396,16 @@ public class SchemalessIndexTest
String filename = resource.getFile();
log.info("Realtime loading index file[%s]", filename);
final IncrementalIndex retVal = new OnheapIncrementalIndex(
new DateTime("2011-01-12T00:00:00.000Z").getMillis(), Granularities.MINUTE, aggs, 1000
);
final IncrementalIndex retVal = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(aggs)
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
try {
final List<Object> events = jsonMapper.readValue(new File(filename), List.class);

View File

@ -23,8 +23,6 @@ import com.google.common.collect.ImmutableMap;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
@ -32,7 +30,7 @@ import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import org.joda.time.Interval;
import org.junit.Rule;
import org.junit.Test;
@ -63,31 +61,27 @@ public class StringDimensionHandlerTest
Map<String, Object> event1,
Map<String, Object> event2
) throws Exception {
IncrementalIndex incrementalIndex1 = new OnheapIncrementalIndex(
TEST_INTERVAL.getStartMillis(),
Granularities.NONE,
true,
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null),
new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
},
1000
);
IncrementalIndex incrementalIndex1 = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(TEST_INTERVAL.getStartMillis())
.withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null))
.withMetrics(new CountAggregatorFactory("count"))
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
IncrementalIndex incrementalIndex2 = new OnheapIncrementalIndex(
TEST_INTERVAL.getStartMillis(),
Granularities.NONE,
true,
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null),
new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
},
1000
);
IncrementalIndex incrementalIndex2 = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(TEST_INTERVAL.getStartMillis())
.withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null))
.withMetrics(new CountAggregatorFactory("count"))
.build()
)
.setMaxRowCount(1000)
.buildOnheap();
incrementalIndex1.add(new MapBasedInputRow(TEST_INTERVAL.getStartMillis(), dims, event1));
incrementalIndex2.add(new MapBasedInputRow(TEST_INTERVAL.getStartMillis() + 3, dims, event2));

View File

@ -33,7 +33,6 @@ import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.hll.HyperLogLogHash;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
@ -45,7 +44,6 @@ import io.druid.query.expression.TestExprMacroTable;
import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.DateTime;
@ -262,13 +260,15 @@ public class TestIndex
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.withTimestampSpec(new TimestampSpec("ds", "auto", null))
.withQueryGranularity(Granularities.NONE)
.withDimensionsSpec(DIMENSIONS_SPEC)
.withVirtualColumns(VIRTUAL_COLUMNS)
.withMetrics(METRIC_AGGS)
.withRollup(rollup)
.build();
final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000);
final IncrementalIndex retVal = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(10000)
.buildOnheap();
try {
return loadIncrementalIndex(retVal, source);

View File

@ -64,8 +64,6 @@ import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
@ -133,20 +131,22 @@ public class IncrementalIndexTest
@Override
public IncrementalIndex createIndex(AggregatorFactory[] factories)
{
return new OffheapIncrementalIndex(
0L, Granularities.NONE, factories, 1000000,
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(factories)
.setMaxRowCount(1000000)
.buildOffheap(
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}
},
@ -166,20 +166,27 @@ public class IncrementalIndexTest
@Override
public IncrementalIndex createIndex(AggregatorFactory[] factories)
{
return new OffheapIncrementalIndex(
0L, Granularities.NONE, false, factories, 1000000,
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(factories)
.withRollup(false)
.build()
)
);
.setMaxRowCount(1000000)
.buildOffheap(
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}
}
@ -200,15 +207,22 @@ public class IncrementalIndexTest
public static IncrementalIndex createIndex(
AggregatorFactory[] aggregatorFactories,
DimensionsSpec dimensionsSpec)
DimensionsSpec dimensionsSpec
)
{
if (null == aggregatorFactories) {
aggregatorFactories = defaultAggregatorFactories;
}
return new OnheapIncrementalIndex(
0L, Granularities.NONE, true, dimensionsSpec, aggregatorFactories, 1000000
);
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(dimensionsSpec)
.withMetrics(aggregatorFactories)
.build()
)
.setMaxRowCount(1000000)
.buildOnheap();
}
public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories)
@ -217,9 +231,10 @@ public class IncrementalIndexTest
aggregatorFactories = defaultAggregatorFactories;
}
return new OnheapIncrementalIndex(
0L, Granularities.NONE, true, null, aggregatorFactories, 1000000
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(aggregatorFactories)
.setMaxRowCount(1000000)
.buildOnheap();
}
public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregatorFactories)
@ -228,9 +243,10 @@ public class IncrementalIndexTest
aggregatorFactories = defaultAggregatorFactories;
}
return new OnheapIncrementalIndex(
0L, Granularities.NONE, false, null, aggregatorFactories, 1000000
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(aggregatorFactories)
.setMaxRowCount(1000000)
.buildOnheap();
}
public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException
@ -476,12 +492,12 @@ public class IncrementalIndexTest
for (int i = 0; i < dimensionCount; ++i) {
Assert.assertEquals(
String.format("Failed long sum on dimension %d", i),
2*rows,
2 * rows,
result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue()
);
Assert.assertEquals(
String.format("Failed double sum on dimension %d", i),
2*rows,
2 * rows,
result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue()
);
}
@ -757,26 +773,21 @@ public class IncrementalIndexTest
@Test
public void testgetDimensions()
{
final IncrementalIndex<Aggregator> incrementalIndex = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withQueryGranularity(Granularities.NONE)
.withMetrics(
new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
}
)
.withDimensionsSpec(
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
)
.build(),
true,
1000000
);
final IncrementalIndex<Aggregator> incrementalIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(new CountAggregatorFactory("count"))
.withDimensionsSpec(
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
)
.build()
)
.setMaxRowCount(1000000)
.buildOnheap();
closer.closeLater(incrementalIndex);
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
@ -785,11 +796,10 @@ public class IncrementalIndexTest
@Test
public void testDynamicSchemaRollup() throws IndexSizeExceededException
{
IncrementalIndex<Aggregator> index = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withQueryGranularity(Granularities.NONE).build(),
true,
10
);
IncrementalIndex<Aggregator> index = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(10)
.buildOnheap();
closer.closeLater(index);
index.add(
new MapBasedInputRow(

View File

@ -34,7 +34,6 @@ import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.Pair;
import io.druid.js.JavaScriptConfig;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.filter.BoundDimFilter;
@ -108,11 +107,8 @@ public class FloatFilteringTest extends BaseFilterTest
ROWS,
indexBuilder.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new DoubleSumAggregatorFactory(FLOAT_COLUMN, FLOAT_COLUMN)
}
).build()
.withMetrics(new DoubleSumAggregatorFactory(FLOAT_COLUMN, FLOAT_COLUMN))
.build()
),
finisher,
cnf,

View File

@ -29,7 +29,6 @@ import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -95,11 +94,12 @@ public class InvalidFilteringTest extends BaseFilterTest
private static IndexBuilder overrideIndexBuilderSchema(IndexBuilder indexBuilder)
{
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder().withMetrics(new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new HyperUniquesAggregatorFactory("hyperion", "dim1"),
new DoubleMaxAggregatorFactory("dmax", "dim0")
}).build();
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("count"),
new HyperUniquesAggregatorFactory("hyperion", "dim1"),
new DoubleMaxAggregatorFactory("dmax", "dim0")
).build();
return indexBuilder.schema(schema);
}

View File

@ -34,7 +34,6 @@ import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.Pair;
import io.druid.js.JavaScriptConfig;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.filter.BoundDimFilter;
@ -112,11 +111,8 @@ public class LongFilteringTest extends BaseFilterTest
ROWS,
indexBuilder.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory(LONG_COLUMN, LONG_COLUMN)
}
).build()
.withMetrics(new LongSumAggregatorFactory(LONG_COLUMN, LONG_COLUMN))
.build()
),
finisher,
cnf,

View File

@ -54,7 +54,6 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
@ -116,25 +115,29 @@ public class SpatialFilterBonusTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build(),
false,
NUM_POINTS
);
IncrementalIndex theIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
@ -255,66 +258,76 @@ public class SpatialFilterBonusTest
private static QueryableIndex makeMergedQueryableIndex(final IndexSpec indexSpec)
{
try {
IncrementalIndex first = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
IncrementalIndex first = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build(),
false,
NUM_POINTS
);
IncrementalIndex second = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build(),
false,
NUM_POINTS
);
IncrementalIndex third = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
).build(),
false,
NUM_POINTS
);
IncrementalIndex second = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
IncrementalIndex third = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
first.add(
new MapBasedInputRow(

View File

@ -53,7 +53,6 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
@ -109,30 +108,33 @@ public class SpatialFilterTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
IncrementalIndex theIndex = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
false,
NUM_POINTS
);
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
theIndex.add(
new MapBasedInputRow(
@ -273,79 +275,86 @@ public class SpatialFilterTest
private static QueryableIndex makeMergedQueryableIndex(IndexSpec indexSpec)
{
try {
IncrementalIndex first = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
IncrementalIndex first = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(1000)
.buildOnheap();
)
)
).build(),
false,
1000
);
IncrementalIndex second = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
false,
1000
);
IncrementalIndex third = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
false,
NUM_POINTS
);
IncrementalIndex second = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(1000)
.buildOnheap();
IncrementalIndex third = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(Granularities.DAY)
.withMetrics(METRIC_AGGS)
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build()
)
.setReportParseExceptions(false)
.setMaxRowCount(NUM_POINTS)
.buildOnheap();
first.add(
new MapBasedInputRow(

View File

@ -77,7 +77,10 @@ public class IncrementalIndexMultiValueSpecTest
return null;
}
};
IncrementalIndex<?> index = new OnheapIncrementalIndex(schema, true, 10000);
IncrementalIndex<?> index = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(10000)
.buildOnheap();
index.add(
new MapBasedInputRow(
0, Arrays.asList(

View File

@ -97,9 +97,10 @@ public class IncrementalIndexStorageAdapterTest
@Override
public IncrementalIndex createIndex()
{
return new OnheapIncrementalIndex(
0, Granularities.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
.setMaxRowCount(1000)
.buildOnheap();
}
}
}

View File

@ -87,11 +87,9 @@ public class IncrementalIndexTest
)
};
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(0)
.withQueryGranularity(Granularities.MINUTE)
.withDimensionsSpec(dimensions)
.withMetrics(metrics)
.withRollup(true)
.build();
final List<Object[]> constructors = Lists.newArrayList();
@ -103,7 +101,12 @@ public class IncrementalIndexTest
@Override
public IncrementalIndex createIndex()
{
return new OnheapIncrementalIndex(schema, false, true, sortFacts, 1000);
return new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setDeserializeComplexMetrics(false)
.setSortFacts(sortFacts)
.setMaxRowCount(1000)
.buildOnheap();
}
}
}
@ -115,24 +118,23 @@ public class IncrementalIndexTest
@Override
public IncrementalIndex createIndex()
{
return new OffheapIncrementalIndex(
schema,
true,
true,
sortFacts,
1000000,
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
return new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setSortFacts(sortFacts)
.setMaxRowCount(1000000)
.buildOffheap(
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}
}

View File

@ -110,6 +110,25 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
private final AtomicInteger indexIncrement = new AtomicInteger(0);
ConcurrentHashMap<Integer, Aggregator[]> indexedMap = new ConcurrentHashMap<Integer, Aggregator[]>();
public MapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount
)
{
super(
incrementalIndexSchema,
deserializeComplexMetrics,
reportParseExceptions,
concurrentEventAdd,
sortFacts,
maxRowCount
);
}
public MapIncrementalIndex(
long minTimestamp,
Granularity gran,
@ -117,7 +136,18 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
int maxRowCount
)
{
super(minTimestamp, gran, metrics, maxRowCount);
super(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
true,
true,
false,
true,
maxRowCount
);
}
@Override
@ -254,12 +284,21 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
final int concurrentThreads = 3;
final int elementsPerThread = 1 << 15;
final OnheapIncrementalIndex incrementalIndex = this.incrementalIndex.getConstructor(
Long.TYPE,
Granularity.class,
AggregatorFactory[].class,
final IncrementalIndex incrementalIndex = this.incrementalIndex.getConstructor(
IncrementalIndexSchema.class,
Boolean.TYPE,
Boolean.TYPE,
Boolean.TYPE,
Boolean.TYPE,
Integer.TYPE
).newInstance(0, Granularities.NONE, factories, elementsPerThread * taskCount);
).newInstance(
new IncrementalIndexSchema.Builder().withMetrics(factories).build(),
true,
true,
false,
true,
elementsPerThread * taskCount
);
final ArrayList<AggregatorFactory> queryAggregatorFactories = new ArrayList<>(dimensionCount + 1);
queryAggregatorFactories.add(new CountAggregatorFactory("rows"));
for (int i = 0; i < dimensionCount; ++i) {

View File

@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregator;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import org.easymock.EasyMock;
@ -42,12 +41,15 @@ public class OnheapIncrementalIndexTest
@Test
public void testMultithreadAddFacts() throws Exception
{
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(
0,
Granularities.MINUTE,
new AggregatorFactory[]{new LongMaxAggregatorFactory("max", "max")},
MAX_ROWS
);
final IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(new LongMaxAggregatorFactory("max", "max"))
.build()
)
.setMaxRowCount(MAX_ROWS)
.buildOnheap();
final Random random = new Random();
final int addThreadCount = 2;
@ -108,12 +110,15 @@ public class OnheapIncrementalIndexTest
mockedAggregator.close();
EasyMock.expectLastCall().times(1);
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(
0,
Granularities.MINUTE,
new AggregatorFactory[]{new LongMaxAggregatorFactory("max", "max")},
MAX_ROWS
);
final OnheapIncrementalIndex index = (OnheapIncrementalIndex) new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(new LongMaxAggregatorFactory("max", "max"))
.build()
)
.setMaxRowCount(MAX_ROWS)
.buildOnheap();
index.add(new MapBasedInputRow(
0,

View File

@ -22,8 +22,6 @@ package io.druid.segment.incremental;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;
@ -41,9 +39,10 @@ public class TimeAndDimsCompTest
@Test
public void testBasic() throws IndexSizeExceededException
{
IncrementalIndex index = new OnheapIncrementalIndex(
0, Granularities.NONE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
IncrementalIndex index = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
.setMaxRowCount(1000)
.buildOnheap();
long time = System.currentTimeMillis();
TimeAndDims td1 = index.toTimeAndDims(toMapRow(time, "billy", "A", "joe", "B"));

View File

@ -36,7 +36,6 @@ import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.FireHydrant;
import io.druid.timeline.DataSegment;
@ -254,7 +253,11 @@ public class Sink implements Iterable<FireHydrant>
.withMetrics(schema.getAggregators())
.withRollup(schema.getGranularitySpec().isRollup())
.build();
final IncrementalIndex newIndex = new OnheapIncrementalIndex(indexSchema, reportParseExceptions, maxRowsInMemory);
final IncrementalIndex newIndex = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setReportParseExceptions(reportParseExceptions)
.setMaxRowCount(maxRowsInMemory)
.buildOnheap();
final FireHydrant old;
synchronized (hydrantLock) {

View File

@ -31,7 +31,6 @@ import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.hll.HyperLogLogCollector;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -46,7 +45,6 @@ import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -105,15 +103,15 @@ public class IngestSegmentFirehoseTest
try (
final QueryableIndex qi = indexIO.loadIndex(segmentDir);
final IncrementalIndex index = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(DIMENSIONS_SPEC_REINDEX)
.withQueryGranularity(Granularities.NONE)
.withMetrics(AGGREGATORS_REINDEX.toArray(new AggregatorFactory[]{}))
.build(),
true,
5000
)
final IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(DIMENSIONS_SPEC_REINDEX)
.withMetrics(AGGREGATORS_REINDEX.toArray(new AggregatorFactory[]{}))
.build()
)
.setMaxRowCount(5000)
.buildOnheap();
) {
final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
@ -193,15 +191,15 @@ public class IngestSegmentFirehoseTest
);
try (
final IncrementalIndex index = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(parser.getParseSpec().getDimensionsSpec())
.withQueryGranularity(Granularities.NONE)
.withMetrics(AGGREGATORS.toArray(new AggregatorFactory[]{}))
.build(),
true,
5000
)
final IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(parser.getParseSpec().getDimensionsSpec())
.withMetrics(AGGREGATORS.toArray(new AggregatorFactory[]{}))
.build()
)
.setMaxRowCount(5000)
.buildOnheap();
) {
for (String line : rows) {
index.add(parser.parse(line));

View File

@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.InputRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -93,11 +92,9 @@ public class DruidSchemaTest
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
}
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
)
.withRollup(false)
.build()
@ -110,11 +107,7 @@ public class DruidSchemaTest
.indexMerger(TestHelper.getTestIndexMergerV9())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory("m1", "m1")
}
)
.withMetrics(new LongSumAggregatorFactory("m1", "m1"))
.withRollup(false)
.build()
)

View File

@ -46,7 +46,6 @@ import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -281,11 +280,9 @@ public class CalciteTests
private static final IncrementalIndexSchema INDEX_SCHEMA = new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
}
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
)
.withRollup(false)
.build();