Use mergeBuffer instead of processingBuffer in parallelCombiner (#5634)

* Use mergeBuffer instead of processingBuffer in parallelCombiner

* Fix test

* address comments

* fix test

* Fix test

* Update comment

* address comments

* fix build

* Fix test failure
This commit is contained in:
Jihoon Son 2018-04-27 18:14:37 -07:00 committed by Gian Merlino
parent 9be000758d
commit 86746f82d8
17 changed files with 232 additions and 301 deletions

View File

@ -19,6 +19,7 @@
package io.druid.collections;
import javax.annotation.Nullable;
import java.util.List;
public interface BlockingPool<T>
@ -33,6 +34,7 @@ public interface BlockingPool<T>
*
* @return a resource, or null if the timeout was reached
*/
@Nullable
ReferenceCountingResourceHolder<T> take(long timeoutMs);
/**
@ -49,16 +51,16 @@ public interface BlockingPool<T>
* @param elementNum number of resources to take
* @param timeoutMs maximum time to wait for resources, in milliseconds.
*
* @return a resource, or null if the timeout was reached
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
*/
ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum, long timeoutMs);
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs);
/**
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
*
* @param elementNum number of resources to take
*
* @return a resource
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
*/
ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum);
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum);
}

View File

@ -22,16 +22,17 @@ package io.druid.collections;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.ISE;
import java.io.Closeable;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
@ -74,6 +75,7 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
}
@Override
@Nullable
public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
@ -82,7 +84,7 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@ -94,25 +96,20 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
return wrapObject(takeObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@Nullable
private ReferenceCountingResourceHolder<T> wrapObject(T theObject)
{
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close()
{
offer(theObject);
}
}
() -> offer(theObject)
);
}
@Nullable
private T pollObject()
{
final ReentrantLock lock = this.lock;
@ -125,6 +122,7 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
}
}
@Nullable
private T pollObject(long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
@ -160,53 +158,39 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
}
@Override
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeoutMs)
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum, final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum));
final List<T> objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum);
return objects.stream().map(this::wrapObject).collect(Collectors.toList());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@Override
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum)
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum)
{
checkInitialized();
try {
return wrapObjects(takeObjects(elementNum));
return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
private ReferenceCountingResourceHolder<List<T>> wrapObjects(List<T> theObjects)
{
return theObjects == null ? null : new ReferenceCountingResourceHolder<>(
theObjects,
new Closeable()
{
@Override
public void close()
{
offerBatch(theObjects);
}
}
);
}
private List<T> pollObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final List<T> list = new ArrayList<>(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (objects.size() < elementNum) {
return null;
return Collections.emptyList();
} else {
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
@ -222,13 +206,13 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
private List<T> pollObjects(int elementNum, long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final List<T> list = new ArrayList<>(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
if (nanos <= 0) {
return null;
return Collections.emptyList();
}
nanos = notEnough.awaitNanos(nanos);
}
@ -244,7 +228,7 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
private List<T> takeObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final List<T> list = new ArrayList<>(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
@ -282,23 +266,4 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
lock.unlock();
}
}
private void offerBatch(List<T> offers)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() + offers.size() <= maxSize) {
for (T offer : offers) {
objects.push(offer);
}
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
}
finally {
lock.unlock();
}
}
}

View File

@ -57,13 +57,13 @@ public final class DummyBlockingPool<T> implements BlockingPool<T>
}
@Override
public ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum, long timeoutMs)
public List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs)
{
throw new UnsupportedOperationException();
}
@Override
public ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum)
public List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum)
{
throw new UnsupportedOperationException();
}

View File

