unit tests passing with offheap-indexing

This commit is contained in:
nishantmonu51 2014-06-05 17:42:53 +05:30
parent d8338fc51d
commit 01e8a713b6
39 changed files with 567 additions and 258 deletions

View File

@ -34,6 +34,7 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
@ -320,6 +321,7 @@ public class IndexGeneratorJob implements Jobby
} }
} }
); );
index.close();
index = makeIncrementalIndex(bucket, aggs); index = makeIncrementalIndex(bucket, aggs);
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
@ -378,7 +380,7 @@ public class IndexGeneratorJob implements Jobby
} }
); );
} }
index.close();
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
for (File file : toMerge) { for (File file : toMerge) {
@ -610,13 +612,15 @@ public class IndexGeneratorJob implements Jobby
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
{ {
//TODO: review this, add a config for batch ingestion
return new IncrementalIndex( return new IncrementalIndex(
new IncrementalIndexSchema.Builder() new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis()) .withMinTimestamp(theBucket.time.getMillis())
.withSpatialDimensions(config.getSchema().getDataSchema().getParser()) .withSpatialDimensions(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs) .withMetrics(aggs)
.build() .build(),
new OffheapBufferPool(1024 * 1024 * 1024)
); );
} }

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Multimaps;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.collections.StupidPool;
import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
@ -46,6 +47,7 @@ import org.joda.time.Interval;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -73,6 +75,7 @@ public class TaskToolbox
private final SegmentLoader segmentLoader; private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final File taskWorkDir; private final File taskWorkDir;
private final StupidPool<ByteBuffer> indexPool;
public TaskToolbox( public TaskToolbox(
TaskConfig config, TaskConfig config,
@ -90,7 +93,8 @@ public class TaskToolbox
MonitorScheduler monitorScheduler, MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader, SegmentLoader segmentLoader,
ObjectMapper objectMapper, ObjectMapper objectMapper,
final File taskWorkDir final File taskWorkDir,
StupidPool<ByteBuffer> indexPool
) )
{ {
this.config = config; this.config = config;
@ -109,6 +113,7 @@ public class TaskToolbox
this.segmentLoader = segmentLoader; this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.taskWorkDir = taskWorkDir; this.taskWorkDir = taskWorkDir;
this.indexPool = indexPool;
} }
public TaskConfig getConfig() public TaskConfig getConfig()
@ -210,4 +215,8 @@ public class TaskToolbox
{ {
return taskWorkDir; return taskWorkDir;
} }
public StupidPool<ByteBuffer> getIndexPool(){
return indexPool;
}
} }

View File

@ -24,6 +24,8 @@ import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.collections.StupidPool;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
@ -36,6 +38,7 @@ import io.druid.segment.loading.DataSegmentPusher;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -57,6 +60,7 @@ public class TaskToolboxFactory
private final MonitorScheduler monitorScheduler; private final MonitorScheduler monitorScheduler;
private final SegmentLoaderFactory segmentLoaderFactory; private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final StupidPool<ByteBuffer> bufferPool;
@Inject @Inject
public TaskToolboxFactory( public TaskToolboxFactory(
@ -73,7 +77,9 @@ public class TaskToolboxFactory
@Processing ExecutorService queryExecutorService, @Processing ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler, MonitorScheduler monitorScheduler,
SegmentLoaderFactory segmentLoaderFactory, SegmentLoaderFactory segmentLoaderFactory,
ObjectMapper objectMapper ObjectMapper objectMapper,
//TODO: have a separate index pool
@Global StupidPool<ByteBuffer> bufferPool
) )
{ {
this.config = config; this.config = config;
@ -90,6 +96,7 @@ public class TaskToolboxFactory
this.monitorScheduler = monitorScheduler; this.monitorScheduler = monitorScheduler;
this.segmentLoaderFactory = segmentLoaderFactory; this.segmentLoaderFactory = segmentLoaderFactory;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.bufferPool = bufferPool;
} }
public TaskToolbox build(Task task) public TaskToolbox build(Task task)
@ -112,7 +119,8 @@ public class TaskToolboxFactory
monitorScheduler, monitorScheduler,
segmentLoaderFactory.manufacturate(taskWorkDir), segmentLoaderFactory.manufacturate(taskWorkDir),
objectMapper, objectMapper,
taskWorkDir taskWorkDir,
bufferPool
); );
} }
} }

View File

@ -31,7 +31,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Global;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
@ -49,9 +51,11 @@ import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.joda.time.Interval; import org.joda.time.Interval;
import sun.misc.JavaNioAccess;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -65,6 +69,7 @@ public class YeOldePlumberSchool implements PlumberSchool
private final String version; private final String version;
private final DataSegmentPusher dataSegmentPusher; private final DataSegmentPusher dataSegmentPusher;
private final File tmpSegmentDir; private final File tmpSegmentDir;
private final StupidPool<ByteBuffer> bufferPool;
private static final Logger log = new Logger(YeOldePlumberSchool.class); private static final Logger log = new Logger(YeOldePlumberSchool.class);
@ -73,13 +78,16 @@ public class YeOldePlumberSchool implements PlumberSchool
@JsonProperty("interval") Interval interval, @JsonProperty("interval") Interval interval,
@JsonProperty("version") String version, @JsonProperty("version") String version,
@JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher, @JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher,
@JacksonInject("tmpSegmentDir") File tmpSegmentDir @JacksonInject("tmpSegmentDir") File tmpSegmentDir,
//TODO: review this global annotation
@JacksonInject @Global StupidPool<ByteBuffer> bufferPool
) )
{ {
this.interval = interval; this.interval = interval;
this.version = version; this.version = version;
this.dataSegmentPusher = dataSegmentPusher; this.dataSegmentPusher = dataSegmentPusher;
this.tmpSegmentDir = tmpSegmentDir; this.tmpSegmentDir = tmpSegmentDir;
this.bufferPool = bufferPool;
} }
@Override @Override
@ -96,7 +104,7 @@ public class YeOldePlumberSchool implements PlumberSchool
) )
{ {
// There can be only one. // There can be only one.
final Sink theSink = new Sink(interval, schema, config, version); final Sink theSink = new Sink(interval, schema, config, version, bufferPool);
// Temporary directory to hold spilled segments. // Temporary directory to hold spilled segments.
final File persistDir = new File(tmpSegmentDir, theSink.getSegment().getIdentifier()); final File persistDir = new File(tmpSegmentDir, theSink.getSegment().getIdentifier());

View File

@ -79,33 +79,43 @@ public class DeleteTask extends AbstractFixedIntervalTask
{ {
// Strategy: Create an empty segment covering the interval to be deleted // Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); final IncrementalIndex empty = new IncrementalIndex(
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty); 0,
QueryGranularity.NONE,
// Create DataSegment new AggregatorFactory[0],
final DataSegment segment = toolbox.getIndexPool()
DataSegment.builder()
.dataSource(this.getDataSource())
.interval(getInterval())
.version(myLock.getVersion())
.shardSpec(new NoneShardSpec())
.build();
final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier());
final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
// Upload the segment
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment);
log.info(
"Uploaded tombstone segment for[%s] interval[%s] with version[%s]",
segment.getDataSource(),
segment.getInterval(),
segment.getVersion()
); );
try {
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
toolbox.pushSegments(ImmutableList.of(uploadedSegment)); // Create DataSegment
final DataSegment segment =
DataSegment.builder()
.dataSource(this.getDataSource())
.interval(getInterval())
.version(myLock.getVersion())
.shardSpec(new NoneShardSpec())
.build();
return TaskStatus.success(getId()); final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier());
final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
// Upload the segment
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment);
log.info(
"Uploaded tombstone segment for[%s] interval[%s] with version[%s]",
segment.getDataSource(),
segment.getInterval(),
segment.getVersion()
);
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
finally {
empty.close();
}
} }
} }

