From 4dc0fdba8a20975076a89bc36b1c475bc0cd2afc Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 3 Dec 2014 23:47:30 +0530 Subject: [PATCH] consider mapped size in limit calculation & review comments --- .../io/druid/indexer/HadoopTuningConfig.java | 24 +++++++-------- .../io/druid/indexer/IndexGeneratorJob.java | 3 +- .../query/groupby/GroupByQueryHelper.java | 3 +- .../incremental/OffheapIncrementalIndex.java | 30 +++++++++++++------ .../test/java/io/druid/segment/TestIndex.java | 3 +- .../segment/data/IncrementalIndexTest.java | 3 +- .../IncrementalIndexStorageAdapterTest.java | 3 +- .../druid/segment/realtime/plumber/Sink.java | 4 ++- 8 files changed, 46 insertions(+), 27 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 4bfb95c4347..2cc6d5996ff 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -36,19 +36,19 @@ import java.util.Map; @JsonTypeName("hadoop") public class HadoopTuningConfig implements TuningConfig { - private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); - private static final Map> defaultShardSpecs = ImmutableMap.>of(); - private static final int defaultRowFlushBoundary = 80000; - private static final int defaultBufferSize = 128 * 1024 * 1024; + private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); + private static final Map> DEFAULT_SHARD_SPECS = ImmutableMap.>of(); + private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; + private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024; public static HadoopTuningConfig makeDefaultTuningConfig() { return new HadoopTuningConfig( null, new DateTime().toString(), - defaultPartitionsSpec, - defaultShardSpecs, - defaultRowFlushBoundary, + DEFAULT_PARTITIONS_SPEC, + DEFAULT_SHARD_SPECS, + DEFAULT_ROW_FLUSH_BOUNDARY, false, true, false, @@ -57,7 +57,7 @@ public class HadoopTuningConfig implements TuningConfig false, false, false, - defaultBufferSize + DEFAULT_BUFFER_SIZE ); } @@ -96,9 +96,9 @@ public class HadoopTuningConfig implements TuningConfig { this.workingPath = workingPath == null ? null : workingPath; this.version = version == null ? new DateTime().toString() : version; - this.partitionsSpec = partitionsSpec == null ? defaultPartitionsSpec : partitionsSpec; - this.shardSpecs = shardSpecs == null ? defaultShardSpecs : shardSpecs; - this.rowFlushBoundary = rowFlushBoundary == null ? defaultRowFlushBoundary : rowFlushBoundary; + this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec; + this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs; + this.rowFlushBoundary = rowFlushBoundary == null ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary; this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure; this.overwriteFiles = overwriteFiles; @@ -109,7 +109,7 @@ public class HadoopTuningConfig implements TuningConfig this.combineText = combineText; this.persistInHeap = persistInHeap; this.ingestOffheap = ingestOffheap; - this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize; + this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize; } @JsonProperty diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 3ca12e131b3..a334fc8b729 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -642,7 +642,8 @@ public class IndexGeneratorJob implements Jobby return new OffheapIncrementalIndex( indexSchema, new OffheapBufferPool(tuningConfig.getBufferSize()), - true + true, + tuningConfig.getBufferSize() ); } else { return new OnheapIncrementalIndex( diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 2d2bdd4d06a..350e1903ebc 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -84,7 +84,8 @@ public class GroupByQueryHelper gran, aggs.toArray(new AggregatorFactory[aggs.size()]), bufferPool, - false + false, + Integer.MAX_VALUE ); } else { index = new OnheapIncrementalIndex( diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 1e79dcc0b9a..9eaa1265162 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -31,6 +31,7 @@ import org.mapdb.BTreeKeySerializer; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; +import org.mapdb.Store; import java.io.DataInput; import java.io.DataOutput; @@ -57,11 +58,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex private final int[] aggPositionOffsets; private final int totalAggSize; private final ConcurrentNavigableMap facts; + private final int sizeLimit; public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, StupidPool bufferPool, - boolean deserializeComplexMetrics + boolean deserializeComplexMetrics, + int sizeLimit ) { super(incrementalIndexSchema, deserializeComplexMetrics); @@ -91,6 +94,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex .comparator(timeAndDimsSerializer.getComparator()) .valueSerializer(Serializer.INTEGER) .make(); + this.sizeLimit = sizeLimit; } public OffheapIncrementalIndex( @@ -98,7 +102,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex QueryGranularity gran, final AggregatorFactory[] metrics, StupidPool bufferPool, - boolean deserializeComplexMetrics + boolean deserializeComplexMetrics, + int sizeLimit ) { this( @@ -107,7 +112,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), bufferPool, - deserializeComplexMetrics + deserializeComplexMetrics, + sizeLimit ); } @@ -220,12 +226,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex } /** - - * @return true if the underlying buffer for IncrementalIndex is full and cannot accommodate more rows. - - */ - public boolean isFull() - { - return (size() + 1) * totalAggSize > bufferHolder.get().limit(); - } + * - * @return true if the underlying buffer for IncrementalIndex is full and cannot accommodate more rows. + * - + */ + public boolean isFull() + { + return (size() + 1) * totalAggSize > bufferHolder.get().limit() || getCurrentSize() > sizeLimit ; + } private int getMetricPosition(int rowOffset, int metricIndex) { @@ -416,4 +423,9 @@ public class OffheapIncrementalIndex extends IncrementalIndex return s1.equals(s2); } } + + private long getCurrentSize() + { + return Store.forDB(db).getCurrSize() + Store.forDB(factsDb).getCurrSize() + bufferHolder.get().limit(); + } } diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index ef271f92e0a..66edb2c7be5 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -166,7 +166,8 @@ public class TestIndex retVal = new OffheapIncrementalIndex( schema, TestQueryRunners.pool, - true + true, + 100 * 1024 * 1024 ); } else { retVal = new OnheapIncrementalIndex( diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 81b6803c578..2b11b7b3132 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -102,7 +102,8 @@ public class IncrementalIndexTest QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, TestQueryRunners.pool, - true + true, + 100 * 1024 * 1024 ); } else { return new OnheapIncrementalIndex( diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 46f72335459..35c0413d7f7 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -107,7 +107,8 @@ public class IncrementalIndexStorageAdapterTest QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, TestQueryRunners.pool, - true + true, + 100 * 1024 * 1024 ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index a07196fda1e..6df37ea529f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -187,8 +187,10 @@ public class Sink implements Iterable if (config.isIngestOffheap()) { newIndex = new OffheapIncrementalIndex( indexSchema, + // Assuming half space for aggregates new OffheapBufferPool(config.getBufferSize()), - true + true, + config.getBufferSize() ); } else { newIndex = new OnheapIncrementalIndex(