Atomic merge buffer acquisition for groupBys (#3939)

* Atomic merge buffer acquisition for groupBys

* documentation

* documentation

* address comments

* address comments

* fix test failure

* Addressed comments

- Add InsufficientResourcesException
- Renamed GroupByQueryBrokerResource to GroupByQueryResource

* addressed comments

* Add takeBatch() to BlockingPool
This commit is contained in:
Jihoon Son 2017-02-23 05:49:37 +09:00 committed by Himanshu
parent e7d01b67b6
commit 7200dce112
20 changed files with 1020 additions and 115 deletions

View File

@ -425,9 +425,7 @@ public class GroupByTypeInterfaceBenchmark
factory = new GroupByQueryRunnerFactory(
strategySelector,
new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
)
);

View File

@ -468,9 +468,7 @@ public class GroupByBenchmark
factory = new GroupByQueryRunnerFactory(
strategySelector,
new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
)
);

View File

@ -22,34 +22,55 @@ package io.druid.collections;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.druid.java.util.common.logger.Logger;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.ISE;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
*/
public class BlockingPool<T>
{
private static final Logger log = new Logger(BlockingPool.class);
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
private final BlockingQueue<T> objects;
private final ArrayDeque<T> objects;
private final ReentrantLock lock;
private final Condition notEnough;
private final int maxSize;
public BlockingPool(
Supplier<T> generator,
int limit
)
{
this.objects = limit > 0 ? new ArrayBlockingQueue<T>(limit) : null;
this.objects = new ArrayDeque<>(limit);
this.maxSize = limit;
for (int i = 0; i < limit; i++) {
objects.add(generator.get());
}
this.lock = new ReentrantLock();
this.notEnough = lock.newCondition();
}
public int maxSize()
{
return maxSize;
}
@VisibleForTesting
public int getPoolSize()
{
return objects.size();
}
/**
@ -58,13 +79,17 @@ public class BlockingPool<T>
* @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout.
*
* @return a resource, or null if the timeout was reached
*
* @throws InterruptedException if interrupted while waiting for a resource to become available
*/
public ReferenceCountingResourceHolder<T> take(final long timeout) throws InterruptedException
public ReferenceCountingResourceHolder<T> take(final long timeout)
{
Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take.");
final T theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take();
checkInitialized();
final T theObject;
try {
if (timeout > -1) {
theObject = timeout > 0 ? poll(timeout) : poll();
} else {
theObject = take();
}
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
@ -72,17 +97,189 @@ public class BlockingPool<T>
@Override
public void close() throws IOException
{
if (!objects.offer(theObject)) {
log.error("WTF?! Queue offer failed, uh oh...");
}
offer(theObject);
}
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
@VisibleForTesting
protected int getQueueSize()
private T poll()
{
return objects.size();
final ReentrantLock lock = this.lock;
lock.lock();
try {
return objects.isEmpty() ? null : objects.pop();
} finally {
lock.unlock();
}
}
private T poll(long timeout) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
return objects.pop();
} finally {
lock.unlock();
}
}
private T take() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
notEnough.await();
}
return objects.pop();
} finally {
lock.unlock();
}
}
/**
* Take a resource from the pool.
*
* @param elementNum number of resources to take
* @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout.
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeout)
{
checkInitialized();
final List<T> objects;
try {
if (timeout > -1) {
objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum);
} else {
objects = takeBatch(elementNum);
}
return objects == null ? null : new ReferenceCountingResourceHolder<>(
objects,
new Closeable()
{
@Override
public void close() throws IOException
{
offerBatch(objects);
}
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
private List<T> pollBatch(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (objects.size() < elementNum) {
return null;
} else {
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
}
} finally {
lock.unlock();
}
}
private List<T> pollBatch(int elementNum, long timeout) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeout);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}
private List<T> takeBatch(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
notEnough.await();
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}
private void checkInitialized()
{
Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take.");
}
private void offer(T theObject)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() < maxSize) {
objects.push(theObject);
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}
private void offerBatch(List<T> offers)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() + offers.size() <= maxSize) {
for (T offer : offers) {
objects.push(offer);
}
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}
}

View File

@ -46,7 +46,7 @@ public class CombiningSequence<T> implements Sequence<T>
private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn;
public CombiningSequence(
private CombiningSequence(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn

View File

@ -0,0 +1,372 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.collections;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class BlockingPoolTest
{
private static final ExecutorService SERVICE = Executors.newFixedThreadPool(2);
private static final BlockingPool<Integer> POOL = new BlockingPool<>(Suppliers.ofInstance(1), 10);
private static final BlockingPool<Integer> EMPTY_POOL = new BlockingPool<>(Suppliers.ofInstance(1), 0);
@AfterClass
public static void teardown()
{
SERVICE.shutdown();
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testTakeFromEmptyPool()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
EMPTY_POOL.take(0);
}
@Test
public void testDrainFromEmptyPool()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
EMPTY_POOL.takeBatch(1, 0);
}
@Test(timeout = 1000)
public void testTake()
{
final ReferenceCountingResourceHolder<Integer> holder = POOL.take(100);
assertNotNull(holder);
assertEquals(9, POOL.getPoolSize());
holder.close();
assertEquals(10, POOL.getPoolSize());
}
@Test(timeout = 1000)
public void testTakeTimeout()
{
final ReferenceCountingResourceHolder<List<Integer>> batchHolder = POOL.takeBatch(10, 100L);
final ReferenceCountingResourceHolder<Integer> holder = POOL.take(100);
assertNull(holder);
batchHolder.close();
}
@Test(timeout = 1000)
public void testTakeBatch()
{
final ReferenceCountingResourceHolder<List<Integer>> holder = POOL.takeBatch(6, 100L);
assertNotNull(holder);
assertEquals(6, holder.get().size());
assertEquals(4, POOL.getPoolSize());
holder.close();
assertEquals(10, POOL.getPoolSize());
}
@Test(timeout = 1000)
public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException
{
ReferenceCountingResourceHolder<List<Integer>> batchHolder = POOL.takeBatch(10, 10);
assertNotNull(batchHolder);
assertEquals(10, batchHolder.get().size());
assertEquals(0, POOL.getPoolSize());
final Future<ReferenceCountingResourceHolder<List<Integer>>> future = SERVICE.submit(
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call() throws Exception
{
return POOL.takeBatch(8, 100);
}
}
);
Thread.sleep(20);
batchHolder.close();
batchHolder = future.get();
assertNotNull(batchHolder);
assertEquals(8, batchHolder.get().size());
assertEquals(2, POOL.getPoolSize());
batchHolder.close();
assertEquals(10, POOL.getPoolSize());
}
@Test(timeout = 1000)
public void testTakeBatchTooManyObjects()
{
final ReferenceCountingResourceHolder<List<Integer>> holder = POOL.takeBatch(100, 100L);
assertNull(holder);
}
@Test(timeout = 1000)
public void testConcurrentTake() throws ExecutionException, InterruptedException
{
final int limit1 = POOL.maxSize() / 2;
final int limit2 = POOL.maxSize() - limit1 + 1;
final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit(
new Callable<List<ReferenceCountingResourceHolder<Integer>>>()
{
@Override
public List<ReferenceCountingResourceHolder<Integer>> call() throws Exception
{
List<ReferenceCountingResourceHolder<Integer>> result = Lists.newArrayList();
for (int i = 0; i < limit1; i++) {
result.add(POOL.take(10));
}
return result;
}
}
);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(
new Callable<List<ReferenceCountingResourceHolder<Integer>>>()
{
@Override
public List<ReferenceCountingResourceHolder<Integer>> call() throws Exception
{
List<ReferenceCountingResourceHolder<Integer>> result = Lists.newArrayList();
for (int i = 0; i < limit2; i++) {
result.add(POOL.take(10));
}
return result;
}
}
);
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
assertEquals(0, POOL.getPoolSize());
assertTrue(r1.contains(null) || r2.contains(null));
int nonNullCount = 0;
for (ReferenceCountingResourceHolder<Integer> holder : r1) {
if (holder != null) {
nonNullCount++;
}
}
for (ReferenceCountingResourceHolder<Integer> holder : r2) {
if (holder != null) {
nonNullCount++;
}
}
assertEquals(POOL.maxSize(), nonNullCount);
final Future future1 = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
for (ReferenceCountingResourceHolder<Integer> holder : r1) {
if (holder != null) {
holder.close();
}
}
}
});
final Future future2 = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
for (ReferenceCountingResourceHolder<Integer> holder : r2) {
if (holder != null) {
holder.close();
}
}
}
});
future1.get();
future2.get();
assertEquals(POOL.maxSize(), POOL.getPoolSize());
}
@Test(timeout = 1000)
public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException
{
final int batch1 = POOL.maxSize() / 2;
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c1 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call() throws Exception
{
return POOL.takeBatch(batch1, 10);
}
};
final int batch2 = POOL.maxSize() - batch1 + 1;
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c2 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call() throws Exception
{
return POOL.takeBatch(batch2, 10);
}
};
final Future<ReferenceCountingResourceHolder<List<Integer>>> f1 = SERVICE.submit(c1);
final Future<ReferenceCountingResourceHolder<List<Integer>>> f2 = SERVICE.submit(c2);
final ReferenceCountingResourceHolder<List<Integer>> r1 = f1.get();
final ReferenceCountingResourceHolder<List<Integer>> r2 = f2.get();
if (r1 != null) {
assertNull(r2);
assertEquals(POOL.maxSize() - batch1, POOL.getPoolSize());
assertEquals(batch1, r1.get().size());
r1.close();
} else {
assertNotNull(r2);
assertEquals(POOL.maxSize() - batch2, POOL.getPoolSize());
assertEquals(batch2, r2.get().size());
r2.close();
}
assertEquals(POOL.maxSize(), POOL.getPoolSize());
}
@Test(timeout = 1000)
public void testConcurrentBatchClose() throws ExecutionException, InterruptedException
{
final int batch1 = POOL.maxSize() / 2;
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c1 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call() throws Exception
{
return POOL.takeBatch(batch1, 10);
}
};
final int batch2 = POOL.maxSize() - batch1;
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c2 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call() throws Exception
{
return POOL.takeBatch(batch2, 10);
}
};
final Future<ReferenceCountingResourceHolder<List<Integer>>> f1 = SERVICE.submit(c1);
final Future<ReferenceCountingResourceHolder<List<Integer>>> f2 = SERVICE.submit(c2);
final ReferenceCountingResourceHolder<List<Integer>> r1 = f1.get();
final ReferenceCountingResourceHolder<List<Integer>> r2 = f2.get();
assertNotNull(r1);
assertNotNull(r2);
assertEquals(batch1, r1.get().size());
assertEquals(batch2, r2.get().size());
assertEquals(0, POOL.getPoolSize());
final Future future1 = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
r1.close();
}
});
final Future future2 = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
r2.close();
}
});
future1.get();
future2.get();
assertEquals(POOL.maxSize(), POOL.getPoolSize());
}
@Test(timeout = 1000)
public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException
{
final ReferenceCountingResourceHolder<List<Integer>> r1 = POOL.takeBatch(1, 10);
final Callable<ReferenceCountingResourceHolder<List<Integer>>> c2 =
new Callable<ReferenceCountingResourceHolder<List<Integer>>>()
{
@Override
public ReferenceCountingResourceHolder<List<Integer>> call() throws Exception
{
return POOL.takeBatch(10, 100);
}
};
final Future<ReferenceCountingResourceHolder<List<Integer>>> f2 = SERVICE.submit(c2);
final Future f1 = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
try {
Thread.sleep(50);
}
catch (InterruptedException e) {
// ignore
}
r1.close();
}
});
final ReferenceCountingResourceHolder<List<Integer>> r2 = f2.get();
f1.get();
assertNotNull(r2);
assertEquals(10, r2.get().size());
assertEquals(0, POOL.getPoolSize());
r2.close();
assertEquals(POOL.maxSize(), POOL.getPoolSize());
}
}

View File

@ -228,7 +228,7 @@ public class CombiningSequenceTest
};
Sequence<Pair<Integer, Integer>> seq = Sequences.limit(
new CombiningSequence<>(
CombiningSequence.create(
Sequences.withBaggage(Sequences.simple(pairs), closeable),
Ordering.natural().onResultOf(Pair.<Integer, Integer>lhsFn()),
new BinaryFn<Pair<Integer, Integer>, Pair<Integer, Integer>, Pair<Integer, Integer>>()

View File

@ -156,12 +156,6 @@ indexing mechanism, and runs the outer query on these materialized results. "v2"
inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both
strategy perform the outer query on the broker in a single-threaded fashion.
Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply
nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the
merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend
that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy.
#### Server configuration
When using the "v1" strategy, the following runtime properties apply:

View File

@ -141,12 +141,6 @@ exact distinct count using a nested groupBy.
SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source)
```
Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply
nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the
merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend
that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy.
#### Semi-joins
Semi-join subqueries involving `WHERE ... IN (SELECT ...)`, like the following, are executed with a special process.

View File

@ -0,0 +1,31 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
/**
* This exception is thrown when the requested operation cannot be completed due to a lack of available resources.
*/
public class InsufficientResourcesException extends RuntimeException
{
public InsufficientResourcesException(String message)
{
super(message);
}
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@ -32,14 +31,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.MappedSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
@ -57,11 +55,12 @@ import io.druid.query.cache.CacheKeyBuilder;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.query.groupby.strategy.GroupByStrategy;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -84,22 +83,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
};
public static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private final Supplier<GroupByQueryConfig> configSupplier;
private final GroupByStrategySelector strategySelector;
private final StupidPool<ByteBuffer> bufferPool;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
@Inject
public GroupByQueryQueryToolChest(
Supplier<GroupByQueryConfig> configSupplier,
GroupByStrategySelector strategySelector,
@Global StupidPool<ByteBuffer> bufferPool,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{
this.configSupplier = configSupplier;
this.strategySelector = strategySelector;
this.bufferPool = bufferPool;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
}
@ -116,7 +109,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
if (query.getContextBoolean(GROUP_BY_MERGE_KEY, true)) {
return mergeGroupByResults(
return initAndMergeGroupByResults(
(GroupByQuery) query,
runner,
responseContext
@ -127,11 +120,34 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
};
}
private Sequence<Row> mergeGroupByResults(
private Sequence<Row> initAndMergeGroupByResults(
final GroupByQuery query,
QueryRunner<Row> runner,
Map<String, Object> context
)
{
final GroupByStrategy groupByStrategy = strategySelector.strategize(query);
final GroupByQueryResource resource = groupByStrategy.prepareResource(query, false);
return Sequences.withBaggage(
mergeGroupByResults(
groupByStrategy,
query,
resource,
runner,
context
),
resource
);
}
private Sequence<Row> mergeGroupByResults(
GroupByStrategy groupByStrategy,
final GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<Row> runner,
Map<String, Object> context
)
{
// If there's a subquery, merge subquery results and then apply the aggregator
@ -161,6 +177,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
final Sequence<Row> subqueryResult = mergeGroupByResults(
groupByStrategy,
subquery.withOverriddenContext(
ImmutableMap.<String, Object>of(
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
@ -169,6 +186,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
false
)
),
resource,
runner,
context
);
@ -186,9 +204,9 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
finalizingResults = subqueryResult;
}
return strategySelector.strategize(query).processSubqueryResult(subquery, query, finalizingResults);
return groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults);
} else {
return strategySelector.strategize(query).mergeResults(runner, query, context);
return groupByStrategy.mergeResults(runner, query, context);
}
}

