configure buffer sizes

This commit is contained in:
nishantmonu51 2014-06-12 19:32:37 +05:30
parent 5bdc4a761a
commit a7e19ad892
19 changed files with 49 additions and 80 deletions

View File

@ -617,7 +617,11 @@ public class IndexGeneratorJob implements Jobby
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
{
//TODO: review this, add a config for batch ingestion
int aggsSize = 0;
for (AggregatorFactory agg : aggs) {
aggsSize += agg.getMaxIntermediateSize();
}
int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary();
return new IncrementalIndex(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
@ -625,7 +629,7 @@ public class IndexGeneratorJob implements Jobby
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build(),
new OffheapBufferPool(1024 * 1024 * 1024)
new OffheapBufferPool(bufferSize)
);
}

View File

@ -75,7 +75,6 @@ public class TaskToolbox
private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper;
private final File taskWorkDir;
private final StupidPool<ByteBuffer> indexPool;
public TaskToolbox(
TaskConfig config,
@ -93,8 +92,7 @@ public class TaskToolbox
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper objectMapper,
final File taskWorkDir,
StupidPool<ByteBuffer> indexPool
final File taskWorkDir
)
{
this.config = config;
@ -113,7 +111,6 @@ public class TaskToolbox
this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper;
this.taskWorkDir = taskWorkDir;
this.indexPool = indexPool;
}
public TaskConfig getConfig()
@ -216,7 +213,4 @@ public class TaskToolbox
return taskWorkDir;
}
public StupidPool<ByteBuffer> getIndexPool(){
return indexPool;
}
}

View File

@ -60,7 +60,6 @@ public class TaskToolboxFactory
private final MonitorScheduler monitorScheduler;
private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper objectMapper;
private final StupidPool<ByteBuffer> bufferPool;
@Inject
public TaskToolboxFactory(
@ -96,7 +95,6 @@ public class TaskToolboxFactory
this.monitorScheduler = monitorScheduler;
this.segmentLoaderFactory = segmentLoaderFactory;
this.objectMapper = objectMapper;
this.bufferPool = bufferPool;
}
public TaskToolbox build(Task task)
@ -119,8 +117,7 @@ public class TaskToolboxFactory
monitorScheduler,
segmentLoaderFactory.manufacturate(taskWorkDir),
objectMapper,
taskWorkDir,
bufferPool
taskWorkDir
);
}
}

View File

@ -70,7 +70,6 @@ public class YeOldePlumberSchool implements PlumberSchool
private final String version;
private final DataSegmentPusher dataSegmentPusher;
private final File tmpSegmentDir;
private final StupidPool<ByteBuffer> bufferPool;
private static final Logger log = new Logger(YeOldePlumberSchool.class);
@ -79,16 +78,13 @@ public class YeOldePlumberSchool implements PlumberSchool
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher,
@JacksonInject("tmpSegmentDir") File tmpSegmentDir,
//TODO: review this global annotation
@JacksonInject @Global StupidPool<ByteBuffer> bufferPool
)
@JacksonInject("tmpSegmentDir") File tmpSegmentDir
)
{
this.interval = interval;
this.version = version;
this.dataSegmentPusher = dataSegmentPusher;
this.tmpSegmentDir = tmpSegmentDir;
this.bufferPool = bufferPool;
}
@Override
@ -105,7 +101,7 @@ public class YeOldePlumberSchool implements PlumberSchool
)
{
// There can be only one.
final Sink theSink = new Sink(interval, schema, config, version, bufferPool);
final Sink theSink = new Sink(interval, schema, config, version);
// Temporary directory to hold spilled segments.
final File persistDir = new File(tmpSegmentDir, theSink.getSegment().getIdentifier());

View File

@ -32,6 +32,7 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexableAdapter;
@ -83,7 +84,7 @@ public class DeleteTask extends AbstractFixedIntervalTask
0,
QueryGranularity.NONE,
new AggregatorFactory[0],
toolbox.getIndexPool()
new OffheapBufferPool(0)
);
try {
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);

View File

@ -385,8 +385,7 @@ public class IndexTask extends AbstractFixedIntervalTask
interval,
version,
wrappedDataSegmentPusher,
tmpDir,
toolbox.getIndexPool()
tmpDir
).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics);
// rowFlushBoundary for this job

View File