View File

@ -385,7 +385,8 @@ public class IndexTask extends AbstractFixedIntervalTask
interval, interval,
version, version,
wrappedDataSegmentPusher, wrappedDataSegmentPusher,
tmpDir tmpDir,
toolbox.getIndexPool()
).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics); ).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics);
// rowFlushBoundary for this job // rowFlushBoundary for this job

View File

@ -308,6 +308,7 @@ public class RealtimeIndexTask extends AbstractTask
segmentPublisher, segmentPublisher,
toolbox.getNewSegmentServerView(), toolbox.getNewSegmentServerView(),
toolbox.getQueryExecutorService(), toolbox.getQueryExecutorService(),
toolbox.getIndexPool(),
null, null,
null, null,
null, null,

View File

@ -43,6 +43,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.offheap.OffheapBufferPool;
import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
@ -205,7 +206,8 @@ public class TaskLifecycleTest
} }
) )
), ),
new DefaultObjectMapper() new DefaultObjectMapper(),
new OffheapBufferPool(1024 * 1024)
); );
tr = new ThreadPoolTaskRunner(tb); tr = new ThreadPoolTaskRunner(tb);
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);

View File

@ -38,6 +38,7 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.offheap.OffheapBufferPool;
import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.OmniSegmentLoader;
@ -138,7 +139,8 @@ public class WorkerTaskMonitorTest
} }
} }
) )
), jsonMapper ), jsonMapper,
new OffheapBufferPool(1024 * 1024)
) )
), ),
new WorkerConfig().setCapacity(1) new WorkerConfig().setCapacity(1)

View File