View File

@ -169,11 +169,11 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
// This will potentially block if there are no merge buffers left in the pool.
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new QueryInterruptedException(new TimeoutException());
throw new TimeoutException();
}
resources.add(mergeBufferHolder);
}
catch (InterruptedException e) {
catch (Exception e) {
throw new QueryInterruptedException(e);
}

View File

@ -23,10 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.druid.collections.BlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.ResourceHolder;
import io.druid.common.guava.SettableSupplier;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Row;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Accumulator;
@ -35,8 +33,6 @@ import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.FilteredSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryInterruptedException;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.filter.Filter;
@ -45,6 +41,7 @@ import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.segment.column.ValueType;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.filter.Filters;
@ -58,7 +55,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class GroupByRowProcessor
{
@ -67,7 +63,7 @@ public class GroupByRowProcessor
final Sequence<Row> rows,
final Map<String, ValueType> rowSignature,
final GroupByQueryConfig config,
final BlockingPool<ByteBuffer> mergeBufferPool,
final GroupByQueryResource resource,
final ObjectMapper spillMapper,
final String processingTmpDir
)
@ -86,8 +82,6 @@ public class GroupByRowProcessor
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
);
final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeout = queryTimeout == null ? JodaUtils.MAX_INSTANT : queryTimeout.longValue();
final List<Interval> queryIntervals = query.getIntervals();
final Filter filter = Filters.convertToCNFFromQueryContext(
query,
@ -133,7 +127,9 @@ public class GroupByRowProcessor
@Override
public CloseableGrouperIterator<RowBasedKey, Row> make()
{
final List<Closeable> closeOnFailure = Lists.newArrayList();
// This contains all closeable objects which are closed when the returned iterator iterates all the elements,
// or an exceptions is thrown. The objects are closed in their reverse order.
final List<Closeable> closeOnExit = Lists.newArrayList();
try {
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
@ -141,9 +137,7 @@ public class GroupByRowProcessor
querySpecificConfig.getMaxOnDiskStorage()
);
closeOnFailure.add(temporaryStorage);
final SettableSupplier<ReferenceCountingResourceHolder<ByteBuffer>> bufferHolderSupplier = new SettableSupplier<>();
closeOnExit.add(temporaryStorage);
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
@ -155,20 +149,10 @@ public class GroupByRowProcessor
@Override
public ByteBuffer get()
{
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new QueryInterruptedException(new TimeoutException());
}
bufferHolderSupplier.set(mergeBufferHolder);
closeOnFailure.add(mergeBufferHolder);
final ResourceHolder<ByteBuffer> mergeBufferHolder = resource.getMergeBuffer();
closeOnExit.add(mergeBufferHolder);
return mergeBufferHolder.get();
}
catch (InterruptedException e) {
throw new QueryInterruptedException(e);
}
}
},
-1,
temporaryStorage,
@ -177,7 +161,7 @@ public class GroupByRowProcessor
);
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
closeOnFailure.add(grouper);
closeOnExit.add(grouper);
final Grouper<RowBasedKey> retVal = filteredSequence.accumulate(
grouper,
@ -195,16 +179,16 @@ public class GroupByRowProcessor
@Override
public void close() throws IOException
{
grouper.close();
CloseQuietly.close(bufferHolderSupplier.get());
CloseQuietly.close(temporaryStorage);
for (Closeable closeable : Lists.reverse(closeOnExit)) {
CloseQuietly.close(closeable);
}
}
}
);
}
catch (Throwable e) {
// Exception caught while setting up the iterator; release resources.
for (Closeable closeable : Lists.reverse(closeOnFailure)) {
for (Closeable closeable : Lists.reverse(closeOnExit)) {
CloseQuietly.close(closeable);
}
throw e;

View File

@ -0,0 +1,91 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.resource;
import com.metamx.common.logger.Logger;
import io.druid.collections.ResourceHolder;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
/**
* This class contains resources required for a groupBy query execution.
* Currently, it contains only merge buffers, but any additional resources can be added in the future.
*/
public class GroupByQueryResource implements Closeable
{
private static final Logger log = new Logger(GroupByQueryResource.class);
private final ResourceHolder<List<ByteBuffer>> mergeBuffersHolder;
private final Deque<ByteBuffer> mergeBuffers;
public GroupByQueryResource()
{
this.mergeBuffersHolder = null;
this.mergeBuffers = new ArrayDeque<>();
}
public GroupByQueryResource(ResourceHolder<List<ByteBuffer>> mergeBuffersHolder)
{
this.mergeBuffersHolder = mergeBuffersHolder;
this.mergeBuffers = new ArrayDeque<>(mergeBuffersHolder.get());
}
/**
* Get a merge buffer from the pre-acquired resources.
*
* @return a resource holder containing a merge buffer
*
* @throws IllegalStateException if this resource is initialized with empty merge buffers, or
* there isn't any available merge buffers
*/
public ResourceHolder<ByteBuffer> getMergeBuffer()
{
final ByteBuffer buffer = mergeBuffers.pop();
return new ResourceHolder<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return buffer;
}
@Override
public void close()
{
mergeBuffers.add(buffer);
}
};
}
@Override
public void close()
{
if (mergeBuffersHolder != null) {
if (mergeBuffers.size() != mergeBuffersHolder.get().size()) {
log.warn("%d resources are not returned yet", mergeBuffersHolder.get().size() - mergeBuffers.size());
}
mergeBuffersHolder.close();
}
}
}

