From 61c7fd2e6e73d7f993e5e0f82a3b59db09d9579b Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 30 Sep 2014 15:30:02 +0530 Subject: [PATCH] make ingestOffheap tuneable --- .../io/druid/indexer/HadoopIngestionSpec.java | 1 + .../io/druid/indexer/HadoopTuningConfig.java | 20 ++++++++++++++---- .../io/druid/indexer/IndexGeneratorJob.java | 5 +++-- .../druid/indexing/common/task/IndexTask.java | 2 +- .../common/task/RealtimeIndexTask.java | 1 + .../segment/incremental/IncrementalIndex.java | 16 ++++++++------ .../indexing/RealtimeTuningConfig.java | 21 +++++++++++++++---- .../segment/realtime/FireDepartment.java | 1 + .../druid/segment/realtime/plumber/Sink.java | 2 +- .../segment/realtime/FireDepartmentTest.java | 2 +- .../segment/realtime/RealtimeManagerTest.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 1 + .../segment/realtime/plumber/SinkTest.java | 1 + 13 files changed, 55 insertions(+), 19 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index 028f97d7db5..fec163e4164 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -166,6 +166,7 @@ public class HadoopIngestionSpec extends IngestionSpec jobProperties; private final boolean combineText; private final boolean persistInHeap; + private final boolean ingestOffheap; @JsonCreator public HadoopTuningConfig( @@ -84,7 +86,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("jobProperties") Map jobProperties, final @JsonProperty("combineText") boolean combineText, - final @JsonProperty("persistInHeap") boolean persistInHeap + final @JsonProperty("persistInHeap") boolean persistInHeap, + final @JsonProperty("ingestOffheap") boolean ingestOffheap ) { this.workingPath = workingPath == null ? null : workingPath; @@ -101,6 +104,7 @@ public class HadoopTuningConfig implements TuningConfig : ImmutableMap.copyOf(jobProperties)); this.combineText = combineText; this.persistInHeap = persistInHeap; + this.ingestOffheap = ingestOffheap; } @JsonProperty @@ -175,6 +179,11 @@ public class HadoopTuningConfig implements TuningConfig return persistInHeap; } + @JsonProperty + public boolean isIngestOffheap(){ + return ingestOffheap; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -189,7 +198,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -207,7 +217,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -225,7 +236,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index bf1abde54c7..74ac61e637c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -634,7 +634,8 @@ public class IndexGeneratorJob implements Jobby for (AggregatorFactory agg : aggs) { aggsSize += agg.getMaxIntermediateSize(); } - int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary(); + final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); + int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary(); return new IncrementalIndex( new IncrementalIndexSchema.Builder() .withMinTimestamp(theBucket.time.getMillis()) @@ -643,7 +644,7 @@ public class IndexGeneratorJob implements Jobby .withMetrics(aggs) .build(), new OffheapBufferPool(bufferSize), - false + tuningConfig.isIngestOffheap() ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index c477cb5d153..058b3fd027e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -403,7 +403,7 @@ public class IndexTask extends AbstractFixedIntervalTask tmpDir ).findPlumber( schema, - new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null), + new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null), metrics ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index d073d88dc32..906e8e7a901 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -144,6 +144,7 @@ public class RealtimeIndexTask extends AbstractTask rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy, maxPendingPersists, spec.getShardSpec(), + false, false ), null, null, null, null diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index e0f47513c05..329e91f02ea 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -57,6 +57,7 @@ import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; import org.mapdb.BTreeKeySerializer; +import org.mapdb.CC; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; @@ -326,17 +327,20 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - this.useOffheap = true; + this.useOffheap = useOffheap; if (this.useOffheap) { - final DBMaker dbMaker = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable(); + final DBMaker dbMaker = DBMaker.newMemoryDirectDB() + .transactionDisable() + .asyncWriteEnable() + .cacheSoftRefEnable(); db = dbMaker.make(); factsDb = dbMaker.make(); final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) - .keySerializer(timeAndDimsSerializer) - .comparator(timeAndDimsSerializer.getComparator()) - .valueSerializer(Serializer.INTEGER) - .make(); + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); } else { db = null; factsDb = null; diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index a4d60bfe77a..fa0c37b2cfd 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -45,6 +45,8 @@ public class RealtimeTuningConfig implements TuningConfig private static final int defaultMaxPendingPersists = 0; private static final ShardSpec defaultShardSpec = new NoneShardSpec(); private static final boolean defaultPersistInHeap = false; + private static final boolean defaultIngestOffheap = false; + // Might make sense for this to be a builder public static RealtimeTuningConfig makeDefaultTuningConfig() @@ -58,7 +60,8 @@ public class RealtimeTuningConfig implements TuningConfig defaultRejectionPolicyFactory, defaultMaxPendingPersists, defaultShardSpec, - defaultPersistInHeap + defaultPersistInHeap, + defaultIngestOffheap ); } @@ -71,6 +74,7 @@ public class RealtimeTuningConfig implements TuningConfig private final int maxPendingPersists; private final ShardSpec shardSpec; private final boolean persistInHeap; + private final boolean ingestOffheap; @JsonCreator public RealtimeTuningConfig( @@ -82,7 +86,8 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, - @JsonProperty("persistInHeap") Boolean persistInHeap + @JsonProperty("persistInHeap") Boolean persistInHeap, + @JsonProperty("ingestOffheap") Boolean ingestOffheap ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -98,6 +103,7 @@ public class RealtimeTuningConfig implements TuningConfig this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap; + this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap; } @JsonProperty @@ -154,6 +160,11 @@ public class RealtimeTuningConfig implements TuningConfig return persistInHeap; } + @JsonProperty + public boolean isIngestOffheap(){ + return ingestOffheap; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -165,7 +176,8 @@ public class RealtimeTuningConfig implements TuningConfig rejectionPolicyFactory, maxPendingPersists, shardSpec, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -180,7 +192,8 @@ public class RealtimeTuningConfig implements TuningConfig rejectionPolicyFactory, maxPendingPersists, shardSpec, - persistInHeap + persistInHeap, + ingestOffheap ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index 6851d693af5..3e2211f51fd 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -97,6 +97,7 @@ public class FireDepartment extends IngestionSpec .withMetrics(schema.getAggregators()) .build(), new OffheapBufferPool(bufferSize), - false + config.isIngestOffheap() ); FireHydrant old; diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 4f9cb51ecc9..549574b238e 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -77,7 +77,7 @@ public class FireDepartmentTest ) ), new RealtimeTuningConfig( - null, null, null, null, null, null, null, null, false + null, null, null, null, null, null, null, null, false, false ), null, null, null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index d8c86386b8e..b6619b35f38 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -117,6 +117,7 @@ public class RealtimeManagerTest null, null, null, + null, null ); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index c120d31451e..c037d5b2b13 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -163,6 +163,7 @@ public class RealtimePlumberSchoolTest rejectionPolicy, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 227f753b114..8fed5962f54 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -64,6 +64,7 @@ public class SinkTest null, null, null, + false, false ); final Sink sink = new Sink(interval, schema, tuningConfig, version);