From b40c342cd118f3b7a4393c5449b829cbe844c5c8 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sat, 23 Jan 2016 00:12:22 -0600 Subject: [PATCH] make Global stupid pool cache size configurable --- .../java/io/druid/collections/StupidPool.java | 21 +++++- docs/content/configuration/broker.md | 1 + docs/content/configuration/historical.md | 1 + .../io/druid/query/DruidProcessingConfig.java | 6 ++ .../query/DruidProcessingConfigTest.java | 66 +++++++++++++++++++ .../io/druid/guice/DruidProcessingModule.java | 2 +- .../io/druid/offheap/OffheapBufferPool.java | 5 +- 7 files changed, 97 insertions(+), 5 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java diff --git a/common/src/main/java/io/druid/collections/StupidPool.java b/common/src/main/java/io/druid/collections/StupidPool.java index 6ba14099a26..8c5937ad335 100644 --- a/common/src/main/java/io/druid/collections/StupidPool.java +++ b/common/src/main/java/io/druid/collections/StupidPool.java @@ -38,11 +38,24 @@ public class StupidPool private final Queue objects = new ConcurrentLinkedQueue<>(); + //note that this is just the max entries in the cache, pool can still create as many buffers as needed. + private final int objectsCacheMaxCount; + public StupidPool( Supplier generator ) { this.generator = generator; + this.objectsCacheMaxCount = Integer.MAX_VALUE; + } + + public StupidPool( + Supplier generator, + int objectsCacheMaxCount + ) + { + this.generator = generator; + this.objectsCacheMaxCount = objectsCacheMaxCount; } public ResourceHolder take() @@ -80,8 +93,12 @@ public class StupidPool log.warn(new ISE("Already Closed!"), "Already closed"); return; } - if (!objects.offer(object)) { - log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object); + if (objects.size() < objectsCacheMaxCount) { + if (!objects.offer(object)) { + log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object); + } + } else { + log.debug("cache num entries is exceeding max limit [%s]", objectsCacheMaxCount); } } diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 9f74731b7a9..8cc0ff031ea 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -55,6 +55,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally, |Property|Description|Default| |--------|-----------|-------| |`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)| +|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE| |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 86c6330ffb7..57264a844ea 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -53,6 +53,7 @@ Druid uses Jetty to serve HTTP requests. |Property|Description|Default| |--------|-----------|-------| |`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)| +|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE| |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| diff --git a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java index 6c99e6ec9c0..73f7b007a37 100644 --- a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java @@ -31,6 +31,12 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem return 1024 * 1024 * 1024; } + @Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"}) + public int poolCacheMaxCount() + { + return Integer.MAX_VALUE; + } + @Override @Config(value = "${base_path}.numThreads") public int getNumThreads() { diff --git a/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java b/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java new file mode 100644 index 00000000000..9d3a4edfb0b --- /dev/null +++ b/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.collect.ImmutableMap; +import com.metamx.common.config.Config; +import org.junit.Assert; +import org.junit.Test; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; + +/** + */ +public class DruidProcessingConfigTest +{ + + @Test + public void testDeserialization() throws Exception + { + ConfigurationObjectFactory factory = Config.createFactory(new Properties()); + + //with defaults + DruidProcessingConfig config = factory.build(DruidProcessingConfig.class); + + Assert.assertEquals(1024 * 1024 * 1024, config.intermediateComputeSizeBytes()); + Assert.assertEquals(Integer.MAX_VALUE, config.poolCacheMaxCount()); + Assert.assertTrue(config.getNumThreads() < Runtime.getRuntime().availableProcessors()); + Assert.assertEquals(0, config.columnCacheSizeBytes()); + Assert.assertFalse(config.isFifo()); + + //with non-defaults + Properties props = new Properties(); + props.setProperty("druid.processing.buffer.sizeBytes", "1"); + props.setProperty("druid.processing.buffer.poolCacheMaxCount", "1"); + props.setProperty("druid.processing.numThreads", "5"); + props.setProperty("druid.processing.columnCache.sizeBytes", "1"); + props.setProperty("druid.processing.fifo", "true"); + + factory = Config.createFactory(props); + config = factory.buildWithReplacements(DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); + + Assert.assertEquals(1, config.intermediateComputeSizeBytes()); + Assert.assertEquals(1, config.poolCacheMaxCount()); + Assert.assertEquals(5, config.getNumThreads()); + Assert.assertEquals(1, config.columnCacheSizeBytes()); + Assert.assertTrue(config.isFifo()); + } +} diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index aafd142a298..15fb776abc9 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -125,7 +125,7 @@ public class DruidProcessingModule implements Module log.info(e.getMessage()); } - return new OffheapBufferPool(config.intermediateComputeSizeBytes()); + return new OffheapBufferPool(config.intermediateComputeSizeBytes(), config.poolCacheMaxCount()); } diff --git a/server/src/main/java/io/druid/offheap/OffheapBufferPool.java b/server/src/main/java/io/druid/offheap/OffheapBufferPool.java index 4d3df2fc90d..aab3ffa7c18 100644 --- a/server/src/main/java/io/druid/offheap/OffheapBufferPool.java +++ b/server/src/main/java/io/druid/offheap/OffheapBufferPool.java @@ -31,7 +31,7 @@ public class OffheapBufferPool extends StupidPool { private static final Logger log = new Logger(OffheapBufferPool.class); - public OffheapBufferPool(final int computationBufferSize) + public OffheapBufferPool(final int computationBufferSize, final int cacheMaxCount) { super( new Supplier() @@ -47,7 +47,8 @@ public class OffheapBufferPool extends StupidPool ); return ByteBuffer.allocateDirect(computationBufferSize); } - } + }, + cacheMaxCount ); } }