@ -52,7 +52,7 @@ public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
@SuppressWarnings("unused")
private final Cleaner cleaner;
ReferenceCountingResourceHolder(final T object, final Closeable closer)
public ReferenceCountingResourceHolder(final T object, final Closeable closer)
{
this.object = object;
this.closer = closer;
@ -64,6 +64,10 @@ public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
return new ReferenceCountingResourceHolder<>(object, object);
}
/**
* Returns the resource with an initial reference count of 1. More references can be added by
* calling {@link #increment()}.
*/
@Override
public T get()
{
@ -73,6 +77,13 @@ public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
return object;
}
/**
* Increments the reference count by 1 and returns a {@link Releaser}. The returned {@link Releaser} is used to
* decrement the reference count when the caller no longer needs the resource.
*
* {@link Releaser}s are not thread-safe. If multiple threads need references to the same holder, they should
* each acquire their own {@link Releaser}.
*/
public Releaser increment()
{
while (true) {
@ -103,6 +114,9 @@ public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
};
}
/**
* Decrements the reference count by 1. If it reaches to 0, then closes {@link #closer}.
*/
@Override
public void close()
{

View File

@ -83,58 +83,51 @@ public class BlockingPoolTest
@Test(timeout = 1000)
public void testTakeTimeout()
{
final ReferenceCountingResourceHolder<List<Integer>> batchHolder = POOL.takeBatch(10, 100L);
final List<ReferenceCountingResourceHolder<Integer>> batchHolder = POOL.takeBatch(10, 100L);
final ReferenceCountingResourceHolder<Integer> holder = POOL.take(100);
assertNull(holder);
batchHolder.close();
batchHolder.forEach(ReferenceCountingResourceHolder::close);
}
@Test(timeout = 1000)
public void testTakeBatch()
{
final ReferenceCountingResourceHolder<List<Integer>> holder = POOL.takeBatch(6, 100L);
final List<ReferenceCountingResourceHolder<Integer>> holder = POOL.takeBatch(6, 100L);
assertNotNull(holder);
assertEquals(6, holder.get().size());
assertEquals(6, holder.size());
assertEquals(4, POOL.getPoolSize());
holder.close();
holder.forEach(ReferenceCountingResourceHolder::close);
assertEquals(10, POOL.getPoolSize());
}
@Test(timeout = 1000)
public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException
{
ReferenceCountingResourceHolder<List<Integer>> batchHolder = POOL.takeBatch(10, 10);
List<ReferenceCountingResourceHolder<Integer>> batchHolder = POOL.takeBatch(10, 10);
assertNotNull(batchHolder);
assertEquals(10, batchHolder.get().size());
assertEquals(10, batchHolder.size());
assertEquals(0, POOL.getPoolSize());
final Future<ReferenceCountingResourceHolder<List<Integer>>> future = SERVICE.submit(
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call()
{
return POOL.takeBatch(8, 100);
}
}
final Future<List<ReferenceCountingResourceHolder<Integer>>> future = SERVICE.submit(
() -> POOL.takeBatch(8, 100)
);
Thread.sleep(20);
batchHolder.close();
batchHolder.forEach(ReferenceCountingResourceHolder::close);
batchHolder = future.get();
assertNotNull(batchHolder);
assertEquals(8, batchHolder.get().size());
assertEquals(8, batchHolder.size());
assertEquals(2, POOL.getPoolSize());
batchHolder.close();
batchHolder.forEach(ReferenceCountingResourceHolder::close);
assertEquals(10, POOL.getPoolSize());
}
@Test(timeout = 1000)
public void testTakeBatchTooManyObjects()
{
final ReferenceCountingResourceHolder<List<Integer>> holder = POOL.takeBatch(100, 100L);
assertNull(holder);
final List<ReferenceCountingResourceHolder<Integer>> holder = POOL.takeBatch(100, 100L);
assertTrue(holder.isEmpty());
}
@Test(timeout = 1000)
@ -227,43 +220,27 @@ public class BlockingPoolTest
public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException
{
final int batch1 = POOL.maxSize() / 2;
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c1 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call()
{
return POOL.takeBatch(batch1, 10);
}
};
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> POOL.takeBatch(batch1, 10);
final int batch2 = POOL.maxSize() - batch1 + 1;
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c2 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call()
{
return POOL.takeBatch(batch2, 10);
}
};
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(batch2, 10);
final Future<ReferenceCountingResourceHolder<List<Integer>>> f1 = SERVICE.submit(c1);
final Future<ReferenceCountingResourceHolder<List<Integer>>> f2 = SERVICE.submit(c2);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit(c1);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2);
final ReferenceCountingResourceHolder<List<Integer>> r1 = f1.get();
final ReferenceCountingResourceHolder<List<Integer>> r2 = f2.get();
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
if (r1 != null) {
assertNull(r2);
assertTrue(r2.isEmpty());
assertEquals(POOL.maxSize() - batch1, POOL.getPoolSize());
assertEquals(batch1, r1.get().size());
r1.close();
assertEquals(batch1, r1.size());
r1.forEach(ReferenceCountingResourceHolder::close);
} else {
assertNotNull(r2);
assertEquals(POOL.maxSize() - batch2, POOL.getPoolSize());
assertEquals(batch2, r2.get().size());
r2.close();
assertEquals(batch2, r2.size());
r2.forEach(ReferenceCountingResourceHolder::close);
}
assertEquals(POOL.maxSize(), POOL.getPoolSize());
@ -273,37 +250,21 @@ public class BlockingPoolTest
public void testConcurrentBatchClose() throws ExecutionException, InterruptedException
{
final int batch1 = POOL.maxSize() / 2;
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c1 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call()
{
return POOL.takeBatch(batch1, 10);
}
};
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> POOL.takeBatch(batch1, 10);
final int batch2 = POOL.maxSize() - batch1;
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c2 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call()
{
return POOL.takeBatch(batch2, 10);
}
};
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(batch2, 10);
final Future<ReferenceCountingResourceHolder<List<Integer>>> f1 = SERVICE.submit(c1);
final Future<ReferenceCountingResourceHolder<List<Integer>>> f2 = SERVICE.submit(c2);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit(c1);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2);
final ReferenceCountingResourceHolder<List<Integer>> r1 = f1.get();
final ReferenceCountingResourceHolder<List<Integer>> r2 = f2.get();
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
assertNotNull(r1);
assertNotNull(r2);
assertEquals(batch1, r1.get().size());
assertEquals(batch2, r2.get().size());
assertEquals(batch1, r1.size());
assertEquals(batch2, r2.size());
assertEquals(0, POOL.getPoolSize());
final Future future1 = SERVICE.submit(new Runnable()
@ -311,7 +272,7 @@ public class BlockingPoolTest
@Override
public void run()
{
r1.close();
r1.forEach(ReferenceCountingResourceHolder::close);
}
});
final Future future2 = SERVICE.submit(new Runnable()
@ -319,7 +280,7 @@ public class BlockingPoolTest
@Override
public void run()
{
r2.close();
r2.forEach(ReferenceCountingResourceHolder::close);
}
});
@ -332,19 +293,11 @@ public class BlockingPoolTest
@Test(timeout = 1000)
public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException
{
final ReferenceCountingResourceHolder<List<Integer>> r1 = POOL.takeBatch(1, 10);
final List<ReferenceCountingResourceHolder<Integer>> r1 = POOL.takeBatch(1, 10);
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c2 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call()
{
return POOL.takeBatch(10, 100);
}
};
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(10, 100);
final Future<ReferenceCountingResourceHolder<List<Integer>>> f2 = SERVICE.submit(c2);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2);
final Future f1 = SERVICE.submit(new Runnable()
{
@Override
@ -356,17 +309,17 @@ public class BlockingPoolTest
catch (InterruptedException e) {
// ignore
}
r1.close();
r1.forEach(ReferenceCountingResourceHolder::close);
}
});
final ReferenceCountingResourceHolder<List<Integer>> r2 = f2.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
f1.get();
assertNotNull(r2);
assertEquals(10, r2.get().size());
assertEquals(10, r2.size());
assertEquals(0, POOL.getPoolSize());
r2.close();
r2.forEach(ReferenceCountingResourceHolder::close);
assertEquals(POOL.maxSize(), POOL.getPoolSize());
}
}

