From 37853f8de4ba801a5f6c429ccceeb58b58762109 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 21 May 2022 10:28:54 -0700 Subject: [PATCH] ConcurrentGrouper: Add mergeThreadLocal option, fix bug around the switch to spilling. (#12513) * ConcurrentGrouper: Add option to always slice up merge buffers thread-locally. Normally, the ConcurrentGrouper shares merge buffers across processing threads until spilling starts, and then switches to a thread-local model. This minimizes memory use and reduces likelihood of spilling, which is good, but it creates thread contention. The new mergeThreadLocal option causes a query to start in thread-local mode immediately, and allows us to experiment with the relative performance of the two modes. * Fix grammar in docs. * Fix race in ConcurrentGrouper. * Fix issue with timeouts. * Remove unused import. * Add "tradeoff" to dictionary. --- docs/querying/groupbyquery.md | 1 + .../query/groupby/GroupByQueryConfig.java | 10 +++ .../epinephelinae/ConcurrentGrouper.java | 68 ++++++++++++++----- .../epinephelinae/SpillingGrouper.java | 5 ++ .../epinephelinae/ConcurrentGrouperTest.java | 26 ++++--- website/.spelling | 1 + 6 files changed, 85 insertions(+), 26 deletions(-) diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md index 9554f1e86ef..41a52f53477 100644 --- a/docs/querying/groupbyquery.md +++ b/docs/querying/groupbyquery.md @@ -441,6 +441,7 @@ Supported query contexts: |`forceHashAggregation`|Overrides the value of `druid.query.groupBy.forceHashAggregation`|None| |`intermediateCombineDegree`|Overrides the value of `druid.query.groupBy.intermediateCombineDegree`|None| |`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None| +|`mergeThreadLocal`|Whether merge buffers should always be split into thread-local buffers. Setting this to `true` reduces thread contention, but uses memory less efficiently. This tradeoff is beneficial when memory is plentiful. |false| |`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false| |`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false| |`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan. This context value can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true| diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index bba64333915..3518c0e4fb4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -48,6 +48,7 @@ public class GroupByQueryConfig private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation"; private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree"; private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads"; + private static final String CTX_KEY_MERGE_THREAD_LOCAL = "mergeThreadLocal"; @JsonProperty private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2; @@ -102,6 +103,9 @@ public class GroupByQueryConfig @JsonProperty private int numParallelCombineThreads = 1; + @JsonProperty + private boolean mergeThreadLocal = false; + @JsonProperty private boolean vectorize = true; @@ -201,6 +205,11 @@ public class GroupByQueryConfig return numParallelCombineThreads; } + public boolean isMergeThreadLocal() + { + return mergeThreadLocal; + } + public boolean isVectorize() { return vectorize; @@ -282,6 +291,7 @@ public class GroupByQueryConfig CTX_KEY_NUM_PARALLEL_COMBINE_THREADS, getNumParallelCombineThreads() ); + newConfig.mergeThreadLocal = query.getContextBoolean(CTX_KEY_MERGE_THREAD_LOCAL, isMergeThreadLocal()); newConfig.vectorize = query.getContextBoolean(QueryContexts.VECTORIZE_KEY, isVectorize()); newConfig.enableMultiValueUnnesting = query.getContextBoolean( CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 2ae96e85422..827583dccbf 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -93,6 +93,7 @@ public class ConcurrentGrouper implements Grouper private final long maxDictionarySizeForCombiner; @Nullable private final ParallelCombiner parallelCombiner; + private final boolean mergeThreadLocal; private volatile boolean initialized = false; @@ -135,7 +136,8 @@ public class ConcurrentGrouper implements Grouper hasQueryTimeout, queryTimeoutAt, groupByQueryConfig.getIntermediateCombineDegree(), - groupByQueryConfig.getNumParallelCombineThreads() + groupByQueryConfig.getNumParallelCombineThreads(), + groupByQueryConfig.isMergeThreadLocal() ); } @@ -159,7 +161,8 @@ public class ConcurrentGrouper implements Grouper final boolean hasQueryTimeout, final long queryTimeoutAt, final int intermediateCombineDegree, - final int numParallelCombineThreads + final int numParallelCombineThreads, + final boolean mergeThreadLocal ) { Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0"); @@ -207,6 +210,8 @@ public class ConcurrentGrouper implements Grouper } else { this.parallelCombiner = null; } + + this.mergeThreadLocal = mergeThreadLocal; } @Override @@ -237,6 +242,10 @@ public class ConcurrentGrouper implements Grouper ); grouper.init(); groupers.add(grouper); + + if (mergeThreadLocal) { + grouper.setSpillingAllowed(true); + } } initialized = true; @@ -262,29 +271,46 @@ public class ConcurrentGrouper implements Grouper throw new ISE("Grouper is closed"); } - if (!spilling) { - final SpillingGrouper hashBasedGrouper = groupers.get(grouperNumberForKeyHash(keyHash)); + final SpillingGrouper tlGrouper = threadLocalGrouper.get(); + + if (mergeThreadLocal) { + // Always thread-local grouping: expect to get more memory use, but no thread contention. + return tlGrouper.aggregate(key, keyHash); + } else if (spilling) { + // Switch to thread-local grouping after spilling starts. No thread contention. + synchronized (tlGrouper) { + tlGrouper.setSpillingAllowed(true); + return tlGrouper.aggregate(key, keyHash); + } + } else { + // Use keyHash to find a grouper prior to spilling. + // There is potential here for thread contention, but it reduces memory use. + final SpillingGrouper subGrouper = groupers.get(grouperNumberForKeyHash(keyHash)); + + synchronized (subGrouper) { + if (subGrouper.isSpillingAllowed() && subGrouper != tlGrouper) { + // Another thread already started treating this grouper as its thread-local grouper. So, switch to ours. + // Fall through to release the lock on subGrouper and do the aggregation with tlGrouper. + } else { + final AggregateResult aggregateResult = subGrouper.aggregate(key, keyHash); - synchronized (hashBasedGrouper) { - if (!spilling) { - final AggregateResult aggregateResult = hashBasedGrouper.aggregate(key, keyHash); if (aggregateResult.isOk()) { return AggregateResult.ok(); } else { // Expecting all-or-nothing behavior. assert aggregateResult.getCount() == 0; spilling = true; + + // Fall through to release the lock on subGrouper and do the aggregation with tlGrouper. } } } - } - // At this point we know spilling = true - final SpillingGrouper tlGrouper = threadLocalGrouper.get(); - - synchronized (tlGrouper) { - tlGrouper.setSpillingAllowed(true); - return tlGrouper.aggregate(key, keyHash); + synchronized (tlGrouper) { + assert spilling; + tlGrouper.setSpillingAllowed(true); + return tlGrouper.aggregate(key, keyHash); + } } } @@ -318,7 +344,7 @@ public class ConcurrentGrouper implements Grouper getGroupersIterator(sorted); if (sorted) { - final boolean fullyCombined = !spilling; + final boolean fullyCombined = !spilling && !mergeThreadLocal; // Parallel combine is used only when data is not fully merged. if (!fullyCombined && parallelCombiner != null) { @@ -398,8 +424,16 @@ public class ConcurrentGrouper implements Grouper ListenableFuture>>> future = Futures.allAsList(futures); try { - final long timeout = queryTimeoutAt - System.currentTimeMillis(); - return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); + if (!hasQueryTimeout) { + return future.get(); + } else { + final long timeout = queryTimeoutAt - System.currentTimeMillis(); + if (timeout > 0) { + return future.get(timeout, TimeUnit.MILLISECONDS); + } else { + throw new TimeoutException(); + } + } } catch (InterruptedException | CancellationException e) { GuavaUtils.cancelAll(true, future, futures); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 44b44d70dfb..86365fef12b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -241,6 +241,11 @@ public class SpillingGrouper implements Grouper return new ArrayList<>(mergedDictionary); } + public boolean isSpillingAllowed() + { + return spillingAllowed; + } + public void setSpillingAllowed(final boolean spillingAllowed) { this.spillingAllowed = spillingAllowed; diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 49951cdcb9b..6ab2f1a7bf0 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.QueryTimeoutException; @@ -61,7 +62,6 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -78,10 +78,11 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest private final Supplier bufferSupplier; private final int concurrencyHint; private final int parallelCombineThreads; - private final ExecutorService exec = Executors.newFixedThreadPool(8); + private final ExecutorService exec; + private final boolean mergeThreadLocal; private final Closer closer = Closer.create(); - @Parameters(name = "bufferSize={0}, concurrencyHint={1}, parallelCombineThreads={2}") + @Parameters(name = "bufferSize={0}, concurrencyHint={1}, parallelCombineThreads={2}, mergeThreadLocal={3}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); @@ -89,8 +90,10 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest for (final int bufferSize : new int[]{1024, 1024 * 32, 1024 * 1024}) { for (final int concurrencyHint : new int[]{1, 8}) { for (final int parallelCombineThreads : new int[]{0, 8}) { - if (parallelCombineThreads <= concurrencyHint) { - constructors.add(new Object[]{bufferSize, concurrencyHint, parallelCombineThreads}); + for (final boolean mergeThreadLocal : new boolean[]{true, false}) { + if (parallelCombineThreads <= concurrencyHint) { + constructors.add(new Object[]{bufferSize, concurrencyHint, parallelCombineThreads, mergeThreadLocal}); + } } } } @@ -115,11 +118,13 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest public ConcurrentGrouperTest( int bufferSize, int concurrencyHint, - int parallelCombineThreads + int parallelCombineThreads, + boolean mergeThreadLocal ) { this.concurrencyHint = concurrencyHint; this.parallelCombineThreads = parallelCombineThreads; + this.mergeThreadLocal = mergeThreadLocal; this.bufferSupplier = new Supplier() { private final AtomicBoolean called = new AtomicBoolean(false); @@ -135,6 +140,7 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest return buffer; } }; + this.exec = Execs.multiThreaded(concurrencyHint, "ConcurrentGrouperTest-%d"); } @Test() @@ -165,7 +171,8 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest false, 0, 4, - parallelCombineThreads + parallelCombineThreads, + mergeThreadLocal ); closer.register(grouper); grouper.init(); @@ -195,7 +202,7 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest final CloseableIterator> iterator = closer.register(grouper.iterator(true)); - if (parallelCombineThreads > 1 && temporaryStorage.currentSize() > 0) { + if (parallelCombineThreads > 1 && (mergeThreadLocal || temporaryStorage.currentSize() > 0)) { // Parallel combiner configured, and expected to actually be used due to thread-local merge (either explicitly // configured, or due to spilling). Assert.assertTrue(TEST_RESOURCE_HOLDER.taken); @@ -234,7 +241,8 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest true, 1, 4, - parallelCombineThreads + parallelCombineThreads, + mergeThreadLocal ); closer.register(grouper); grouper.init(); diff --git a/website/.spelling b/website/.spelling index f0b5b0370cd..fd16c897542 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1580,6 +1580,7 @@ pre-existing pushdown row1 subtotalsSpec +tradeoff unnested unnesting - ../docs/querying/having.md