From 7200dce112d8f69f3f59d16b2b8290d69459aa5e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 23 Feb 2017 05:49:37 +0900 Subject: [PATCH] Atomic merge buffer acquisition for groupBys (#3939) * Atomic merge buffer acquisition for groupBys * documentation * documentation * address comments * address comments * fix test failure * Addressed comments - Add InsufficientResourcesException - Renamed GroupByQueryBrokerResource to GroupByQueryResource * addressed comments * Add takeBatch() to BlockingPool --- .../GroupByTypeInterfaceBenchmark.java | 2 - .../benchmark/query/GroupByBenchmark.java | 2 - .../io/druid/collections/BlockingPool.java | 247 ++++++++++-- .../druid/common/guava/CombiningSequence.java | 2 +- .../druid/collections/BlockingPoolTest.java | 372 ++++++++++++++++++ .../common/guava/CombiningSequenceTest.java | 2 +- docs/content/querying/groupbyquery.md | 6 - docs/content/querying/sql.md | 6 - .../query/InsufficientResourcesException.java | 31 ++ .../groupby/GroupByQueryQueryToolChest.java | 46 ++- .../GroupByMergingQueryRunnerV2.java | 4 +- .../epinephelinae/GroupByRowProcessor.java | 46 +-- .../resource/GroupByQueryResource.java | 91 +++++ .../groupby/strategy/GroupByStrategy.java | 9 + .../groupby/strategy/GroupByStrategyV1.java | 9 +- .../groupby/strategy/GroupByStrategyV2.java | 65 ++- .../groupby/GroupByQueryMergeBufferTest.java | 31 +- .../GroupByQueryRunnerFailureTest.java | 147 ++++++- .../query/groupby/GroupByQueryRunnerTest.java | 12 +- .../druid/sql/calcite/util/CalciteTests.java | 5 +- 20 files changed, 1020 insertions(+), 115 deletions(-) create mode 100644 common/src/test/java/io/druid/collections/BlockingPoolTest.java create mode 100644 processing/src/main/java/io/druid/query/InsufficientResourcesException.java create mode 100644 processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 6780e2404b7..314b486d448 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -425,9 +425,7 @@ public class GroupByTypeInterfaceBenchmark factory = new GroupByQueryRunnerFactory( strategySelector, new GroupByQueryQueryToolChest( - configSupplier, strategySelector, - bufferPool, QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() ) ); diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 03d5b62b456..6725977c10f 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -468,9 +468,7 @@ public class GroupByBenchmark factory = new GroupByQueryRunnerFactory( strategySelector, new GroupByQueryQueryToolChest( - configSupplier, strategySelector, - bufferPool, QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() ) ); diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index 81cedc4e1da..9a883da35cf 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -22,34 +22,55 @@ package io.druid.collections; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; - -import io.druid.java.util.common.logger.Logger; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import io.druid.java.util.common.ISE; import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.ArrayDeque; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations. */ public class BlockingPool { - private static final Logger log = new Logger(BlockingPool.class); + private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; - private final BlockingQueue objects; + private final ArrayDeque objects; + private final ReentrantLock lock; + private final Condition notEnough; + private final int maxSize; public BlockingPool( Supplier generator, int limit ) { - this.objects = limit > 0 ? new ArrayBlockingQueue(limit) : null; + this.objects = new ArrayDeque<>(limit); + this.maxSize = limit; for (int i = 0; i < limit; i++) { objects.add(generator.get()); } + + this.lock = new ReentrantLock(); + this.notEnough = lock.newCondition(); + } + + public int maxSize() + { + return maxSize; + } + + @VisibleForTesting + public int getPoolSize() + { + return objects.size(); } /** @@ -58,31 +79,207 @@ public class BlockingPool * @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout. * * @return a resource, or null if the timeout was reached - * - * @throws InterruptedException if interrupted while waiting for a resource to become available */ - public ReferenceCountingResourceHolder take(final long timeout) throws InterruptedException + public ReferenceCountingResourceHolder take(final long timeout) { - Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take."); - final T theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take(); - return theObject == null ? null : new ReferenceCountingResourceHolder<>( - theObject, - new Closeable() - { - @Override - public void close() throws IOException + checkInitialized(); + final T theObject; + try { + if (timeout > -1) { + theObject = timeout > 0 ? poll(timeout) : poll(); + } else { + theObject = take(); + } + return theObject == null ? null : new ReferenceCountingResourceHolder<>( + theObject, + new Closeable() { - if (!objects.offer(theObject)) { - log.error("WTF?! Queue offer failed, uh oh..."); + @Override + public void close() throws IOException + { + offer(theObject); } } - } - ); + ); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } } - @VisibleForTesting - protected int getQueueSize() + private T poll() { - return objects.size(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return objects.isEmpty() ? null : objects.pop(); + } finally { + lock.unlock(); + } + } + + private T poll(long timeout) throws InterruptedException + { + long nanos = TIME_UNIT.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.isEmpty()) { + if (nanos <= 0) { + return null; + } + nanos = notEnough.awaitNanos(nanos); + } + return objects.pop(); + } finally { + lock.unlock(); + } + } + + private T take() throws InterruptedException + { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.isEmpty()) { + notEnough.await(); + } + return objects.pop(); + } finally { + lock.unlock(); + } + } + + /** + * Take a resource from the pool. + * + * @param elementNum number of resources to take + * @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout. + * + * @return a resource, or null if the timeout was reached + */ + public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeout) + { + checkInitialized(); + final List objects; + try { + if (timeout > -1) { + objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum); + } else { + objects = takeBatch(elementNum); + } + return objects == null ? null : new ReferenceCountingResourceHolder<>( + objects, + new Closeable() + { + @Override + public void close() throws IOException + { + offerBatch(objects); + } + } + ); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private List pollBatch(int elementNum) throws InterruptedException + { + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + if (objects.size() < elementNum) { + return null; + } else { + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } + } finally { + lock.unlock(); + } + } + + private List pollBatch(int elementNum, long timeout) throws InterruptedException + { + long nanos = TIME_UNIT.toNanos(timeout); + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.size() < elementNum) { + if (nanos <= 0) { + return null; + } + nanos = notEnough.awaitNanos(nanos); + } + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } finally { + lock.unlock(); + } + } + + private List takeBatch(int elementNum) throws InterruptedException + { + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.size() < elementNum) { + notEnough.await(); + } + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } finally { + lock.unlock(); + } + } + + private void checkInitialized() + { + Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take."); + } + + private void offer(T theObject) + { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (objects.size() < maxSize) { + objects.push(theObject); + notEnough.signal(); + } else { + throw new ISE("Cannot exceed pre-configured maximum size"); + } + } finally { + lock.unlock(); + } + } + + private void offerBatch(List offers) + { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (objects.size() + offers.size() <= maxSize) { + for (T offer : offers) { + objects.push(offer); + } + notEnough.signal(); + } else { + throw new ISE("Cannot exceed pre-configured maximum size"); + } + } finally { + lock.unlock(); + } } } diff --git a/common/src/main/java/io/druid/common/guava/CombiningSequence.java b/common/src/main/java/io/druid/common/guava/CombiningSequence.java index e3ccc40453a..6abe7ce0203 100644 --- a/common/src/main/java/io/druid/common/guava/CombiningSequence.java +++ b/common/src/main/java/io/druid/common/guava/CombiningSequence.java @@ -46,7 +46,7 @@ public class CombiningSequence implements Sequence private final Ordering ordering; private final BinaryFn mergeFn; - public CombiningSequence( + private CombiningSequence( Sequence baseSequence, Ordering ordering, BinaryFn mergeFn diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java new file mode 100644 index 00000000000..73a3d86c307 --- /dev/null +++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java @@ -0,0 +1,372 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +import com.google.common.base.Suppliers; +import com.google.common.collect.Lists; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class BlockingPoolTest +{ + private static final ExecutorService SERVICE = Executors.newFixedThreadPool(2); + + private static final BlockingPool POOL = new BlockingPool<>(Suppliers.ofInstance(1), 10); + private static final BlockingPool EMPTY_POOL = new BlockingPool<>(Suppliers.ofInstance(1), 0); + + @AfterClass + public static void teardown() + { + SERVICE.shutdown(); + } + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testTakeFromEmptyPool() + { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); + EMPTY_POOL.take(0); + } + + @Test + public void testDrainFromEmptyPool() + { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); + EMPTY_POOL.takeBatch(1, 0); + } + + @Test(timeout = 1000) + public void testTake() + { + final ReferenceCountingResourceHolder holder = POOL.take(100); + assertNotNull(holder); + assertEquals(9, POOL.getPoolSize()); + holder.close(); + assertEquals(10, POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testTakeTimeout() + { + final ReferenceCountingResourceHolder> batchHolder = POOL.takeBatch(10, 100L); + final ReferenceCountingResourceHolder holder = POOL.take(100); + assertNull(holder); + batchHolder.close(); + } + + @Test(timeout = 1000) + public void testTakeBatch() + { + final ReferenceCountingResourceHolder> holder = POOL.takeBatch(6, 100L); + assertNotNull(holder); + assertEquals(6, holder.get().size()); + assertEquals(4, POOL.getPoolSize()); + holder.close(); + assertEquals(10, POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException + { + ReferenceCountingResourceHolder> batchHolder = POOL.takeBatch(10, 10); + assertNotNull(batchHolder); + assertEquals(10, batchHolder.get().size()); + assertEquals(0, POOL.getPoolSize()); + + final Future>> future = SERVICE.submit( + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(8, 100); + } + } + ); + Thread.sleep(20); + batchHolder.close(); + + batchHolder = future.get(); + assertNotNull(batchHolder); + assertEquals(8, batchHolder.get().size()); + assertEquals(2, POOL.getPoolSize()); + + batchHolder.close(); + assertEquals(10, POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testTakeBatchTooManyObjects() + { + final ReferenceCountingResourceHolder> holder = POOL.takeBatch(100, 100L); + assertNull(holder); + } + + @Test(timeout = 1000) + public void testConcurrentTake() throws ExecutionException, InterruptedException + { + final int limit1 = POOL.maxSize() / 2; + final int limit2 = POOL.maxSize() - limit1 + 1; + + final Future>> f1 = SERVICE.submit( + new Callable>>() + { + @Override + public List> call() throws Exception + { + List> result = Lists.newArrayList(); + for (int i = 0; i < limit1; i++) { + result.add(POOL.take(10)); + } + return result; + } + } + ); + final Future>> f2 = SERVICE.submit( + new Callable>>() + { + @Override + public List> call() throws Exception + { + List> result = Lists.newArrayList(); + for (int i = 0; i < limit2; i++) { + result.add(POOL.take(10)); + } + return result; + } + } + ); + + final List> r1 = f1.get(); + final List> r2 = f2.get(); + + assertEquals(0, POOL.getPoolSize()); + assertTrue(r1.contains(null) || r2.contains(null)); + + int nonNullCount = 0; + for (ReferenceCountingResourceHolder holder : r1) { + if (holder != null) { + nonNullCount++; + } + } + + for (ReferenceCountingResourceHolder holder : r2) { + if (holder != null) { + nonNullCount++; + } + } + assertEquals(POOL.maxSize(), nonNullCount); + + final Future future1 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + for (ReferenceCountingResourceHolder holder : r1) { + if (holder != null) { + holder.close(); + } + } + } + }); + final Future future2 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + for (ReferenceCountingResourceHolder holder : r2) { + if (holder != null) { + holder.close(); + } + } + } + }); + + future1.get(); + future2.get(); + + assertEquals(POOL.maxSize(), POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException + { + final int batch1 = POOL.maxSize() / 2; + final Callable>> c1 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(batch1, 10); + } + }; + + final int batch2 = POOL.maxSize() - batch1 + 1; + final Callable>> c2 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(batch2, 10); + } + }; + + final Future>> f1 = SERVICE.submit(c1); + final Future>> f2 = SERVICE.submit(c2); + + final ReferenceCountingResourceHolder> r1 = f1.get(); + final ReferenceCountingResourceHolder> r2 = f2.get(); + + if (r1 != null) { + assertNull(r2); + assertEquals(POOL.maxSize() - batch1, POOL.getPoolSize()); + assertEquals(batch1, r1.get().size()); + r1.close(); + } else { + assertNotNull(r2); + assertEquals(POOL.maxSize() - batch2, POOL.getPoolSize()); + assertEquals(batch2, r2.get().size()); + r2.close(); + } + + assertEquals(POOL.maxSize(), POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testConcurrentBatchClose() throws ExecutionException, InterruptedException + { + final int batch1 = POOL.maxSize() / 2; + final Callable>> c1 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(batch1, 10); + } + }; + + final int batch2 = POOL.maxSize() - batch1; + final Callable>> c2 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(batch2, 10); + } + }; + + final Future>> f1 = SERVICE.submit(c1); + final Future>> f2 = SERVICE.submit(c2); + + final ReferenceCountingResourceHolder> r1 = f1.get(); + final ReferenceCountingResourceHolder> r2 = f2.get(); + + assertNotNull(r1); + assertNotNull(r2); + assertEquals(batch1, r1.get().size()); + assertEquals(batch2, r2.get().size()); + assertEquals(0, POOL.getPoolSize()); + + final Future future1 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + r1.close(); + } + }); + final Future future2 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + r2.close(); + } + }); + + future1.get(); + future2.get(); + + assertEquals(POOL.maxSize(), POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException + { + final ReferenceCountingResourceHolder> r1 = POOL.takeBatch(1, 10); + + final Callable>> c2 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(10, 100); + } + }; + + final Future>> f2 = SERVICE.submit(c2); + final Future f1 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + // ignore + } + r1.close(); + } + }); + + final ReferenceCountingResourceHolder> r2 = f2.get(); + f1.get(); + assertNotNull(r2); + assertEquals(10, r2.get().size()); + assertEquals(0, POOL.getPoolSize()); + + r2.close(); + assertEquals(POOL.maxSize(), POOL.getPoolSize()); + } +} diff --git a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java index 040bac1739c..b1f933e0df3 100644 --- a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java +++ b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java @@ -228,7 +228,7 @@ public class CombiningSequenceTest }; Sequence> seq = Sequences.limit( - new CombiningSequence<>( + CombiningSequence.create( Sequences.withBaggage(Sequences.simple(pairs), closeable), Ordering.natural().onResultOf(Pair.lhsFn()), new BinaryFn, Pair, Pair>() diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index fceaf2dbd87..5f5421c3e8a 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -156,12 +156,6 @@ indexing mechanism, and runs the outer query on these materialized results. "v2" inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both strategy perform the outer query on the broker in a single-threaded fashion. -Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy. -This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply -nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the -merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend -that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy. - #### Server configuration When using the "v1" strategy, the following runtime properties apply: diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index f2e4404b909..c662b99d852 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -141,12 +141,6 @@ exact distinct count using a nested groupBy. SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source) ``` -Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy. -This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply -nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the -merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend -that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy. - #### Semi-joins Semi-join subqueries involving `WHERE ... IN (SELECT ...)`, like the following, are executed with a special process. diff --git a/processing/src/main/java/io/druid/query/InsufficientResourcesException.java b/processing/src/main/java/io/druid/query/InsufficientResourcesException.java new file mode 100644 index 00000000000..f431428d1d6 --- /dev/null +++ b/processing/src/main/java/io/druid/query/InsufficientResourcesException.java @@ -0,0 +1,31 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +/** + * This exception is thrown when the requested operation cannot be completed due to a lack of available resources. + */ +public class InsufficientResourcesException extends RuntimeException +{ + public InsufficientResourcesException(String message) + { + super(message); + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 2058b829526..1255b1756e0 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Predicate; -import com.google.common.base.Supplier; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -32,14 +31,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; -import io.druid.guice.annotations.Global; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.MappedSequence; import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; @@ -57,11 +55,12 @@ import io.druid.query.cache.CacheKeyBuilder; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; +import io.druid.query.groupby.resource.GroupByQueryResource; +import io.druid.query.groupby.strategy.GroupByStrategy; import io.druid.query.groupby.strategy.GroupByStrategySelector; import org.joda.time.DateTime; import javax.annotation.Nullable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -84,22 +83,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest configSupplier; private final GroupByStrategySelector strategySelector; - private final StupidPool bufferPool; private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator; @Inject public GroupByQueryQueryToolChest( - Supplier configSupplier, GroupByStrategySelector strategySelector, - @Global StupidPool bufferPool, IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator ) { - this.configSupplier = configSupplier; this.strategySelector = strategySelector; - this.bufferPool = bufferPool; this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator; } @@ -116,7 +109,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults( + private Sequence initAndMergeGroupByResults( final GroupByQuery query, QueryRunner runner, Map context ) + { + final GroupByStrategy groupByStrategy = strategySelector.strategize(query); + final GroupByQueryResource resource = groupByStrategy.prepareResource(query, false); + + return Sequences.withBaggage( + mergeGroupByResults( + groupByStrategy, + query, + resource, + runner, + context + ), + resource + ); + } + + private Sequence mergeGroupByResults( + GroupByStrategy groupByStrategy, + final GroupByQuery query, + GroupByQueryResource resource, + QueryRunner runner, + Map context + ) { // If there's a subquery, merge subquery results and then apply the aggregator @@ -161,6 +177,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest subqueryResult = mergeGroupByResults( + groupByStrategy, subquery.withOverriddenContext( ImmutableMap.of( //setting sort to false avoids unnecessary sorting while merging results. we only need to sort @@ -169,6 +186,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest // This will potentially block if there are no merge buffers left in the pool. final long timeout = timeoutAt - System.currentTimeMillis(); if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new QueryInterruptedException(new TimeoutException()); + throw new TimeoutException(); } resources.add(mergeBufferHolder); } - catch (InterruptedException e) { + catch (Exception e) { throw new QueryInterruptedException(e); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 1736837d3e6..4e7148c87b7 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -23,10 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import io.druid.collections.BlockingPool; -import io.druid.collections.ReferenceCountingResourceHolder; +import io.druid.collections.ResourceHolder; import io.druid.common.guava.SettableSupplier; -import io.druid.common.utils.JodaUtils; import io.druid.data.input.Row; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Accumulator; @@ -35,8 +33,6 @@ import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.FilteredSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; -import io.druid.query.QueryInterruptedException; import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.filter.Filter; @@ -45,6 +41,7 @@ import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.segment.column.ValueType; import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.filter.Filters; @@ -58,7 +55,6 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeoutException; public class GroupByRowProcessor { @@ -67,7 +63,7 @@ public class GroupByRowProcessor final Sequence rows, final Map rowSignature, final GroupByQueryConfig config, - final BlockingPool mergeBufferPool, + final GroupByQueryResource resource, final ObjectMapper spillMapper, final String processingTmpDir ) @@ -86,8 +82,6 @@ public class GroupByRowProcessor String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeout = queryTimeout == null ? JodaUtils.MAX_INSTANT : queryTimeout.longValue(); final List queryIntervals = query.getIntervals(); final Filter filter = Filters.convertToCNFFromQueryContext( query, @@ -133,7 +127,9 @@ public class GroupByRowProcessor @Override public CloseableGrouperIterator make() { - final List closeOnFailure = Lists.newArrayList(); + // This contains all closeable objects which are closed when the returned iterator iterates all the elements, + // or an exceptions is thrown. The objects are closed in their reverse order. + final List closeOnExit = Lists.newArrayList(); try { final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( @@ -141,9 +137,7 @@ public class GroupByRowProcessor querySpecificConfig.getMaxOnDiskStorage() ); - closeOnFailure.add(temporaryStorage); - - final SettableSupplier> bufferHolderSupplier = new SettableSupplier<>(); + closeOnExit.add(temporaryStorage); Pair, Accumulator, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( query, @@ -155,19 +149,9 @@ public class GroupByRowProcessor @Override public ByteBuffer get() { - final ReferenceCountingResourceHolder mergeBufferHolder; - try { - if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new QueryInterruptedException(new TimeoutException()); - } - bufferHolderSupplier.set(mergeBufferHolder); - closeOnFailure.add(mergeBufferHolder); - - return mergeBufferHolder.get(); - } - catch (InterruptedException e) { - throw new QueryInterruptedException(e); - } + final ResourceHolder mergeBufferHolder = resource.getMergeBuffer(); + closeOnExit.add(mergeBufferHolder); + return mergeBufferHolder.get(); } }, -1, @@ -177,7 +161,7 @@ public class GroupByRowProcessor ); final Grouper grouper = pair.lhs; final Accumulator, Row> accumulator = pair.rhs; - closeOnFailure.add(grouper); + closeOnExit.add(grouper); final Grouper retVal = filteredSequence.accumulate( grouper, @@ -195,16 +179,16 @@ public class GroupByRowProcessor @Override public void close() throws IOException { - grouper.close(); - CloseQuietly.close(bufferHolderSupplier.get()); - CloseQuietly.close(temporaryStorage); + for (Closeable closeable : Lists.reverse(closeOnExit)) { + CloseQuietly.close(closeable); + } } } ); } catch (Throwable e) { // Exception caught while setting up the iterator; release resources. - for (Closeable closeable : Lists.reverse(closeOnFailure)) { + for (Closeable closeable : Lists.reverse(closeOnExit)) { CloseQuietly.close(closeable); } throw e; diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java new file mode 100644 index 00000000000..fa993af9c30 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java @@ -0,0 +1,91 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.resource; + +import com.metamx.common.logger.Logger; +import io.druid.collections.ResourceHolder; + +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; + +/** + * This class contains resources required for a groupBy query execution. + * Currently, it contains only merge buffers, but any additional resources can be added in the future. + */ +public class GroupByQueryResource implements Closeable +{ + private static final Logger log = new Logger(GroupByQueryResource.class); + + private final ResourceHolder> mergeBuffersHolder; + private final Deque mergeBuffers; + + public GroupByQueryResource() + { + this.mergeBuffersHolder = null; + this.mergeBuffers = new ArrayDeque<>(); + } + + public GroupByQueryResource(ResourceHolder> mergeBuffersHolder) + { + this.mergeBuffersHolder = mergeBuffersHolder; + this.mergeBuffers = new ArrayDeque<>(mergeBuffersHolder.get()); + } + + /** + * Get a merge buffer from the pre-acquired resources. + * + * @return a resource holder containing a merge buffer + * + * @throws IllegalStateException if this resource is initialized with empty merge buffers, or + * there isn't any available merge buffers + */ + public ResourceHolder getMergeBuffer() + { + final ByteBuffer buffer = mergeBuffers.pop(); + return new ResourceHolder() + { + @Override + public ByteBuffer get() + { + return buffer; + } + + @Override + public void close() + { + mergeBuffers.add(buffer); + } + }; + } + + @Override + public void close() + { + if (mergeBuffersHolder != null) { + if (mergeBuffers.size() != mergeBuffersHolder.get().size()) { + log.warn("%d resources are not returned yet", mergeBuffersHolder.get().size() - mergeBuffers.size()); + } + mergeBuffersHolder.close(); + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java index 4a84f665ffb..7513fd0286b 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java @@ -25,6 +25,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.segment.StorageAdapter; import java.util.Map; @@ -32,6 +33,13 @@ import java.util.concurrent.ExecutorService; public interface GroupByStrategy { + /** + * Initializes resources required for a broker to process the given query. + * + * @param query a groupBy query to be processed + * @return broker resource + */ + GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners); /** * Indicates this strategy is cacheable or not. @@ -52,6 +60,7 @@ public interface GroupByStrategy Sequence processSubqueryResult( GroupByQuery subquery, GroupByQuery query, + GroupByQueryResource resource, Sequence subqueryResult ); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index cc85d3a5a09..abd6fcc32e6 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -46,6 +46,7 @@ import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.GroupByQueryQueryToolChest; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.StorageAdapter; import io.druid.segment.incremental.IncrementalIndex; @@ -77,6 +78,12 @@ public class GroupByStrategyV1 implements GroupByStrategy this.bufferPool = bufferPool; } + @Override + public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners) + { + return new GroupByQueryResource(); + } + @Override public boolean isCacheable(boolean willMergeRunners) { @@ -130,7 +137,7 @@ public class GroupByStrategyV1 implements GroupByStrategy @Override public Sequence processSubqueryResult( - GroupByQuery subquery, GroupByQuery query, Sequence subqueryResult + GroupByQuery subquery, GroupByQuery query, GroupByQueryResource resource, Sequence subqueryResult ) { final Set aggs = Sets.newHashSet(); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 08ff15b29fc..c36bb6e57ca 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -29,7 +29,9 @@ import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import io.druid.collections.BlockingPool; +import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; +import io.druid.common.utils.JodaUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularities; @@ -40,10 +42,15 @@ import io.druid.guice.annotations.Smile; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; +import io.druid.query.DataSource; import io.druid.query.DruidProcessingConfig; +import io.druid.query.InsufficientResourcesException; import io.druid.query.Query; +import io.druid.query.QueryContextKeys; +import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.PostAggregator; import io.druid.query.groupby.GroupByQuery; @@ -53,10 +60,12 @@ import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import io.druid.query.groupby.epinephelinae.GroupByRowProcessor; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.segment.StorageAdapter; import org.joda.time.DateTime; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; public class GroupByStrategyV2 implements GroupByStrategy @@ -64,6 +73,9 @@ public class GroupByStrategyV2 implements GroupByStrategy public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; public static final String CTX_KEY_OUTERMOST = "groupByOutermost"; + // see countRequiredMergeBufferNum() for explanation + private static final int MAX_MERGE_BUFFER_NUM = 2; + private final DruidProcessingConfig processingConfig; private final Supplier configSupplier; private final StupidPool bufferPool; @@ -112,6 +124,52 @@ public class GroupByStrategyV2 implements GroupByStrategy } } + @Override + public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners) + { + if (!willMergeRunners) { + final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1); + + if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { + throw new ResourceLimitExceededException( + "Query needs " + requiredMergeBufferNum + " merge buffers, but only " + + mergeBufferPool.maxSize() + " merge buffers are configured" + ); + } else if (requiredMergeBufferNum == 0) { + return new GroupByQueryResource(); + } else { + final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); + final ResourceHolder> mergeBufferHolders = mergeBufferPool.takeBatch( + requiredMergeBufferNum, timeout.longValue() + ); + if (mergeBufferHolders == null) { + throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); + } else { + return new GroupByQueryResource(mergeBufferHolders); + } + } + } else { + return new GroupByQueryResource(); + } + } + + private static int countRequiredMergeBufferNum(Query query, int foundNum) + { + // Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one. + // For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1. + // If the broker processes an outer groupBy which reads input from an inner groupBy, + // it requires two merge buffers for inner and outer groupBys to keep the intermediate result of inner groupBy + // until the outer groupBy processing completes. + // This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2. + + final DataSource dataSource = query.getDataSource(); + if (foundNum == MAX_MERGE_BUFFER_NUM + 1 || !(dataSource instanceof QueryDataSource)) { + return foundNum - 1; + } else { + return countRequiredMergeBufferNum(((QueryDataSource) dataSource).getQuery(), foundNum + 1); + } + } + @Override public boolean isCacheable(boolean willMergeRunners) { @@ -208,7 +266,10 @@ public class GroupByStrategyV2 implements GroupByStrategy @Override public Sequence processSubqueryResult( - GroupByQuery subquery, GroupByQuery query, Sequence subqueryResult + GroupByQuery subquery, + GroupByQuery query, + GroupByQueryResource resource, + Sequence subqueryResult ) { final Sequence results = GroupByRowProcessor.process( @@ -216,7 +277,7 @@ public class GroupByStrategyV2 implements GroupByStrategy subqueryResult, GroupByQueryHelper.rowSignatureFor(subquery), configSupplier.get(), - mergeBufferPool, + resource, spillMapper, processingConfig.getTmpDir() ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 5d04ab840fa..2cfe1034930 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -71,12 +71,23 @@ public class GroupByQueryMergeBufferTest } @Override - public ReferenceCountingResourceHolder take(final long timeout) throws InterruptedException + public ReferenceCountingResourceHolder take(final long timeout) { final ReferenceCountingResourceHolder holder = super.take(timeout); - final int queueSize = getQueueSize(); - if (minRemainBufferNum > queueSize) { - minRemainBufferNum = queueSize; + final int poolSize = getPoolSize(); + if (minRemainBufferNum > poolSize) { + minRemainBufferNum = poolSize; + } + return holder; + } + + @Override + public ReferenceCountingResourceHolder> takeBatch(final int maxElements, final long timeout) + { + final ReferenceCountingResourceHolder> holder = super.takeBatch(maxElements, timeout); + final int poolSize = getPoolSize(); + if (minRemainBufferNum > poolSize) { + minRemainBufferNum = poolSize; } return holder; } @@ -155,9 +166,7 @@ public class GroupByQueryMergeBufferTest ) ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( - configSupplier, strategySelector, - bufferPool, QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ); return new GroupByQueryRunnerFactory( @@ -227,6 +236,7 @@ public class GroupByQueryMergeBufferTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); assertEquals(2, mergeBufferPool.getMinRemainBufferNum()); + assertEquals(3, mergeBufferPool.getPoolSize()); } @Test @@ -254,6 +264,7 @@ public class GroupByQueryMergeBufferTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); assertEquals(1, mergeBufferPool.getMinRemainBufferNum()); + assertEquals(3, mergeBufferPool.getPoolSize()); } @Test @@ -291,7 +302,9 @@ public class GroupByQueryMergeBufferTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - assertEquals(1, mergeBufferPool.getMinRemainBufferNum()); + // This should be 0 because the broker needs 2 buffers and the queryable node needs one. + assertEquals(0, mergeBufferPool.getMinRemainBufferNum()); + assertEquals(3, mergeBufferPool.getPoolSize()); } @Test @@ -341,6 +354,8 @@ public class GroupByQueryMergeBufferTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - assertEquals(1, mergeBufferPool.getMinRemainBufferNum()); + // This should be 0 because the broker needs 2 buffers and the queryable node needs one. + assertEquals(0, mergeBufferPool.getMinRemainBufferNum()); + assertEquals(3, mergeBufferPool.getPoolSize()); } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 2489318d3a8..fee40bf28c6 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -19,22 +19,33 @@ package io.druid.query.groupby; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; +import io.druid.collections.BlockingPool; +import io.druid.collections.ReferenceCountingResourceHolder; +import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularities; import io.druid.query.DruidProcessingConfig; +import io.druid.query.InsufficientResourcesException; import io.druid.query.QueryContextKeys; import io.druid.query.QueryDataSource; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.strategy.GroupByStrategySelector; +import io.druid.query.groupby.strategy.GroupByStrategyV1; +import io.druid.query.groupby.strategy.GroupByStrategyV2; import org.bouncycastle.util.Integers; import org.hamcrest.CoreMatchers; import org.junit.Rule; @@ -45,6 +56,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.concurrent.TimeoutException; @@ -52,7 +64,7 @@ import java.util.concurrent.TimeoutException; @RunWith(Parameterized.class) public class GroupByQueryRunnerFailureTest { - public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() + private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() { @Override @@ -83,7 +95,63 @@ public class GroupByQueryRunnerFailureTest @Rule public ExpectedException expectedException = ExpectedException.none(); - private static final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory( + private static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config + ) + { + final Supplier configSupplier = Suppliers.ofInstance(config); + final StupidPool bufferPool = new StupidPool<>( + "GroupByQueryEngine-bufferPool", + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()); + } + } + ); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPool), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + bufferPool + ), + new GroupByStrategyV2( + DEFAULT_PROCESSING_CONFIG, + configSupplier, + bufferPool, + mergeBufferPool, + mapper, + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( + strategySelector, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + return new GroupByQueryRunnerFactory( + strategySelector, + toolChest + ); + } + + private final static BlockingPool mergeBufferPool = new BlockingPool<>( + new Supplier() + { + @Override + public ByteBuffer get () + { + return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()); + } + }, + DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers() + ); + + private static final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory( GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig() { @@ -91,8 +159,7 @@ public class GroupByQueryRunnerFailureTest { return "v2"; } - }, - DEFAULT_PROCESSING_CONFIG + } ); private QueryRunner runner; @@ -109,11 +176,11 @@ public class GroupByQueryRunnerFailureTest public GroupByQueryRunnerFailureTest(QueryRunner runner) { - this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.>of(runner)); + this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner)); } @Test(timeout = 10000) - public void testLackOfMergeBuffers() throws IOException + public void testNotEnoughMergeBuffersOnQueryable() throws IOException { expectedException.expect(QueryInterruptedException.class); expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); @@ -139,4 +206,72 @@ public class GroupByQueryRunnerFailureTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } + + @Test(timeout = 10000) + public void testResourceLimitExceededOnBroker() + { + expectedException.expect(ResourceLimitExceededException.class); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setGranularity(QueryGranularities.ALL) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", null) + )) + .setAggregatorSpecs(Lists.newArrayList(QueryRunnerTestHelper.rowsCount)) + .build() + ) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setGranularity(QueryGranularities.ALL) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs(Lists.newArrayList(QueryRunnerTestHelper.rowsCount)) + .build() + ) + ) + .setGranularity(QueryGranularities.ALL) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) + .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .build(); + + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } + + @Test(timeout = 10000, expected = InsufficientResourcesException.class) + public void testInsufficientResourcesOnBroker() throws IOException + { + final ReferenceCountingResourceHolder> holder = mergeBufferPool.takeBatch(1, 10); + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setGranularity(QueryGranularities.ALL) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs(Lists.newArrayList(QueryRunnerTestHelper.rowsCount)) + .build() + ) + ) + .setGranularity(QueryGranularities.ALL) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) + .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .build(); + + try { + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } finally { + holder.close(); + } + } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 22b231693ef..ca7c1f42c3a 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -142,6 +142,8 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.junit.Assert.assertEquals; + @RunWith(Parameterized.class) public class GroupByQueryRunnerTest { @@ -347,9 +349,7 @@ public class GroupByQueryRunnerTest ) ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( - configSupplier, strategySelector, - bufferPool, QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ); return new GroupByQueryRunnerFactory( @@ -1793,7 +1793,7 @@ public class GroupByQueryRunnerTest List expectedResults = ImmutableList.of(); Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - Assert.assertEquals(expectedResults, results); + assertEquals(expectedResults, results); } @Test @@ -3140,7 +3140,7 @@ public class GroupByQueryRunnerTest final Object next1 = resultsIter.next(); Object expectedNext1 = expectedResultsIter.next(); - Assert.assertEquals("order-limit", expectedNext1, next1); + assertEquals("order-limit", expectedNext1, next1); final Object next2 = resultsIter.next(); Object expectedNext2 = expectedResultsIter.next(); @@ -6908,7 +6908,7 @@ public class GroupByQueryRunnerTest .setGranularity(QueryRunnerTestHelper.dayGran) .build(); - Assert.assertEquals( + assertEquals( Functions.>identity(), query.getLimitSpec().build( query.getDimensions(), @@ -7172,7 +7172,7 @@ public class GroupByQueryRunnerTest .setGranularity(QueryRunnerTestHelper.dayGran) .build(); - Assert.assertEquals( + assertEquals( Functions.>identity(), query.getLimitSpec().build( query.getDimensions(), diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index 4d416e762b0..a89cdef5d21 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -171,8 +171,9 @@ public class CalciteTests @Override public int getNumMergeBuffers() { - // Need 2 buffers for CalciteQueryTest.testDoubleNestedGroupby. - return 2; + // Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby. + // Two buffers for the broker and one for the queryable + return 3; } } )