@ -308,7 +308,6 @@ public class RealtimeIndexTask extends AbstractTask
segmentPublisher,
toolbox.getNewSegmentServerView(),
toolbox.getQueryExecutorService(),
toolbox.getIndexPool(),
null,
null,
null,

View File

@ -54,20 +54,21 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
private final GroupByQueryEngine engine;
private final Supplier<GroupByQueryConfig> config;
private final GroupByQueryQueryToolChest toolChest;
private final StupidPool<ByteBuffer> bufferPool;
@Global
StupidPool<ByteBuffer> computationBufferPool;
@Inject
public GroupByQueryRunnerFactory(
GroupByQueryEngine engine,
Supplier<GroupByQueryConfig> config,
GroupByQueryQueryToolChest toolChest,
@Global StupidPool<ByteBuffer> bufferPool
@Global StupidPool<ByteBuffer> computationBufferPool
)
{
this.engine = engine;
this.config = config;
this.toolChest = toolChest;
this.bufferPool = bufferPool;
this.computationBufferPool = computationBufferPool;
}
@Override
@ -123,7 +124,13 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
)
);
} else {
return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, bufferPool, queryRunners);
return new GroupByParallelQueryRunner(
queryExecutor,
new RowOrdering(),
config,
computationBufferPool,
queryRunners
);
}
}

View File

@ -58,7 +58,6 @@ public class GreaterThanHavingSpec implements HavingSpec
public boolean eval(Row row)
{
float metricValue = row.getFloatMetric(aggregationName);
return Float.compare(metricValue, value.floatValue()) > 0;
}

View File

@ -297,6 +297,9 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
synchronized (this) {
if (!facts.containsKey(key)) {
int rowOffset = totalAggSize * numEntries.getAndIncrement();
if (rowOffset + totalAggSize > bufferHolder.get().limit()) {
throw new ISE("Buffer Full cannot add more rows current rowSize : %d", numEntries.get());
}
for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
}

View File

@ -64,8 +64,7 @@ public class FlushingPlumber extends RealtimePlumber
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
StupidPool<ByteBuffer> bufferPool
ExecutorService queryExecutorService
)
{
super(
@ -78,8 +77,7 @@ public class FlushingPlumber extends RealtimePlumber
queryExecutorService,
null,
null,
null,
bufferPool
null
);
this.flushDuration = flushDuration;

View File

@ -54,7 +54,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final StupidPool<ByteBuffer> bufferPool;
@JsonCreator
public FlushingPlumberSchool(
@ -63,8 +62,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
//TODO: define separate index pool
@JacksonInject @Global StupidPool<ByteBuffer> bufferPool,
// Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@ -82,7 +79,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
null,
null,
queryExecutorService,
bufferPool,
windowPeriod,
basePersistDirectory,
segmentGranularity,
@ -96,7 +92,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.bufferPool = bufferPool;
}
@Override
@ -116,8 +111,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
bufferPool
queryExecutorService
);
}

View File

@ -99,7 +99,6 @@ public class RealtimePlumber implements Plumber
private volatile ExecutorService persistExecutor = null;
private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
private final StupidPool<ByteBuffer> bufferPool;
public RealtimePlumber(
@ -112,9 +111,7 @@ public class RealtimePlumber implements Plumber
ExecutorService queryExecutorService,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
FilteredServerView serverView,
StupidPool<ByteBuffer> bufferPool
FilteredServerView serverView
)
{
this.schema = schema;
@ -128,7 +125,6 @@ public class RealtimePlumber implements Plumber
this.dataSegmentPusher = dataSegmentPusher;
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.bufferPool = bufferPool;
log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
}
@ -193,7 +189,7 @@ public class RealtimePlumber implements Plumber
segmentGranularity.increment(new DateTime(truncatedTime))
);
retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), bufferPool);
retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval));
try {
segmentAnnouncer.announceSegment(retVal.getSegment());
@ -544,7 +540,7 @@ public class RealtimePlumber implements Plumber
);
}
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants, bufferPool);
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
currSink.getInterval(),

View File