@ -31,12 +31,14 @@ import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -51,20 +53,25 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
private final ExecutorService exec; private final ExecutorService exec;
private final Ordering<Row> ordering; private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier; private final Supplier<GroupByQueryConfig> configSupplier;
private final StupidPool<ByteBuffer> bufferPool;
public GroupByParallelQueryRunner( public GroupByParallelQueryRunner(
ExecutorService exec, ExecutorService exec,
Ordering<Row> ordering, Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier, Supplier<GroupByQueryConfig> configSupplier,
StupidPool<ByteBuffer> bufferPool,
QueryRunner<Row>... queryables QueryRunner<Row>... queryables
) )
{ {
this(exec, ordering, configSupplier, Arrays.asList(queryables)); this(exec, ordering, configSupplier, bufferPool, Arrays.asList(queryables));
} }
public GroupByParallelQueryRunner( public GroupByParallelQueryRunner(
ExecutorService exec, ExecutorService exec,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier, Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
StupidPool<ByteBuffer> bufferPool,
Iterable<QueryRunner<Row>> queryables Iterable<QueryRunner<Row>> queryables
) )
{ {
@ -72,6 +79,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
this.ordering = ordering; this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier; this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
} }
@Override @Override
@ -81,7 +89,8 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
final GroupByQuery query = (GroupByQuery) queryParam; final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query, query,
configSupplier.get() configSupplier.get(),
bufferPool
); );
final int priority = query.getContextPriority(0); final int priority = query.getContextPriority(0);

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
@ -32,13 +33,16 @@ import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
public class GroupByQueryHelper public class GroupByQueryHelper
{ {
public static Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> createIndexAccumulatorPair( public static Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> createIndexAccumulatorPair(
final GroupByQuery query, final GroupByQuery query,
final GroupByQueryConfig config final GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool
) )
{ {
final QueryGranularity gran = query.getGranularity(); final QueryGranularity gran = query.getGranularity();
@ -75,7 +79,8 @@ public class GroupByQueryHelper
// since incoming truncated timestamps may precede timeStart // since incoming truncated timestamps may precede timeStart
granTimeStart, granTimeStart,
gran, gran,
aggs.toArray(new AggregatorFactory[aggs.size()]) aggs.toArray(new AggregatorFactory[aggs.size()]),
bufferPool
); );
Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>() Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()

View File

@ -32,8 +32,10 @@ import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.guice.annotations.Global;
import io.druid.query.DataSource; import io.druid.query.DataSource;
import io.druid.query.DataSourceUtil; import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
@ -49,6 +51,7 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Minutes; import org.joda.time.Minutes;
import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
/** /**
@ -62,15 +65,19 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(GROUP_BY_MERGE_KEY, "false"); private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(GROUP_BY_MERGE_KEY, "false");
private final Supplier<GroupByQueryConfig> configSupplier; private final Supplier<GroupByQueryConfig> configSupplier;
private GroupByQueryEngine engine; // For running the outer query around a subquery private GroupByQueryEngine engine; // For running the outer query around a subquery
private final StupidPool<ByteBuffer> bufferPool;
@Inject @Inject
public GroupByQueryQueryToolChest( public GroupByQueryQueryToolChest(
Supplier<GroupByQueryConfig> configSupplier, Supplier<GroupByQueryConfig> configSupplier,
GroupByQueryEngine engine GroupByQueryEngine engine,
@Global StupidPool<ByteBuffer> bufferPool
) )
{ {
this.configSupplier = configSupplier; this.configSupplier = configSupplier;
this.engine = engine; this.engine = engine;
this.bufferPool = bufferPool;
} }
@Override @Override
@ -142,7 +149,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final GroupByQueryConfig config = configSupplier.get(); final GroupByQueryConfig config = configSupplier.get();
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query, query,
config config,
bufferPool
); );
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);

View File

@ -29,7 +29,9 @@ import com.metamx.common.ISE;
import com.metamx.common.guava.ExecutorExecutingSequence; import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.guice.annotations.Global;
import io.druid.query.ConcatQueryRunner; import io.druid.query.ConcatQueryRunner;
import io.druid.query.GroupByParallelQueryRunner; import io.druid.query.GroupByParallelQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
@ -39,6 +41,7 @@ import io.druid.query.QueryToolChest;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -51,17 +54,20 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
private final GroupByQueryEngine engine; private final GroupByQueryEngine engine;
private final Supplier<GroupByQueryConfig> config; private final Supplier<GroupByQueryConfig> config;
private final GroupByQueryQueryToolChest toolChest; private final GroupByQueryQueryToolChest toolChest;
private final StupidPool<ByteBuffer> bufferPool;
@Inject @Inject
public GroupByQueryRunnerFactory( public GroupByQueryRunnerFactory(
GroupByQueryEngine engine, GroupByQueryEngine engine,
Supplier<GroupByQueryConfig> config, Supplier<GroupByQueryConfig> config,
GroupByQueryQueryToolChest toolChest GroupByQueryQueryToolChest toolChest,
@Global StupidPool<ByteBuffer> bufferPool
) )
{ {
this.engine = engine; this.engine = engine;
this.config = config; this.config = config;
this.toolChest = toolChest; this.toolChest = toolChest;
this.bufferPool = bufferPool;
} }
@Override @Override
@ -117,7 +123,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
) )
); );
} else { } else {
return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryRunners); return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, bufferPool, queryRunners);
} }
} }

View File

@ -33,13 +33,15 @@ import com.google.common.primitives.Longs;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
@ -53,6 +55,9 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -66,7 +71,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
public class IncrementalIndex implements Iterable<Row> public class IncrementalIndex implements Iterable<Row>, Closeable
{ {
private static final Logger log = new Logger(IncrementalIndex.class); private static final Logger log = new Logger(IncrementalIndex.class);
private static final Joiner JOINER = Joiner.on(","); private static final Joiner JOINER = Joiner.on(",");
@ -76,17 +81,21 @@ public class IncrementalIndex implements Iterable<Row>
private final Map<String, Integer> metricIndexes; private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes; private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames; private final ImmutableList<String> metricNames;
private final BufferAggregator[] aggs;
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final LinkedHashMap<String, Integer> dimensionOrder; private final LinkedHashMap<String, Integer> dimensionOrder;
private final CopyOnWriteArrayList<String> dimensions; private final CopyOnWriteArrayList<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions; private final List<SpatialDimensionSchema> spatialDimensions;
private final SpatialDimensionRowFormatter spatialDimensionRowFormatter; private final SpatialDimensionRowFormatter spatialDimensionRowFormatter;
private final DimensionHolder dimValues; private final DimensionHolder dimValues;
private final ConcurrentSkipListMap<TimeAndDims, Aggregator[]> facts; private final ConcurrentSkipListMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder;
private volatile AtomicInteger numEntries = new AtomicInteger(); private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section. // This is modified on add() in a critical section.
private InputRow in; private ThreadLocal<InputRow> in = new ThreadLocal<>();
public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema) public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, StupidPool<ByteBuffer> bufferPool)
{ {
this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran(); this.gran = incrementalIndexSchema.getGran();
@ -95,7 +104,100 @@ public class IncrementalIndex implements Iterable<Row>
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder(); final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder(); final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder(); final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggs = new BufferAggregator[metrics.length];
this.aggPositionOffsets = new int[metrics.length];
int currAggSize = 0;
for (int i = 0; i < metrics.length; i++) { for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
new ColumnSelectorFactory()
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return in.get().getTimestampFromEpoch();
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(metricName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), columnName);
}
};
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
// we should implement this, but this is going to be rewritten soon anyways
throw new UnsupportedOperationException(
"Incremental index aggregation does not support dimension selectors"
);
}
}
);
aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize();
final String metricName = metrics[i].getName().toLowerCase(); final String metricName = metrics[i].getName().toLowerCase();
metricNamesBuilder.add(metricName); metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i); metricIndexesBuilder.put(metricName, i);
@ -105,6 +207,8 @@ public class IncrementalIndex implements Iterable<Row>
metricIndexes = metricIndexesBuilder.build(); metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build(); metricTypes = metricTypesBuilder.build();
this.totalAggSize = currAggSize;
this.dimensionOrder = Maps.newLinkedHashMap(); this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<String>(); this.dimensions = new CopyOnWriteArrayList<String>();
int index = 0; int index = 0;
@ -114,22 +218,24 @@ public class IncrementalIndex implements Iterable<Row>
} }
this.spatialDimensions = incrementalIndexSchema.getSpatialDimensions(); this.spatialDimensions = incrementalIndexSchema.getSpatialDimensions();
this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(spatialDimensions); this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(spatialDimensions);
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder(); this.dimValues = new DimensionHolder();
this.facts = new ConcurrentSkipListMap<TimeAndDims, Aggregator[]>(); this.facts = new ConcurrentSkipListMap<TimeAndDims, Integer>();
} }
public IncrementalIndex( public IncrementalIndex(
long minTimestamp, long minTimestamp,
QueryGranularity gran, QueryGranularity gran,
final AggregatorFactory[] metrics final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool
) )
{ {
this( this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran) .withQueryGranularity(gran)
.withMetrics(metrics) .withMetrics(metrics)
.build() .build(),
bufferPool
); );
} }
@ -137,7 +243,7 @@ public class IncrementalIndex implements Iterable<Row>
* Adds a new row. The row might correspond with another row that already exists, in which case this will * Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one. * update that row instead of inserting a new one.
* <p/> * <p/>
* This is *not* thread-safe. Calls to add() should always happen on the same thread. * This method is thread-safe.
* *
* @param row the row of data to add * @param row the row of data to add
* *
@ -188,118 +294,23 @@ public class IncrementalIndex implements Iterable<Row>
TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Aggregator[] aggs = facts.get(key);
if (aggs == null) {
aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; ++i) {
final AggregatorFactory agg = metrics[i];
aggs[i] =
agg.factorize(
new ColumnSelectorFactory()
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return in.getTimestampFromEpoch();
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.getFloatMetric(metricName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return in.getFloatMetric(columnName);
}
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
}
};
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
// we should implement this, but this is going to be rewritten soon anyways
throw new UnsupportedOperationException(
"Incremental index aggregation does not support dimension selectors"
);
}
}
);
}
Aggregator[] prev = facts.putIfAbsent(key, aggs);
if (prev != null) {
aggs = prev;
} else {
numEntries.incrementAndGet();
}
}
synchronized (this) { synchronized (this) {
in = row; if (!facts.containsKey(key)) {
for (Aggregator agg : aggs) { int rowOffset = totalAggSize * numEntries.getAndIncrement();
agg.aggregate(); for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
facts.put(key, rowOffset);
} }
in = null;
} }
in.set(row);
int rowOffset = facts.get(key);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
in.set(null);
return numEntries.get(); return numEntries.get();
} }
@ -413,12 +424,27 @@ public class IncrementalIndex implements Iterable<Row>
return metricIndexes.get(metricName); return metricIndexes.get(metricName);
} }
ConcurrentSkipListMap<TimeAndDims, Aggregator[]> getFacts() int getMetricPosition(int rowOffset, int metricIndex)
{
return rowOffset + aggPositionOffsets[metricIndex];
}
ByteBuffer getMetricBuffer()
{
return bufferHolder.get();
}
BufferAggregator getAggregator(int metricIndex)
{
return aggs[metricIndex];
}
ConcurrentSkipListMap<TimeAndDims, Integer> getFacts()
{ {
return facts; return facts;
} }
ConcurrentNavigableMap<TimeAndDims, Aggregator[]> getSubMap(TimeAndDims start, TimeAndDims end) ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{ {
return facts.subMap(start, end); return facts.subMap(start, end);
} }
@ -438,13 +464,13 @@ public class IncrementalIndex implements Iterable<Row>
{ {
return Iterators.transform( return Iterators.transform(
facts.entrySet().iterator(), facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Aggregator[]>, Row>() new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{ {
@Override @Override
public Row apply(final Map.Entry<TimeAndDims, Aggregator[]> input) public Row apply(final Map.Entry<TimeAndDims, Integer> input)
{ {
final TimeAndDims timeAndDims = input.getKey(); final TimeAndDims timeAndDims = input.getKey();
final Aggregator[] aggregators = input.getValue(); final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims(); String[][] theDims = timeAndDims.getDims();
@ -456,8 +482,8 @@ public class IncrementalIndex implements Iterable<Row>
} }
} }
for (int i = 0; i < aggregators.length; ++i) { for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), aggregators[i].get()); theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i)));
} }
if (postAggs != null) { if (postAggs != null) {
@ -474,6 +500,12 @@ public class IncrementalIndex implements Iterable<Row>
}; };
} }
@Override
public void close() throws IOException
{
bufferHolder.close();
}
static class DimensionHolder static class DimensionHolder
{ {
private final Map<String, DimDim> dimensions; private final Map<String, DimDim> dimensions;

View File

@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.IndexableAdapter; import io.druid.segment.IndexableAdapter;
import io.druid.segment.Rowboat; import io.druid.segment.Rowboat;
import io.druid.segment.data.EmptyIndexedInts; import io.druid.segment.data.EmptyIndexedInts;
@ -45,10 +44,8 @@ import java.util.Map;
public class IncrementalIndexAdapter implements IndexableAdapter public class IncrementalIndexAdapter implements IndexableAdapter
{ {
private static final Logger log = new Logger(IncrementalIndexAdapter.class); private static final Logger log = new Logger(IncrementalIndexAdapter.class);
private final Interval dataInterval; private final Interval dataInterval;
private final IncrementalIndex index; private final IncrementalIndex index;
private final Map<String, Map<String, ConciseSet>> invertedIndexes; private final Map<String, Map<String, ConciseSet>> invertedIndexes;
public IncrementalIndexAdapter( public IncrementalIndexAdapter(
@ -171,18 +168,18 @@ public class IncrementalIndexAdapter implements IndexableAdapter
return FunctionalIterable return FunctionalIterable
.create(index.getFacts().entrySet()) .create(index.getFacts().entrySet())
.transform( .transform(
new Function<Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]>, Rowboat>() new Function<Map.Entry<IncrementalIndex.TimeAndDims, Integer>, Rowboat>()
{ {
int count = 0; int count = 0;
@Override @Override
public Rowboat apply( public Rowboat apply(
@Nullable Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]> input @Nullable Map.Entry<IncrementalIndex.TimeAndDims, Integer> input
) )
{ {
final IncrementalIndex.TimeAndDims timeAndDims = input.getKey(); final IncrementalIndex.TimeAndDims timeAndDims = input.getKey();
final String[][] dimValues = timeAndDims.getDims(); final String[][] dimValues = timeAndDims.getDims();
final Aggregator[] aggs = input.getValue(); final int rowOffset = input.getValue();
int[][] dims = new int[dimValues.length][]; int[][] dims = new int[dimValues.length][];
for (String dimension : index.getDimensions()) { for (String dimension : index.getDimensions()) {
@ -205,9 +202,10 @@ public class IncrementalIndexAdapter implements IndexableAdapter
} }
} }
Object[] metrics = new Object[aggs.length]; Object[] metrics = new Object[index.getMetricAggs().length];
for (int i = 0; i < aggs.length; i++) { for (int i = 0; i < metrics.length; i++) {
metrics[i] = aggs[i].get(); metrics[i] = index.getAggregator(i)
.get(index.getMetricBuffer(), index.getMetricPosition(rowOffset, i));
} }
Map<String, String> description = Maps.newHashMap(); Map<String, String> description = Maps.newHashMap();

View File

@ -29,7 +29,7 @@ import com.metamx.collections.spatial.search.Bound;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentNavigableMap;
public class IncrementalIndexStorageAdapter implements StorageAdapter public class IncrementalIndexStorageAdapter implements StorageAdapter
{ {
private static final Splitter SPLITTER = Splitter.on(","); private static final Splitter SPLITTER = Splitter.on(",");
private final IncrementalIndex index; private final IncrementalIndex index;
public IncrementalIndexStorageAdapter( public IncrementalIndexStorageAdapter(
@ -165,8 +164,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return new Cursor() return new Cursor()
{ {
private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]>> baseIter; private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> baseIter;
private ConcurrentNavigableMap<IncrementalIndex.TimeAndDims, Aggregator[]> cursorMap; private ConcurrentNavigableMap<IncrementalIndex.TimeAndDims, Integer> cursorMap;
final DateTime time; final DateTime time;
int numAdvanced = -1; int numAdvanced = -1;
boolean done; boolean done;
@ -355,13 +354,17 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
} }
final int metricIndex = metricIndexInt; final int metricIndex = metricIndexInt;
final BufferAggregator agg = index.getAggregator(metricIndex);
return new FloatColumnSelector() return new FloatColumnSelector()
{ {
@Override @Override
public float get() public float get()
{ {
return currEntry.getValue()[metricIndex].getFloat(); return agg.getFloat(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
);
} }
}; };
} }
@ -376,7 +379,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final int metricIndex = metricIndexInt; final int metricIndex = metricIndexInt;
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName)); final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName));
final BufferAggregator agg = index.getAggregator(metricIndex);
return new ObjectColumnSelector() return new ObjectColumnSelector()
{ {
@Override @Override
@ -388,7 +391,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override @Override
public Object get() public Object get()
{ {
return currEntry.getValue()[metricIndex].get(); return agg.get(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
);
} }
}; };
} }
@ -411,11 +417,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
if (dimVals.length == 1) { if (dimVals.length == 1) {
return dimVals[0]; return dimVals[0];
} } else if (dimVals.length == 0) {
else if (dimVals.length == 0) {
return null; return null;
} } else {
else {
return dimVals; return dimVals;
} }
} }
@ -439,14 +443,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
private static class EntryHolder private static class EntryHolder
{ {
Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]> currEntry = null; Map.Entry<IncrementalIndex.TimeAndDims, Integer> currEntry = null;
public Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]> get() public Map.Entry<IncrementalIndex.TimeAndDims, Integer> get()
{ {
return currEntry; return currEntry;
} }
public void set(Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]> currEntry) public void set(Map.Entry<IncrementalIndex.TimeAndDims, Integer> currEntry)
{ {
this.currEntry = currEntry; this.currEntry = currEntry;
this.currEntry = currEntry; this.currEntry = currEntry;
@ -457,7 +461,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return currEntry.getKey(); return currEntry.getKey();
} }
public Aggregator[] getValue() public Integer getValue()
{ {
return currEntry.getValue(); return currEntry.getValue();
} }

View File

@ -24,11 +24,10 @@ public class TestQueryRunners
@Override @Override
public ByteBuffer get() public ByteBuffer get()
{ {
return ByteBuffer.allocate(1024 * 10); return ByteBuffer.allocate(1024 * 1024 * 10);
} }
} }
); );
public static final TopNQueryConfig topNConfig = new TopNQueryConfig(); public static final TopNQueryConfig topNConfig = new TopNQueryConfig();
public static StupidPool<ByteBuffer> getPool() public static StupidPool<ByteBuffer> getPool()

View File

@ -0,0 +1,93 @@
package io.druid.query.aggregation.hyperloglog;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.druid.segment.ObjectColumnSelector;
import org.junit.Test;
import java.nio.ByteBuffer;
/**
* Created with IntelliJ IDEA.
* User: neo
* Date: 05/06/14
* Time: 3:14 PM
* To change this template use File | Settings | File Templates.
*/
public class HyperUniqueBufferAggregatorTest
{
private final HashFunction fn = Hashing.murmur3_128();
private volatile HyperLogLogCollector collector;
@Test
public void testAggregation()
{
final HyperUniquesBufferAggregator agg = new HyperUniquesBufferAggregator(
new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return HyperLogLogCollector.class;
}
@Override
public Object get()
{
return collector;
}
}
);
ByteBuffer byteBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage());
for (int i = 0; i < 1000; i++) {
collector = HyperLogLogCollector.makeLatestCollector();
collector.add(fn.hashInt(i).asBytes());
agg.aggregate(byteBuffer, 0);
}
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
((HyperLogLogCollector) agg.get(
byteBuffer,
0
)).toByteBuffer()
);
System.out.println(collector.estimateCardinality());
}
@Test
public void testAggregation2()
{
final HyperUniquesAggregator agg = new HyperUniquesAggregator(
"abc",
new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return HyperLogLogCollector.class;
}
@Override
public Object get()
{
return collector;
}
}
);
for (int i = 0; i < 1000; i++) {
collector = HyperLogLogCollector.makeLatestCollector();
collector.add(fn.hashInt(i).asBytes());
agg.aggregate();
}
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
((HyperLogLogCollector) agg.get(
)).toByteBuffer()
);
System.out.println(collector.estimateCardinality());
}
}

