mirror of https://github.com/apache/druid.git
fix buffer pool usage
This commit is contained in:
parent
6e03a6245f
commit
40f223215a
|
@ -33,6 +33,7 @@ import com.metamx.common.IAE;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import io.druid.offheap.OffheapBufferPool;
|
||||
|
@ -324,8 +325,12 @@ public class IndexGeneratorJob implements Jobby
|
|||
|
||||
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
|
||||
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
|
||||
final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize();
|
||||
final int aggregationBufferSize = (int) ((double) maxTotalBufferSize
|
||||
* config.getSchema().getTuningConfig().getAggregationBufferRatio());
|
||||
|
||||
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
|
||||
final StupidPool<ByteBuffer> bufferPool = new OffheapBufferPool(aggregationBufferSize);
|
||||
IncrementalIndex index = makeIncrementalIndex(bucket, aggs, bufferPool);
|
||||
try {
|
||||
File baseFlushFile = File.createTempFile("base", "flush");
|
||||
baseFlushFile.delete();
|
||||
|
@ -363,9 +368,9 @@ public class IndexGeneratorJob implements Jobby
|
|||
|
||||
context.progress();
|
||||
persist(index, interval, file, progressIndicator);
|
||||
// close this index and make a new one
|
||||
// close this index and make a new one, reusing same buffer
|
||||
index.close();
|
||||
index = makeIncrementalIndex(bucket, aggs);
|
||||
index = makeIncrementalIndex(bucket, aggs, bufferPool);
|
||||
|
||||
startTime = System.currentTimeMillis();
|
||||
++indexCount;
|
||||
|
@ -630,7 +635,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
return numRead;
|
||||
}
|
||||
|
||||
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
|
||||
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool)
|
||||
{
|
||||
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
|
||||
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
|
||||
|
@ -640,14 +645,12 @@ public class IndexGeneratorJob implements Jobby
|
|||
.withMetrics(aggs)
|
||||
.build();
|
||||
if (tuningConfig.isIngestOffheap()) {
|
||||
final int maxTotalBufferSize = tuningConfig.getBufferSize();
|
||||
final int aggregationBufferSize = (int) ((double) maxTotalBufferSize
|
||||
* tuningConfig.getAggregationBufferRatio());
|
||||
|
||||
return new OffheapIncrementalIndex(
|
||||
indexSchema,
|
||||
new OffheapBufferPool(aggregationBufferSize),
|
||||
bufferPool,
|
||||
true,
|
||||
maxTotalBufferSize
|
||||
tuningConfig.getBufferSize()
|
||||
);
|
||||
} else {
|
||||
return new OnheapIncrementalIndex(
|
||||
|
|
Loading…
Reference in New Issue