@ -23,14 +23,9 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.metamx.common.Granularity;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.collections.StupidPool;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema;
@ -42,9 +37,7 @@ import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Period;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
@ -57,8 +50,6 @@ public class RealtimePlumberSchool implements PlumberSchool
private final SegmentPublisher segmentPublisher;
private final FilteredServerView serverView;
private final ExecutorService queryExecutorService;
private final StupidPool<ByteBuffer> bufferPool;
// Backwards compatible
private final Period windowPeriod;
private final File basePersistDirectory;
@ -76,8 +67,6 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject FilteredServerView serverView,
@JacksonInject @Processing ExecutorService executorService,
//TODO: define separate index pool
@JacksonInject @Global StupidPool<ByteBuffer> bufferPool,
// Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@ -100,7 +89,6 @@ public class RealtimePlumberSchool implements PlumberSchool
this.versioningPolicy = versioningPolicy;
this.rejectionPolicyFactory = rejectionPolicyFactory;
this.maxPendingPersists = maxPendingPersists;
this.bufferPool = bufferPool;
}
@Deprecated
@ -159,8 +147,7 @@ public class RealtimePlumberSchool implements PlumberSchool
queryExecutorService,
dataSegmentPusher,
segmentPublisher,
serverView,
bufferPool
serverView
);
}

View File

@ -29,8 +29,7 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.guice.annotations.Global;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
@ -52,31 +51,25 @@ import java.util.concurrent.CopyOnWriteArrayList;
public class Sink implements Iterable<FireHydrant>
{
private static final Logger log = new Logger(Sink.class);
private volatile FireHydrant currHydrant;
private final Interval interval;
private final DataSchema schema;
private final RealtimeTuningConfig config;
private final String version;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
private final StupidPool<ByteBuffer> bufferPool;
private volatile FireHydrant currHydrant;
public Sink(
Interval interval,
DataSchema schema,
RealtimeTuningConfig config,
String version,
StupidPool<ByteBuffer> bufferPool
String version
)
{
this.schema = schema;
this.config = config;
this.interval = interval;
this.version = version;
this.bufferPool = bufferPool;
makeNewCurrIndex(interval.getStartMillis(), schema);
}
@ -86,15 +79,13 @@ public class Sink implements Iterable<FireHydrant>
DataSchema schema,
RealtimeTuningConfig config,
String version,
List<FireHydrant> hydrants,
StupidPool<ByteBuffer> bufferPool
List<FireHydrant> hydrants
)
{
this.schema = schema;
this.config = config;
this.interval = interval;
this.version = version;
this.bufferPool = bufferPool;
for (int i = 0; i < hydrants.size(); ++i) {
final FireHydrant hydrant = hydrants.get(i);
@ -187,6 +178,11 @@ public class Sink implements Iterable<FireHydrant>
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
int aggsSize = 0;
for (AggregatorFactory agg : schema.getAggregators()) {
aggsSize += agg.getMaxIntermediateSize();
}
int bufferSize = aggsSize * config.getMaxRowsInMemory();
IncrementalIndex newIndex = new IncrementalIndex(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
@ -194,7 +190,7 @@ public class Sink implements Iterable<FireHydrant>
.withSpatialDimensions(schema.getParser())
.withMetrics(schema.getAggregators())
.build(),
bufferPool
new OffheapBufferPool(bufferSize)
);
FireHydrant old;

View File

@ -74,7 +74,7 @@ public class FireDepartmentTest
new RealtimeIOConfig(
null,
new RealtimePlumberSchool(
null, null, null, null, null, null, null, TestQueryRunners.pool, null, null, null, null, null, 0
null, null, null, null, null, null, null, null, null, null, null, null, 0
)
),
new RealtimeTuningConfig(

View File

@ -118,7 +118,7 @@ public class RealtimeManagerTest
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString(), TestQueryRunners.pool));
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));
realtimeManager = new RealtimeManager(
Arrays.<FireDepartment>asList(

View File

@ -145,7 +145,6 @@ public class RealtimePlumberSchoolTest
segmentPublisher,
serverView,
MoreExecutors.sameThreadExecutor(),
TestQueryRunners.pool,
new Period("PT10m"),
tmpDir,
Granularity.HOUR,

View File

@ -56,7 +56,7 @@ public class SinkTest
final Interval interval = new Interval("2013-01-01/2013-01-02");
final String version = new DateTime().toString();
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
1,
100,
new Period("P1Y"),
null,
null,
@ -65,7 +65,7 @@ public class SinkTest
null,
null
);
final Sink sink = new Sink(interval, schema, tuningConfig, version, TestQueryRunners.pool);
final Sink sink = new Sink(interval, schema, tuningConfig, version);
sink.add(
new InputRow()