View File

@ -40,6 +40,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -114,7 +115,8 @@ public class GroupByQueryRunnerTest
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine, engine,
configSupplier, configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine) new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool),
TestQueryRunners.pool
); );
return Lists.newArrayList( return Lists.newArrayList(
@ -758,7 +760,7 @@ public class GroupByQueryRunnerTest
) )
); );
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
} }
@ -807,7 +809,7 @@ public class GroupByQueryRunnerTest
) )
); );
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
} }
@ -856,7 +858,7 @@ public class GroupByQueryRunnerTest
) )
); );
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner); QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit"); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
} }

View File

@ -32,6 +32,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryRunnerTest; import io.druid.query.timeseries.TimeseriesQueryRunnerTest;
import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.query.timeseries.TimeseriesResultValue;
@ -73,7 +74,8 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine, engine,
configSupplier, configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine) new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool),
TestQueryRunners.pool
); );
final Collection<?> objects = QueryRunnerTestHelper.makeQueryRunners(factory); final Collection<?> objects = QueryRunnerTestHelper.makeQueryRunners(factory);

View File

@ -31,6 +31,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactory;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.IncrementalIndexSegment;
@ -49,7 +50,8 @@ public class TimeseriesQueryRunnerBonusTest
public void testOneRowAtATime() throws Exception public void testOneRowAtATime() throws Exception
{ {
final IncrementalIndex oneRowIndex = new IncrementalIndex( final IncrementalIndex oneRowIndex = new IncrementalIndex(
new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{} new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{},
TestQueryRunners.pool
); );
List<Result<TimeseriesResultValue>> results; List<Result<TimeseriesResultValue>> results;

View File

@ -22,6 +22,7 @@ package io.druid.segment;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.IncrementalIndexAdapter;
@ -45,7 +46,7 @@ public class EmptyIndexTest
} }
tmpDir.deleteOnExit(); tmpDir.deleteOnExit();
IncrementalIndex emptyIndex = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); IncrementalIndex emptyIndex = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0], TestQueryRunners.pool);
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(new Interval("2012-08-01/P3D"), emptyIndex); IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(new Interval("2012-08-01/P3D"), emptyIndex);
IndexMerger.merge(Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir); IndexMerger.merge(Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir);

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import com.google.common.io.Files; import com.google.common.io.Files;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
@ -65,7 +66,7 @@ public class IndexMergerTest
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(timestamp); IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(timestamp);
IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
toPersist2.add( toPersist2.add(
new MapBasedInputRow( new MapBasedInputRow(
@ -117,8 +118,8 @@ public class IndexMergerTest
@Test @Test
public void testPersistEmptyColumn() throws Exception public void testPersistEmptyColumn() throws Exception
{ {
final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
final File tmpDir1 = Files.createTempDir(); final File tmpDir1 = Files.createTempDir();
final File tmpDir2 = Files.createTempDir(); final File tmpDir2 = Files.createTempDir();
final File tmpDir3 = Files.createTempDir(); final File tmpDir3 = Files.createTempDir();

View File

@ -31,6 +31,7 @@ import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec; import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -155,7 +156,8 @@ public class TestIndex
log.info("Realtime loading index file[%s]", resource); log.info("Realtime loading index file[%s]", resource);
final IncrementalIndex retVal = new IncrementalIndex( final IncrementalIndex retVal = new IncrementalIndex(
new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.NONE, METRIC_AGGS new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.NONE, METRIC_AGGS,
TestQueryRunners.pool
); );
final AtomicLong startTime = new AtomicLong(); final AtomicLong startTime = new AtomicLong();

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
@ -45,7 +46,10 @@ public class IncrementalIndexTest
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp) public static IncrementalIndex createCaseInsensitiveIndex(long timestamp)
{ {
IncrementalIndex index = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); IncrementalIndex index = new IncrementalIndex(
0L, QueryGranularity.NONE, new AggregatorFactory[]{},
TestQueryRunners.pool
);
index.add( index.add(
new MapBasedInputRow( new MapBasedInputRow(
@ -105,7 +109,8 @@ public class IncrementalIndexTest
final IncrementalIndex index = new IncrementalIndex( final IncrementalIndex index = new IncrementalIndex(
0L, 0L,
QueryGranularity.NONE, QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")} new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool
); );
final int threadCount = 10; final int threadCount = 10;
final int elementsPerThread = 200; final int elementsPerThread = 200;

View File

@ -31,6 +31,7 @@ import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -108,7 +109,8 @@ public class SpatialFilterBonusTest
Lists.<String>newArrayList() Lists.<String>newArrayList()
) )
) )
).build() ).build(),
TestQueryRunners.pool
); );
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
@ -233,7 +235,8 @@ public class SpatialFilterBonusTest
Lists.<String>newArrayList() Lists.<String>newArrayList()
) )
) )
).build() ).build(),
TestQueryRunners.pool
); );
IncrementalIndex second = new IncrementalIndex( IncrementalIndex second = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -246,7 +249,8 @@ public class SpatialFilterBonusTest
Lists.<String>newArrayList() Lists.<String>newArrayList()
) )
) )
).build() ).build(),
TestQueryRunners.pool
); );
IncrementalIndex third = new IncrementalIndex( IncrementalIndex third = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -259,7 +263,8 @@ public class SpatialFilterBonusTest
Lists.<String>newArrayList() Lists.<String>newArrayList()
) )
) )
).build() ).build(),
TestQueryRunners.pool
); );

