From a7e19ad892884996b77afc35e85cdc3ea7bfe9ae Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 12 Jun 2014 19:32:37 +0530 Subject: [PATCH] configure buffer sizes --- .../io/druid/indexer/IndexGeneratorJob.java | 8 +++++-- .../io/druid/indexing/common/TaskToolbox.java | 8 +------ .../indexing/common/TaskToolboxFactory.java | 5 +--- .../common/index/YeOldePlumberSchool.java | 10 +++----- .../indexing/common/task/DeleteTask.java | 3 ++- .../druid/indexing/common/task/IndexTask.java | 3 +-- .../common/task/RealtimeIndexTask.java | 1 - .../groupby/GroupByQueryRunnerFactory.java | 15 ++++++++---- .../groupby/having/GreaterThanHavingSpec.java | 1 - .../segment/incremental/IncrementalIndex.java | 3 +++ .../realtime/plumber/FlushingPlumber.java | 6 ++--- .../plumber/FlushingPlumberSchool.java | 8 +------ .../realtime/plumber/RealtimePlumber.java | 10 +++----- .../plumber/RealtimePlumberSchool.java | 15 +----------- .../druid/segment/realtime/plumber/Sink.java | 24 ++++++++----------- .../segment/realtime/FireDepartmentTest.java | 2 +- .../segment/realtime/RealtimeManagerTest.java | 2 +- .../plumber/RealtimePlumberSchoolTest.java | 1 - .../segment/realtime/plumber/SinkTest.java | 4 ++-- 19 files changed, 49 insertions(+), 80 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index d867dc8fe9a..cb15480da03 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -617,7 +617,11 @@ public class IndexGeneratorJob implements Jobby private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) { - //TODO: review this, add a config for batch ingestion + int aggsSize = 0; + for (AggregatorFactory agg : aggs) { + aggsSize += agg.getMaxIntermediateSize(); + } + int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary(); return new IncrementalIndex( new IncrementalIndexSchema.Builder() .withMinTimestamp(theBucket.time.getMillis()) @@ -625,7 +629,7 @@ public class IndexGeneratorJob implements Jobby .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withMetrics(aggs) .build(), - new OffheapBufferPool(1024 * 1024 * 1024) + new OffheapBufferPool(bufferSize) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 944c5b19ea1..0b19d9fd5e4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -75,7 +75,6 @@ public class TaskToolbox private final SegmentLoader segmentLoader; private final ObjectMapper objectMapper; private final File taskWorkDir; - private final StupidPool indexPool; public TaskToolbox( TaskConfig config, @@ -93,8 +92,7 @@ public class TaskToolbox MonitorScheduler monitorScheduler, SegmentLoader segmentLoader, ObjectMapper objectMapper, - final File taskWorkDir, - StupidPool indexPool + final File taskWorkDir ) { this.config = config; @@ -113,7 +111,6 @@ public class TaskToolbox this.segmentLoader = segmentLoader; this.objectMapper = objectMapper; this.taskWorkDir = taskWorkDir; - this.indexPool = indexPool; } public TaskConfig getConfig() @@ -216,7 +213,4 @@ public class TaskToolbox return taskWorkDir; } - public StupidPool getIndexPool(){ - return indexPool; - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index b950959b473..fd9365dc05d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -60,7 +60,6 @@ public class TaskToolboxFactory private final MonitorScheduler monitorScheduler; private final SegmentLoaderFactory segmentLoaderFactory; private final ObjectMapper objectMapper; - private final StupidPool bufferPool; @Inject public TaskToolboxFactory( @@ -96,7 +95,6 @@ public class TaskToolboxFactory this.monitorScheduler = monitorScheduler; this.segmentLoaderFactory = segmentLoaderFactory; this.objectMapper = objectMapper; - this.bufferPool = bufferPool; } public TaskToolbox build(Task task) @@ -119,8 +117,7 @@ public class TaskToolboxFactory monitorScheduler, segmentLoaderFactory.manufacturate(taskWorkDir), objectMapper, - taskWorkDir, - bufferPool + taskWorkDir ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index de3aa720219..9ed0fc1e062 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -70,7 +70,6 @@ public class YeOldePlumberSchool implements PlumberSchool private final String version; private final DataSegmentPusher dataSegmentPusher; private final File tmpSegmentDir; - private final StupidPool bufferPool; private static final Logger log = new Logger(YeOldePlumberSchool.class); @@ -79,16 +78,13 @@ public class YeOldePlumberSchool implements PlumberSchool @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, @JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher, - @JacksonInject("tmpSegmentDir") File tmpSegmentDir, - //TODO: review this global annotation - @JacksonInject @Global StupidPool bufferPool - ) + @JacksonInject("tmpSegmentDir") File tmpSegmentDir + ) { this.interval = interval; this.version = version; this.dataSegmentPusher = dataSegmentPusher; this.tmpSegmentDir = tmpSegmentDir; - this.bufferPool = bufferPool; } @Override @@ -105,7 +101,7 @@ public class YeOldePlumberSchool implements PlumberSchool ) { // There can be only one. - final Sink theSink = new Sink(interval, schema, config, version, bufferPool); + final Sink theSink = new Sink(interval, schema, config, version); // Temporary directory to hold spilled segments. final File persistDir = new File(tmpSegmentDir, theSink.getSegment().getIdentifier()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java index 90d6eff2775..d5995f9756f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java @@ -32,6 +32,7 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentInsertAction; +import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexMerger; import io.druid.segment.IndexableAdapter; @@ -83,7 +84,7 @@ public class DeleteTask extends AbstractFixedIntervalTask 0, QueryGranularity.NONE, new AggregatorFactory[0], - toolbox.getIndexPool() + new OffheapBufferPool(0) ); try { final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index d4cd8ea0bb2..32565f1d51a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -385,8 +385,7 @@ public class IndexTask extends AbstractFixedIntervalTask interval, version, wrappedDataSegmentPusher, - tmpDir, - toolbox.getIndexPool() + tmpDir ).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics); // rowFlushBoundary for this job diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index bfc6b35fc3f..e0fc20d1216 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -308,7 +308,6 @@ public class RealtimeIndexTask extends AbstractTask segmentPublisher, toolbox.getNewSegmentServerView(), toolbox.getQueryExecutorService(), - toolbox.getIndexPool(), null, null, null, diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index a498a162965..e603a1997f4 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -54,20 +54,21 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory config; private final GroupByQueryQueryToolChest toolChest; - private final StupidPool bufferPool; + @Global + StupidPool computationBufferPool; @Inject public GroupByQueryRunnerFactory( GroupByQueryEngine engine, Supplier config, GroupByQueryQueryToolChest toolChest, - @Global StupidPool bufferPool + @Global StupidPool computationBufferPool ) { this.engine = engine; this.config = config; this.toolChest = toolChest; - this.bufferPool = bufferPool; + this.computationBufferPool = computationBufferPool; } @Override @@ -123,7 +124,13 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory 0; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 52106ae8a0c..fdf772db72a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -297,6 +297,9 @@ public class IncrementalIndex implements Iterable, Closeable synchronized (this) { if (!facts.containsKey(key)) { int rowOffset = totalAggSize * numEntries.getAndIncrement(); + if (rowOffset + totalAggSize > bufferHolder.get().limit()) { + throw new ISE("Buffer Full cannot add more rows current rowSize : %d", numEntries.get()); + } for (int i = 0; i < aggs.length; i++) { aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i)); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index 30babfd57ec..9fc23be052f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -64,8 +64,7 @@ public class FlushingPlumber extends RealtimePlumber ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, - ExecutorService queryExecutorService, - StupidPool bufferPool + ExecutorService queryExecutorService ) { super( @@ -78,8 +77,7 @@ public class FlushingPlumber extends RealtimePlumber queryExecutorService, null, null, - null, - bufferPool + null ); this.flushDuration = flushDuration; diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index f7de3ff21bd..e5a57461f97 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -54,7 +54,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool private final QueryRunnerFactoryConglomerate conglomerate; private final DataSegmentAnnouncer segmentAnnouncer; private final ExecutorService queryExecutorService; - private final StupidPool bufferPool; @JsonCreator public FlushingPlumberSchool( @@ -63,8 +62,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool @JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject @Processing ExecutorService queryExecutorService, - //TODO: define separate index pool - @JacksonInject @Global StupidPool bufferPool, // Backwards compatible @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -82,7 +79,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool null, null, queryExecutorService, - bufferPool, windowPeriod, basePersistDirectory, segmentGranularity, @@ -96,7 +92,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool this.conglomerate = conglomerate; this.segmentAnnouncer = segmentAnnouncer; this.queryExecutorService = queryExecutorService; - this.bufferPool = bufferPool; } @Override @@ -116,8 +111,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool emitter, conglomerate, segmentAnnouncer, - queryExecutorService, - bufferPool + queryExecutorService ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 26b4cc45509..337149aadac 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -99,7 +99,6 @@ public class RealtimePlumber implements Plumber private volatile ExecutorService persistExecutor = null; private volatile ExecutorService mergeExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null; - private final StupidPool bufferPool; public RealtimePlumber( @@ -112,9 +111,7 @@ public class RealtimePlumber implements Plumber ExecutorService queryExecutorService, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, - FilteredServerView serverView, - StupidPool bufferPool - + FilteredServerView serverView ) { this.schema = schema; @@ -128,7 +125,6 @@ public class RealtimePlumber implements Plumber this.dataSegmentPusher = dataSegmentPusher; this.segmentPublisher = segmentPublisher; this.serverView = serverView; - this.bufferPool = bufferPool; log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); } @@ -193,7 +189,7 @@ public class RealtimePlumber implements Plumber segmentGranularity.increment(new DateTime(truncatedTime)) ); - retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), bufferPool); + retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval)); try { segmentAnnouncer.announceSegment(retVal.getSegment()); @@ -544,7 +540,7 @@ public class RealtimePlumber implements Plumber ); } - Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants, bufferPool); + Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); sinks.put(sinkInterval.getStartMillis(), currSink); sinkTimeline.add( currSink.getInterval(), diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 2b24fcf4279..11ebf015b11 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -23,14 +23,9 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import com.metamx.common.Granularity; -import com.metamx.common.logger.Logger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.FilteredServerView; -import io.druid.client.ServerView; -import io.druid.collections.StupidPool; -import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.indexing.DataSchema; @@ -42,9 +37,7 @@ import io.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Period; import java.io.File; -import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; /** */ @@ -57,8 +50,6 @@ public class RealtimePlumberSchool implements PlumberSchool private final SegmentPublisher segmentPublisher; private final FilteredServerView serverView; private final ExecutorService queryExecutorService; - private final StupidPool bufferPool; - // Backwards compatible private final Period windowPeriod; private final File basePersistDirectory; @@ -76,8 +67,6 @@ public class RealtimePlumberSchool implements PlumberSchool @JacksonInject SegmentPublisher segmentPublisher, @JacksonInject FilteredServerView serverView, @JacksonInject @Processing ExecutorService executorService, - //TODO: define separate index pool - @JacksonInject @Global StupidPool bufferPool, // Backwards compatible @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -100,7 +89,6 @@ public class RealtimePlumberSchool implements PlumberSchool this.versioningPolicy = versioningPolicy; this.rejectionPolicyFactory = rejectionPolicyFactory; this.maxPendingPersists = maxPendingPersists; - this.bufferPool = bufferPool; } @Deprecated @@ -159,8 +147,7 @@ public class RealtimePlumberSchool implements PlumberSchool queryExecutorService, dataSegmentPusher, segmentPublisher, - serverView, - bufferPool + serverView ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 4dc1edf61b8..57db2201d87 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -29,8 +29,7 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.SpatialDimensionSchema; -import io.druid.guice.annotations.Global; +import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; @@ -52,31 +51,25 @@ import java.util.concurrent.CopyOnWriteArrayList; public class Sink implements Iterable { private static final Logger log = new Logger(Sink.class); - - private volatile FireHydrant currHydrant; - private final Interval interval; private final DataSchema schema; private final RealtimeTuningConfig config; private final String version; private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); - private final StupidPool bufferPool; + private volatile FireHydrant currHydrant; public Sink( Interval interval, DataSchema schema, RealtimeTuningConfig config, - String version, - StupidPool bufferPool - + String version ) { this.schema = schema; this.config = config; this.interval = interval; this.version = version; - this.bufferPool = bufferPool; makeNewCurrIndex(interval.getStartMillis(), schema); } @@ -86,15 +79,13 @@ public class Sink implements Iterable DataSchema schema, RealtimeTuningConfig config, String version, - List hydrants, - StupidPool bufferPool + List hydrants ) { this.schema = schema; this.config = config; this.interval = interval; this.version = version; - this.bufferPool = bufferPool; for (int i = 0; i < hydrants.size(); ++i) { final FireHydrant hydrant = hydrants.get(i); @@ -187,6 +178,11 @@ public class Sink implements Iterable private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) { + int aggsSize = 0; + for (AggregatorFactory agg : schema.getAggregators()) { + aggsSize += agg.getMaxIntermediateSize(); + } + int bufferSize = aggsSize * config.getMaxRowsInMemory(); IncrementalIndex newIndex = new IncrementalIndex( new IncrementalIndexSchema.Builder() .withMinTimestamp(minTimestamp) @@ -194,7 +190,7 @@ public class Sink implements Iterable .withSpatialDimensions(schema.getParser()) .withMetrics(schema.getAggregators()) .build(), - bufferPool + new OffheapBufferPool(bufferSize) ); FireHydrant old; diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 1566013194c..5817ed6a9ac 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -74,7 +74,7 @@ public class FireDepartmentTest new RealtimeIOConfig( null, new RealtimePlumberSchool( - null, null, null, null, null, null, null, TestQueryRunners.pool, null, null, null, null, null, 0 + null, null, null, null, null, null, null, null, null, null, null, null, 0 ) ), new RealtimeTuningConfig( diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 4a1f692a3f4..7abe823594e 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -118,7 +118,7 @@ public class RealtimeManagerTest null, null ); - plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString(), TestQueryRunners.pool)); + plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); realtimeManager = new RealtimeManager( Arrays.asList( diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index f2a060ea1cc..bab7a549b52 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -145,7 +145,6 @@ public class RealtimePlumberSchoolTest segmentPublisher, serverView, MoreExecutors.sameThreadExecutor(), - TestQueryRunners.pool, new Period("PT10m"), tmpDir, Granularity.HOUR, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index e50ac4403dc..0e84660f682 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -56,7 +56,7 @@ public class SinkTest final Interval interval = new Interval("2013-01-01/2013-01-02"); final String version = new DateTime().toString(); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( - 1, + 100, new Period("P1Y"), null, null, @@ -65,7 +65,7 @@ public class SinkTest null, null ); - final Sink sink = new Sink(interval, schema, tuningConfig, version, TestQueryRunners.pool); + final Sink sink = new Sink(interval, schema, tuningConfig, version); sink.add( new InputRow()