groupBy v2: Configurable load factor. (#3437)

Also change defaults:

- bufferGrouperMaxLoadFactor from 0.75 to 0.7.
- maxMergingDictionarySize to 100MB from 25MB, should be more appropriate
  for most heaps.
This commit is contained in:
Gian Merlino 2016-09-07 12:14:59 -07:00 committed by Himanshu
parent 4f0bcdce36
commit 1e3f94237e
8 changed files with 43 additions and 12 deletions

View File

@ -172,8 +172,9 @@ When using the "v2" strategy, the following runtime properties apply:
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1| |`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1|
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to -1 to use a reasonable default.|-1| |`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|25000000| |`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| |`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that
@ -203,4 +204,5 @@ When using the "v2" strategy, the following query context parameters apply:
|--------|-----------| |--------|-----------|
|`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.| |`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.|
|`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.| |`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.|
|`bufferGrouperMaxLoadFactor`|Overrides the value of `druid.query.groupBy.bufferGrouperMaxLoadFactor` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| |`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|

View File

@ -31,6 +31,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private static final String CTX_KEY_MAX_RESULTS = "maxResults"; private static final String CTX_KEY_MAX_RESULTS = "maxResults";
private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets"; private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets";
private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor";
private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage"; private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
@ -51,11 +52,14 @@ public class GroupByQueryConfig
private int bufferGrouperMaxSize = Integer.MAX_VALUE; private int bufferGrouperMaxSize = Integer.MAX_VALUE;
@JsonProperty @JsonProperty
private int bufferGrouperInitialBuckets = -1; private float bufferGrouperMaxLoadFactor = 0;
@JsonProperty
private int bufferGrouperInitialBuckets = 0;
@JsonProperty @JsonProperty
// Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk // Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk
private long maxMergingDictionarySize = 25_000_000L; private long maxMergingDictionarySize = 100_000_000L;
@JsonProperty @JsonProperty
// Max on-disk temporary storage, per-query; when exceeded, the query fails // Max on-disk temporary storage, per-query; when exceeded, the query fails
@ -101,6 +105,11 @@ public class GroupByQueryConfig
return bufferGrouperMaxSize; return bufferGrouperMaxSize;
} }
public float getBufferGrouperMaxLoadFactor()
{
return bufferGrouperMaxLoadFactor;
}
public int getBufferGrouperInitialBuckets() public int getBufferGrouperInitialBuckets()
{ {
return bufferGrouperInitialBuckets; return bufferGrouperInitialBuckets;
@ -129,14 +138,18 @@ public class GroupByQueryConfig
query.getContextValue(CTX_KEY_MAX_RESULTS, getMaxResults()), query.getContextValue(CTX_KEY_MAX_RESULTS, getMaxResults()),
getMaxResults() getMaxResults()
); );
newConfig.bufferGrouperInitialBuckets = query.getContextValue(
CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS,
getBufferGrouperInitialBuckets()
);
newConfig.bufferGrouperMaxSize = Math.min( newConfig.bufferGrouperMaxSize = Math.min(
query.getContextValue(CTX_KEY_BUFFER_GROUPER_MAX_SIZE, getBufferGrouperMaxSize()), query.getContextValue(CTX_KEY_BUFFER_GROUPER_MAX_SIZE, getBufferGrouperMaxSize()),
getBufferGrouperMaxSize() getBufferGrouperMaxSize()
); );
newConfig.bufferGrouperMaxLoadFactor = query.getContextValue(
CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR,
getBufferGrouperMaxLoadFactor()
);
newConfig.bufferGrouperInitialBuckets = query.getContextValue(
CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS,
getBufferGrouperInitialBuckets()
);
newConfig.maxOnDiskStorage = Math.min( newConfig.maxOnDiskStorage = Math.min(
((Number)query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(), ((Number)query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(),
getMaxOnDiskStorage() getMaxOnDiskStorage()

View File

@ -62,7 +62,7 @@ public class BufferGrouper<KeyType extends Comparable<KeyType>> implements Group
private static final int MIN_INITIAL_BUCKETS = 4; private static final int MIN_INITIAL_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024; private static final int DEFAULT_INITIAL_BUCKETS = 1024;
private static final float MAX_LOAD_FACTOR = 0.75f; private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
private static final int HASH_SIZE = Ints.BYTES; private static final int HASH_SIZE = Ints.BYTES;
private final ByteBuffer buffer; private final ByteBuffer buffer;
@ -74,6 +74,7 @@ public class BufferGrouper<KeyType extends Comparable<KeyType>> implements Group
private final int bucketSize; private final int bucketSize;
private final int tableArenaSize; private final int tableArenaSize;
private final int bufferGrouperMaxSize; // Integer.MAX_VALUE in production, only used for unit tests private final int bufferGrouperMaxSize; // Integer.MAX_VALUE in production, only used for unit tests
private final float maxLoadFactor;
// Buffer pointing to the current table (it moves around as the table grows) // Buffer pointing to the current table (it moves around as the table grows)
private ByteBuffer tableBuffer; private ByteBuffer tableBuffer;
@ -96,6 +97,7 @@ public class BufferGrouper<KeyType extends Comparable<KeyType>> implements Group
final ColumnSelectorFactory columnSelectorFactory, final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories, final AggregatorFactory[] aggregatorFactories,
final int bufferGrouperMaxSize, final int bufferGrouperMaxSize,
final float maxLoadFactor,
final int initialBuckets final int initialBuckets
) )
{ {
@ -105,8 +107,13 @@ public class BufferGrouper<KeyType extends Comparable<KeyType>> implements Group
this.aggregators = new BufferAggregator[aggregatorFactories.length]; this.aggregators = new BufferAggregator[aggregatorFactories.length];
this.aggregatorOffsets = new int[aggregatorFactories.length]; this.aggregatorOffsets = new int[aggregatorFactories.length];
this.bufferGrouperMaxSize = bufferGrouperMaxSize; this.bufferGrouperMaxSize = bufferGrouperMaxSize;
this.maxLoadFactor = maxLoadFactor > 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR;
this.initialBuckets = initialBuckets > 0 ? Math.max(MIN_INITIAL_BUCKETS, initialBuckets) : DEFAULT_INITIAL_BUCKETS; this.initialBuckets = initialBuckets > 0 ? Math.max(MIN_INITIAL_BUCKETS, initialBuckets) : DEFAULT_INITIAL_BUCKETS;
if (this.maxLoadFactor >= 1.0f) {
throw new IAE("Invalid maxLoadFactor[%f], must be < 1.0", maxLoadFactor);
}
int offset = HASH_SIZE + keySize; int offset = HASH_SIZE + keySize;
for (int i = 0; i < aggregatorFactories.length; i++) { for (int i = 0; i < aggregatorFactories.length; i++) {
aggregators[i] = aggregatorFactories[i].factorizeBuffered(columnSelectorFactory); aggregators[i] = aggregatorFactories[i].factorizeBuffered(columnSelectorFactory);
@ -434,9 +441,9 @@ public class BufferGrouper<KeyType extends Comparable<KeyType>> implements Group
} }
} }
private static int maxSizeForBuckets(int buckets) private int maxSizeForBuckets(int buckets)
{ {
return Math.max(1, (int) (buckets * MAX_LOAD_FACTOR)); return Math.max(1, (int) (buckets * maxLoadFactor));
} }
/** /**

View File

@ -46,6 +46,7 @@ public class ConcurrentGrouper<KeyType extends Comparable<KeyType>> implements G
final LimitedTemporaryStorage temporaryStorage, final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper, final ObjectMapper spillMapper,
final int bufferGrouperMaxSize, final int bufferGrouperMaxSize,
final float bufferGrouperMaxLoadFactor,
final int bufferGrouperInitialBuckets, final int bufferGrouperInitialBuckets,
final KeySerdeFactory<KeyType> keySerdeFactory, final KeySerdeFactory<KeyType> keySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory, final ColumnSelectorFactory columnSelectorFactory,
@ -68,6 +69,7 @@ public class ConcurrentGrouper<KeyType extends Comparable<KeyType>> implements G
temporaryStorage, temporaryStorage,
spillMapper, spillMapper,
bufferGrouperMaxSize, bufferGrouperMaxSize,
bufferGrouperMaxLoadFactor,
bufferGrouperInitialBuckets bufferGrouperInitialBuckets
) )
); );

View File

@ -214,6 +214,7 @@ public class GroupByQueryEngineV2
query.getAggregatorSpecs() query.getAggregatorSpecs()
.toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]), .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]),
querySpecificConfig.getBufferGrouperMaxSize(), querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets() querySpecificConfig.getBufferGrouperInitialBuckets()
); );

View File

@ -97,6 +97,7 @@ public class RowBasedGrouperHelper
temporaryStorage, temporaryStorage,
spillMapper, spillMapper,
querySpecificConfig.getBufferGrouperMaxSize(), querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets() querySpecificConfig.getBufferGrouperInitialBuckets()
); );
} else { } else {
@ -106,6 +107,7 @@ public class RowBasedGrouperHelper
temporaryStorage, temporaryStorage,
spillMapper, spillMapper,
querySpecificConfig.getBufferGrouperMaxSize(), querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets(), querySpecificConfig.getBufferGrouperInitialBuckets(),
keySerdeFactory, keySerdeFactory,
columnSelectorFactory, columnSelectorFactory,

View File

@ -69,6 +69,7 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
final LimitedTemporaryStorage temporaryStorage, final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper, final ObjectMapper spillMapper,
final int bufferGrouperMaxSize, final int bufferGrouperMaxSize,
final float bufferGrouperMaxLoadFactor,
final int bufferGrouperInitialBuckets final int bufferGrouperInitialBuckets
) )
{ {
@ -79,6 +80,7 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
columnSelectorFactory, columnSelectorFactory,
aggregatorFactories, aggregatorFactories,
bufferGrouperMaxSize, bufferGrouperMaxSize,
bufferGrouperMaxLoadFactor,
bufferGrouperInitialBuckets bufferGrouperInitialBuckets
); );
this.aggregatorFactories = aggregatorFactories; this.aggregatorFactories = aggregatorFactories;

View File

@ -50,7 +50,8 @@ public class BufferGrouperTest
new CountAggregatorFactory("count") new CountAggregatorFactory("count")
}, },
Integer.MAX_VALUE, Integer.MAX_VALUE,
-1 0,
0
); );
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L)));
@ -156,6 +157,7 @@ public class BufferGrouperTest
new CountAggregatorFactory("count") new CountAggregatorFactory("count")
}, },
Integer.MAX_VALUE, Integer.MAX_VALUE,
0.75f,
initialBuckets initialBuckets
); );
} }