View File

@ -31,6 +31,7 @@ import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -108,7 +109,8 @@ public class SpatialFilterTest
Arrays.asList("lat", "long") Arrays.asList("lat", "long")
) )
) )
).build() ).build(),
TestQueryRunners.pool
); );
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
@ -248,7 +250,8 @@ public class SpatialFilterTest
Arrays.asList("lat", "long") Arrays.asList("lat", "long")
) )
) )
).build() ).build(),
TestQueryRunners.pool
); );
IncrementalIndex second = new IncrementalIndex( IncrementalIndex second = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -261,7 +264,8 @@ public class SpatialFilterTest
Arrays.asList("lat", "long") Arrays.asList("lat", "long")
) )
) )
).build() ).build(),
TestQueryRunners.pool
); );
IncrementalIndex third = new IncrementalIndex( IncrementalIndex third = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -274,7 +278,8 @@ public class SpatialFilterTest
Arrays.asList("lat", "long") Arrays.asList("lat", "long")
) )
) )
).build() ).build(),
TestQueryRunners.pool
); );

View File

@ -32,6 +32,7 @@ import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -62,7 +63,8 @@ public class IncrementalIndexStorageAdapterTest
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
IncrementalIndex index = new IncrementalIndex( IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
); );
index.add( index.add(
@ -127,7 +129,8 @@ public class IncrementalIndexStorageAdapterTest
@Test @Test
public void testResetSanity() { public void testResetSanity() {
IncrementalIndex index = new IncrementalIndex( IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
); );
@ -179,7 +182,8 @@ public class IncrementalIndexStorageAdapterTest
public void testSingleValueTopN() public void testSingleValueTopN()
{ {
IncrementalIndex index = new IncrementalIndex( IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
); );
DateTime t = DateTime.now(); DateTime t = DateTime.now();
@ -234,7 +238,8 @@ public class IncrementalIndexStorageAdapterTest
public void testFilterByNull() throws Exception public void testFilterByNull() throws Exception
{ {
IncrementalIndex index = new IncrementalIndex( IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
); );
index.add( index.add(

View File

@ -33,6 +33,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.MetricsEmittingExecutorService; import io.druid.query.MetricsEmittingExecutorService;
import io.druid.query.PrioritizedExecutorService; import io.druid.query.PrioritizedExecutorService;
import io.druid.server.DruidProcessingConfig; import io.druid.server.DruidProcessingConfig;
@ -102,31 +103,8 @@ public class DruidProcessingModule implements Module
log.warn(e, e.getMessage()); log.warn(e, e.getMessage());
} }
return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes()); return new OffheapBufferPool(config.intermediateComputeSizeBytes());
} }
private static class IntermediateProcessingBufferPool extends StupidPool<ByteBuffer>
{
private static final Logger log = new Logger(IntermediateProcessingBufferPool.class);
public IntermediateProcessingBufferPool(final int computationBufferSize)
{
super(
new Supplier<ByteBuffer>()
{
final AtomicLong count = new AtomicLong(0);
@Override
public ByteBuffer get()
{
log.info(
"Allocating new intermediate processing buffer[%,d] of size[%,d]",
count.getAndIncrement(), computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
);
}
}
} }

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.offheap;
import com.google.common.base.Supplier;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
public class OffheapBufferPool extends StupidPool<ByteBuffer>
{
private static final Logger log = new Logger(OffheapBufferPool.class);
public OffheapBufferPool(final int computationBufferSize)
{
super(
new Supplier<ByteBuffer>()
{
final AtomicLong count = new AtomicLong(0);
@Override
public ByteBuffer get()
{
log.info(
"Allocating new intermediate processing buffer[%,d] of size[%,d]",
count.getAndIncrement(), computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
);
}
}

View File

@ -25,6 +25,7 @@ import com.metamx.common.Granularity;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.collections.StupidPool;
import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -34,6 +35,7 @@ import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -62,7 +64,8 @@ public class FlushingPlumber extends RealtimePlumber
ServiceEmitter emitter, ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate, QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService ExecutorService queryExecutorService,
StupidPool<ByteBuffer> bufferPool
) )
{ {
super( super(
@ -75,7 +78,8 @@ public class FlushingPlumber extends RealtimePlumber
queryExecutorService, queryExecutorService,
null, null,
null, null,
null null,
bufferPool
); );
this.flushDuration = flushDuration; this.flushDuration = flushDuration;

View File

@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.collections.StupidPool;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -35,6 +37,7 @@ import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -51,6 +54,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final QueryRunnerFactoryConglomerate conglomerate; private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
private final StupidPool<ByteBuffer> bufferPool;
@JsonCreator @JsonCreator
public FlushingPlumberSchool( public FlushingPlumberSchool(
@ -59,6 +63,8 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService, @JacksonInject @Processing ExecutorService queryExecutorService,
//TODO: define separate index pool
@JacksonInject @Global StupidPool<ByteBuffer> bufferPool,
// Backwards compatible // Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("basePersistDirectory") File basePersistDirectory,
@ -76,6 +82,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
null, null,
null, null,
queryExecutorService, queryExecutorService,
bufferPool,
windowPeriod, windowPeriod,
basePersistDirectory, basePersistDirectory,
segmentGranularity, segmentGranularity,
@ -89,6 +96,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.conglomerate = conglomerate; this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer; this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService; this.queryExecutorService = queryExecutorService;
this.bufferPool = bufferPool;
} }
@Override @Override
@ -108,7 +116,8 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
emitter, emitter,
conglomerate, conglomerate,
segmentAnnouncer, segmentAnnouncer,
queryExecutorService queryExecutorService,
bufferPool
); );
} }

View File

@ -19,10 +19,12 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.collections.StupidPool;
import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Global;
import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
@ -59,6 +61,7 @@ import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -95,6 +98,8 @@ public class RealtimePlumber implements Plumber
private volatile ExecutorService persistExecutor = null; private volatile ExecutorService persistExecutor = null;
private volatile ExecutorService mergeExecutor = null; private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null;
private final StupidPool<ByteBuffer> bufferPool;
public RealtimePlumber( public RealtimePlumber(
DataSchema schema, DataSchema schema,
@ -106,7 +111,9 @@ public class RealtimePlumber implements Plumber
ExecutorService queryExecutorService, ExecutorService queryExecutorService,
DataSegmentPusher dataSegmentPusher, DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher, SegmentPublisher segmentPublisher,
FilteredServerView serverView FilteredServerView serverView,
StupidPool<ByteBuffer> bufferPool
) )
{ {
this.schema = schema; this.schema = schema;
@ -120,6 +127,7 @@ public class RealtimePlumber implements Plumber
this.dataSegmentPusher = dataSegmentPusher; this.dataSegmentPusher = dataSegmentPusher;
this.segmentPublisher = segmentPublisher; this.segmentPublisher = segmentPublisher;
this.serverView = serverView; this.serverView = serverView;
this.bufferPool = bufferPool;
log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
} }
@ -184,7 +192,7 @@ public class RealtimePlumber implements Plumber
segmentGranularity.increment(new DateTime(truncatedTime)) segmentGranularity.increment(new DateTime(truncatedTime))
); );
retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval)); retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), bufferPool);
try { try {
segmentAnnouncer.announceSegment(retVal.getSegment()); segmentAnnouncer.announceSegment(retVal.getSegment());
@ -535,7 +543,7 @@ public class RealtimePlumber implements Plumber
); );
} }
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants, bufferPool);
sinks.put(sinkInterval.getStartMillis(), currSink); sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add( sinkTimeline.add(
currSink.getInterval(), currSink.getInterval(),

View File

@ -23,10 +23,14 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.collections.StupidPool;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -38,7 +42,9 @@ import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Period; import org.joda.time.Period;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
@ -51,6 +57,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final SegmentPublisher segmentPublisher; private final SegmentPublisher segmentPublisher;
private final FilteredServerView serverView; private final FilteredServerView serverView;
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
private final StupidPool<ByteBuffer> bufferPool;
// Backwards compatible // Backwards compatible
private final Period windowPeriod; private final Period windowPeriod;
@ -69,6 +76,8 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject SegmentPublisher segmentPublisher, @JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject FilteredServerView serverView, @JacksonInject FilteredServerView serverView,
@JacksonInject @Processing ExecutorService executorService, @JacksonInject @Processing ExecutorService executorService,
//TODO: define separate index pool
@JacksonInject @Global StupidPool<ByteBuffer> bufferPool,
// Backwards compatible // Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("basePersistDirectory") File basePersistDirectory,
@ -91,6 +100,7 @@ public class RealtimePlumberSchool implements PlumberSchool
this.versioningPolicy = versioningPolicy; this.versioningPolicy = versioningPolicy;
this.rejectionPolicyFactory = rejectionPolicyFactory; this.rejectionPolicyFactory = rejectionPolicyFactory;
this.maxPendingPersists = maxPendingPersists; this.maxPendingPersists = maxPendingPersists;
this.bufferPool = bufferPool;
} }
@Deprecated @Deprecated
@ -149,7 +159,8 @@ public class RealtimePlumberSchool implements PlumberSchool
queryExecutorService, queryExecutorService,
dataSegmentPusher, dataSegmentPusher,
segmentPublisher, segmentPublisher,
serverView serverView,
bufferPool
); );
} }
@ -162,4 +173,5 @@ public class RealtimePlumberSchool implements PlumberSchool
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action."); Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action."); Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
} }
} }

