ConcurrentGrouper: Add mergeThreadLocal option, fix bug around the switch to spilling. (#12513)

* ConcurrentGrouper: Add option to always slice up merge buffers thread-locally.

Normally, the ConcurrentGrouper shares merge buffers across processing
threads until spilling starts, and then switches to a thread-local model.
This minimizes memory use and reduces likelihood of spilling, which is
good, but it creates thread contention. The new mergeThreadLocal option
causes a query to start in thread-local mode immediately, and allows us
to experiment with the relative performance of the two modes.

* Fix grammar in docs.

* Fix race in ConcurrentGrouper.

* Fix issue with timeouts.

* Remove unused import.

* Add "tradeoff" to dictionary.
This commit is contained in:
Gian Merlino 2022-05-21 10:28:54 -07:00 committed by GitHub
parent 5073cee73f
commit 37853f8de4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 26 deletions

View File

@ -441,6 +441,7 @@ Supported query contexts:
|`forceHashAggregation`|Overrides the value of `druid.query.groupBy.forceHashAggregation`|None|
|`intermediateCombineDegree`|Overrides the value of `druid.query.groupBy.intermediateCombineDegree`|None|
|`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None|
|`mergeThreadLocal`|Whether merge buffers should always be split into thread-local buffers. Setting this to `true` reduces thread contention, but uses memory less efficiently. This tradeoff is beneficial when memory is plentiful. |false|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false|
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false|
|`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan. This context value can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true|

View File

@ -48,6 +48,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
private static final String CTX_KEY_MERGE_THREAD_LOCAL = "mergeThreadLocal";
@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
@ -102,6 +103,9 @@ public class GroupByQueryConfig
@JsonProperty
private int numParallelCombineThreads = 1;
@JsonProperty
private boolean mergeThreadLocal = false;
@JsonProperty
private boolean vectorize = true;
@ -201,6 +205,11 @@ public class GroupByQueryConfig
return numParallelCombineThreads;
}
public boolean isMergeThreadLocal()
{
return mergeThreadLocal;
}
public boolean isVectorize()
{
return vectorize;
@ -282,6 +291,7 @@ public class GroupByQueryConfig
CTX_KEY_NUM_PARALLEL_COMBINE_THREADS,
getNumParallelCombineThreads()
);
newConfig.mergeThreadLocal = query.getContextBoolean(CTX_KEY_MERGE_THREAD_LOCAL, isMergeThreadLocal());
newConfig.vectorize = query.getContextBoolean(QueryContexts.VECTORIZE_KEY, isVectorize());
newConfig.enableMultiValueUnnesting = query.getContextBoolean(
CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING,

View File

@ -93,6 +93,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
private final long maxDictionarySizeForCombiner;
@Nullable
private final ParallelCombiner<KeyType> parallelCombiner;
private final boolean mergeThreadLocal;
private volatile boolean initialized = false;
@ -135,7 +136,8 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
hasQueryTimeout,
queryTimeoutAt,
groupByQueryConfig.getIntermediateCombineDegree(),
groupByQueryConfig.getNumParallelCombineThreads()
groupByQueryConfig.getNumParallelCombineThreads(),
groupByQueryConfig.isMergeThreadLocal()
);
}
@ -159,7 +161,8 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
final boolean hasQueryTimeout,
final long queryTimeoutAt,
final int intermediateCombineDegree,
final int numParallelCombineThreads
final int numParallelCombineThreads,
final boolean mergeThreadLocal
)
{
Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0");
@ -207,6 +210,8 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
} else {
this.parallelCombiner = null;
}
this.mergeThreadLocal = mergeThreadLocal;
}
@Override
@ -237,6 +242,10 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
);
grouper.init();
groupers.add(grouper);
if (mergeThreadLocal) {
grouper.setSpillingAllowed(true);
}
}
initialized = true;
@ -262,31 +271,48 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
throw new ISE("Grouper is closed");
}
if (!spilling) {
final SpillingGrouper<KeyType> hashBasedGrouper = groupers.get(grouperNumberForKeyHash(keyHash));
final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
if (mergeThreadLocal) {
// Always thread-local grouping: expect to get more memory use, but no thread contention.
return tlGrouper.aggregate(key, keyHash);
} else if (spilling) {
// Switch to thread-local grouping after spilling starts. No thread contention.
synchronized (tlGrouper) {
tlGrouper.setSpillingAllowed(true);
return tlGrouper.aggregate(key, keyHash);
}
} else {
// Use keyHash to find a grouper prior to spilling.
// There is potential here for thread contention, but it reduces memory use.
final SpillingGrouper<KeyType> subGrouper = groupers.get(grouperNumberForKeyHash(keyHash));
synchronized (subGrouper) {
if (subGrouper.isSpillingAllowed() && subGrouper != tlGrouper) {
// Another thread already started treating this grouper as its thread-local grouper. So, switch to ours.
// Fall through to release the lock on subGrouper and do the aggregation with tlGrouper.
} else {
final AggregateResult aggregateResult = subGrouper.aggregate(key, keyHash);
synchronized (hashBasedGrouper) {
if (!spilling) {
final AggregateResult aggregateResult = hashBasedGrouper.aggregate(key, keyHash);
if (aggregateResult.isOk()) {
return AggregateResult.ok();
} else {
// Expecting all-or-nothing behavior.
assert aggregateResult.getCount() == 0;
spilling = true;
}
}
}
}
// At this point we know spilling = true
final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
// Fall through to release the lock on subGrouper and do the aggregation with tlGrouper.
}
}
}
synchronized (tlGrouper) {
assert spilling;
tlGrouper.setSpillingAllowed(true);
return tlGrouper.aggregate(key, keyHash);
}
}
}
@Override
public void reset()
@ -318,7 +344,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
getGroupersIterator(sorted);
if (sorted) {
final boolean fullyCombined = !spilling;
final boolean fullyCombined = !spilling && !mergeThreadLocal;
// Parallel combine is used only when data is not fully merged.
if (!fullyCombined && parallelCombiner != null) {
@ -398,8 +424,16 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
ListenableFuture<List<CloseableIterator<Entry<KeyType>>>> future = Futures.allAsList(futures);
try {
if (!hasQueryTimeout) {
return future.get();
} else {
final long timeout = queryTimeoutAt - System.currentTimeMillis();
return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
if (timeout > 0) {
return future.get(timeout, TimeUnit.MILLISECONDS);
} else {
throw new TimeoutException();
}
}
}
catch (InterruptedException | CancellationException e) {
GuavaUtils.cancelAll(true, future, futures);

View File

@ -241,6 +241,11 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
return new ArrayList<>(mergedDictionary);
}
public boolean isSpillingAllowed()
{
return spillingAllowed;
}
public void setSpillingAllowed(final boolean spillingAllowed)
{
this.spillingAllowed = spillingAllowed;

View File

@ -28,6 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryTimeoutException;
@ -61,7 +62,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@ -78,10 +78,11 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
private final Supplier<ByteBuffer> bufferSupplier;
private final int concurrencyHint;
private final int parallelCombineThreads;
private final ExecutorService exec = Executors.newFixedThreadPool(8);
private final ExecutorService exec;
private final boolean mergeThreadLocal;
private final Closer closer = Closer.create();
@Parameters(name = "bufferSize={0}, concurrencyHint={1}, parallelCombineThreads={2}")
@Parameters(name = "bufferSize={0}, concurrencyHint={1}, parallelCombineThreads={2}, mergeThreadLocal={3}")
public static Collection<Object[]> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
@ -89,8 +90,10 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
for (final int bufferSize : new int[]{1024, 1024 * 32, 1024 * 1024}) {
for (final int concurrencyHint : new int[]{1, 8}) {
for (final int parallelCombineThreads : new int[]{0, 8}) {
for (final boolean mergeThreadLocal : new boolean[]{true, false}) {
if (parallelCombineThreads <= concurrencyHint) {
constructors.add(new Object[]{bufferSize, concurrencyHint, parallelCombineThreads});
constructors.add(new Object[]{bufferSize, concurrencyHint, parallelCombineThreads, mergeThreadLocal});
}
}
}
}
@ -115,11 +118,13 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
public ConcurrentGrouperTest(
int bufferSize,
int concurrencyHint,
int parallelCombineThreads
int parallelCombineThreads,
boolean mergeThreadLocal
)
{
this.concurrencyHint = concurrencyHint;
this.parallelCombineThreads = parallelCombineThreads;
this.mergeThreadLocal = mergeThreadLocal;
this.bufferSupplier = new Supplier<ByteBuffer>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
@ -135,6 +140,7 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
return buffer;
}
};
this.exec = Execs.multiThreaded(concurrencyHint, "ConcurrentGrouperTest-%d");
}
@Test()
@ -165,7 +171,8 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
false,
0,
4,
parallelCombineThreads
parallelCombineThreads,
mergeThreadLocal
);
closer.register(grouper);
grouper.init();
@ -195,7 +202,7 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
final CloseableIterator<Entry<LongKey>> iterator = closer.register(grouper.iterator(true));
if (parallelCombineThreads > 1 && temporaryStorage.currentSize() > 0) {
if (parallelCombineThreads > 1 && (mergeThreadLocal || temporaryStorage.currentSize() > 0)) {
// Parallel combiner configured, and expected to actually be used due to thread-local merge (either explicitly
// configured, or due to spilling).
Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
@ -234,7 +241,8 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
true,
1,
4,
parallelCombineThreads
parallelCombineThreads,
mergeThreadLocal
);
closer.register(grouper);
grouper.init();

View File

@ -1580,6 +1580,7 @@ pre-existing
pushdown
row1
subtotalsSpec
tradeoff
unnested
unnesting
- ../docs/querying/having.md