diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index fdcf7f2356d..99662946e1a 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -19,6 +19,7 @@ package io.druid.collections; +import javax.annotation.Nullable; import java.util.List; public interface BlockingPool @@ -33,6 +34,7 @@ public interface BlockingPool * * @return a resource, or null if the timeout was reached */ + @Nullable ReferenceCountingResourceHolder take(long timeoutMs); /** @@ -49,16 +51,16 @@ public interface BlockingPool * @param elementNum number of resources to take * @param timeoutMs maximum time to wait for resources, in milliseconds. * - * @return a resource, or null if the timeout was reached + * @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available. */ - ReferenceCountingResourceHolder> takeBatch(int elementNum, long timeoutMs); + List> takeBatch(int elementNum, long timeoutMs); /** * Take resources from the pool, waiting if necessary until the elements of the given number become available. * * @param elementNum number of resources to take * - * @return a resource + * @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available. */ - ReferenceCountingResourceHolder> takeBatch(int elementNum); + List> takeBatch(int elementNum); } diff --git a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java index 6b4512bfbb5..e5efc70eacf 100644 --- a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java +++ b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java @@ -22,16 +22,17 @@ package io.druid.collections; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import io.druid.java.util.common.ISE; -import java.io.Closeable; +import javax.annotation.Nullable; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** * Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations. @@ -74,6 +75,7 @@ public class DefaultBlockingPool implements BlockingPool } @Override + @Nullable public ReferenceCountingResourceHolder take(final long timeoutMs) { Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); @@ -82,7 +84,7 @@ public class DefaultBlockingPool implements BlockingPool return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject()); } catch (InterruptedException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -94,25 +96,20 @@ public class DefaultBlockingPool implements BlockingPool return wrapObject(takeObject()); } catch (InterruptedException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } + @Nullable private ReferenceCountingResourceHolder wrapObject(T theObject) { return theObject == null ? null : new ReferenceCountingResourceHolder<>( theObject, - new Closeable() - { - @Override - public void close() - { - offer(theObject); - } - } + () -> offer(theObject) ); } + @Nullable private T pollObject() { final ReentrantLock lock = this.lock; @@ -125,6 +122,7 @@ public class DefaultBlockingPool implements BlockingPool } } + @Nullable private T pollObject(long timeoutMs) throws InterruptedException { long nanos = TIME_UNIT.toNanos(timeoutMs); @@ -160,53 +158,39 @@ public class DefaultBlockingPool implements BlockingPool } @Override - public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeoutMs) + public List> takeBatch(final int elementNum, final long timeoutMs) { Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); try { - return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum)); + final List objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum); + return objects.stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @Override - public ReferenceCountingResourceHolder> takeBatch(final int elementNum) + public List> takeBatch(final int elementNum) { checkInitialized(); try { - return wrapObjects(takeObjects(elementNum)); + return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } - private ReferenceCountingResourceHolder> wrapObjects(List theObjects) - { - return theObjects == null ? null : new ReferenceCountingResourceHolder<>( - theObjects, - new Closeable() - { - @Override - public void close() - { - offerBatch(theObjects); - } - } - ); - } - private List pollObjects(int elementNum) throws InterruptedException { - final List list = Lists.newArrayListWithCapacity(elementNum); + final List list = new ArrayList<>(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { if (objects.size() < elementNum) { - return null; + return Collections.emptyList(); } else { for (int i = 0; i < elementNum; i++) { list.add(objects.pop()); @@ -222,13 +206,13 @@ public class DefaultBlockingPool implements BlockingPool private List pollObjects(int elementNum, long timeoutMs) throws InterruptedException { long nanos = TIME_UNIT.toNanos(timeoutMs); - final List list = Lists.newArrayListWithCapacity(elementNum); + final List list = new ArrayList<>(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (objects.size() < elementNum) { if (nanos <= 0) { - return null; + return Collections.emptyList(); } nanos = notEnough.awaitNanos(nanos); } @@ -244,7 +228,7 @@ public class DefaultBlockingPool implements BlockingPool private List takeObjects(int elementNum) throws InterruptedException { - final List list = Lists.newArrayListWithCapacity(elementNum); + final List list = new ArrayList<>(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { @@ -282,23 +266,4 @@ public class DefaultBlockingPool implements BlockingPool lock.unlock(); } } - - private void offerBatch(List offers) - { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - if (objects.size() + offers.size() <= maxSize) { - for (T offer : offers) { - objects.push(offer); - } - notEnough.signal(); - } else { - throw new ISE("Cannot exceed pre-configured maximum size"); - } - } - finally { - lock.unlock(); - } - } } diff --git a/common/src/main/java/io/druid/collections/DummyBlockingPool.java b/common/src/main/java/io/druid/collections/DummyBlockingPool.java index 2752e68a8ad..e128d5713f7 100644 --- a/common/src/main/java/io/druid/collections/DummyBlockingPool.java +++ b/common/src/main/java/io/druid/collections/DummyBlockingPool.java @@ -57,13 +57,13 @@ public final class DummyBlockingPool implements BlockingPool } @Override - public ReferenceCountingResourceHolder> takeBatch(int elementNum, long timeoutMs) + public List> takeBatch(int elementNum, long timeoutMs) { throw new UnsupportedOperationException(); } @Override - public ReferenceCountingResourceHolder> takeBatch(int elementNum) + public List> takeBatch(int elementNum) { throw new UnsupportedOperationException(); } diff --git a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java index 691281a14e4..46a487d44a2 100644 --- a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java +++ b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java @@ -52,7 +52,7 @@ public class ReferenceCountingResourceHolder implements ResourceHolder @SuppressWarnings("unused") private final Cleaner cleaner; - ReferenceCountingResourceHolder(final T object, final Closeable closer) + public ReferenceCountingResourceHolder(final T object, final Closeable closer) { this.object = object; this.closer = closer; @@ -64,6 +64,10 @@ public class ReferenceCountingResourceHolder implements ResourceHolder return new ReferenceCountingResourceHolder<>(object, object); } + /** + * Returns the resource with an initial reference count of 1. More references can be added by + * calling {@link #increment()}. + */ @Override public T get() { @@ -73,6 +77,13 @@ public class ReferenceCountingResourceHolder implements ResourceHolder return object; } + /** + * Increments the reference count by 1 and returns a {@link Releaser}. The returned {@link Releaser} is used to + * decrement the reference count when the caller no longer needs the resource. + * + * {@link Releaser}s are not thread-safe. If multiple threads need references to the same holder, they should + * each acquire their own {@link Releaser}. + */ public Releaser increment() { while (true) { @@ -103,6 +114,9 @@ public class ReferenceCountingResourceHolder implements ResourceHolder }; } + /** + * Decrements the reference count by 1. If it reaches to 0, then closes {@link #closer}. + */ @Override public void close() { diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java index 9b90a844e49..a6459a862f5 100644 --- a/common/src/test/java/io/druid/collections/BlockingPoolTest.java +++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java @@ -83,58 +83,51 @@ public class BlockingPoolTest @Test(timeout = 1000) public void testTakeTimeout() { - final ReferenceCountingResourceHolder> batchHolder = POOL.takeBatch(10, 100L); + final List> batchHolder = POOL.takeBatch(10, 100L); final ReferenceCountingResourceHolder holder = POOL.take(100); assertNull(holder); - batchHolder.close(); + batchHolder.forEach(ReferenceCountingResourceHolder::close); } @Test(timeout = 1000) public void testTakeBatch() { - final ReferenceCountingResourceHolder> holder = POOL.takeBatch(6, 100L); + final List> holder = POOL.takeBatch(6, 100L); assertNotNull(holder); - assertEquals(6, holder.get().size()); + assertEquals(6, holder.size()); assertEquals(4, POOL.getPoolSize()); - holder.close(); + holder.forEach(ReferenceCountingResourceHolder::close); assertEquals(10, POOL.getPoolSize()); } @Test(timeout = 1000) public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException { - ReferenceCountingResourceHolder> batchHolder = POOL.takeBatch(10, 10); + List> batchHolder = POOL.takeBatch(10, 10); assertNotNull(batchHolder); - assertEquals(10, batchHolder.get().size()); + assertEquals(10, batchHolder.size()); assertEquals(0, POOL.getPoolSize()); - final Future>> future = SERVICE.submit( - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(8, 100); - } - } + final Future>> future = SERVICE.submit( + () -> POOL.takeBatch(8, 100) ); Thread.sleep(20); - batchHolder.close(); + batchHolder.forEach(ReferenceCountingResourceHolder::close); batchHolder = future.get(); assertNotNull(batchHolder); - assertEquals(8, batchHolder.get().size()); + assertEquals(8, batchHolder.size()); assertEquals(2, POOL.getPoolSize()); - batchHolder.close(); + batchHolder.forEach(ReferenceCountingResourceHolder::close); assertEquals(10, POOL.getPoolSize()); } @Test(timeout = 1000) public void testTakeBatchTooManyObjects() { - final ReferenceCountingResourceHolder> holder = POOL.takeBatch(100, 100L); - assertNull(holder); + final List> holder = POOL.takeBatch(100, 100L); + assertTrue(holder.isEmpty()); } @Test(timeout = 1000) @@ -227,43 +220,27 @@ public class BlockingPoolTest public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException { final int batch1 = POOL.maxSize() / 2; - final Callable>> c1 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(batch1, 10); - } - }; + final Callable>> c1 = () -> POOL.takeBatch(batch1, 10); final int batch2 = POOL.maxSize() - batch1 + 1; - final Callable>> c2 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(batch2, 10); - } - }; + final Callable>> c2 = () -> POOL.takeBatch(batch2, 10); - final Future>> f1 = SERVICE.submit(c1); - final Future>> f2 = SERVICE.submit(c2); + final Future>> f1 = SERVICE.submit(c1); + final Future>> f2 = SERVICE.submit(c2); - final ReferenceCountingResourceHolder> r1 = f1.get(); - final ReferenceCountingResourceHolder> r2 = f2.get(); + final List> r1 = f1.get(); + final List> r2 = f2.get(); if (r1 != null) { - assertNull(r2); + assertTrue(r2.isEmpty()); assertEquals(POOL.maxSize() - batch1, POOL.getPoolSize()); - assertEquals(batch1, r1.get().size()); - r1.close(); + assertEquals(batch1, r1.size()); + r1.forEach(ReferenceCountingResourceHolder::close); } else { assertNotNull(r2); assertEquals(POOL.maxSize() - batch2, POOL.getPoolSize()); - assertEquals(batch2, r2.get().size()); - r2.close(); + assertEquals(batch2, r2.size()); + r2.forEach(ReferenceCountingResourceHolder::close); } assertEquals(POOL.maxSize(), POOL.getPoolSize()); @@ -273,37 +250,21 @@ public class BlockingPoolTest public void testConcurrentBatchClose() throws ExecutionException, InterruptedException { final int batch1 = POOL.maxSize() / 2; - final Callable>> c1 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(batch1, 10); - } - }; + final Callable>> c1 = () -> POOL.takeBatch(batch1, 10); final int batch2 = POOL.maxSize() - batch1; - final Callable>> c2 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(batch2, 10); - } - }; + final Callable>> c2 = () -> POOL.takeBatch(batch2, 10); - final Future>> f1 = SERVICE.submit(c1); - final Future>> f2 = SERVICE.submit(c2); + final Future>> f1 = SERVICE.submit(c1); + final Future>> f2 = SERVICE.submit(c2); - final ReferenceCountingResourceHolder> r1 = f1.get(); - final ReferenceCountingResourceHolder> r2 = f2.get(); + final List> r1 = f1.get(); + final List> r2 = f2.get(); assertNotNull(r1); assertNotNull(r2); - assertEquals(batch1, r1.get().size()); - assertEquals(batch2, r2.get().size()); + assertEquals(batch1, r1.size()); + assertEquals(batch2, r2.size()); assertEquals(0, POOL.getPoolSize()); final Future future1 = SERVICE.submit(new Runnable() @@ -311,7 +272,7 @@ public class BlockingPoolTest @Override public void run() { - r1.close(); + r1.forEach(ReferenceCountingResourceHolder::close); } }); final Future future2 = SERVICE.submit(new Runnable() @@ -319,7 +280,7 @@ public class BlockingPoolTest @Override public void run() { - r2.close(); + r2.forEach(ReferenceCountingResourceHolder::close); } }); @@ -332,19 +293,11 @@ public class BlockingPoolTest @Test(timeout = 1000) public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException { - final ReferenceCountingResourceHolder> r1 = POOL.takeBatch(1, 10); + final List> r1 = POOL.takeBatch(1, 10); - final Callable>> c2 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(10, 100); - } - }; + final Callable>> c2 = () -> POOL.takeBatch(10, 100); - final Future>> f2 = SERVICE.submit(c2); + final Future>> f2 = SERVICE.submit(c2); final Future f1 = SERVICE.submit(new Runnable() { @Override @@ -356,17 +309,17 @@ public class BlockingPoolTest catch (InterruptedException e) { // ignore } - r1.close(); + r1.forEach(ReferenceCountingResourceHolder::close); } }); - final ReferenceCountingResourceHolder> r2 = f2.get(); + final List> r2 = f2.get(); f1.get(); assertNotNull(r2); - assertEquals(10, r2.get().size()); + assertEquals(10, r2.size()); assertEquals(0, POOL.getPoolSize()); - r2.close(); + r2.forEach(ReferenceCountingResourceHolder::close); assertEquals(POOL.maxSize(), POOL.getPoolSize()); } } diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index 52dfddfd24d..4048551f84f 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -204,13 +204,30 @@ The default number of initial buckets is 1024 and the default max load factor of ##### Parallel combine -Once a historical finishes aggregation using the hash table, it sorts aggregates and merge them before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging aggregates which is an http thread to send data to brokers. +Once a historical finishes aggregation using the hash table, it sorts the aggregated results and merges them before sending to the +broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads +(configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging +aggregates which is an http thread to send data to brokers. -This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible. +This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared +between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available +processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take +longer time than timeseries or topN queries, they should release processing threads as soon as possible. -However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)). +However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck +of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well. +This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in +[Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when +data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)). -Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). +Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each +intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge +aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they +need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the +degree of intermediate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). + +Please note that each historical needs two merge buffers to process a groupBy v2 query with parallel combine: one for +computing intermediate aggregates from each segment and another for combining intermediate aggregates in parallel. #### Alternatives diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 52492fc2b69..305f228a682 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -27,7 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.ISE; import io.druid.java.util.common.parsers.CloseableIterator; @@ -96,7 +96,7 @@ public class ConcurrentGrouper implements Grouper public ConcurrentGrouper( final GroupByQueryConfig groupByQueryConfig, final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final KeySerdeFactory keySerdeFactory, final KeySerdeFactory combineKeySerdeFactory, final ColumnSelectorFactory columnSelectorFactory, @@ -114,7 +114,7 @@ public class ConcurrentGrouper implements Grouper { this( bufferSupplier, - combineBufferSupplier, + combineBufferHolder, keySerdeFactory, combineKeySerdeFactory, columnSelectorFactory, @@ -138,7 +138,7 @@ public class ConcurrentGrouper implements Grouper ConcurrentGrouper( final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final KeySerdeFactory keySerdeFactory, final KeySerdeFactory combineKeySerdeFactory, final ColumnSelectorFactory columnSelectorFactory, @@ -191,7 +191,7 @@ public class ConcurrentGrouper implements Grouper if (numParallelCombineThreads > 1) { this.parallelCombiner = new ParallelCombiner<>( - combineBufferSupplier, + Preconditions.checkNotNull(combineBufferHolder, "combineBufferHolder"), getCombiningFactories(aggregatorFactories), combineKeySerdeFactory, executor, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 71115147af9..95ac2e11646 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -22,7 +22,6 @@ package io.druid.query.groupby.epinephelinae; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Predicates; -import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -34,10 +33,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.collections.BlockingPool; -import io.druid.collections.NonBlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.Releaser; -import io.druid.collections.ResourceHolder; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -82,7 +79,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private final ListeningExecutorService exec; private final QueryWatcher queryWatcher; private final int concurrencyHint; - private final NonBlockingPool processingBufferPool; private final BlockingPool mergeBufferPool; private final ObjectMapper spillMapper; private final String processingTmpDir; @@ -94,7 +90,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner QueryWatcher queryWatcher, Iterable> queryables, int concurrencyHint, - NonBlockingPool processingBufferPool, BlockingPool mergeBufferPool, int mergeBufferSize, ObjectMapper spillMapper, @@ -106,7 +101,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.concurrencyHint = concurrencyHint; - this.processingBufferPool = processingBufferPool; this.mergeBufferPool = mergeBufferPool; this.spillMapper = spillMapper; this.processingTmpDir = processingTmpDir; @@ -159,22 +153,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner final boolean hasTimeout = QueryContexts.hasTimeout(query); final long timeoutAt = System.currentTimeMillis() + queryTimeout; - final Supplier> combineBufferSupplier = new Supplier>() - { - private boolean initialized; - private ResourceHolder buffer; - - @Override - public ResourceHolder get() - { - if (!initialized) { - buffer = processingBufferPool.take(); - initialized = true; - } - return buffer; - } - }; - return new BaseSequence<>( new BaseSequence.IteratorMaker>() { @@ -192,40 +170,39 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ReferenceCountingResourceHolder.fromCloseable(temporaryStorage); resources.add(temporaryStorageHolder); - final ReferenceCountingResourceHolder mergeBufferHolder; - try { - // This will potentially block if there are no merge buffers left in the pool. - if (hasTimeout) { - final long timeout = timeoutAt - System.currentTimeMillis(); - if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new TimeoutException(); - } - } else { - mergeBufferHolder = mergeBufferPool.take(); - } - resources.add(mergeBufferHolder); - } - catch (Exception e) { - throw new QueryInterruptedException(e); - } + // If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining + final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1; - Pair, Accumulator> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( - query, - false, - null, - config, - Suppliers.ofInstance(mergeBufferHolder.get()), - combineBufferSupplier, - concurrencyHint, - temporaryStorage, - spillMapper, - combiningAggregatorFactories, - exec, - priority, + final List> mergeBufferHolders = getMergeBuffersHolder( + numMergeBuffers, hasTimeout, - timeoutAt, - mergeBufferSize + timeoutAt ); + resources.addAll(mergeBufferHolders); + + final ReferenceCountingResourceHolder mergeBufferHolder = mergeBufferHolders.get(0); + final ReferenceCountingResourceHolder combineBufferHolder = numMergeBuffers == 2 ? + mergeBufferHolders.get(1) : + null; + + Pair, Accumulator> pair = + RowBasedGrouperHelper.createGrouperAccumulatorPair( + query, + false, + null, + config, + Suppliers.ofInstance(mergeBufferHolder.get()), + combineBufferHolder, + concurrencyHint, + temporaryStorage, + spillMapper, + combiningAggregatorFactories, + exec, + priority, + hasTimeout, + timeoutAt, + mergeBufferSize + ); final Grouper grouper = pair.lhs; final Accumulator accumulator = pair.rhs; grouper.init(); @@ -256,7 +233,10 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner public AggregateResult call() { try ( + // These variables are used to close releasers automatically. + @SuppressWarnings("unused") Releaser bufferReleaser = mergeBufferHolder.increment(); + @SuppressWarnings("unused") Releaser grouperReleaser = grouperHolder.increment() ) { final AggregateResult retVal = input.run(queryPlusForRunners, responseContext) @@ -332,6 +312,40 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ); } + private List> getMergeBuffersHolder( + int numBuffers, + boolean hasTimeout, + long timeoutAt + ) + { + try { + if (numBuffers > mergeBufferPool.maxSize()) { + throw new ResourceLimitExceededException( + "Query needs " + numBuffers + " merge buffers, but only " + + mergeBufferPool.maxSize() + " merge buffers were configured. " + + "Try raising druid.processing.numMergeBuffers." + ); + } + final List> mergeBufferHolder; + // This will potentially block if there are no merge buffers left in the pool. + if (hasTimeout) { + final long timeout = timeoutAt - System.currentTimeMillis(); + if (timeout <= 0) { + throw new TimeoutException(); + } + if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) { + throw new TimeoutException("Cannot acquire enough merge buffers"); + } + } else { + mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers); + } + return mergeBufferHolder; + } + catch (Exception e) { + throw new QueryInterruptedException(e); + } + } + private void waitForFutureCompletion( GroupByQuery query, ListenableFuture> future, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java index e8db98d31da..0f209753462 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java @@ -26,7 +26,8 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; +import io.druid.collections.Releaser; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -84,7 +85,7 @@ public class ParallelCombiner // details. private static final int MINIMUM_LEAF_COMBINE_DEGREE = 2; - private final Supplier> combineBufferSupplier; + private final ReferenceCountingResourceHolder combineBufferHolder; private final AggregatorFactory[] combiningFactories; private final KeySerdeFactory combineKeySerdeFactory; private final ListeningExecutorService executor; @@ -98,7 +99,7 @@ public class ParallelCombiner private final int intermediateCombineDegree; public ParallelCombiner( - Supplier> combineBufferSupplier, + ReferenceCountingResourceHolder combineBufferHolder, AggregatorFactory[] combiningFactories, KeySerdeFactory combineKeySerdeFactory, ListeningExecutorService executor, @@ -109,7 +110,7 @@ public class ParallelCombiner int intermediateCombineDegree ) { - this.combineBufferSupplier = combineBufferSupplier; + this.combineBufferHolder = combineBufferHolder; this.combiningFactories = combiningFactories; this.combineKeySerdeFactory = combineKeySerdeFactory; this.executor = executor; @@ -137,9 +138,6 @@ public class ParallelCombiner { // CombineBuffer is initialized when this method is called and closed after the result iterator is done final Closer closer = Closer.create(); - final ResourceHolder combineBufferHolder = combineBufferSupplier.get(); - closer.register(combineBufferHolder); - try { final ByteBuffer combineBuffer = combineBufferHolder.get(); final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity( @@ -172,6 +170,7 @@ public class ParallelCombiner final CloseableIterator> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs); final List combineFutures = combineIteratorAndFutures.rhs; + closer.register(() -> checkCombineFutures(combineFutures)); return CloseableIterators.wrap(combineIterator, closer); @@ -278,7 +277,7 @@ public class ParallelCombiner * * @return minimum number of buffers required for combining tree * - * @see #buildCombineTree(List, Supplier, AggregatorFactory[], int, List) + * @see #buildCombineTree */ private int computeRequiredBufferNum(int numChildNodes, int combineDegree) { @@ -405,7 +404,10 @@ public class ParallelCombiner CloseableIterator> mergedIterator = CloseableIterators.mergeSorted( iterators, keyObjComparator - ) + ); + // This variable is used to close releaser automatically. + @SuppressWarnings("unused") + final Releaser releaser = combineBufferHolder.increment() ) { while (mergedIterator.hasNext()) { final Entry next = mergedIterator.next(); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index ad5f12b98fd..722f7b4692d 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -30,7 +30,7 @@ import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.common.utils.IntArrayUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -137,7 +137,7 @@ public class RowBasedGrouperHelper final Map rawInputRowSignature, final GroupByQueryConfig config, final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final int concurrencyHint, final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, @@ -216,7 +216,7 @@ public class RowBasedGrouperHelper grouper = new ConcurrentGrouper<>( querySpecificConfig, bufferSupplier, - combineBufferSupplier, + combineBufferHolder, keySerdeFactory, combineKeySerdeFactory, columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java index 03fd7da0331..5e41a15b62f 100644 --- a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java @@ -19,6 +19,7 @@ package io.druid.query.groupby.resource; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.ResourceHolder; import io.druid.java.util.common.logger.Logger; @@ -36,19 +37,20 @@ public class GroupByQueryResource implements Closeable { private static final Logger log = new Logger(GroupByQueryResource.class); - private final ResourceHolder> mergeBuffersHolder; + private final List> mergeBufferHolders; private final Deque mergeBuffers; public GroupByQueryResource() { - this.mergeBuffersHolder = null; + this.mergeBufferHolders = null; this.mergeBuffers = new ArrayDeque<>(); } - public GroupByQueryResource(ResourceHolder> mergeBuffersHolder) + public GroupByQueryResource(List> mergeBufferHolders) { - this.mergeBuffersHolder = mergeBuffersHolder; - this.mergeBuffers = new ArrayDeque<>(mergeBuffersHolder.get()); + this.mergeBufferHolders = mergeBufferHolders; + this.mergeBuffers = new ArrayDeque<>(mergeBufferHolders.size()); + mergeBufferHolders.forEach(holder -> mergeBuffers.add(holder.get())); } /** @@ -81,11 +83,11 @@ public class GroupByQueryResource implements Closeable @Override public void close() { - if (mergeBuffersHolder != null) { - if (mergeBuffers.size() != mergeBuffersHolder.get().size()) { - log.warn("%d resources are not returned yet", mergeBuffersHolder.get().size() - mergeBuffers.size()); + if (mergeBufferHolders != null) { + if (mergeBuffers.size() != mergeBufferHolders.size()) { + log.warn("%d resources are not returned yet", mergeBufferHolders.size() - mergeBuffers.size()); } - mergeBuffersHolder.close(); + mergeBufferHolders.forEach(ReferenceCountingResourceHolder::close); } } } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 24bbfe7af01..4cfae560655 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -29,7 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import io.druid.collections.BlockingPool; import io.druid.collections.NonBlockingPool; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; @@ -136,18 +136,18 @@ public class GroupByStrategyV2 implements GroupByStrategy if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { throw new ResourceLimitExceededException( "Query needs " + requiredMergeBufferNum + " merge buffers, but only " - + mergeBufferPool.maxSize() + " merge buffers are configured" + + mergeBufferPool.maxSize() + " merge buffers were configured" ); } else if (requiredMergeBufferNum == 0) { return new GroupByQueryResource(); } else { - final ResourceHolder> mergeBufferHolders; + final List> mergeBufferHolders; if (QueryContexts.hasTimeout(query)) { mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, QueryContexts.getTimeout(query)); } else { mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum); } - if (mergeBufferHolders == null) { + if (mergeBufferHolders.isEmpty()) { throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); } else { return new GroupByQueryResource(mergeBufferHolders); @@ -338,7 +338,6 @@ public class GroupByStrategyV2 implements GroupByStrategy queryWatcher, queryRunners, processingConfig.getNumThreads(), - bufferPool, mergeBufferPool, processingConfig.intermediateComputeSizeBytes(), spillMapper, diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 23f15b7e320..4e80ed6baf4 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -82,9 +82,9 @@ public class GroupByQueryMergeBufferTest } @Override - public ReferenceCountingResourceHolder> takeBatch(final int maxElements, final long timeout) + public List> takeBatch(final int maxElements, final long timeout) { - final ReferenceCountingResourceHolder> holder = super.takeBatch(maxElements, timeout); + final List> holder = super.takeBatch(maxElements, timeout); final int poolSize = getPoolSize(); if (minRemainBufferNum > poolSize) { minRemainBufferNum = poolSize; diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 74d5b2f5d67..55c05ddec98 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -184,7 +184,8 @@ public class GroupByQueryRunnerFailureTest public void testNotEnoughMergeBuffersOnQueryable() { expectedException.expect(QueryInterruptedException.class); - expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + expectedException.expectMessage("Cannot acquire enough merge buffers"); final GroupByQuery query = GroupByQuery .builder() @@ -268,8 +269,15 @@ public class GroupByQueryRunnerFailureTest .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); - try (ReferenceCountingResourceHolder> holder = mergeBufferPool.takeBatch(1, 10)) { + List> holder = null; + try { + holder = mergeBufferPool.takeBatch(1, 10); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } + finally { + if (holder != null) { + holder.forEach(ReferenceCountingResourceHolder::close); + } + } } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 42c0c2f0085..e35f8b7f2bb 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -171,8 +171,9 @@ public class GroupByQueryRunnerTest @Override public int getNumMergeBuffers() { - // There are some tests that need to allocate two buffers (simulating two levels of merging) - return 2; + // Some tests need two buffers for testing nested groupBy (simulating two levels of merging). + // Some tests need more buffers for parallel combine (testMergedPostAggHavingSpec). + return 4; } @Override diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 0d0137b2e5f..ac6f450e899 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -24,9 +24,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.jackson.DefaultObjectMapper; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.parsers.CloseableIterator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -66,7 +65,6 @@ public class ConcurrentGrouperTest private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8); private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(256); private static final KeySerdeFactory KEY_SERDE_FACTORY = new TestKeySerdeFactory(); - private static final Supplier> COMBINE_BUFFER_SUPPLIER = new TestBufferSupplier(); private static final ColumnSelectorFactory NULL_FACTORY = new TestColumnSelectorFactory(); @Rule @@ -113,7 +111,7 @@ public class ConcurrentGrouperTest { final ConcurrentGrouper grouper = new ConcurrentGrouper<>( bufferSupplier, - COMBINE_BUFFER_SUPPLIER, + TEST_RESOURCE_HOLDER, KEY_SERDE_FACTORY, KEY_SERDE_FACTORY, NULL_FACTORY, @@ -160,7 +158,7 @@ public class ConcurrentGrouperTest final List> actual = Lists.newArrayList(iterator); iterator.close(); - Assert.assertTrue(!TEST_RESOURCE_HOLDER.taken || TEST_RESOURCE_HOLDER.closed); + Assert.assertTrue(TEST_RESOURCE_HOLDER.taken); final List> expected = new ArrayList<>(); for (long i = 0; i < numRows; i++) { @@ -172,28 +170,20 @@ public class ConcurrentGrouperTest grouper.close(); } - static class TestResourceHolder implements ResourceHolder + static class TestResourceHolder extends ReferenceCountingResourceHolder { private boolean taken; - private boolean closed; - private ByteBuffer buffer; TestResourceHolder(int bufferSize) { - buffer = ByteBuffer.allocate(bufferSize); + super(ByteBuffer.allocate(bufferSize), () -> {}); } @Override public ByteBuffer get() { taken = true; - return buffer; - } - - @Override - public void close() - { - closed = true; + return super.get(); } } @@ -292,21 +282,6 @@ public class ConcurrentGrouperTest } } - private static class TestBufferSupplier implements Supplier> - { - private final AtomicBoolean called = new AtomicBoolean(false); - - @Override - public ResourceHolder get() - { - if (called.compareAndSet(false, true)) { - return TEST_RESOURCE_HOLDER; - } else { - throw new IAE("should be called once"); - } - } - } - private static class TestColumnSelectorFactory implements ColumnSelectorFactory { @Override diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java index 84e6df7b98c..795133d4119 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java @@ -19,10 +19,7 @@ package io.druid.query.groupby.epinephelinae; -import com.google.common.base.Supplier; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.collections.ResourceHolder; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.parsers.CloseableIterator; import io.druid.query.aggregation.AggregatorFactory; @@ -36,12 +33,10 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; public class ParallelCombinerTest { @@ -50,22 +45,6 @@ public class ParallelCombinerTest private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(512); private static final KeySerdeFactory KEY_SERDE_FACTORY = new TestKeySerdeFactory(); - private static final Supplier> COMBINE_BUFFER_SUPPLIER = - new Supplier>() - { - private final AtomicBoolean called = new AtomicBoolean(false); - - @Override - public ResourceHolder get() - { - if (called.compareAndSet(false, true)) { - return TEST_RESOURCE_HOLDER; - } else { - throw new IAE("should be called once"); - } - } - }; - private static final class TestIterator implements CloseableIterator> { private final Iterator> innerIterator; @@ -112,7 +91,7 @@ public class ParallelCombinerTest public void testCombine() throws IOException { final ParallelCombiner combiner = new ParallelCombiner<>( - COMBINE_BUFFER_SUPPLIER, + TEST_RESOURCE_HOLDER, new AggregatorFactory[]{new CountAggregatorFactory("cnt").getCombiningFactory()}, KEY_SERDE_FACTORY, MoreExecutors.listeningDecorator(SERVICE),