diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 4da79a9912e..aa8f62656bf 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -61,6 +61,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally, |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| |`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`| +|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| The amount of direct memory needed by Druid is at least `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index b8d2645e217..9d539b50695 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -60,6 +60,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| |`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`| +|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| The amount of direct memory needed by Druid is at least `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can diff --git a/docs/content/configuration/realtime.md b/docs/content/configuration/realtime.md index a1e18b3dadf..28475bae5fa 100644 --- a/docs/content/configuration/realtime.md +++ b/docs/content/configuration/realtime.md @@ -44,6 +44,7 @@ The realtime node uses several of the global configs in [Configuration](../confi |`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| +|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| The amount of direct memory needed by Druid is at least `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can diff --git a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java index d4308c8ab11..ee7520131be 100644 --- a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java @@ -63,4 +63,9 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem { return false; } + + @Config(value = "${base_path}.tmpDir") + public String getTmpDir() { + return System.getProperty("java.io.tmpdir"); + } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 885186852d6..a6deacf1495 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -81,6 +81,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private final int concurrencyHint; private final BlockingPool mergeBufferPool; private final ObjectMapper spillMapper; + private final String processingTmpDir; public GroupByMergingQueryRunnerV2( GroupByQueryConfig config, @@ -89,7 +90,8 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner Iterable> queryables, int concurrencyHint, BlockingPool mergeBufferPool, - ObjectMapper spillMapper + ObjectMapper spillMapper, + String processingTmpDir ) { this.config = config; @@ -99,6 +101,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner this.concurrencyHint = concurrencyHint; this.mergeBufferPool = mergeBufferPool; this.spillMapper = spillMapper; + this.processingTmpDir = processingTmpDir; } @Override @@ -130,7 +133,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner } final File temporaryStorageDirectory = new File( - System.getProperty("java.io.tmpdir"), + processingTmpDir, String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 3eab688f2e7..b3821767228 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -67,7 +67,8 @@ public class GroupByRowProcessor final Map rowSignature, final GroupByQueryConfig config, final BlockingPool mergeBufferPool, - final ObjectMapper spillMapper + final ObjectMapper spillMapper, + final String processingTmpDir ) { final GroupByQuery query = (GroupByQuery) queryParam; @@ -78,8 +79,9 @@ public class GroupByRowProcessor aggregatorFactories[i] = query.getAggregatorSpecs().get(i); } + final File temporaryStorageDirectory = new File( - System.getProperty("java.io.tmpdir"), + processingTmpDir, String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index b17323fadc9..95df89b550a 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -211,7 +211,8 @@ public class GroupByStrategyV2 implements GroupByStrategy GroupByQueryHelper.rowSignatureFor(subquery), configSupplier.get(), mergeBufferPool, - spillMapper + spillMapper, + processingConfig.getTmpDir() ); return mergeResults(new QueryRunner() { @@ -236,7 +237,8 @@ public class GroupByStrategyV2 implements GroupByStrategy queryRunners, processingConfig.getNumThreads(), mergeBufferPool, - spillMapper + spillMapper, + processingConfig.getTmpDir() ); } diff --git a/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java b/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java index 1f937e7dc5c..3f4c8206f58 100644 --- a/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java +++ b/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java @@ -49,6 +49,7 @@ public class DruidProcessingConfigTest } Assert.assertEquals(0, config.columnCacheSizeBytes()); Assert.assertFalse(config.isFifo()); + Assert.assertEquals(System.getProperty("java.io.tmpdir"), config.getTmpDir()); //with non-defaults Properties props = new Properties(); @@ -57,6 +58,7 @@ public class DruidProcessingConfigTest props.setProperty("druid.processing.numThreads", "5"); props.setProperty("druid.processing.columnCache.sizeBytes", "1"); props.setProperty("druid.processing.fifo", "true"); + props.setProperty("druid.processing.tmpDir", "/test/path"); factory = Config.createFactory(props); config = factory.buildWithReplacements(DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); @@ -66,5 +68,6 @@ public class DruidProcessingConfigTest Assert.assertEquals(5, config.getNumThreads()); Assert.assertEquals(1, config.columnCacheSizeBytes()); Assert.assertTrue(config.isFifo()); + Assert.assertEquals("/test/path", config.getTmpDir()); } }