View File

@ -204,13 +204,30 @@ The default number of initial buckets is 1024 and the default max load factor of
##### Parallel combine
Once a historical finishes aggregation using the hash table, it sorts aggregates and merge them before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging aggregates which is an http thread to send data to brokers.
Once a historical finishes aggregation using the hash table, it sorts the aggregated results and merges them before sending to the
broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads
(configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging
aggregates which is an http thread to send data to brokers.
This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible.
This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared
between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available
processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take
longer time than timeseries or topN queries, they should release processing threads as soon as possible.
However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)).
However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck
of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well.
This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in
[Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when
data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)).
Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each
intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge
aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they
need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the
degree of intermediate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
Please note that each historical needs two merge buffers to process a groupBy v2 query with parallel combine: one for
computing intermediate aggregates from each segment and another for combining intermediate aggregates in parallel.
#### Alternatives

View File

@ -27,7 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.collections.ResourceHolder;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.parsers.CloseableIterator;
@ -96,7 +96,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
public ConcurrentGrouper(
final GroupByQueryConfig groupByQueryConfig,
final Supplier<ByteBuffer> bufferSupplier,
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
@Nullable final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder,
final KeySerdeFactory<KeyType> keySerdeFactory,
final KeySerdeFactory<KeyType> combineKeySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
@ -114,7 +114,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
{
this(
bufferSupplier,
combineBufferSupplier,
combineBufferHolder,
keySerdeFactory,
combineKeySerdeFactory,
columnSelectorFactory,
@ -138,7 +138,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
ConcurrentGrouper(
final Supplier<ByteBuffer> bufferSupplier,
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
@Nullable final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder,
final KeySerdeFactory<KeyType> keySerdeFactory,
final KeySerdeFactory<KeyType> combineKeySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
@ -191,7 +191,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
if (numParallelCombineThreads > 1) {
this.parallelCombiner = new ParallelCombiner<>(
combineBufferSupplier,
Preconditions.checkNotNull(combineBufferHolder, "combineBufferHolder"),
getCombiningFactories(aggregatorFactories),
combineKeySerdeFactory,
executor,

View File

@ -22,7 +22,6 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@ -34,10 +33,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.BlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.Releaser;
import io.druid.collections.ResourceHolder;
import io.druid.data.input.Row;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
@ -82,7 +79,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
private final ListeningExecutorService exec;
private final QueryWatcher queryWatcher;
private final int concurrencyHint;
private final NonBlockingPool<ByteBuffer> processingBufferPool;
private final BlockingPool<ByteBuffer> mergeBufferPool;
private final ObjectMapper spillMapper;
private final String processingTmpDir;
@ -94,7 +90,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
QueryWatcher queryWatcher,
Iterable<QueryRunner<Row>> queryables,
int concurrencyHint,
NonBlockingPool<ByteBuffer> processingBufferPool,
BlockingPool<ByteBuffer> mergeBufferPool,
int mergeBufferSize,
ObjectMapper spillMapper,
@ -106,7 +101,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.concurrencyHint = concurrencyHint;
this.processingBufferPool = processingBufferPool;
this.mergeBufferPool = mergeBufferPool;
this.spillMapper = spillMapper;
this.processingTmpDir = processingTmpDir;
@ -159,22 +153,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
final boolean hasTimeout = QueryContexts.hasTimeout(query);
final long timeoutAt = System.currentTimeMillis() + queryTimeout;
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier = new Supplier<ResourceHolder<ByteBuffer>>()
{
private boolean initialized;
private ResourceHolder<ByteBuffer> buffer;
@Override
public ResourceHolder<ByteBuffer> get()
{
if (!initialized) {
buffer = processingBufferPool.take();
initialized = true;
}
return buffer;
}
};
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
{
@ -192,30 +170,29 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
resources.add(temporaryStorageHolder);
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
// This will potentially block if there are no merge buffers left in the pool.
if (hasTimeout) {
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new TimeoutException();
}
} else {
mergeBufferHolder = mergeBufferPool.take();
}
resources.add(mergeBufferHolder);
}
catch (Exception e) {
throw new QueryInterruptedException(e);
}
// If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining
final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1;
Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
final List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolders = getMergeBuffersHolder(
numMergeBuffers,
hasTimeout,
timeoutAt
);
resources.addAll(mergeBufferHolders);
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder = mergeBufferHolders.get(0);
final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder = numMergeBuffers == 2 ?
mergeBufferHolders.get(1) :
null;
Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair =
RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
false,
null,
config,
Suppliers.ofInstance(mergeBufferHolder.get()),
combineBufferSupplier,
combineBufferHolder,
concurrencyHint,
temporaryStorage,
spillMapper,
@ -256,7 +233,10 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
public AggregateResult call()
{
try (
// These variables are used to close releasers automatically.
@SuppressWarnings("unused")
Releaser bufferReleaser = mergeBufferHolder.increment();
@SuppressWarnings("unused")
Releaser grouperReleaser = grouperHolder.increment()
) {
final AggregateResult retVal = input.run(queryPlusForRunners, responseContext)
@ -332,6 +312,40 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
);
}
private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(
int numBuffers,
boolean hasTimeout,
long timeoutAt
)
{
try {
if (numBuffers > mergeBufferPool.maxSize()) {
throw new ResourceLimitExceededException(
"Query needs " + numBuffers + " merge buffers, but only "
+ mergeBufferPool.maxSize() + " merge buffers were configured. "
+ "Try raising druid.processing.numMergeBuffers."
);
}
final List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolder;
// This will potentially block if there are no merge buffers left in the pool.
if (hasTimeout) {
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0) {
throw new TimeoutException();
}
if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) {
throw new TimeoutException("Cannot acquire enough merge buffers");
}
} else {
mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers);
}
return mergeBufferHolder;
}
catch (Exception e) {
throw new QueryInterruptedException(e);
}
}
private void waitForFutureCompletion(
GroupByQuery query,
ListenableFuture<List<AggregateResult>> future,

View File

@ -26,7 +26,8 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.collections.ResourceHolder;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.Releaser;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
@ -84,7 +85,7 @@ public class ParallelCombiner<KeyType>
// details.
private static final int MINIMUM_LEAF_COMBINE_DEGREE = 2;
private final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier;
private final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder;
private final AggregatorFactory[] combiningFactories;
private final KeySerdeFactory<KeyType> combineKeySerdeFactory;
private final ListeningExecutorService executor;
@ -98,7 +99,7 @@ public class ParallelCombiner<KeyType>
private final int intermediateCombineDegree;
public ParallelCombiner(
Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder,
AggregatorFactory[] combiningFactories,
KeySerdeFactory<KeyType> combineKeySerdeFactory,
ListeningExecutorService executor,
@ -109,7 +110,7 @@ public class ParallelCombiner<KeyType>
int intermediateCombineDegree
)
{
this.combineBufferSupplier = combineBufferSupplier;
this.combineBufferHolder = combineBufferHolder;
this.combiningFactories = combiningFactories;
this.combineKeySerdeFactory = combineKeySerdeFactory;
this.executor = executor;
@ -137,9 +138,6 @@ public class ParallelCombiner<KeyType>
{
// CombineBuffer is initialized when this method is called and closed after the result iterator is done
final Closer closer = Closer.create();
final ResourceHolder<ByteBuffer> combineBufferHolder = combineBufferSupplier.get();
closer.register(combineBufferHolder);
try {
final ByteBuffer combineBuffer = combineBufferHolder.get();
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
@ -172,6 +170,7 @@ public class ParallelCombiner<KeyType>
final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
closer.register(() -> checkCombineFutures(combineFutures));
return CloseableIterators.wrap(combineIterator, closer);
@ -278,7 +277,7 @@ public class ParallelCombiner<KeyType>
*
* @return minimum number of buffers required for combining tree
*
* @see #buildCombineTree(List, Supplier, AggregatorFactory[], int, List)
* @see #buildCombineTree
*/
private int computeRequiredBufferNum(int numChildNodes, int combineDegree)
{
@ -405,7 +404,10 @@ public class ParallelCombiner<KeyType>
CloseableIterator<Entry<KeyType>> mergedIterator = CloseableIterators.mergeSorted(
iterators,
keyObjComparator
)
);
// This variable is used to close releaser automatically.
@SuppressWarnings("unused")
final Releaser releaser = combineBufferHolder.increment()
) {
while (mergedIterator.hasNext()) {
final Entry<KeyType> next = mergedIterator.next();

View File

@ -30,7 +30,7 @@ import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.collections.ResourceHolder;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.common.utils.IntArrayUtils;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
@ -137,7 +137,7 @@ public class RowBasedGrouperHelper
final Map<String, ValueType> rawInputRowSignature,
final GroupByQueryConfig config,
final Supplier<ByteBuffer> bufferSupplier,
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
@Nullable final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder,
final int concurrencyHint,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
@ -216,7 +216,7 @@ public class RowBasedGrouperHelper
grouper = new ConcurrentGrouper<>(
querySpecificConfig,
bufferSupplier,
combineBufferSupplier,
combineBufferHolder,
keySerdeFactory,
combineKeySerdeFactory,
columnSelectorFactory,

View File

@ -19,6 +19,7 @@
package io.druid.query.groupby.resource;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.logger.Logger;
@ -36,19 +37,20 @@ public class GroupByQueryResource implements Closeable
{
private static final Logger log = new Logger(GroupByQueryResource.class);
private final ResourceHolder<List<ByteBuffer>> mergeBuffersHolder;
private final List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolders;
private final Deque<ByteBuffer> mergeBuffers;
public GroupByQueryResource()
{
this.mergeBuffersHolder = null;
this.mergeBufferHolders = null;
this.mergeBuffers = new ArrayDeque<>();
}
public GroupByQueryResource(ResourceHolder<List<ByteBuffer>> mergeBuffersHolder)
public GroupByQueryResource(List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolders)
{
this.mergeBuffersHolder = mergeBuffersHolder;
this.mergeBuffers = new ArrayDeque<>(mergeBuffersHolder.get());
this.mergeBufferHolders = mergeBufferHolders;
this.mergeBuffers = new ArrayDeque<>(mergeBufferHolders.size());
mergeBufferHolders.forEach(holder -> mergeBuffers.add(holder.get()));
}
/**
@ -81,11 +83,11 @@ public class GroupByQueryResource implements Closeable
@Override
public void close()
{
if (mergeBuffersHolder != null) {
if (mergeBuffers.size() != mergeBuffersHolder.get().size()) {
log.warn("%d resources are not returned yet", mergeBuffersHolder.get().size() - mergeBuffers.size());
if (mergeBufferHolders != null) {
if (mergeBuffers.size() != mergeBufferHolders.size()) {
log.warn("%d resources are not returned yet", mergeBufferHolders.size() - mergeBuffers.size());
}
mergeBuffersHolder.close();
mergeBufferHolders.forEach(ReferenceCountingResourceHolder::close);
}
}
}

View File

@ -29,7 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import io.druid.collections.BlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.ResourceHolder;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.guice.annotations.Global;
@ -136,18 +136,18 @@ public class GroupByStrategyV2 implements GroupByStrategy
if (requiredMergeBufferNum > mergeBufferPool.maxSize()) {
throw new ResourceLimitExceededException(
"Query needs " + requiredMergeBufferNum + " merge buffers, but only "
+ mergeBufferPool.maxSize() + " merge buffers are configured"
+ mergeBufferPool.maxSize() + " merge buffers were configured"
);
} else if (requiredMergeBufferNum == 0) {
return new GroupByQueryResource();
} else {
final ResourceHolder<List<ByteBuffer>> mergeBufferHolders;
final List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolders;
if (QueryContexts.hasTimeout(query)) {
mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, QueryContexts.getTimeout(query));
} else {
mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum);
}
if (mergeBufferHolders == null) {
if (mergeBufferHolders.isEmpty()) {
throw new InsufficientResourcesException("Cannot acquire enough merge buffers");
} else {
return new GroupByQueryResource(mergeBufferHolders);
@ -338,7 +338,6 @@ public class GroupByStrategyV2 implements GroupByStrategy
queryWatcher,
queryRunners,
processingConfig.getNumThreads(),
bufferPool,
mergeBufferPool,
processingConfig.intermediateComputeSizeBytes(),
spillMapper,

View File

@ -82,9 +82,9 @@ public class GroupByQueryMergeBufferTest
}
@Override
public ReferenceCountingResourceHolder<List<ByteBuffer>> takeBatch(final int maxElements, final long timeout)
public List<ReferenceCountingResourceHolder<ByteBuffer>> takeBatch(final int maxElements, final long timeout)
{
final ReferenceCountingResourceHolder<List<ByteBuffer>> holder = super.takeBatch(maxElements, timeout);
final List<ReferenceCountingResourceHolder<ByteBuffer>> holder = super.takeBatch(maxElements, timeout);
final int poolSize = getPoolSize();
if (minRemainBufferNum > poolSize) {
minRemainBufferNum = poolSize;

View File

@ -184,7 +184,8 @@ public class GroupByQueryRunnerFailureTest
public void testNotEnoughMergeBuffersOnQueryable()
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectCause(CoreMatchers.<Throwable>instanceOf(TimeoutException.class));
expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
expectedException.expectMessage("Cannot acquire enough merge buffers");
final GroupByQuery query = GroupByQuery
.builder()
@ -268,8 +269,15 @@ public class GroupByQueryRunnerFailureTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500))
.build();
try (ReferenceCountingResourceHolder<List<ByteBuffer>> holder = mergeBufferPool.takeBatch(1, 10)) {
List<ReferenceCountingResourceHolder<ByteBuffer>> holder = null;
try {
holder = mergeBufferPool.takeBatch(1, 10);
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
finally {
if (holder != null) {
holder.forEach(ReferenceCountingResourceHolder::close);
}
}
}
}

View File

@ -171,8 +171,9 @@ public class GroupByQueryRunnerTest
@Override
public int getNumMergeBuffers()
{
// There are some tests that need to allocate two buffers (simulating two levels of merging)
return 2;
// Some tests need two buffers for testing nested groupBy (simulating two levels of merging).
// Some tests need more buffers for parallel combine (testMergedPostAggHavingSpec).
return 4;
}
@Override

View File

@ -24,9 +24,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.ResourceHolder;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@ -66,7 +65,6 @@ public class ConcurrentGrouperTest
private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);
private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(256);
private static final KeySerdeFactory<Long> KEY_SERDE_FACTORY = new TestKeySerdeFactory();
private static final Supplier<ResourceHolder<ByteBuffer>> COMBINE_BUFFER_SUPPLIER = new TestBufferSupplier();
private static final ColumnSelectorFactory NULL_FACTORY = new TestColumnSelectorFactory();
@Rule
@ -113,7 +111,7 @@ public class ConcurrentGrouperTest
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
bufferSupplier,
COMBINE_BUFFER_SUPPLIER,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
@ -160,7 +158,7 @@ public class ConcurrentGrouperTest
final List<Entry<Long>> actual = Lists.newArrayList(iterator);
iterator.close();
Assert.assertTrue(!TEST_RESOURCE_HOLDER.taken || TEST_RESOURCE_HOLDER.closed);
Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
final List<Entry<Long>> expected = new ArrayList<>();
for (long i = 0; i < numRows; i++) {
@ -172,28 +170,20 @@ public class ConcurrentGrouperTest
grouper.close();
}
static class TestResourceHolder implements ResourceHolder<ByteBuffer>
static class TestResourceHolder extends ReferenceCountingResourceHolder<ByteBuffer>
{
private boolean taken;
private boolean closed;
private ByteBuffer buffer;
TestResourceHolder(int bufferSize)
{
buffer = ByteBuffer.allocate(bufferSize);
super(ByteBuffer.allocate(bufferSize), () -> {});
}
@Override
public ByteBuffer get()
{
taken = true;
return buffer;
}
@Override
public void close()
{
closed = true;
return super.get();
}
}
@ -292,21 +282,6 @@ public class ConcurrentGrouperTest
}
}
private static class TestBufferSupplier implements Supplier<ResourceHolder<ByteBuffer>>
{
private final AtomicBoolean called = new AtomicBoolean(false);
@Override
public ResourceHolder<ByteBuffer> get()
{
if (called.compareAndSet(false, true)) {
return TEST_RESOURCE_HOLDER;
} else {
throw new IAE("should be called once");
}
}
}
private static class TestColumnSelectorFactory implements ColumnSelectorFactory
{
@Override

View File

@ -19,10 +19,7 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
@ -36,12 +33,10 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
public class ParallelCombinerTest
{
@ -50,22 +45,6 @@ public class ParallelCombinerTest
private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(512);
private static final KeySerdeFactory<Long> KEY_SERDE_FACTORY = new TestKeySerdeFactory();
private static final Supplier<ResourceHolder<ByteBuffer>> COMBINE_BUFFER_SUPPLIER =
new Supplier<ResourceHolder<ByteBuffer>>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
@Override
public ResourceHolder<ByteBuffer> get()
{
if (called.compareAndSet(false, true)) {
return TEST_RESOURCE_HOLDER;
} else {
throw new IAE("should be called once");
}
}
};
private static final class TestIterator implements CloseableIterator<Entry<Long>>
{
private final Iterator<Entry<Long>> innerIterator;
@ -112,7 +91,7 @@ public class ParallelCombinerTest
public void testCombine() throws IOException
{
final ParallelCombiner<Long> combiner = new ParallelCombiner<>(
COMBINE_BUFFER_SUPPLIER,
TEST_RESOURCE_HOLDER,
new AggregatorFactory[]{new CountAggregatorFactory("cnt").getCombiningFactory()},
KEY_SERDE_FACTORY,
MoreExecutors.listeningDecorator(SERVICE),