View File

@ -27,8 +27,10 @@ import com.google.common.collect.Lists;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.guice.annotations.Global;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
@ -39,6 +41,7 @@ import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -57,18 +60,23 @@ public class Sink implements Iterable<FireHydrant>
private final RealtimeTuningConfig config; private final RealtimeTuningConfig config;
private final String version; private final String version;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>(); private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
private final StupidPool<ByteBuffer> bufferPool;
public Sink( public Sink(
Interval interval, Interval interval,
DataSchema schema, DataSchema schema,
RealtimeTuningConfig config, RealtimeTuningConfig config,
String version String version,
StupidPool<ByteBuffer> bufferPool
) )
{ {
this.schema = schema; this.schema = schema;
this.config = config; this.config = config;
this.interval = interval; this.interval = interval;
this.version = version; this.version = version;
this.bufferPool = bufferPool;
makeNewCurrIndex(interval.getStartMillis(), schema); makeNewCurrIndex(interval.getStartMillis(), schema);
} }
@ -78,13 +86,15 @@ public class Sink implements Iterable<FireHydrant>
DataSchema schema, DataSchema schema,
RealtimeTuningConfig config, RealtimeTuningConfig config,
String version, String version,
List<FireHydrant> hydrants List<FireHydrant> hydrants,
StupidPool<ByteBuffer> bufferPool
) )
{ {
this.schema = schema; this.schema = schema;
this.config = config; this.config = config;
this.interval = interval; this.interval = interval;
this.version = version; this.version = version;
this.bufferPool = bufferPool;
for (int i = 0; i < hydrants.size(); ++i) { for (int i = 0; i < hydrants.size(); ++i) {
final FireHydrant hydrant = hydrants.get(i); final FireHydrant hydrant = hydrants.get(i);
@ -183,7 +193,8 @@ public class Sink implements Iterable<FireHydrant>
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
.withSpatialDimensions(schema.getParser()) .withSpatialDimensions(schema.getParser())
.withMetrics(schema.getAggregators()) .withMetrics(schema.getAggregators())
.build() .build(),
bufferPool
); );
FireHydrant old; FireHydrant old;

View File

@ -27,6 +27,7 @@ import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec; import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -73,7 +74,7 @@ public class FireDepartmentTest
new RealtimeIOConfig( new RealtimeIOConfig(
null, null,
new RealtimePlumberSchool( new RealtimePlumberSchool(
null, null, null, null, null, null, null, null, null, null, null, null, 0 null, null, null, null, null, null, null, TestQueryRunners.pool, null, null, null, null, null, 0
) )
), ),
new RealtimeTuningConfig( new RealtimeTuningConfig(

View File

@ -31,6 +31,7 @@ import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -117,7 +118,7 @@ public class RealtimeManagerTest
null, null,
null null
); );
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString(), TestQueryRunners.pool));
realtimeManager = new RealtimeManager( realtimeManager = new RealtimeManager(
Arrays.<FireDepartment>asList( Arrays.<FireDepartment>asList(

View File

@ -40,6 +40,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactory;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -144,6 +145,7 @@ public class RealtimePlumberSchoolTest
segmentPublisher, segmentPublisher,
serverView, serverView,
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
TestQueryRunners.pool,
new Period("PT10m"), new Period("PT10m"),
tmpDir, tmpDir,
Granularity.HOUR, Granularity.HOUR,

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -64,7 +65,7 @@ public class SinkTest
null, null,
null null
); );
final Sink sink = new Sink(interval, schema, tuningConfig, version); final Sink sink = new Sink(interval, schema, tuningConfig, version, TestQueryRunners.pool);
sink.add( sink.add(
new InputRow() new InputRow()