View File

@ -25,6 +25,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.segment.StorageAdapter;
import java.util.Map;
@ -32,6 +33,13 @@ import java.util.concurrent.ExecutorService;
public interface GroupByStrategy
{
/**
* Initializes resources required for a broker to process the given query.
*
* @param query a groupBy query to be processed
* @return broker resource
*/
GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners);
/**
* Indicates this strategy is cacheable or not.
@ -52,6 +60,7 @@ public interface GroupByStrategy
Sequence<Row> processSubqueryResult(
GroupByQuery subquery,
GroupByQuery query,
GroupByQueryResource resource,
Sequence<Row> subqueryResult
);

View File

@ -46,6 +46,7 @@ import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.StorageAdapter;
import io.druid.segment.incremental.IncrementalIndex;
@ -77,6 +78,12 @@ public class GroupByStrategyV1 implements GroupByStrategy
this.bufferPool = bufferPool;
}
@Override
public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners)
{
return new GroupByQueryResource();
}
@Override
public boolean isCacheable(boolean willMergeRunners)
{
@ -130,7 +137,7 @@ public class GroupByStrategyV1 implements GroupByStrategy
@Override
public Sequence<Row> processSubqueryResult(
GroupByQuery subquery, GroupByQuery query, Sequence<Row> subqueryResult
GroupByQuery subquery, GroupByQuery query, GroupByQueryResource resource, Sequence<Row> subqueryResult
)
{
final Set<AggregatorFactory> aggs = Sets.newHashSet();

View File

@ -29,7 +29,9 @@ import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import io.druid.collections.BlockingPool;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
@ -40,10 +42,15 @@ import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.DataSource;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.InsufficientResourcesException;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.groupby.GroupByQuery;
@ -53,10 +60,12 @@ import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import io.druid.query.groupby.epinephelinae.GroupByRowProcessor;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
public class GroupByStrategyV2 implements GroupByStrategy
@ -64,6 +73,9 @@ public class GroupByStrategyV2 implements GroupByStrategy
public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp";
public static final String CTX_KEY_OUTERMOST = "groupByOutermost";
// see countRequiredMergeBufferNum() for explanation
private static final int MAX_MERGE_BUFFER_NUM = 2;
private final DruidProcessingConfig processingConfig;
private final Supplier<GroupByQueryConfig> configSupplier;
private final StupidPool<ByteBuffer> bufferPool;
@ -112,6 +124,52 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
@Override
public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners)
{
if (!willMergeRunners) {
final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1);
if (requiredMergeBufferNum > mergeBufferPool.maxSize()) {
throw new ResourceLimitExceededException(
"Query needs " + requiredMergeBufferNum + " merge buffers, but only "
+ mergeBufferPool.maxSize() + " merge buffers are configured"
);
} else if (requiredMergeBufferNum == 0) {
return new GroupByQueryResource();
} else {
final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT);
final ResourceHolder<List<ByteBuffer>> mergeBufferHolders = mergeBufferPool.takeBatch(
requiredMergeBufferNum, timeout.longValue()
);
if (mergeBufferHolders == null) {
throw new InsufficientResourcesException("Cannot acquire enough merge buffers");
} else {
return new GroupByQueryResource(mergeBufferHolders);
}
}
} else {
return new GroupByQueryResource();
}
}
private static int countRequiredMergeBufferNum(Query query, int foundNum)
{
// Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one.
// For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1.
// If the broker processes an outer groupBy which reads input from an inner groupBy,
// it requires two merge buffers for inner and outer groupBys to keep the intermediate result of inner groupBy
// until the outer groupBy processing completes.
// This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2.
final DataSource dataSource = query.getDataSource();
if (foundNum == MAX_MERGE_BUFFER_NUM + 1 || !(dataSource instanceof QueryDataSource)) {
return foundNum - 1;
} else {
return countRequiredMergeBufferNum(((QueryDataSource) dataSource).getQuery(), foundNum + 1);
}
}
@Override
public boolean isCacheable(boolean willMergeRunners)
{
@ -208,7 +266,10 @@ public class GroupByStrategyV2 implements GroupByStrategy
@Override
public Sequence<Row> processSubqueryResult(
GroupByQuery subquery, GroupByQuery query, Sequence<Row> subqueryResult
GroupByQuery subquery,
GroupByQuery query,
GroupByQueryResource resource,
Sequence<Row> subqueryResult
)
{
final Sequence<Row> results = GroupByRowProcessor.process(
@ -216,7 +277,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
subqueryResult,
GroupByQueryHelper.rowSignatureFor(subquery),
configSupplier.get(),
mergeBufferPool,
resource,
spillMapper,
processingConfig.getTmpDir()
);

View File

@ -71,12 +71,23 @@ public class GroupByQueryMergeBufferTest
}
@Override
public ReferenceCountingResourceHolder<ByteBuffer> take(final long timeout) throws InterruptedException
public ReferenceCountingResourceHolder<ByteBuffer> take(final long timeout)
{
final ReferenceCountingResourceHolder<ByteBuffer> holder = super.take(timeout);
final int queueSize = getQueueSize();
if (minRemainBufferNum > queueSize) {
minRemainBufferNum = queueSize;
final int poolSize = getPoolSize();
if (minRemainBufferNum > poolSize) {
minRemainBufferNum = poolSize;
}
return holder;
}
@Override
public ReferenceCountingResourceHolder<List<ByteBuffer>> takeBatch(final int maxElements, final long timeout)
{
final ReferenceCountingResourceHolder<List<ByteBuffer>> holder = super.takeBatch(maxElements, timeout);
final int poolSize = getPoolSize();
if (minRemainBufferNum > poolSize) {
minRemainBufferNum = poolSize;
}
return holder;
}
@ -155,9 +166,7 @@ public class GroupByQueryMergeBufferTest
)
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
return new GroupByQueryRunnerFactory(
@ -227,6 +236,7 @@ public class GroupByQueryMergeBufferTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
assertEquals(2, mergeBufferPool.getMinRemainBufferNum());
assertEquals(3, mergeBufferPool.getPoolSize());
}
@Test
@ -254,6 +264,7 @@ public class GroupByQueryMergeBufferTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
assertEquals(1, mergeBufferPool.getMinRemainBufferNum());
assertEquals(3, mergeBufferPool.getPoolSize());
}
@Test
@ -291,7 +302,9 @@ public class GroupByQueryMergeBufferTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
assertEquals(1, mergeBufferPool.getMinRemainBufferNum());
// This should be 0 because the broker needs 2 buffers and the queryable node needs one.
assertEquals(0, mergeBufferPool.getMinRemainBufferNum());
assertEquals(3, mergeBufferPool.getPoolSize());
}
@Test
@ -341,6 +354,8 @@ public class GroupByQueryMergeBufferTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
assertEquals(1, mergeBufferPool.getMinRemainBufferNum());
// This should be 0 because the broker needs 2 buffers and the queryable node needs one.
assertEquals(0, mergeBufferPool.getMinRemainBufferNum());
assertEquals(3, mergeBufferPool.getPoolSize());
}
}

View File

@ -19,22 +19,33 @@
package io.druid.query.groupby;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.BlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.InsufficientResourcesException;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.groupby.strategy.GroupByStrategyV1;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import org.bouncycastle.util.Integers;
import org.hamcrest.CoreMatchers;
import org.junit.Rule;
@ -45,6 +56,7 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;
@ -52,7 +64,7 @@ import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerFailureTest
{
public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
{
@Override
@ -83,7 +95,63 @@ public class GroupByQueryRunnerFailureTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(
private static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config
)
{
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes());
}
}
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
bufferPool
),
new GroupByStrategyV2(
DEFAULT_PROCESSING_CONFIG,
configSupplier,
bufferPool,
mergeBufferPool,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(
strategySelector,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
return new GroupByQueryRunnerFactory(
strategySelector,
toolChest
);
}
private final static BlockingPool<ByteBuffer> mergeBufferPool = new BlockingPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get ()
{
return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes());
}
},
DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers()
);
private static final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
{
@ -91,8 +159,7 @@ public class GroupByQueryRunnerFailureTest
{
return "v2";
}
},
DEFAULT_PROCESSING_CONFIG
}
);
private QueryRunner<Row> runner;
@ -109,11 +176,11 @@ public class GroupByQueryRunnerFailureTest
public GroupByQueryRunnerFailureTest(QueryRunner<Row> runner)
{
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner));
}
@Test(timeout = 10000)
public void testLackOfMergeBuffers() throws IOException
public void testNotEnoughMergeBuffersOnQueryable() throws IOException
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectCause(CoreMatchers.<Throwable>instanceOf(TimeoutException.class));
@ -139,4 +206,72 @@ public class GroupByQueryRunnerFailureTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
@Test(timeout = 10000)
public void testResourceLimitExceededOnBroker()
{
expectedException.expect(ResourceLimitExceededException.class);
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("quality", "alias"),
new DefaultDimensionSpec("market", null)
))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
)
.setGranularity(QueryGranularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
@Test(timeout = 10000, expected = InsufficientResourcesException.class)
public void testInsufficientResourcesOnBroker() throws IOException
{
final ReferenceCountingResourceHolder<List<ByteBuffer>> holder = mergeBufferPool.takeBatch(1, 10);
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
)
.setGranularity(QueryGranularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.build();
try {
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} finally {
holder.close();
}
}
}

View File

@ -142,6 +142,8 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerTest
{
@ -347,9 +349,7 @@ public class GroupByQueryRunnerTest
)
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
return new GroupByQueryRunnerFactory(
@ -1793,7 +1793,7 @@ public class GroupByQueryRunnerTest
List<Row> expectedResults = ImmutableList.of();
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
Assert.assertEquals(expectedResults, results);
assertEquals(expectedResults, results);
}
@Test
@ -3140,7 +3140,7 @@ public class GroupByQueryRunnerTest
final Object next1 = resultsIter.next();
Object expectedNext1 = expectedResultsIter.next();
Assert.assertEquals("order-limit", expectedNext1, next1);
assertEquals("order-limit", expectedNext1, next1);
final Object next2 = resultsIter.next();
Object expectedNext2 = expectedResultsIter.next();
@ -6908,7 +6908,7 @@ public class GroupByQueryRunnerTest
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
Assert.assertEquals(
assertEquals(
Functions.<Sequence<Row>>identity(),
query.getLimitSpec().build(
query.getDimensions(),
@ -7172,7 +7172,7 @@ public class GroupByQueryRunnerTest
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
Assert.assertEquals(
assertEquals(
Functions.<Sequence<Row>>identity(),
query.getLimitSpec().build(
query.getDimensions(),

View File

@ -171,8 +171,9 @@ public class CalciteTests
@Override
public int getNumMergeBuffers()
{
// Need 2 buffers for CalciteQueryTest.testDoubleNestedGroupby.
return 2;
// Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
// Two buffers for the broker and one for the queryable
return 3;
}
}
)