mirror of https://github.com/apache/druid.git
Fix dictionarySize overrides in tests (#15354)
I think this is a problem as it discards the false return value when the putToKeyBuffer can't store the value because of the limit Not forwarding the return value at that point may lead to the normal continuation here regardless something was not added to the dictionary like here
This commit is contained in:
parent
a929b9f16e
commit
eb056e23b5
|
@ -378,6 +378,8 @@ Supported query contexts:
|
||||||
|`forceHashAggregation`|Overrides the value of `druid.query.groupBy.forceHashAggregation`|None|
|
|`forceHashAggregation`|Overrides the value of `druid.query.groupBy.forceHashAggregation`|None|
|
||||||
|`intermediateCombineDegree`|Overrides the value of `druid.query.groupBy.intermediateCombineDegree`|None|
|
|`intermediateCombineDegree`|Overrides the value of `druid.query.groupBy.intermediateCombineDegree`|None|
|
||||||
|`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None|
|
|`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None|
|
||||||
|
|`maxSelectorDictionarySize`|Overrides the value of `druid.query.groupBy.maxMergingDictionarySize`|None|
|
||||||
|
|`maxMergingDictionarySize`|Overrides the value of `druid.query.groupBy.maxMergingDictionarySize`|None|
|
||||||
|`mergeThreadLocal`|Whether merge buffers should always be split into thread-local buffers. Setting this to `true` reduces thread contention, but uses memory less efficiently. This tradeoff is beneficial when memory is plentiful. |false|
|
|`mergeThreadLocal`|Whether merge buffers should always be split into thread-local buffers. Setting this to `true` reduces thread contention, but uses memory less efficiently. This tradeoff is beneficial when memory is plentiful. |false|
|
||||||
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false|
|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false|
|
||||||
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false|
|
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false|
|
||||||
|
|
|
@ -217,6 +217,11 @@ public class QueryContext
|
||||||
return QueryContexts.getAsHumanReadableBytes(key, get(key), defaultValue);
|
return QueryContexts.getAsHumanReadableBytes(key, get(key), defaultValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HumanReadableBytes getHumanReadableBytes(final String key, final long defaultBytes)
|
||||||
|
{
|
||||||
|
return QueryContexts.getAsHumanReadableBytes(key, get(key), HumanReadableBytes.valueOf(defaultBytes));
|
||||||
|
}
|
||||||
|
|
||||||
public <E extends Enum<E>> E getEnum(String key, Class<E> clazz, E defaultValue)
|
public <E extends Enum<E>> E getEnum(String key, Class<E> clazz, E defaultValue)
|
||||||
{
|
{
|
||||||
return QueryContexts.getAsEnum(key, get(key), clazz, defaultValue);
|
return QueryContexts.getAsEnum(key, get(key), clazz, defaultValue);
|
||||||
|
|
|
@ -48,6 +48,8 @@ public class GroupByQueryConfig
|
||||||
private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets";
|
private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets";
|
||||||
private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor";
|
private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor";
|
||||||
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
|
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
|
||||||
|
private static final String CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE = "maxSelectorDictionarySize";
|
||||||
|
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
|
||||||
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
|
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
|
||||||
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
|
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
|
||||||
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
|
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
|
||||||
|
@ -164,7 +166,7 @@ public class GroupByQueryConfig
|
||||||
*/
|
*/
|
||||||
long getActualMaxSelectorDictionarySize(final long maxHeapSize, final int numConcurrentQueries)
|
long getActualMaxSelectorDictionarySize(final long maxHeapSize, final int numConcurrentQueries)
|
||||||
{
|
{
|
||||||
if (maxSelectorDictionarySize.getBytes() == AUTOMATIC) {
|
if (getConfiguredMaxSelectorDictionarySize() == AUTOMATIC) {
|
||||||
final long heapForDictionaries = (long) (maxHeapSize * SELECTOR_DICTIONARY_HEAP_FRACTION);
|
final long heapForDictionaries = (long) (maxHeapSize * SELECTOR_DICTIONARY_HEAP_FRACTION);
|
||||||
|
|
||||||
return Math.max(
|
return Math.max(
|
||||||
|
@ -175,7 +177,7 @@ public class GroupByQueryConfig
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
return maxSelectorDictionarySize.getBytes();
|
return getConfiguredMaxSelectorDictionarySize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,8 +323,13 @@ public class GroupByQueryConfig
|
||||||
getMaxOnDiskStorage().getBytes()
|
getMaxOnDiskStorage().getBytes()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
newConfig.maxSelectorDictionarySize = maxSelectorDictionarySize; // No overrides
|
|
||||||
newConfig.maxMergingDictionarySize = maxMergingDictionarySize; // No overrides
|
newConfig.maxSelectorDictionarySize = queryContext
|
||||||
|
.getHumanReadableBytes(CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE, getConfiguredMaxSelectorDictionarySize());
|
||||||
|
|
||||||
|
newConfig.maxMergingDictionarySize = queryContext
|
||||||
|
.getHumanReadableBytes(CTX_KEY_MAX_MERGING_DICTIONARY_SIZE, getConfiguredMaxMergingDictionarySize());
|
||||||
|
|
||||||
newConfig.forcePushDownLimit = queryContext.getBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
|
newConfig.forcePushDownLimit = queryContext.getBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
|
||||||
newConfig.applyLimitPushDownToSegment = queryContext.getBoolean(
|
newConfig.applyLimitPushDownToSegment = queryContext.getBoolean(
|
||||||
CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT,
|
CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT,
|
||||||
|
|
|
@ -1921,8 +1921,7 @@ public class RowBasedGrouperHelper
|
||||||
} else {
|
} else {
|
||||||
keyBuffer.put(NullHandling.IS_NOT_NULL_BYTE);
|
keyBuffer.put(NullHandling.IS_NOT_NULL_BYTE);
|
||||||
}
|
}
|
||||||
delegate.putToKeyBuffer(key, idx);
|
return delegate.putToKeyBuffer(key, idx);
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.query.groupby.epinephelinae;
|
||||||
import org.apache.druid.query.groupby.epinephelinae.Grouper.BufferComparator;
|
import org.apache.druid.query.groupby.epinephelinae.Grouper.BufferComparator;
|
||||||
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
|
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
|
||||||
|
|
||||||
|
import javax.annotation.CheckReturnValue;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
interface RowBasedKeySerdeHelper
|
interface RowBasedKeySerdeHelper
|
||||||
|
@ -43,6 +44,7 @@ interface RowBasedKeySerdeHelper
|
||||||
*
|
*
|
||||||
* @return true if the value was added to the key, false otherwise
|
* @return true if the value was added to the key, false otherwise
|
||||||
*/
|
*/
|
||||||
|
@CheckReturnValue
|
||||||
boolean putToKeyBuffer(RowBasedKey key, int idx);
|
boolean putToKeyBuffer(RowBasedKey key, int idx);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.druid.query.groupby.epinephelinae.Grouper;
|
||||||
import org.apache.druid.query.ordering.StringComparator;
|
import org.apache.druid.query.ordering.StringComparator;
|
||||||
import org.apache.druid.segment.ColumnValueSelector;
|
import org.apache.druid.segment.ColumnValueSelector;
|
||||||
|
|
||||||
|
import javax.annotation.CheckReturnValue;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
@ -89,6 +90,7 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
|
||||||
* @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if
|
* @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if
|
||||||
* memory did not increase as a result of this operation. Will not be negative.
|
* memory did not increase as a result of this operation. Will not be negative.
|
||||||
*/
|
*/
|
||||||
|
@CheckReturnValue
|
||||||
int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess);
|
int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -144,6 +146,7 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
|
||||||
* @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if
|
* @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if
|
||||||
* memory did not increase as a result of this operation. Will not be negative.
|
* memory did not increase as a result of this operation. Will not be negative.
|
||||||
*/
|
*/
|
||||||
|
@CheckReturnValue
|
||||||
int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer);
|
int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -102,8 +102,8 @@ public class GroupByQueryConfigTest
|
||||||
Assert.assertEquals(true, config2.isSingleThreaded());
|
Assert.assertEquals(true, config2.isSingleThreaded());
|
||||||
Assert.assertEquals(1, config2.getBufferGrouperInitialBuckets());
|
Assert.assertEquals(1, config2.getBufferGrouperInitialBuckets());
|
||||||
Assert.assertEquals(3_000_000, config2.getMaxOnDiskStorage().getBytes());
|
Assert.assertEquals(3_000_000, config2.getMaxOnDiskStorage().getBytes());
|
||||||
Assert.assertEquals(5 /* Can't override */, config2.getConfiguredMaxSelectorDictionarySize());
|
Assert.assertEquals(3, config2.getConfiguredMaxSelectorDictionarySize());
|
||||||
Assert.assertEquals(6_000_000 /* Can't override */, config2.getConfiguredMaxMergingDictionarySize());
|
Assert.assertEquals(4, config2.getConfiguredMaxMergingDictionarySize());
|
||||||
Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
|
Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
|
||||||
Assert.assertTrue(config2.isApplyLimitPushDownToSegment());
|
Assert.assertTrue(config2.isApplyLimitPushDownToSegment());
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,8 +60,8 @@ class DruidSqlValidator extends BaseDruidSqlValidator
|
||||||
if (!plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) {
|
if (!plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) {
|
||||||
throw buildCalciteContextException(
|
throw buildCalciteContextException(
|
||||||
StringUtils.format(
|
StringUtils.format(
|
||||||
"The query contains window functions; To run these window functions, enable [%s] in query context.",
|
"The query contains window functions; To run these window functions, specify [%s] in query context.",
|
||||||
EngineFeature.WINDOW_FUNCTIONS),
|
PlannerContext.CTX_ENABLE_WINDOW_FNS),
|
||||||
call);
|
call);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14396,7 +14396,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
||||||
.sql("SELECT dim1,ROW_NUMBER() OVER () from druid.foo")
|
.sql("SELECT dim1,ROW_NUMBER() OVER () from druid.foo")
|
||||||
.run());
|
.run());
|
||||||
|
|
||||||
assertThat(e, invalidSqlIs("The query contains window functions; To run these window functions, enable [WINDOW_FUNCTIONS] in query context. (line [1], column [13])"));
|
assertThat(e, invalidSqlIs("The query contains window functions; To run these window functions, specify [enableWindowing] in query context. (line [1], column [13])"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue