diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index b2f1dfda129..db0c9568dac 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -34,6 +34,7 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; +import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; @@ -320,6 +321,7 @@ public class IndexGeneratorJob implements Jobby } } ); + index.close(); index = makeIncrementalIndex(bucket, aggs); startTime = System.currentTimeMillis(); @@ -378,7 +380,7 @@ public class IndexGeneratorJob implements Jobby } ); } - + index.close(); serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); for (File file : toMerge) { @@ -610,13 +612,15 @@ public class IndexGeneratorJob implements Jobby private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) { + //TODO: review this, add a config for batch ingestion return new IncrementalIndex( new IncrementalIndexSchema.Builder() .withMinTimestamp(theBucket.time.getMillis()) .withSpatialDimensions(config.getSchema().getDataSchema().getParser()) .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withMetrics(aggs) - .build() + .build(), + new OffheapBufferPool(1024 * 1024 * 1024) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 5eb50b622c6..944c5b19ea1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -28,6 +28,7 @@ import com.google.common.collect.Multimaps; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.FilteredServerView; +import io.druid.collections.StupidPool; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -46,6 +47,7 @@ import org.joda.time.Interval; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -73,6 +75,7 @@ public class TaskToolbox private final SegmentLoader segmentLoader; private final ObjectMapper objectMapper; private final File taskWorkDir; + private final StupidPool indexPool; public TaskToolbox( TaskConfig config, @@ -90,7 +93,8 @@ public class TaskToolbox MonitorScheduler monitorScheduler, SegmentLoader segmentLoader, ObjectMapper objectMapper, - final File taskWorkDir + final File taskWorkDir, + StupidPool indexPool ) { this.config = config; @@ -109,6 +113,7 @@ public class TaskToolbox this.segmentLoader = segmentLoader; this.objectMapper = objectMapper; this.taskWorkDir = taskWorkDir; + this.indexPool = indexPool; } public TaskConfig getConfig() @@ -210,4 +215,8 @@ public class TaskToolbox { return taskWorkDir; } + + public StupidPool getIndexPool(){ + return indexPool; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 89d275b9a7f..b950959b473 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -24,6 +24,8 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.FilteredServerView; +import io.druid.collections.StupidPool; +import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Processing; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; @@ -36,6 +38,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.coordination.DataSegmentAnnouncer; import java.io.File; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; /** @@ -57,6 +60,7 @@ public class TaskToolboxFactory private final MonitorScheduler monitorScheduler; private final SegmentLoaderFactory segmentLoaderFactory; private final ObjectMapper objectMapper; + private final StupidPool bufferPool; @Inject public TaskToolboxFactory( @@ -73,7 +77,9 @@ public class TaskToolboxFactory @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, SegmentLoaderFactory segmentLoaderFactory, - ObjectMapper objectMapper + ObjectMapper objectMapper, + //TODO: have a separate index pool + @Global StupidPool bufferPool ) { this.config = config; @@ -90,6 +96,7 @@ public class TaskToolboxFactory this.monitorScheduler = monitorScheduler; this.segmentLoaderFactory = segmentLoaderFactory; this.objectMapper = objectMapper; + this.bufferPool = bufferPool; } public TaskToolbox build(Task task) @@ -112,7 +119,8 @@ public class TaskToolboxFactory monitorScheduler, segmentLoaderFactory.manufacturate(taskWorkDir), objectMapper, - taskWorkDir + taskWorkDir, + bufferPool ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 11f6bb2264d..ff9f1549607 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -31,7 +31,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.Granularity; import com.metamx.common.logger.Logger; +import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; +import io.druid.guice.annotations.Global; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.segment.IndexIO; @@ -49,9 +51,11 @@ import io.druid.segment.realtime.plumber.Sink; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.joda.time.Interval; +import sun.misc.JavaNioAccess; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -65,6 +69,7 @@ public class YeOldePlumberSchool implements PlumberSchool private final String version; private final DataSegmentPusher dataSegmentPusher; private final File tmpSegmentDir; + private final StupidPool bufferPool; private static final Logger log = new Logger(YeOldePlumberSchool.class); @@ -73,13 +78,16 @@ public class YeOldePlumberSchool implements PlumberSchool @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, @JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher, - @JacksonInject("tmpSegmentDir") File tmpSegmentDir + @JacksonInject("tmpSegmentDir") File tmpSegmentDir, + //TODO: review this global annotation + @JacksonInject @Global StupidPool bufferPool ) { this.interval = interval; this.version = version; this.dataSegmentPusher = dataSegmentPusher; this.tmpSegmentDir = tmpSegmentDir; + this.bufferPool = bufferPool; } @Override @@ -96,7 +104,7 @@ public class YeOldePlumberSchool implements PlumberSchool ) { // There can be only one. - final Sink theSink = new Sink(interval, schema, config, version); + final Sink theSink = new Sink(interval, schema, config, version, bufferPool); // Temporary directory to hold spilled segments. final File persistDir = new File(tmpSegmentDir, theSink.getSegment().getIdentifier()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java index 970818a6e9d..90d6eff2775 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java @@ -79,33 +79,43 @@ public class DeleteTask extends AbstractFixedIntervalTask { // Strategy: Create an empty segment covering the interval to be deleted final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); - final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); - final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty); - - // Create DataSegment - final DataSegment segment = - DataSegment.builder() - .dataSource(this.getDataSource()) - .interval(getInterval()) - .version(myLock.getVersion()) - .shardSpec(new NoneShardSpec()) - .build(); - - final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier()); - final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir); - - // Upload the segment - final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment); - - log.info( - "Uploaded tombstone segment for[%s] interval[%s] with version[%s]", - segment.getDataSource(), - segment.getInterval(), - segment.getVersion() + final IncrementalIndex empty = new IncrementalIndex( + 0, + QueryGranularity.NONE, + new AggregatorFactory[0], + toolbox.getIndexPool() ); + try { + final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty); - toolbox.pushSegments(ImmutableList.of(uploadedSegment)); + // Create DataSegment + final DataSegment segment = + DataSegment.builder() + .dataSource(this.getDataSource()) + .interval(getInterval()) + .version(myLock.getVersion()) + .shardSpec(new NoneShardSpec()) + .build(); - return TaskStatus.success(getId()); + final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier()); + final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir); + + // Upload the segment + final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment); + + log.info( + "Uploaded tombstone segment for[%s] interval[%s] with version[%s]", + segment.getDataSource(), + segment.getInterval(), + segment.getVersion() + ); + + toolbox.pushSegments(ImmutableList.of(uploadedSegment)); + + return TaskStatus.success(getId()); + } + finally { + empty.close(); + } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 32565f1d51a..d4cd8ea0bb2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -385,7 +385,8 @@ public class IndexTask extends AbstractFixedIntervalTask interval, version, wrappedDataSegmentPusher, - tmpDir + tmpDir, + toolbox.getIndexPool() ).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics); // rowFlushBoundary for this job diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index e0fc20d1216..bfc6b35fc3f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -308,6 +308,7 @@ public class RealtimeIndexTask extends AbstractTask segmentPublisher, toolbox.getNewSegmentServerView(), toolbox.getQueryExecutorService(), + toolbox.getIndexPool(), null, null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 30325c9d398..9068535f38a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -43,6 +43,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; +import io.druid.offheap.OffheapBufferPool; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; @@ -205,7 +206,8 @@ public class TaskLifecycleTest } ) ), - new DefaultObjectMapper() + new DefaultObjectMapper(), + new OffheapBufferPool(1024 * 1024) ); tr = new ThreadPoolTaskRunner(tb); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 8d4bf32b870..4f88488e436 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -38,6 +38,7 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.jackson.DefaultObjectMapper; +import io.druid.offheap.OffheapBufferPool; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.OmniSegmentLoader; @@ -138,7 +139,8 @@ public class WorkerTaskMonitorTest } } ) - ), jsonMapper + ), jsonMapper, + new OffheapBufferPool(1024 * 1024) ) ), new WorkerConfig().setCapacity(1) diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 51c663c6a2e..43e65b9e49a 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -31,12 +31,14 @@ import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; +import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryHelper; import io.druid.segment.incremental.IncrementalIndex; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; @@ -51,20 +53,25 @@ public class GroupByParallelQueryRunner implements QueryRunner private final ExecutorService exec; private final Ordering ordering; private final Supplier configSupplier; + private final StupidPool bufferPool; + public GroupByParallelQueryRunner( ExecutorService exec, Ordering ordering, Supplier configSupplier, + StupidPool bufferPool, QueryRunner... queryables ) { - this(exec, ordering, configSupplier, Arrays.asList(queryables)); + this(exec, ordering, configSupplier, bufferPool, Arrays.asList(queryables)); } public GroupByParallelQueryRunner( ExecutorService exec, - Ordering ordering, Supplier configSupplier, + Ordering ordering, + Supplier configSupplier, + StupidPool bufferPool, Iterable> queryables ) { @@ -72,6 +79,7 @@ public class GroupByParallelQueryRunner implements QueryRunner this.ordering = ordering; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.configSupplier = configSupplier; + this.bufferPool = bufferPool; } @Override @@ -81,7 +89,8 @@ public class GroupByParallelQueryRunner implements QueryRunner final GroupByQuery query = (GroupByQuery) queryParam; final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, - configSupplier.get() + configSupplier.get(), + bufferPool ); final int priority = query.getContextPriority(0); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 00298f18ba0..e7f1fdea912 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; +import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; @@ -32,13 +33,16 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; import javax.annotation.Nullable; +import java.nio.ByteBuffer; import java.util.List; public class GroupByQueryHelper { public static Pair> createIndexAccumulatorPair( final GroupByQuery query, - final GroupByQueryConfig config + final GroupByQueryConfig config, + StupidPool bufferPool + ) { final QueryGranularity gran = query.getGranularity(); @@ -75,7 +79,8 @@ public class GroupByQueryHelper // since incoming truncated timestamps may precede timeStart granTimeStart, gran, - aggs.toArray(new AggregatorFactory[aggs.size()]) + aggs.toArray(new AggregatorFactory[aggs.size()]), + bufferPool ); Accumulator accumulator = new Accumulator() diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 0e00ceae46d..1cec9a993c7 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -32,8 +32,10 @@ import com.metamx.common.guava.ConcatSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; +import io.druid.guice.annotations.Global; import io.druid.query.DataSource; import io.druid.query.DataSourceUtil; import io.druid.query.IntervalChunkingQueryRunner; @@ -49,6 +51,7 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.Interval; import org.joda.time.Minutes; +import java.nio.ByteBuffer; import java.util.Map; /** @@ -62,15 +65,19 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); private final Supplier configSupplier; private GroupByQueryEngine engine; // For running the outer query around a subquery + private final StupidPool bufferPool; + @Inject public GroupByQueryQueryToolChest( Supplier configSupplier, - GroupByQueryEngine engine + GroupByQueryEngine engine, + @Global StupidPool bufferPool ) { this.configSupplier = configSupplier; this.engine = engine; + this.bufferPool = bufferPool; } @Override @@ -142,7 +149,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, - config + config, + bufferPool ); return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index 714aad37925..a498a162965 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -29,7 +29,9 @@ import com.metamx.common.ISE; import com.metamx.common.guava.ExecutorExecutingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import io.druid.collections.StupidPool; import io.druid.data.input.Row; +import io.druid.guice.annotations.Global; import io.druid.query.ConcatQueryRunner; import io.druid.query.GroupByParallelQueryRunner; import io.druid.query.Query; @@ -39,6 +41,7 @@ import io.druid.query.QueryToolChest; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; +import java.nio.ByteBuffer; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -51,17 +54,20 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory config; private final GroupByQueryQueryToolChest toolChest; + private final StupidPool bufferPool; @Inject public GroupByQueryRunnerFactory( GroupByQueryEngine engine, Supplier config, - GroupByQueryQueryToolChest toolChest + GroupByQueryQueryToolChest toolChest, + @Global StupidPool bufferPool ) { this.engine = engine; this.config = config; this.toolChest = toolChest; + this.bufferPool = bufferPool; } @Override @@ -117,7 +123,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory +public class IncrementalIndex implements Iterable, Closeable { private static final Logger log = new Logger(IncrementalIndex.class); private static final Joiner JOINER = Joiner.on(","); @@ -76,17 +81,21 @@ public class IncrementalIndex implements Iterable private final Map metricIndexes; private final Map metricTypes; private final ImmutableList metricNames; + private final BufferAggregator[] aggs; + private final int[] aggPositionOffsets; + private final int totalAggSize; private final LinkedHashMap dimensionOrder; private final CopyOnWriteArrayList dimensions; private final List spatialDimensions; private final SpatialDimensionRowFormatter spatialDimensionRowFormatter; private final DimensionHolder dimValues; - private final ConcurrentSkipListMap facts; + private final ConcurrentSkipListMap facts; + private final ResourceHolder bufferHolder; private volatile AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. - private InputRow in; + private ThreadLocal in = new ThreadLocal<>(); - public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema) + public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, StupidPool bufferPool) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); this.gran = incrementalIndexSchema.getGran(); @@ -95,7 +104,100 @@ public class IncrementalIndex implements Iterable final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); + this.aggs = new BufferAggregator[metrics.length]; + this.aggPositionOffsets = new int[metrics.length]; + int currAggSize = 0; for (int i = 0; i < metrics.length; i++) { + final AggregatorFactory agg = metrics[i]; + aggs[i] = agg.factorizeBuffered( + new ColumnSelectorFactory() + { + @Override + public TimestampColumnSelector makeTimestampColumnSelector() + { + return new TimestampColumnSelector() + { + @Override + public long getTimestamp() + { + return in.get().getTimestampFromEpoch(); + } + }; + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + final String metricName = columnName.toLowerCase(); + return new FloatColumnSelector() + { + @Override + public float get() + { + return in.get().getFloatMetric(metricName); + } + }; + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + final String typeName = agg.getTypeName(); + final String columnName = column.toLowerCase(); + + if (typeName.equals("float")) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.TYPE; + } + + @Override + public Float get() + { + return in.get().getFloatMetric(columnName); + } + }; + } + + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); + } + + final ComplexMetricExtractor extractor = serde.getExtractor(); + + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } + + @Override + public Object get() + { + return extractor.extractValue(in.get(), columnName); + } + }; + } + + @Override + public DimensionSelector makeDimensionSelector(String dimension) + { + // we should implement this, but this is going to be rewritten soon anyways + throw new UnsupportedOperationException( + "Incremental index aggregation does not support dimension selectors" + ); + } + } + ); + aggPositionOffsets[i] = currAggSize; + currAggSize += agg.getMaxIntermediateSize(); final String metricName = metrics[i].getName().toLowerCase(); metricNamesBuilder.add(metricName); metricIndexesBuilder.put(metricName, i); @@ -105,6 +207,8 @@ public class IncrementalIndex implements Iterable metricIndexes = metricIndexesBuilder.build(); metricTypes = metricTypesBuilder.build(); + this.totalAggSize = currAggSize; + this.dimensionOrder = Maps.newLinkedHashMap(); this.dimensions = new CopyOnWriteArrayList(); int index = 0; @@ -114,22 +218,24 @@ public class IncrementalIndex implements Iterable } this.spatialDimensions = incrementalIndexSchema.getSpatialDimensions(); this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(spatialDimensions); - + this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - this.facts = new ConcurrentSkipListMap(); + this.facts = new ConcurrentSkipListMap(); } public IncrementalIndex( long minTimestamp, QueryGranularity gran, - final AggregatorFactory[] metrics + final AggregatorFactory[] metrics, + StupidPool bufferPool ) { this( new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) .withQueryGranularity(gran) .withMetrics(metrics) - .build() + .build(), + bufferPool ); } @@ -137,7 +243,7 @@ public class IncrementalIndex implements Iterable * Adds a new row. The row might correspond with another row that already exists, in which case this will * update that row instead of inserting a new one. *

- * This is *not* thread-safe. Calls to add() should always happen on the same thread. + * This method is thread-safe. * * @param row the row of data to add * @@ -188,118 +294,23 @@ public class IncrementalIndex implements Iterable TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); - Aggregator[] aggs = facts.get(key); - if (aggs == null) { - aggs = new Aggregator[metrics.length]; - - for (int i = 0; i < metrics.length; ++i) { - final AggregatorFactory agg = metrics[i]; - aggs[i] = - agg.factorize( - new ColumnSelectorFactory() - { - @Override - public TimestampColumnSelector makeTimestampColumnSelector() - { - return new TimestampColumnSelector() - { - @Override - public long getTimestamp() - { - return in.getTimestampFromEpoch(); - } - }; - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - final String metricName = columnName.toLowerCase(); - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.getFloatMetric(metricName); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) - { - final String typeName = agg.getTypeName(); - final String columnName = column.toLowerCase(); - - if (typeName.equals("float")) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Float.TYPE; - } - - @Override - public Float get() - { - return in.getFloatMetric(columnName); - } - }; - } - - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); - - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } - - final ComplexMetricExtractor extractor = serde.getExtractor(); - - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); - } - - @Override - public Object get() - { - return extractor.extractValue(in, columnName); - } - }; - } - - @Override - public DimensionSelector makeDimensionSelector(String dimension) - { - // we should implement this, but this is going to be rewritten soon anyways - throw new UnsupportedOperationException( - "Incremental index aggregation does not support dimension selectors" - ); - } - } - - ); - } - - Aggregator[] prev = facts.putIfAbsent(key, aggs); - if (prev != null) { - aggs = prev; - } else { - numEntries.incrementAndGet(); - } - } - synchronized (this) { - in = row; - for (Aggregator agg : aggs) { - agg.aggregate(); + if (!facts.containsKey(key)) { + int rowOffset = totalAggSize * numEntries.getAndIncrement(); + for (int i = 0; i < aggs.length; i++) { + aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i)); + } + facts.put(key, rowOffset); } - in = null; } + in.set(row); + int rowOffset = facts.get(key); + for (int i = 0; i < aggs.length; i++) { + synchronized (aggs[i]) { + aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); + } + } + in.set(null); return numEntries.get(); } @@ -413,12 +424,27 @@ public class IncrementalIndex implements Iterable return metricIndexes.get(metricName); } - ConcurrentSkipListMap getFacts() + int getMetricPosition(int rowOffset, int metricIndex) + { + return rowOffset + aggPositionOffsets[metricIndex]; + } + + ByteBuffer getMetricBuffer() + { + return bufferHolder.get(); + } + + BufferAggregator getAggregator(int metricIndex) + { + return aggs[metricIndex]; + } + + ConcurrentSkipListMap getFacts() { return facts; } - ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) + ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) { return facts.subMap(start, end); } @@ -438,13 +464,13 @@ public class IncrementalIndex implements Iterable { return Iterators.transform( facts.entrySet().iterator(), - new Function, Row>() + new Function, Row>() { @Override - public Row apply(final Map.Entry input) + public Row apply(final Map.Entry input) { final TimeAndDims timeAndDims = input.getKey(); - final Aggregator[] aggregators = input.getValue(); + final int rowOffset = input.getValue(); String[][] theDims = timeAndDims.getDims(); @@ -456,8 +482,8 @@ public class IncrementalIndex implements Iterable } } - for (int i = 0; i < aggregators.length; ++i) { - theVals.put(metrics[i].getName(), aggregators[i].get()); + for (int i = 0; i < aggs.length; ++i) { + theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i))); } if (postAggs != null) { @@ -474,6 +500,12 @@ public class IncrementalIndex implements Iterable }; } + @Override + public void close() throws IOException + { + bufferHolder.close(); + } + static class DimensionHolder { private final Map dimensions; diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index b72530ba47a..bf44a8b25ac 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -24,7 +24,6 @@ import com.google.common.collect.Maps; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import io.druid.data.input.impl.SpatialDimensionSchema; -import io.druid.query.aggregation.Aggregator; import io.druid.segment.IndexableAdapter; import io.druid.segment.Rowboat; import io.druid.segment.data.EmptyIndexedInts; @@ -45,10 +44,8 @@ import java.util.Map; public class IncrementalIndexAdapter implements IndexableAdapter { private static final Logger log = new Logger(IncrementalIndexAdapter.class); - private final Interval dataInterval; private final IncrementalIndex index; - private final Map> invertedIndexes; public IncrementalIndexAdapter( @@ -171,18 +168,18 @@ public class IncrementalIndexAdapter implements IndexableAdapter return FunctionalIterable .create(index.getFacts().entrySet()) .transform( - new Function, Rowboat>() + new Function, Rowboat>() { int count = 0; @Override public Rowboat apply( - @Nullable Map.Entry input + @Nullable Map.Entry input ) { final IncrementalIndex.TimeAndDims timeAndDims = input.getKey(); final String[][] dimValues = timeAndDims.getDims(); - final Aggregator[] aggs = input.getValue(); + final int rowOffset = input.getValue(); int[][] dims = new int[dimValues.length][]; for (String dimension : index.getDimensions()) { @@ -205,9 +202,10 @@ public class IncrementalIndexAdapter implements IndexableAdapter } } - Object[] metrics = new Object[aggs.length]; - for (int i = 0; i < aggs.length; i++) { - metrics[i] = aggs[i].get(); + Object[] metrics = new Object[index.getMetricAggs().length]; + for (int i = 0; i < metrics.length; i++) { + metrics[i] = index.getAggregator(i) + .get(index.getMetricBuffer(), index.getMetricPosition(rowOffset, i)); } Map description = Maps.newHashMap(); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 0eddf59ac98..3e12d0726c0 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -29,7 +29,7 @@ import com.metamx.collections.spatial.search.Bound; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.BufferAggregator; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcherFactory; @@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentNavigableMap; public class IncrementalIndexStorageAdapter implements StorageAdapter { private static final Splitter SPLITTER = Splitter.on(","); - private final IncrementalIndex index; public IncrementalIndexStorageAdapter( @@ -165,8 +164,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return new Cursor() { - private Iterator> baseIter; - private ConcurrentNavigableMap cursorMap; + private Iterator> baseIter; + private ConcurrentNavigableMap cursorMap; final DateTime time; int numAdvanced = -1; boolean done; @@ -355,13 +354,17 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int metricIndex = metricIndexInt; + final BufferAggregator agg = index.getAggregator(metricIndex); return new FloatColumnSelector() { @Override public float get() { - return currEntry.getValue()[metricIndex].getFloat(); + return agg.getFloat( + index.getMetricBuffer(), + index.getMetricPosition(currEntry.getValue(), metricIndex) + ); } }; } @@ -376,7 +379,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter final int metricIndex = metricIndexInt; final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName)); - + final BufferAggregator agg = index.getAggregator(metricIndex); return new ObjectColumnSelector() { @Override @@ -388,7 +391,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Object get() { - return currEntry.getValue()[metricIndex].get(); + return agg.get( + index.getMetricBuffer(), + index.getMetricPosition(currEntry.getValue(), metricIndex) + ); } }; } @@ -411,11 +417,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; if (dimVals.length == 1) { return dimVals[0]; - } - else if (dimVals.length == 0) { + } else if (dimVals.length == 0) { return null; - } - else { + } else { return dimVals; } } @@ -439,14 +443,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter private static class EntryHolder { - Map.Entry currEntry = null; + Map.Entry currEntry = null; - public Map.Entry get() + public Map.Entry get() { return currEntry; } - public void set(Map.Entry currEntry) + public void set(Map.Entry currEntry) { this.currEntry = currEntry; this.currEntry = currEntry; @@ -457,7 +461,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return currEntry.getKey(); } - public Aggregator[] getValue() + public Integer getValue() { return currEntry.getValue(); } diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index c4767c1c6f9..94d263f7cdf 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -24,11 +24,10 @@ public class TestQueryRunners @Override public ByteBuffer get() { - return ByteBuffer.allocate(1024 * 10); + return ByteBuffer.allocate(1024 * 1024 * 10); } } ); - public static final TopNQueryConfig topNConfig = new TopNQueryConfig(); public static StupidPool getPool() diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueBufferAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueBufferAggregatorTest.java new file mode 100644 index 00000000000..d2d291ac3b9 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueBufferAggregatorTest.java @@ -0,0 +1,93 @@ +package io.druid.query.aggregation.hyperloglog; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import io.druid.segment.ObjectColumnSelector; +import org.junit.Test; + +import java.nio.ByteBuffer; + +/** + * Created with IntelliJ IDEA. + * User: neo + * Date: 05/06/14 + * Time: 3:14 PM + * To change this template use File | Settings | File Templates. + */ +public class HyperUniqueBufferAggregatorTest +{ + private final HashFunction fn = Hashing.murmur3_128(); + private volatile HyperLogLogCollector collector; + + @Test + public void testAggregation() + { + final HyperUniquesBufferAggregator agg = new HyperUniquesBufferAggregator( + new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return HyperLogLogCollector.class; + } + + @Override + public Object get() + { + return collector; + } + } + ); + ByteBuffer byteBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + + for (int i = 0; i < 1000; i++) { + collector = HyperLogLogCollector.makeLatestCollector(); + collector.add(fn.hashInt(i).asBytes()); + agg.aggregate(byteBuffer, 0); + } + + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector( + ((HyperLogLogCollector) agg.get( + byteBuffer, + 0 + )).toByteBuffer() + ); + System.out.println(collector.estimateCardinality()); + + } + + @Test + public void testAggregation2() + { + final HyperUniquesAggregator agg = new HyperUniquesAggregator( + "abc", + new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return HyperLogLogCollector.class; + } + + @Override + public Object get() + { + return collector; + } + } + ); + + for (int i = 0; i < 1000; i++) { + collector = HyperLogLogCollector.makeLatestCollector(); + collector.add(fn.hashInt(i).asBytes()); + agg.aggregate(); + } + + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector( + ((HyperLogLogCollector) agg.get( + )).toByteBuffer() + ); + System.out.println(collector.estimateCardinality()); + + } +} diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 97e64a0ec0c..396c8a5ef33 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -40,6 +40,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -114,7 +115,8 @@ public class GroupByQueryRunnerTest final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( engine, configSupplier, - new GroupByQueryQueryToolChest(configSupplier, engine) + new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool), + TestQueryRunners.pool ); return Lists.newArrayList( @@ -758,7 +760,7 @@ public class GroupByQueryRunnerTest ) ); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -807,7 +809,7 @@ public class GroupByQueryRunnerTest ) ); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } @@ -856,7 +858,7 @@ public class GroupByQueryRunnerTest ) ); - QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); + QueryRunner mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 2538e91bc76..b5077cb2dc6 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -32,6 +32,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; +import io.druid.query.TestQueryRunners; import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQueryRunnerTest; import io.druid.query.timeseries.TimeseriesResultValue; @@ -73,7 +74,8 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( engine, configSupplier, - new GroupByQueryQueryToolChest(configSupplier, engine) + new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool), + TestQueryRunners.pool ); final Collection objects = QueryRunnerTestHelper.makeQueryRunners(factory); diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java index d1497a19026..36eba162090 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java @@ -31,6 +31,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.Result; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.IncrementalIndexSegment; @@ -49,7 +50,8 @@ public class TimeseriesQueryRunnerBonusTest public void testOneRowAtATime() throws Exception { final IncrementalIndex oneRowIndex = new IncrementalIndex( - new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{} + new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{}, + TestQueryRunners.pool ); List> results; diff --git a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java index d4e835566a9..04db1387a37 100644 --- a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java +++ b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java @@ -22,6 +22,7 @@ package io.druid.segment; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.druid.granularity.QueryGranularity; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; @@ -45,7 +46,7 @@ public class EmptyIndexTest } tmpDir.deleteOnExit(); - IncrementalIndex emptyIndex = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); + IncrementalIndex emptyIndex = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0], TestQueryRunners.pool); IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(new Interval("2012-08-01/P3D"), emptyIndex); IndexMerger.merge(Lists.newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir); diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index a21eb92c718..ace0ae7c33e 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Lists; import com.google.common.io.Files; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.incremental.IncrementalIndex; @@ -65,7 +66,7 @@ public class IndexMergerTest final long timestamp = System.currentTimeMillis(); IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(timestamp); - IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); + IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool); toPersist2.add( new MapBasedInputRow( @@ -117,8 +118,8 @@ public class IndexMergerTest @Test public void testPersistEmptyColumn() throws Exception { - final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); - final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); + final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool); + final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool); final File tmpDir1 = Files.createTempDir(); final File tmpDir2 = Files.createTempDir(); final File tmpDir3 = Files.createTempDir(); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 95823210549..14eb6c9db3c 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -31,6 +31,7 @@ import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -155,7 +156,8 @@ public class TestIndex log.info("Realtime loading index file[%s]", resource); final IncrementalIndex retVal = new IncrementalIndex( - new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.NONE, METRIC_AGGS + new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.NONE, METRIC_AGGS, + TestQueryRunners.pool ); final AtomicLong startTime = new AtomicLong(); diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 35fb2b81c0e..7a131901501 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; @@ -45,7 +46,10 @@ public class IncrementalIndexTest public static IncrementalIndex createCaseInsensitiveIndex(long timestamp) { - IncrementalIndex index = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); + IncrementalIndex index = new IncrementalIndex( + 0L, QueryGranularity.NONE, new AggregatorFactory[]{}, + TestQueryRunners.pool + ); index.add( new MapBasedInputRow( @@ -105,7 +109,8 @@ public class IncrementalIndexTest final IncrementalIndex index = new IncrementalIndex( 0L, QueryGranularity.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")} + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + TestQueryRunners.pool ); final int threadCount = 10; final int elementsPerThread = 200; diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index 0eb327972ee..3a7d3d39e7c 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -31,6 +31,7 @@ import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.QueryRunner; import io.druid.query.Result; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -108,7 +109,8 @@ public class SpatialFilterBonusTest Lists.newArrayList() ) ) - ).build() + ).build(), + TestQueryRunners.pool ); theIndex.add( new MapBasedInputRow( @@ -233,7 +235,8 @@ public class SpatialFilterBonusTest Lists.newArrayList() ) ) - ).build() + ).build(), + TestQueryRunners.pool ); IncrementalIndex second = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -246,7 +249,8 @@ public class SpatialFilterBonusTest Lists.newArrayList() ) ) - ).build() + ).build(), + TestQueryRunners.pool ); IncrementalIndex third = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -259,7 +263,8 @@ public class SpatialFilterBonusTest Lists.newArrayList() ) ) - ).build() + ).build(), + TestQueryRunners.pool ); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index d342c12c577..f69166da60d 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -31,6 +31,7 @@ import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.QueryRunner; import io.druid.query.Result; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -108,7 +109,8 @@ public class SpatialFilterTest Arrays.asList("lat", "long") ) ) - ).build() + ).build(), + TestQueryRunners.pool ); theIndex.add( new MapBasedInputRow( @@ -248,7 +250,8 @@ public class SpatialFilterTest Arrays.asList("lat", "long") ) ) - ).build() + ).build(), + TestQueryRunners.pool ); IncrementalIndex second = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -261,7 +264,8 @@ public class SpatialFilterTest Arrays.asList("lat", "long") ) ) - ).build() + ).build(), + TestQueryRunners.pool ); IncrementalIndex third = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -274,7 +278,8 @@ public class SpatialFilterTest Arrays.asList("lat", "long") ) ) - ).build() + ).build(), + TestQueryRunners.pool ); diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index db6fc2f909d..1eee18fe782 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -32,6 +32,7 @@ import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; import io.druid.query.Result; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -62,7 +63,8 @@ public class IncrementalIndexStorageAdapterTest public void testSanity() throws Exception { IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + TestQueryRunners.pool ); index.add( @@ -127,7 +129,8 @@ public class IncrementalIndexStorageAdapterTest @Test public void testResetSanity() { IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + TestQueryRunners.pool ); @@ -179,7 +182,8 @@ public class IncrementalIndexStorageAdapterTest public void testSingleValueTopN() { IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + TestQueryRunners.pool ); DateTime t = DateTime.now(); @@ -234,7 +238,8 @@ public class IncrementalIndexStorageAdapterTest public void testFilterByNull() throws Exception { IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + TestQueryRunners.pool ); index.add( diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index 402aded1f74..ccedcc6e110 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -33,6 +33,7 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.StupidPool; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Processing; +import io.druid.offheap.OffheapBufferPool; import io.druid.query.MetricsEmittingExecutorService; import io.druid.query.PrioritizedExecutorService; import io.druid.server.DruidProcessingConfig; @@ -102,31 +103,8 @@ public class DruidProcessingModule implements Module log.warn(e, e.getMessage()); } - return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes()); + return new OffheapBufferPool(config.intermediateComputeSizeBytes()); } - private static class IntermediateProcessingBufferPool extends StupidPool - { - private static final Logger log = new Logger(IntermediateProcessingBufferPool.class); - public IntermediateProcessingBufferPool(final int computationBufferSize) - { - super( - new Supplier() - { - final AtomicLong count = new AtomicLong(0); - - @Override - public ByteBuffer get() - { - log.info( - "Allocating new intermediate processing buffer[%,d] of size[%,d]", - count.getAndIncrement(), computationBufferSize - ); - return ByteBuffer.allocateDirect(computationBufferSize); - } - } - ); - } - } } diff --git a/server/src/main/java/io/druid/offheap/OffheapBufferPool.java b/server/src/main/java/io/druid/offheap/OffheapBufferPool.java new file mode 100644 index 00000000000..77c70f10583 --- /dev/null +++ b/server/src/main/java/io/druid/offheap/OffheapBufferPool.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.offheap; + +import com.google.common.base.Supplier; +import com.metamx.common.logger.Logger; +import io.druid.collections.StupidPool; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; + + +public class OffheapBufferPool extends StupidPool +{ + private static final Logger log = new Logger(OffheapBufferPool.class); + + public OffheapBufferPool(final int computationBufferSize) + { + super( + new Supplier() + { + final AtomicLong count = new AtomicLong(0); + + @Override + public ByteBuffer get() + { + log.info( + "Allocating new intermediate processing buffer[%,d] of size[%,d]", + count.getAndIncrement(), computationBufferSize + ); + return ByteBuffer.allocateDirect(computationBufferSize); + } + } + ); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index 083c290c4cc..30babfd57ec 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -25,6 +25,7 @@ import com.metamx.common.Granularity; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.collections.StupidPool; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.indexing.DataSchema; @@ -34,6 +35,7 @@ import io.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.DateTime; import org.joda.time.Duration; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -62,7 +64,8 @@ public class FlushingPlumber extends RealtimePlumber ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, - ExecutorService queryExecutorService + ExecutorService queryExecutorService, + StupidPool bufferPool ) { super( @@ -75,7 +78,8 @@ public class FlushingPlumber extends RealtimePlumber queryExecutorService, null, null, - null + null, + bufferPool ); this.flushDuration = flushDuration; diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index 026af6f43c8..f7de3ff21bd 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.metamx.common.Granularity; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.collections.StupidPool; +import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.indexing.DataSchema; @@ -35,6 +37,7 @@ import org.joda.time.Duration; import org.joda.time.Period; import java.io.File; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; /** @@ -51,6 +54,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool private final QueryRunnerFactoryConglomerate conglomerate; private final DataSegmentAnnouncer segmentAnnouncer; private final ExecutorService queryExecutorService; + private final StupidPool bufferPool; @JsonCreator public FlushingPlumberSchool( @@ -59,6 +63,8 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool @JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject @Processing ExecutorService queryExecutorService, + //TODO: define separate index pool + @JacksonInject @Global StupidPool bufferPool, // Backwards compatible @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -76,6 +82,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool null, null, queryExecutorService, + bufferPool, windowPeriod, basePersistDirectory, segmentGranularity, @@ -89,6 +96,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool this.conglomerate = conglomerate; this.segmentAnnouncer = segmentAnnouncer; this.queryExecutorService = queryExecutorService; + this.bufferPool = bufferPool; } @Override @@ -108,7 +116,8 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool emitter, conglomerate, segmentAnnouncer, - queryExecutorService + queryExecutorService, + bufferPool ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 306350aaaa3..cbe8b597b9f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -19,10 +19,12 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.FilteredServerView; import io.druid.client.ServerView; +import io.druid.collections.StupidPool; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; +import io.druid.guice.annotations.Global; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -59,6 +61,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -95,6 +98,8 @@ public class RealtimePlumber implements Plumber private volatile ExecutorService persistExecutor = null; private volatile ExecutorService mergeExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null; + private final StupidPool bufferPool; + public RealtimePlumber( DataSchema schema, @@ -106,7 +111,9 @@ public class RealtimePlumber implements Plumber ExecutorService queryExecutorService, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, - FilteredServerView serverView + FilteredServerView serverView, + StupidPool bufferPool + ) { this.schema = schema; @@ -120,6 +127,7 @@ public class RealtimePlumber implements Plumber this.dataSegmentPusher = dataSegmentPusher; this.segmentPublisher = segmentPublisher; this.serverView = serverView; + this.bufferPool = bufferPool; log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); } @@ -184,7 +192,7 @@ public class RealtimePlumber implements Plumber segmentGranularity.increment(new DateTime(truncatedTime)) ); - retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval)); + retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), bufferPool); try { segmentAnnouncer.announceSegment(retVal.getSegment()); @@ -535,7 +543,7 @@ public class RealtimePlumber implements Plumber ); } - Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); + Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants, bufferPool); sinks.put(sinkInterval.getStartMillis(), currSink); sinkTimeline.add( currSink.getInterval(), diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index eb52a30ba31..2b24fcf4279 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -23,10 +23,14 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.metamx.common.Granularity; +import com.metamx.common.logger.Logger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.FilteredServerView; import io.druid.client.ServerView; +import io.druid.collections.StupidPool; +import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.indexing.DataSchema; @@ -38,7 +42,9 @@ import io.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Period; import java.io.File; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -51,6 +57,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final SegmentPublisher segmentPublisher; private final FilteredServerView serverView; private final ExecutorService queryExecutorService; + private final StupidPool bufferPool; // Backwards compatible private final Period windowPeriod; @@ -69,6 +76,8 @@ public class RealtimePlumberSchool implements PlumberSchool @JacksonInject SegmentPublisher segmentPublisher, @JacksonInject FilteredServerView serverView, @JacksonInject @Processing ExecutorService executorService, + //TODO: define separate index pool + @JacksonInject @Global StupidPool bufferPool, // Backwards compatible @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -91,6 +100,7 @@ public class RealtimePlumberSchool implements PlumberSchool this.versioningPolicy = versioningPolicy; this.rejectionPolicyFactory = rejectionPolicyFactory; this.maxPendingPersists = maxPendingPersists; + this.bufferPool = bufferPool; } @Deprecated @@ -149,7 +159,8 @@ public class RealtimePlumberSchool implements PlumberSchool queryExecutorService, dataSegmentPusher, segmentPublisher, - serverView + serverView, + bufferPool ); } @@ -162,4 +173,5 @@ public class RealtimePlumberSchool implements PlumberSchool Preconditions.checkNotNull(serverView, "must specify a serverView to do this action."); Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action."); } + } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index f3ff140193b..4dc1edf61b8 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -27,8 +27,10 @@ import com.google.common.collect.Lists; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.impl.SpatialDimensionSchema; +import io.druid.guice.annotations.Global; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; @@ -39,6 +41,7 @@ import io.druid.timeline.DataSegment; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -57,18 +60,23 @@ public class Sink implements Iterable private final RealtimeTuningConfig config; private final String version; private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); + private final StupidPool bufferPool; + public Sink( Interval interval, DataSchema schema, RealtimeTuningConfig config, - String version + String version, + StupidPool bufferPool + ) { this.schema = schema; this.config = config; this.interval = interval; this.version = version; + this.bufferPool = bufferPool; makeNewCurrIndex(interval.getStartMillis(), schema); } @@ -78,13 +86,15 @@ public class Sink implements Iterable DataSchema schema, RealtimeTuningConfig config, String version, - List hydrants + List hydrants, + StupidPool bufferPool ) { this.schema = schema; this.config = config; this.interval = interval; this.version = version; + this.bufferPool = bufferPool; for (int i = 0; i < hydrants.size(); ++i) { final FireHydrant hydrant = hydrants.get(i); @@ -183,7 +193,8 @@ public class Sink implements Iterable .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) .withSpatialDimensions(schema.getParser()) .withMetrics(schema.getAggregators()) - .build() + .build(), + bufferPool ); FireHydrant old; diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 19c104a4ff0..1566013194c 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -27,6 +27,7 @@ import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.indexing.DataSchema; @@ -73,7 +74,7 @@ public class FireDepartmentTest new RealtimeIOConfig( null, new RealtimePlumberSchool( - null, null, null, null, null, null, null, null, null, null, null, null, 0 + null, null, null, null, null, null, null, TestQueryRunners.pool, null, null, null, null, null, 0 ) ), new RealtimeTuningConfig( diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index e29dd187b4b..4a1f692a3f4 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -31,6 +31,7 @@ import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.query.Query; import io.druid.query.QueryRunner; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.indexing.DataSchema; @@ -117,7 +118,7 @@ public class RealtimeManagerTest null, null ); - plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); + plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString(), TestQueryRunners.pool)); realtimeManager = new RealtimeManager( Arrays.asList( diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index ddcb503af58..f2a060ea1cc 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -40,6 +40,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.indexing.DataSchema; @@ -144,6 +145,7 @@ public class RealtimePlumberSchoolTest segmentPublisher, serverView, MoreExecutors.sameThreadExecutor(), + TestQueryRunners.pool, new Period("PT10m"), tmpDir, Granularity.HOUR, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index b068a994cb7..e50ac4403dc 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.metamx.common.Granularity; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularity; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.indexing.DataSchema; @@ -64,7 +65,7 @@ public class SinkTest null, null ); - final Sink sink = new Sink(interval, schema, tuningConfig, version); + final Sink sink = new Sink(interval, schema, tuningConfig, version, TestQueryRunners.pool); sink.add( new InputRow()