diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 3980c18de56..bc8ca44dd33 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -33,6 +33,7 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.logger.Logger; +import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; import io.druid.offheap.OffheapBufferPool; @@ -324,8 +325,12 @@ public class IndexGeneratorJob implements Jobby final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators(); + final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize(); + final int aggregationBufferSize = (int) ((double) maxTotalBufferSize + * config.getSchema().getTuningConfig().getAggregationBufferRatio()); - IncrementalIndex index = makeIncrementalIndex(bucket, aggs); + final StupidPool bufferPool = new OffheapBufferPool(aggregationBufferSize); + IncrementalIndex index = makeIncrementalIndex(bucket, aggs, bufferPool); try { File baseFlushFile = File.createTempFile("base", "flush"); baseFlushFile.delete(); @@ -363,9 +368,9 @@ public class IndexGeneratorJob implements Jobby context.progress(); persist(index, interval, file, progressIndicator); - // close this index and make a new one + // close this index and make a new one, reusing same buffer index.close(); - index = makeIncrementalIndex(bucket, aggs); + index = makeIncrementalIndex(bucket, aggs, bufferPool); startTime = System.currentTimeMillis(); ++indexCount; @@ -630,7 +635,7 @@ public class IndexGeneratorJob implements Jobby return numRead; } - private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) + private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool) { final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() @@ -640,14 +645,12 @@ public class IndexGeneratorJob implements Jobby .withMetrics(aggs) .build(); if (tuningConfig.isIngestOffheap()) { - final int maxTotalBufferSize = tuningConfig.getBufferSize(); - final int aggregationBufferSize = (int) ((double) maxTotalBufferSize - * tuningConfig.getAggregationBufferRatio()); + return new OffheapIncrementalIndex( indexSchema, - new OffheapBufferPool(aggregationBufferSize), + bufferPool, true, - maxTotalBufferSize + tuningConfig.getBufferSize() ); } else { return new OnheapIncrementalIndex(