Allow configurable temp directory for query processing (#3893)

This commit is contained in:
Jonathan Wei 2017-02-02 10:22:28 -08:00 committed by Fangjin Yang
parent a73f1c9c70
commit 182261f713
8 changed files with 24 additions and 6 deletions

View File

@ -61,6 +61,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can

View File

@ -60,6 +60,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can

View File

@ -44,6 +44,7 @@ The realtime node uses several of the global configs in [Configuration](../confi
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can

View File

@ -63,4 +63,9 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
{
return false;
}
@Config(value = "${base_path}.tmpDir")
public String getTmpDir() {
return System.getProperty("java.io.tmpdir");
}
}

View File

@ -81,6 +81,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
private final int concurrencyHint;
private final BlockingPool<ByteBuffer> mergeBufferPool;
private final ObjectMapper spillMapper;
private final String processingTmpDir;
public GroupByMergingQueryRunnerV2(
GroupByQueryConfig config,
@ -89,7 +90,8 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
Iterable<QueryRunner<Row>> queryables,
int concurrencyHint,
BlockingPool<ByteBuffer> mergeBufferPool,
ObjectMapper spillMapper
ObjectMapper spillMapper,
String processingTmpDir
)
{
this.config = config;
@ -99,6 +101,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
this.concurrencyHint = concurrencyHint;
this.mergeBufferPool = mergeBufferPool;
this.spillMapper = spillMapper;
this.processingTmpDir = processingTmpDir;
}
@Override
@ -130,7 +133,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
}
final File temporaryStorageDirectory = new File(
System.getProperty("java.io.tmpdir"),
processingTmpDir,
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
);

View File

@ -67,7 +67,8 @@ public class GroupByRowProcessor
final Map<String, ValueType> rowSignature,
final GroupByQueryConfig config,
final BlockingPool<ByteBuffer> mergeBufferPool,
final ObjectMapper spillMapper
final ObjectMapper spillMapper,
final String processingTmpDir
)
{
final GroupByQuery query = (GroupByQuery) queryParam;
@ -78,8 +79,9 @@ public class GroupByRowProcessor
aggregatorFactories[i] = query.getAggregatorSpecs().get(i);
}
final File temporaryStorageDirectory = new File(
System.getProperty("java.io.tmpdir"),
processingTmpDir,
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
);

View File

@ -211,7 +211,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
GroupByQueryHelper.rowSignatureFor(subquery),
configSupplier.get(),
mergeBufferPool,
spillMapper
spillMapper,
processingConfig.getTmpDir()
);
return mergeResults(new QueryRunner<Row>()
{
@ -236,7 +237,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
queryRunners,
processingConfig.getNumThreads(),
mergeBufferPool,
spillMapper
spillMapper,
processingConfig.getTmpDir()
);
}

View File

@ -49,6 +49,7 @@ public class DruidProcessingConfigTest
}
Assert.assertEquals(0, config.columnCacheSizeBytes());
Assert.assertFalse(config.isFifo());
Assert.assertEquals(System.getProperty("java.io.tmpdir"), config.getTmpDir());
//with non-defaults
Properties props = new Properties();
@ -57,6 +58,7 @@ public class DruidProcessingConfigTest
props.setProperty("druid.processing.numThreads", "5");
props.setProperty("druid.processing.columnCache.sizeBytes", "1");
props.setProperty("druid.processing.fifo", "true");
props.setProperty("druid.processing.tmpDir", "/test/path");
factory = Config.createFactory(props);
config = factory.buildWithReplacements(DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing"));
@ -66,5 +68,6 @@ public class DruidProcessingConfigTest
Assert.assertEquals(5, config.getNumThreads());
Assert.assertEquals(1, config.columnCacheSizeBytes());
Assert.assertTrue(config.isFifo());
Assert.assertEquals("/test/path", config.getTmpDir());
}
}