Free ByteBuffers in tests and fix some bugs. (#12521)

* Ensure ByteBuffers allocated in tests get freed.

Many tests had problems where a direct ByteBuffer would be allocated
and then not freed. This is bad because it causes flaky tests.

To fix this:

1) Add ByteBufferUtils.allocateDirect(size), which returns a ResourceHolder.
   This makes it easy to free the direct buffer. Currently, it's only used
   in tests, because production code seems OK.

2) Update all usages of ByteBuffer.allocateDirect (off-heap) in tests either
   to ByteBuffer.allocate (on-heap, which are garbaged collected), or to
   ByteBufferUtils.allocateDirect (wherever it seemed like there was a good
   reason for the buffer to be off-heap). Make sure to close all direct
   holders when done.

* Changes based on CI results.

* A different approach.

* Roll back BitmapOperationTest stuff.

* Try additional surefire memory.

* Revert "Roll back BitmapOperationTest stuff."

This reverts commit 49f846d9e3d0904df6c685d403766c07531b15e5.

* Add TestBufferPool.

* Revert Xmx change in tests.

* Better behaved NestedQueryPushDownTest. Exit tests on OOME.

* Fix TestBufferPool.

* Remove T1C from ARM tests.

* Somewhat safer.

* Fix tests.

* Fix style stuff.

* Additional debugging.

* Reset null / expr configs better.

* ExpressionLambdaAggregatorFactory thread-safety.

* Alter forkNode to try to get better info when a JVM crashes.

* Fix buffer retention in ExpressionLambdaAggregatorFactory.

* Remove unused import.
This commit is contained in:
Gian Merlino 2022-05-19 07:42:29 -07:00 committed by GitHub
parent c877d8a981
commit 4631cff2a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1233 additions and 694 deletions

View File

@ -391,7 +391,7 @@ jobs:
jdk: openjdk11
env:
- MAVEN_PROJECTS='core,indexing-hadoop,indexing-service,processing'
script: ${MVN} test -B -pl ${MAVEN_PROJECTS} -Ddruid.console.skip=true -DargLine=-Xmx3000m -T1C
script: ${MVN} test -B -pl ${MAVEN_PROJECTS} -Ddruid.console.skip=true -DargLine=-Xmx3000m
- name: "Build and test on ARM64 CPU architecture (2)"
stage: Tests - phase 2
@ -402,7 +402,7 @@ jobs:
jdk: openjdk11
env:
- MAVEN_PROJECTS='core,sql,server,services'
script: ${MVN} test -B -pl ${MAVEN_PROJECTS} -Ddruid.console.skip=true -DargLine=-Xmx3000m -T1C
script: ${MVN} test -B -pl ${MAVEN_PROJECTS} -Ddruid.console.skip=true -DargLine=-Xmx3000m
- name: "web console end-to-end test"
before_install: *setup_generate_license

View File

@ -85,39 +85,6 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
);
}
@Nullable
private T pollObject()
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
return objects.isEmpty() ? null : objects.pop();
}
finally {
lock.unlock();
}
}
@Nullable
private T pollObject(long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
return objects.pop();
}
finally {
lock.unlock();
}
}
@Override
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum, final long timeoutMs)
{

View File

@ -19,6 +19,8 @@
package org.apache.druid.java.util.common;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.JvmUtils;
import java.lang.invoke.MethodHandle;
@ -28,11 +30,15 @@ import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
*/
public class ByteBufferUtils
{
private static final Logger log = new Logger(ByteBufferUtils.class);
// the following MethodHandle lookup code is adapted from Apache Kafka
// https://github.com/apache/kafka/blob/e554dc518eaaa0747899e708160275f95c4e525f/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
@ -138,6 +144,52 @@ public class ByteBufferUtils
return unmapper.bindTo(UnsafeUtils.theUnsafe());
}
/**
* Same as {@link ByteBuffer#allocateDirect(int)}, but returns a closeable {@link ResourceHolder} that
* frees the buffer upon close.
*
* Direct (off-heap) buffers are an alternative to on-heap buffers that allow memory to be managed
* outside the purview of the garbage collector. It's most useful when allocating big chunks of memory,
* like processing buffers.
*
* Holders cannot be closed more than once. Attempting to close a holder twice will earn you an
* {@link IllegalStateException}.
*/
public static ResourceHolder<ByteBuffer> allocateDirect(final int size)
{
class DirectByteBufferHolder implements ResourceHolder<ByteBuffer>
{
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile ByteBuffer buf = ByteBuffer.allocateDirect(size);
@Override
public ByteBuffer get()
{
final ByteBuffer theBuf = buf;
if (theBuf == null) {
throw new ISE("Closed");
} else {
return theBuf;
}
}
@Override
public void close()
{
if (closed.compareAndSet(false, true)) {
final ByteBuffer theBuf = buf;
buf = null;
free(theBuf);
} else {
throw new ISE("Already closed");
}
}
}
return new DirectByteBufferHolder();
}
/**
* Releases memory held by the given direct ByteBuffer
*

View File

@ -45,9 +45,30 @@ public abstract class ExprEval<T>
/**
* Deserialize an expression stored in a bytebuffer, e.g. for an agg.
*
* This should be refactored to be consolidated with some of the standard type handling of aggregators probably
* This method is not thread-safe with respect to the provided {@link ByteBuffer}, because the position of the
* buffer may be changed transiently during execution of this method. However, it will be restored to its original
* position prior to the method completing. Therefore, if the provided buffer is being used by a single thread, then
* this method does not change the position of the buffer.
*
* The {@code canRetainBufferReference} parameter determines
*
* @param buffer source buffer
* @param offset position to start reading from
* @param maxSize maximum number of bytes from "offset" that may be required. This is used as advice,
* but is not strictly enforced in all cases. It is possible that type strategies may
* attempt reads past this limit.
* @param type data type to read
* @param canRetainBufferReference whether the returned {@link ExprEval} may retain a reference to the provided
* {@link ByteBuffer}. Certain types are deserialized more efficiently if allowed
* to retain references to the provided buffer.
*/
public static ExprEval deserialize(ByteBuffer buffer, int offset, ExpressionType type)
public static ExprEval deserialize(
final ByteBuffer buffer,
final int offset,
final int maxSize,
final ExpressionType type,
final boolean canRetainBufferReference
)
{
switch (type.getType()) {
case LONG:
@ -61,7 +82,19 @@ public abstract class ExprEval<T>
}
return of(TypeStrategies.readNotNullNullableDouble(buffer, offset));
default:
return ofType(type, type.getNullableStrategy().read(buffer, offset));
final NullableTypeStrategy<Object> strategy = type.getNullableStrategy();
if (!canRetainBufferReference && strategy.readRetainsBufferReference()) {
final ByteBuffer dataCopyBuffer = ByteBuffer.allocate(maxSize);
final ByteBuffer mutationBuffer = buffer.duplicate();
mutationBuffer.limit(offset + maxSize);
mutationBuffer.position(offset);
dataCopyBuffer.put(mutationBuffer);
dataCopyBuffer.rewind();
return ofType(type, strategy.read(dataCopyBuffer, 0));
} else {
return ofType(type, strategy.read(buffer, offset));
}
}
}

View File

@ -102,6 +102,15 @@ public final class NullableTypeStrategy<T> implements Comparator<T>
}
}
/**
* Whether the {@link #read} methods return an object that may retain a reference to the provided {@link ByteBuffer}.
* If a reference is sometimes retained, this method returns true. It returns false if, and only if, a reference
* is *never* retained.
*/
public boolean readRetainsBufferReference()
{
return delegate.readRetainsBufferReference();
}
public int write(ByteBuffer buffer, int offset, @Nullable T value, int maxSizeBytes)
{

View File

@ -250,6 +250,12 @@ public class TypeStrategies
return buffer.getLong();
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
@Override
public int write(ByteBuffer buffer, Long value, int maxSizeBytes)
{
@ -291,6 +297,12 @@ public class TypeStrategies
return buffer.getFloat();
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
@Override
public int write(ByteBuffer buffer, Float value, int maxSizeBytes)
{
@ -332,6 +344,12 @@ public class TypeStrategies
return buffer.getDouble();
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
@Override
public int write(ByteBuffer buffer, Double value, int maxSizeBytes)
{
@ -379,6 +397,12 @@ public class TypeStrategies
return StringUtils.fromUtf8(blob);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
@Override
public int write(ByteBuffer buffer, String value, int maxSizeBytes)
{
@ -447,6 +471,12 @@ public class TypeStrategies
return array;
}
@Override
public boolean readRetainsBufferReference()
{
return elementStrategy.readRetainsBufferReference();
}
@Override
public int write(ByteBuffer buffer, Object[] value, int maxSizeBytes)
{

View File

@ -73,13 +73,18 @@ public interface TypeStrategy<T> extends Comparator<T>
* Read a non-null value from the {@link ByteBuffer} at the current {@link ByteBuffer#position()}. This will move
* the underlying position by the size of the value read.
*
* The contract of this method is that any value returned from this method MUST be completely detached from the
* underlying {@link ByteBuffer}, since it might outlive the memory location being allocated to hold the object.
* In other words, if an object is memory mapped, it must be copied on heap, or relocated to another memory location
* that is owned by the caller with {@link #write}.
* The value returned from this method may retain a reference to the provided {@link ByteBuffer}. If it does, then
* {@link #readRetainsBufferReference()} returns true.
*/
T read(ByteBuffer buffer);
/**
* Whether the {@link #read} methods return an object that may retain a reference to the provided {@link ByteBuffer}.
* If a reference is sometimes retained, this method returns true. It returns false if, and only if, a reference
* is *never* retained.
*/
boolean readRetainsBufferReference();
/**
* Write a non-null value to the {@link ByteBuffer} at position {@link ByteBuffer#position()}. This will move the
* underlying position by the size of the value written.

View File

@ -94,11 +94,15 @@ public class NullHandlingTest
@Test
public void test_ignoreNullsStrings()
{
NullHandling.initializeForTestsWithValues(false, true);
Assert.assertFalse(NullHandling.ignoreNullsForStringCardinality());
NullHandling.initializeForTestsWithValues(true, false);
Assert.assertFalse(NullHandling.ignoreNullsForStringCardinality());
try {
NullHandling.initializeForTestsWithValues(false, true);
Assert.assertFalse(NullHandling.ignoreNullsForStringCardinality());
NullHandling.initializeForTestsWithValues(true, false);
Assert.assertFalse(NullHandling.ignoreNullsForStringCardinality());
}
finally {
NullHandling.initializeForTests();
}
}
}

View File

@ -20,7 +20,8 @@
package org.apache.druid.java.util.common;
import com.google.common.io.Files;
import junit.framework.Assert;
import org.apache.druid.collections.ResourceHolder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -38,6 +39,18 @@ public class ByteBufferUtilsTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testAllocateDirect()
{
final int sz = 10;
try (final ResourceHolder<ByteBuffer> holder = ByteBufferUtils.allocateDirect(sz)) {
final ByteBuffer buf = holder.get();
Assert.assertTrue(buf.isDirect());
Assert.assertEquals(sz, buf.remaining());
}
}
@Test
public void testUnmapDoesntCrashJVM() throws Exception
{

View File

@ -19,6 +19,7 @@
package org.apache.druid.java.util.common;
import org.apache.druid.collections.ResourceHolder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -125,12 +126,14 @@ public class StringUtilsTest
@Test
public void fromUtf8ByteBufferDirect()
{
ByteBuffer bytes = ByteBuffer.allocateDirect(4);
bytes.put(new byte[]{'a', 'b', 'c', 'd'});
bytes.rewind();
Assert.assertEquals("abcd", StringUtils.fromUtf8(bytes, 4));
bytes.rewind();
Assert.assertEquals("abcd", StringUtils.fromUtf8(bytes));
try (final ResourceHolder<ByteBuffer> bufferHolder = ByteBufferUtils.allocateDirect(4)) {
final ByteBuffer bytes = bufferHolder.get();
bytes.put(new byte[]{'a', 'b', 'c', 'd'});
bytes.rewind();
Assert.assertEquals("abcd", StringUtils.fromUtf8(bytes, 4));
bytes.rewind();
Assert.assertEquals("abcd", StringUtils.fromUtf8(bytes));
}
}
@SuppressWarnings("MalformedFormatString")

View File

@ -368,13 +368,27 @@ public class ExprEvalTest extends InitializedNullHandlingTest
ExprEval.serialize(buffer, position, expected.type(), expected, maxSizeBytes);
if (expected.type().isArray()) {
Assert.assertArrayEquals(
"deserialized value with buffer references allowed",
expected.asArray(),
ExprEval.deserialize(buffer, position, expected.type()).asArray()
ExprEval.deserialize(buffer, position, MAX_SIZE_BYTES, expected.type(), true).asArray()
);
Assert.assertArrayEquals(
"deserialized value with buffer references not allowed",
expected.asArray(),
ExprEval.deserialize(buffer, position, MAX_SIZE_BYTES, expected.type(), false).asArray()
);
} else {
Assert.assertEquals(
"deserialized value with buffer references allowed",
expected.value(),
ExprEval.deserialize(buffer, position, expected.type()).value()
ExprEval.deserialize(buffer, position, MAX_SIZE_BYTES, expected.type(), true).value()
);
Assert.assertEquals(
"deserialized value with buffer references not allowed",
expected.value(),
ExprEval.deserialize(buffer, position, MAX_SIZE_BYTES, expected.type(), false).value()
);
}
}

View File

@ -309,24 +309,29 @@ public class ParserTest extends InitializedNullHandlingTest
public void testLiteralExplicitTypedArrays()
{
ExpressionProcessing.initializeForTests(true);
validateConstantExpression("ARRAY<DOUBLE>[1.0, 2.0, null, 3.0]", new Object[]{1.0, 2.0, null, 3.0});
validateConstantExpression("ARRAY<LONG>[1, 2, null, 3]", new Object[]{1L, 2L, null, 3L});
validateConstantExpression("ARRAY<STRING>['1', '2', null, '3.0']", new Object[]{"1", "2", null, "3.0"});
// mixed type tests
validateConstantExpression("ARRAY<DOUBLE>[3, null, 4, 2.345]", new Object[]{3.0, null, 4.0, 2.345});
validateConstantExpression("ARRAY<LONG>[1.0, null, 2000.0]", new Object[]{1L, null, 2000L});
try {
validateConstantExpression("ARRAY<DOUBLE>[1.0, 2.0, null, 3.0]", new Object[]{1.0, 2.0, null, 3.0});
validateConstantExpression("ARRAY<LONG>[1, 2, null, 3]", new Object[]{1L, 2L, null, 3L});
validateConstantExpression("ARRAY<STRING>['1', '2', null, '3.0']", new Object[]{"1", "2", null, "3.0"});
// explicit typed string arrays should accept any literal and convert
validateConstantExpression("ARRAY<STRING>['1', null, 2000, 1.1]", new Object[]{"1", null, "2000", "1.1"});
validateConstantExpression("ARRAY<LONG>['1', null, 2000, 1.1]", new Object[]{1L, null, 2000L, 1L});
validateConstantExpression("ARRAY<DOUBLE>['1', null, 2000, 1.1]", new Object[]{1.0, null, 2000.0, 1.1});
// mixed type tests
validateConstantExpression("ARRAY<DOUBLE>[3, null, 4, 2.345]", new Object[]{3.0, null, 4.0, 2.345});
validateConstantExpression("ARRAY<LONG>[1.0, null, 2000.0]", new Object[]{1L, null, 2000L});
// the gramar isn't cool enough yet to parse populated nested-arrays or complex arrays..., but empty ones can
// be defined...
validateConstantExpression("ARRAY<COMPLEX<nullableLongPair>>[]", new Object[]{});
validateConstantExpression("ARRAY<ARRAY<LONG>>[]", new Object[]{});
ExpressionProcessing.initializeForTests(null);
// explicit typed string arrays should accept any literal and convert
validateConstantExpression("ARRAY<STRING>['1', null, 2000, 1.1]", new Object[]{"1", null, "2000", "1.1"});
validateConstantExpression("ARRAY<LONG>['1', null, 2000, 1.1]", new Object[]{1L, null, 2000L, 1L});
validateConstantExpression("ARRAY<DOUBLE>['1', null, 2000, 1.1]", new Object[]{1.0, null, 2000.0, 1.1});
// the gramar isn't cool enough yet to parse populated nested-arrays or complex arrays..., but empty ones can
// be defined...
validateConstantExpression("ARRAY<COMPLEX<nullableLongPair>>[]", new Object[]{});
validateConstantExpression("ARRAY<ARRAY<LONG>>[]", new Object[]{});
}
finally {
ExpressionProcessing.initializeForTests(null);
}
}
@Test

View File

@ -90,6 +90,12 @@ public class TypeStrategiesTest
return null;
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
@Override
public int write(ByteBuffer buffer, String value, int maxSizeBytes)
{
@ -658,6 +664,12 @@ public class TypeStrategiesTest
return new NullableLongPair(lhs, rhs);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
@Override
public int write(ByteBuffer buffer, NullableLongPair value, int maxSizeBytes)
{

View File

@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunnerTestHelper;
@ -35,6 +34,7 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.segment.IncrementalIndexSegment;
@ -63,11 +63,11 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest
{
final GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair = GroupByQueryRunnerTest.makeQueryRunnerFactory(
config
this.resourceCloser = Closer.create();
this.factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(
config,
this.resourceCloser.register(TestGroupByBuffers.createDefault())
);
factory = factoryCloserPair.lhs;
resourceCloser = factoryCloserPair.rhs;
}
@After

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunner;
@ -33,12 +32,15 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -54,13 +56,31 @@ import java.util.List;
public class ApproximateHistogramGroupByQueryTest extends InitializedNullHandlingTest
{
private static final Closer RESOURCE_CLOSER = Closer.create();
private static TestGroupByBuffers BUFFER_POOLS = null;
private final QueryRunner<Row> runner;
private final GroupByQueryRunnerFactory factory;
@BeforeClass
public static void setUpClass()
{
if (BUFFER_POOLS == null) {
BUFFER_POOLS = TestGroupByBuffers.createDefault();
}
}
@AfterClass
public static void tearDownClass()
{
BUFFER_POOLS.close();
BUFFER_POOLS = null;
}
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
setUpClass();
final GroupByQueryConfig v1Config = new GroupByQueryConfig()
{
@Override
@ -121,11 +141,7 @@ public class ApproximateHistogramGroupByQueryTest extends InitializedNullHandlin
);
for (GroupByQueryConfig config : configs) {
final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
config
);
final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
RESOURCE_CLOSER.register(factoryAndCloser.rhs);
final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config, BUFFER_POOLS);
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
final String testName = StringUtils.format(
"config=%s, runner=%s",

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunner;
@ -33,12 +32,15 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -54,13 +56,31 @@ import java.util.List;
public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandlingTest
{
private static final Closer RESOURCE_CLOSER = Closer.create();
private static TestGroupByBuffers BUFFER_POOLS = null;
private final QueryRunner<Row> runner;
private final GroupByQueryRunnerFactory factory;
@BeforeClass
public static void setUpClass()
{
if (BUFFER_POOLS == null) {
BUFFER_POOLS = TestGroupByBuffers.createDefault();
}
}
@AfterClass
public static void tearDownClass()
{
BUFFER_POOLS.close();
BUFFER_POOLS = null;
}
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
setUpClass();
final GroupByQueryConfig v1Config = new GroupByQueryConfig()
{
@Override
@ -121,11 +141,7 @@ public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandli
);
for (GroupByQueryConfig config : configs) {
final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
config
);
final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
RESOURCE_CLOSER.register(factoryAndCloser.rhs);
final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config, BUFFER_POOLS);
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
final String testName = StringUtils.format(
"config=%s, runner=%s",

View File

@ -624,7 +624,7 @@ public class HyperLogLogCollectorTest
int valsToCheckIndex = 0;
HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(
ByteBuffer.allocateDirect(
ByteBuffer.allocate(
HyperLogLogCollector.getLatestNumBytesForDenseStorage()
)
);

View File

@ -1502,6 +1502,8 @@
@{jacocoArgLine}
-Xmx1500m
-XX:MaxDirectMemorySize=512m
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-Duser.language=en
-Duser.GroupByQueryRunnerTest.javacountry=US
-Dfile.encoding=UTF-8
@ -1514,6 +1516,7 @@
<trimStackTrace>false</trimStackTrace>
<!-- our tests are very verbose, let's keep the volume down -->
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<forkNode implementation="org.apache.maven.plugin.surefire.extensions.SurefireForkNodeFactory"/>
</configuration>
</plugin>
<plugin>
@ -1734,6 +1737,8 @@
<argLine>
${jdk.surefire.argLine}
-Xmx768m -Duser.language=en -Duser.country=US -Dfile.encoding=UTF-8
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-Duser.timezone=UTC -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-Daws.region=us-east-1 <!-- required for s3-related unit tests -->
<!--@TODO After fixing https://github.com/apache/druid/issues/4964 remove this parameter-->

View File

@ -288,6 +288,8 @@
${jdk.surefire.argLine}
-Xmx512m
-XX:MaxDirectMemorySize=2500m
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-Duser.language=en
-Duser.GroupByQueryRunnerTest.javacountry=US
-Dfile.encoding=UTF-8
@ -315,6 +317,8 @@
<argLine>
${jdk.surefire.argLine}
-server -Xms3G -Xmx3G -Djub.consumers=CONSOLE,H2 -Djub.db.file=benchmarks/benchmarks
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
</argLine>
<groups>org.apache.druid.collections.test.annotation.Benchmark</groups>
<excludedGroups>org.apache.druid.collections.test.annotation.Dummy</excludedGroups>

View File

@ -93,9 +93,9 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory
private final Supplier<Expr> finalizeExpression;
private final HumanReadableBytes maxSizeBytes;
private final Supplier<SettableObjectBinding> compareBindings;
private final Supplier<SettableObjectBinding> combineBindings;
private final Supplier<SettableObjectBinding> finalizeBindings;
private final ThreadLocal<SettableObjectBinding> compareBindings;
private final ThreadLocal<SettableObjectBinding> combineBindings;
private final ThreadLocal<SettableObjectBinding> finalizeBindings;
private final Supplier<Expr.InputBindingInspector> finalizeInspector;
@JsonCreator
@ -166,7 +166,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory
ImmutableMap.of(FINALIZE_IDENTIFIER, this.initialCombineValue.get().type())
)
);
this.compareBindings = Suppliers.memoize(
this.compareBindings = ThreadLocal.withInitial(
() -> new SettableObjectBinding(2).withInspector(
InputBindings.inspectorFromTypeMap(
ImmutableMap.of(
@ -176,7 +176,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory
)
)
);
this.combineBindings = Suppliers.memoize(
this.combineBindings = ThreadLocal.withInitial(
() -> new SettableObjectBinding(2).withInspector(
InputBindings.inspectorFromTypeMap(
ImmutableMap.of(
@ -186,7 +186,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory
)
)
);
this.finalizeBindings = Suppliers.memoize(
this.finalizeBindings = ThreadLocal.withInitial(
() -> new SettableObjectBinding(1).withInspector(finalizeInspector.get())
);
this.finalizeExpression = Parser.lazyParse(finalizeExpressionString, macroTable);
@ -302,7 +302,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory
FactorizePlan thePlan = new FactorizePlan(metricFactory);
return new ExpressionLambdaAggregator(
thePlan,
maxSizeBytes.getBytesInInt()
getMaxIntermediateSize()
);
}
@ -312,7 +312,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory
FactorizePlan thePlan = new FactorizePlan(metricFactory);
return new ExpressionLambdaBufferAggregator(
thePlan,
maxSizeBytes.getBytesInInt()
getMaxIntermediateSize()
);
}

View File

@ -75,7 +75,7 @@ public class ExpressionLambdaBufferAggregator implements BufferAggregator
}
}
}
ExprEval<?> acc = ExprEval.deserialize(buf, position, outputType);
ExprEval<?> acc = ExprEval.deserialize(buf, position, maxSizeBytes, outputType, true);
bindings.setAccumulator(acc);
ExprEval<?> newAcc = lambda.eval(bindings);
ExprEval.serialize(buf, position, outputType, newAcc, maxSizeBytes);
@ -90,25 +90,25 @@ public class ExpressionLambdaBufferAggregator implements BufferAggregator
if (isNullUnlessAggregated && (buf.get(position) & NOT_AGGREGATED_BIT) != 0) {
return null;
}
return ExprEval.deserialize(buf, position, outputType).value();
return ExprEval.deserialize(buf, position, maxSizeBytes, outputType, false).value();
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) ExprEval.deserialize(buf, position, outputType).asDouble();
return (float) ExprEval.deserialize(buf, position, maxSizeBytes, outputType, true).asDouble();
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return ExprEval.deserialize(buf, position, outputType).asDouble();
return ExprEval.deserialize(buf, position, maxSizeBytes, outputType, true).asDouble();
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return ExprEval.deserialize(buf, position, outputType).asLong();
return ExprEval.deserialize(buf, position, maxSizeBytes, outputType, true).asLong();
}
@Override

View File

@ -58,6 +58,13 @@ public class ObjectStrategyComplexTypeStrategy<T> implements TypeStrategy<T>
return objectStrategy.fromByteBuffer(dupe, complexLength);
}
@Override
public boolean readRetainsBufferReference()
{
// Can't guarantee that ObjectStrategy *doesn't* retain a reference.
return true;
}
@Override
public int write(ByteBuffer buffer, T value, int maxSizeBytes)
{

View File

@ -21,9 +21,11 @@ package org.apache.druid.collections.bitmap;
import org.apache.druid.extendedset.intset.ConciseSet;
import org.apache.druid.extendedset.intset.ImmutableConciseSet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import java.io.IOException;
import java.util.BitSet;
public class BitmapOperationAgainstConsecutiveRunsTest extends BitmapOperationTestBase
@ -66,10 +68,16 @@ public class BitmapOperationAgainstConsecutiveRunsTest extends BitmapOperationTe
ROARING[i] = r;
IMMUTABLE_ROARING[i] = makeImmutableRoaring(r);
OFF_HEAP_ROARING[i] = makeOffheapRoaring(r);
GENERIC_CONCISE[i] = new WrappedImmutableConciseBitmap(OFF_HEAP_CONCISE[i]);
GENERIC_ROARING[i] = new WrappedImmutableRoaringBitmap(OFF_HEAP_ROARING[i]);
GENERIC_CONCISE[i] = new WrappedImmutableConciseBitmap(OFF_HEAP_CONCISE[i].get());
GENERIC_ROARING[i] = new WrappedImmutableRoaringBitmap(OFF_HEAP_ROARING[i].get());
}
unionCount = expectedUnion.cardinality();
printSizeStats(DENSITY, "Random Alternating Bitmap");
}
@AfterClass
public static void tearDownClass() throws IOException
{
reset();
}
}

View File

@ -21,9 +21,11 @@ package org.apache.druid.collections.bitmap;
import org.apache.druid.extendedset.intset.ConciseSet;
import org.apache.druid.extendedset.intset.ImmutableConciseSet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import java.io.IOException;
import java.util.BitSet;
public class BitmapOperationAgainstUniformDistributionTest extends BitmapOperationTestBase
@ -61,11 +63,17 @@ public class BitmapOperationAgainstUniformDistributionTest extends BitmapOperati
ROARING[i] = r;
IMMUTABLE_ROARING[i] = makeImmutableRoaring(r);
OFF_HEAP_ROARING[i] = makeOffheapRoaring(r);
GENERIC_CONCISE[i] = new WrappedImmutableConciseBitmap(OFF_HEAP_CONCISE[i]);
GENERIC_ROARING[i] = new WrappedImmutableRoaringBitmap(OFF_HEAP_ROARING[i]);
GENERIC_CONCISE[i] = new WrappedImmutableConciseBitmap(OFF_HEAP_CONCISE[i].get());
GENERIC_ROARING[i] = new WrappedImmutableRoaringBitmap(OFF_HEAP_ROARING[i].get());
}
unionCount = expectedUnion.cardinality();
minIntersection = knownTrue.length;
printSizeStats(DENSITY, "Uniform Bitmap");
}
@AfterClass
public static void tearDownClass() throws IOException
{
reset();
}
}

View File

@ -19,8 +19,11 @@
package org.apache.druid.collections.bitmap;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.extendedset.intset.ImmutableConciseSet;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.utils.CloseableUtils;
import org.junit.Assert;
import org.junit.Test;
import org.roaringbitmap.buffer.BufferFastAggregation;
@ -40,10 +43,10 @@ public abstract class BitmapOperationTestBase
public static final int BITMAP_LENGTH = 500_000;
public static final int NUM_BITMAPS = 1000;
static final ImmutableConciseSet[] CONCISE = new ImmutableConciseSet[NUM_BITMAPS];
static final ImmutableConciseSet[] OFF_HEAP_CONCISE = new ImmutableConciseSet[NUM_BITMAPS];
static final ResourceHolder<ImmutableConciseSet>[] OFF_HEAP_CONCISE = new ResourceHolder[NUM_BITMAPS];
static final ImmutableRoaringBitmap[] ROARING = new ImmutableRoaringBitmap[NUM_BITMAPS];
static final ImmutableRoaringBitmap[] IMMUTABLE_ROARING = new ImmutableRoaringBitmap[NUM_BITMAPS];
static final ImmutableRoaringBitmap[] OFF_HEAP_ROARING = new ImmutableRoaringBitmap[NUM_BITMAPS];
static final ResourceHolder<ImmutableRoaringBitmap>[] OFF_HEAP_ROARING = new ResourceHolder[NUM_BITMAPS];
static final ImmutableBitmap[] GENERIC_CONCISE = new ImmutableBitmap[NUM_BITMAPS];
static final ImmutableBitmap[] GENERIC_ROARING = new ImmutableBitmap[NUM_BITMAPS];
static final ConciseBitmapFactory CONCISE_FACTORY = new ConciseBitmapFactory();
@ -60,14 +63,31 @@ public abstract class BitmapOperationTestBase
NullHandling.initializeForTests();
}
protected static ImmutableConciseSet makeOffheapConcise(ImmutableConciseSet concise)
protected static ResourceHolder<ImmutableConciseSet> makeOffheapConcise(ImmutableConciseSet concise)
{
final byte[] bytes = concise.toBytes();
totalConciseBytes += bytes.length;
conciseCount++;
final ByteBuffer buf = ByteBuffer.allocateDirect(bytes.length).put(bytes);
final ResourceHolder<ByteBuffer> bufHolder = ByteBufferUtils.allocateDirect(bytes.length);
final ByteBuffer buf = bufHolder.get().put(bytes);
buf.rewind();
return new ImmutableConciseSet(buf.asIntBuffer());
final ImmutableConciseSet bitmap = new ImmutableConciseSet(buf.asIntBuffer());
return new ResourceHolder<ImmutableConciseSet>()
{
@Override
public ImmutableConciseSet get()
{
return bitmap;
}
@Override
public void close()
{
bufHolder.close();
}
};
}
protected static ImmutableRoaringBitmap writeImmutable(MutableRoaringBitmap r, ByteBuffer buf) throws IOException
@ -81,8 +101,19 @@ public abstract class BitmapOperationTestBase
return new ImmutableRoaringBitmap(buf.asReadOnlyBuffer());
}
protected static void reset()
protected static void reset() throws IOException
{
CloseableUtils.closeAll(Arrays.asList(OFF_HEAP_CONCISE));
CloseableUtils.closeAll(Arrays.asList(OFF_HEAP_ROARING));
Arrays.fill(CONCISE, null);
Arrays.fill(ROARING, null);
Arrays.fill(IMMUTABLE_ROARING, null);
Arrays.fill(GENERIC_CONCISE, null);
Arrays.fill(GENERIC_ROARING, null);
Arrays.fill(OFF_HEAP_CONCISE, null);
Arrays.fill(OFF_HEAP_ROARING, null);
conciseCount = 0;
roaringCount = 0;
totalConciseBytes = 0;
@ -111,13 +142,29 @@ public abstract class BitmapOperationTestBase
System.out.flush();
}
protected static ImmutableRoaringBitmap makeOffheapRoaring(MutableRoaringBitmap r) throws IOException
protected static ResourceHolder<ImmutableRoaringBitmap> makeOffheapRoaring(MutableRoaringBitmap r) throws IOException
{
final int size = r.serializedSizeInBytes();
final ByteBuffer buf = ByteBuffer.allocateDirect(size);
final ResourceHolder<ByteBuffer> bufHolder = ByteBufferUtils.allocateDirect(size);
final ByteBuffer buf = bufHolder.get();
totalRoaringBytes += size;
roaringCount++;
return writeImmutable(r, buf);
final ImmutableRoaringBitmap bitmap = writeImmutable(r, buf);
return new ResourceHolder<ImmutableRoaringBitmap>()
{
@Override
public ImmutableRoaringBitmap get()
{
return bitmap;
}
@Override
public void close()
{
bufHolder.close();
}
};
}
protected static ImmutableRoaringBitmap makeImmutableRoaring(MutableRoaringBitmap r) throws IOException
@ -136,7 +183,9 @@ public abstract class BitmapOperationTestBase
@Test
public void testOffheapConciseUnion()
{
ImmutableConciseSet union = ImmutableConciseSet.union(OFF_HEAP_CONCISE);
ImmutableConciseSet union = ImmutableConciseSet.union(
Arrays.stream(OFF_HEAP_CONCISE).map(ResourceHolder::get).iterator()
);
Assert.assertEquals(unionCount, union.size());
}
@ -171,7 +220,9 @@ public abstract class BitmapOperationTestBase
@Test
public void testOffheapRoaringUnion()
{
ImmutableRoaringBitmap union = BufferFastAggregation.horizontal_or(Arrays.asList(OFF_HEAP_ROARING).iterator());
ImmutableRoaringBitmap union = BufferFastAggregation.naive_or(
Arrays.stream(OFF_HEAP_ROARING).map(ResourceHolder::get).iterator()
);
Assert.assertEquals(unionCount, union.getCardinality());
}

View File

@ -21,6 +21,8 @@ package org.apache.druid.collections.bitmap;
import com.google.common.collect.Sets;
import org.apache.druid.collections.IntSetTestUtility;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.junit.Assert;
import org.junit.Test;
import org.roaringbitmap.IntIterator;
@ -67,13 +69,15 @@ public class WrappedBitSetBitmapBitSetTest
@Test
public void testOffHeap()
{
ByteBuffer buffer = ByteBuffer.allocateDirect(Long.SIZE * 100 / 8).order(ByteOrder.LITTLE_ENDIAN);
BitSet testSet = BitSet.valueOf(buffer);
testSet.set(1);
WrappedImmutableBitSetBitmap bitMap = new WrappedImmutableBitSetBitmap(testSet);
Assert.assertTrue(bitMap.get(1));
testSet.set(2);
Assert.assertTrue(bitMap.get(2));
try (final ResourceHolder<ByteBuffer> bufferHolder = ByteBufferUtils.allocateDirect(Long.SIZE * 100 / 8)) {
final ByteBuffer buffer = bufferHolder.get().order(ByteOrder.LITTLE_ENDIAN);
BitSet testSet = BitSet.valueOf(buffer);
testSet.set(1);
WrappedImmutableBitSetBitmap bitMap = new WrappedImmutableBitSetBitmap(testSet);
Assert.assertTrue(bitMap.get(1));
testSet.set(2);
Assert.assertTrue(bitMap.get(2));
}
}
@Test

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.Iterables;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.utils.CloseableUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
/**
* A buffer pool that throws away buffers when they are "returned" to the pool. Useful for tests that need to make
* many pools and use them one at a time.
*
* This pool implements {@link BlockingPool}, but never blocks. It returns immediately if resources are available;
* otherwise it returns an empty list immediately. This is also useful for tests, because it allows "timeouts" to
* happen immediately and therefore speeds up tests.
*/
public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool<ByteBuffer>
{
private final Supplier<ResourceHolder<ByteBuffer>> generator;
private final int maxCount;
@GuardedBy("this")
private long numOutstanding;
private TestBufferPool(final Supplier<ResourceHolder<ByteBuffer>> generator, final int maxCount)
{
this.generator = generator;
this.maxCount = maxCount;
}
public static TestBufferPool onHeap(final int bufferSize, final int maxCount)
{
return new TestBufferPool(
() -> new ReferenceCountingResourceHolder<>(ByteBuffer.allocate(bufferSize), () -> {}),
maxCount
);
}
public static TestBufferPool offHeap(final int bufferSize, final int maxCount)
{
return new TestBufferPool(
() -> ByteBufferUtils.allocateDirect(bufferSize),
maxCount
);
}
@Override
public int maxSize()
{
return maxCount;
}
@Override
public ResourceHolder<ByteBuffer> take()
{
final List<ReferenceCountingResourceHolder<ByteBuffer>> holders = takeBatch(1);
if (holders.isEmpty()) {
throw new ISE("Too many objects outstanding");
} else {
return Iterables.getOnlyElement(holders);
}
}
@Override
public List<ReferenceCountingResourceHolder<ByteBuffer>> takeBatch(int elementNum, long timeoutMs)
{
return takeBatch(elementNum);
}
@Override
public List<ReferenceCountingResourceHolder<ByteBuffer>> takeBatch(int elementNum)
{
synchronized (this) {
if (numOutstanding + elementNum <= maxCount) {
final List<ReferenceCountingResourceHolder<ByteBuffer>> retVal = new ArrayList<>();
try {
for (int i = 0; i < elementNum; i++) {
final ResourceHolder<ByteBuffer> holder = generator.get();
final ByteBuffer o = holder.get();
retVal.add(new ReferenceCountingResourceHolder<>(o, () -> {
synchronized (this) {
numOutstanding--;
holder.close();
}
}));
numOutstanding++;
}
}
catch (Throwable e) {
throw CloseableUtils.closeAndWrapInCatch(e, () -> CloseableUtils.closeAll(retVal));
}
return retVal;
} else {
return Collections.emptyList();
}
}
}
public long getOutstandingObjectCount()
{
synchronized (this) {
return numOutstanding;
}
}
}

View File

@ -38,7 +38,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
@ -59,6 +58,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
@ -152,13 +152,14 @@ public class AggregationTestHelper implements Closeable
TemporaryFolder tempFolder
)
{
final Closer closer = Closer.create();
final ObjectMapper mapper = TestHelper.makeJsonMapper();
final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
final TestGroupByBuffers groupByBuffers = closer.register(TestGroupByBuffers.createDefault());
final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(
mapper,
config
config,
groupByBuffers
);
final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
final Closer closer = factoryAndCloser.rhs;
IndexIO indexIO = new IndexIO(
mapper,

View File

@ -114,7 +114,7 @@ public class HistogramAggregatorTest
);
HistogramBufferAggregator agg = new HistogramBufferAggregator(selector, breaks);
ByteBuffer buf = ByteBuffer.allocateDirect(factory.getMaxIntermediateSizeWithNulls());
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls());
int position = 0;
agg.init(buf, position);

View File

@ -159,7 +159,7 @@ public class JavaScriptAggregatorTest
)
);
ByteBuffer buf = ByteBuffer.allocateDirect(32);
ByteBuffer buf = ByteBuffer.allocate(32);
final int position = 4;
agg.init(buf, position);

View File

@ -49,6 +49,7 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.DimensionSelectorUtils;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
@ -59,7 +60,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CardinalityAggregatorTest
public class CardinalityAggregatorTest extends InitializedNullHandlingTest
{
public static class TestDimensionSelector extends AbstractDimensionSelector
{
@ -381,7 +382,6 @@ public class CardinalityAggregatorTest
@Test
public void testAggregateRows()
{
NullHandling.initializeForTestsWithValues(null, null);
CardinalityAggregator agg = new CardinalityAggregator(
dimInfoList,
true
@ -397,7 +397,6 @@ public class CardinalityAggregatorTest
@Test
public void testAggregateValues()
{
NullHandling.initializeForTestsWithValues(null, null);
CardinalityAggregator agg = new CardinalityAggregator(
dimInfoList,
false
@ -421,7 +420,6 @@ public class CardinalityAggregatorTest
@Test
public void testBufferAggregateRows()
{
NullHandling.initializeForTestsWithValues(null, null);
CardinalityBufferAggregator agg = new CardinalityBufferAggregator(
dimInfoList.toArray(new ColumnSelectorPlus[0]),
true
@ -444,7 +442,6 @@ public class CardinalityAggregatorTest
@Test
public void testBufferAggregateValues()
{
NullHandling.initializeForTestsWithValues(null, null);
CardinalityBufferAggregator agg = new CardinalityBufferAggregator(
dimInfoList.toArray(new ColumnSelectorPlus[0]),
false
@ -474,7 +471,6 @@ public class CardinalityAggregatorTest
@Test
public void testCombineRows()
{
NullHandling.initializeForTestsWithValues(null, null);
List<DimensionSelector> selector1 = Collections.singletonList(dim1);
List<DimensionSelector> selector2 = Collections.singletonList(dim2);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo1 = Collections.singletonList(
@ -520,7 +516,6 @@ public class CardinalityAggregatorTest
@Test
public void testCombineValues()
{
NullHandling.initializeForTestsWithValues(null, null);
List<DimensionSelector> selector1 = Collections.singletonList(dim1);
List<DimensionSelector> selector2 = Collections.singletonList(dim2);
@ -573,7 +568,6 @@ public class CardinalityAggregatorTest
@Test
public void testAggregateRowsWithExtraction()
{
NullHandling.initializeForTestsWithValues(null, null);
CardinalityAggregator agg = new CardinalityAggregator(
dimInfoListWithExtraction,
true
@ -596,7 +590,6 @@ public class CardinalityAggregatorTest
@Test
public void testAggregateValuesWithExtraction()
{
NullHandling.initializeForTestsWithValues(null, null);
CardinalityAggregator agg = new CardinalityAggregator(
dimInfoListWithExtraction,
false
@ -669,92 +662,107 @@ public class CardinalityAggregatorTest
public void testAggregateRowsIgnoreNulls()
{
NullHandling.initializeForTestsWithValues(null, true);
CardinalityAggregator agg = new CardinalityAggregator(
dimInfoList,
true
);
try {
CardinalityAggregator agg = new CardinalityAggregator(
dimInfoList,
true
);
for (int i = 0; i < VALUES1.size(); ++i) {
aggregate(selectorList, agg);
for (int i = 0; i < VALUES1.size(); ++i) {
aggregate(selectorList, agg);
}
Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05);
Assert.assertEquals(9L, rowAggregatorFactoryRounded.finalizeComputation(agg.get()));
}
finally {
NullHandling.initializeForTests();
}
Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05);
Assert.assertEquals(9L, rowAggregatorFactoryRounded.finalizeComputation(agg.get()));
}
@Test
public void testAggregateValuesIgnoreNulls()
{
NullHandling.initializeForTestsWithValues(null, true);
CardinalityAggregator agg = new CardinalityAggregator(
dimInfoList,
false
);
try {
CardinalityAggregator agg = new CardinalityAggregator(
dimInfoList,
false
);
for (int i = 0; i < VALUES1.size(); ++i) {
aggregate(selectorList, agg);
for (int i = 0; i < VALUES1.size(); ++i) {
aggregate(selectorList, agg);
}
//setting is not applied when druid.generic.useDefaultValueForNull=false
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 6.0 : 6.0,
(Double) valueAggregatorFactory.finalizeComputation(agg.get()),
0.05
);
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 6L : 6L,
rowAggregatorFactoryRounded.finalizeComputation(agg.get())
);
}
finally {
NullHandling.initializeForTests();
}
//setting is not applied when druid.generic.useDefaultValueForNull=false
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 6.0 : 6.0,
(Double) valueAggregatorFactory.finalizeComputation(agg.get()),
0.05
);
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 6L : 6L,
rowAggregatorFactoryRounded.finalizeComputation(agg.get())
);
}
@Test
public void testCombineValuesIgnoreNulls()
{
NullHandling.initializeForTestsWithValues(null, true);
List<DimensionSelector> selector1 = Collections.singletonList(dim1);
List<DimensionSelector> selector2 = Collections.singletonList(dim2);
try {
List<DimensionSelector> selector1 = Collections.singletonList(dim1);
List<DimensionSelector> selector2 = Collections.singletonList(dim2);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo1 = Collections.singletonList(
new ColumnSelectorPlus<>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim1
)
);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo2 = Collections.singletonList(
new ColumnSelectorPlus<>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim2
)
);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo1 = Collections.singletonList(
new ColumnSelectorPlus<>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim1
)
);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo2 = Collections.singletonList(
new ColumnSelectorPlus<>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim2
)
);
CardinalityAggregator agg1 = new CardinalityAggregator(dimInfo1, false);
CardinalityAggregator agg2 = new CardinalityAggregator(dimInfo2, false);
CardinalityAggregator agg1 = new CardinalityAggregator(dimInfo1, false);
CardinalityAggregator agg2 = new CardinalityAggregator(dimInfo2, false);
for (int i = 0; i < VALUES1.size(); ++i) {
aggregate(selector1, agg1);
for (int i = 0; i < VALUES1.size(); ++i) {
aggregate(selector1, agg1);
}
for (int i = 0; i < VALUES2.size(); ++i) {
aggregate(selector2, agg2);
}
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 3.0 : 3.0,
(Double) valueAggregatorFactory.finalizeComputation(agg1.get()),
0.05
);
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 6.0 : 6.0,
(Double) valueAggregatorFactory.finalizeComputation(agg2.get()),
0.05
);
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 6.0 : 6.0,
(Double) rowAggregatorFactory.finalizeComputation(
rowAggregatorFactory.combine(
agg1.get(),
agg2.get()
)
),
0.05
);
}
for (int i = 0; i < VALUES2.size(); ++i) {
aggregate(selector2, agg2);
finally {
NullHandling.initializeForTests();
}
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 3.0 : 3.0,
(Double) valueAggregatorFactory.finalizeComputation(agg1.get()),
0.05
);
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 6.0 : 6.0,
(Double) valueAggregatorFactory.finalizeComputation(agg2.get()),
0.05
);
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 6.0 : 6.0,
(Double) rowAggregatorFactory.finalizeComputation(
rowAggregatorFactory.combine(
agg1.get(),
agg2.get()
)
),
0.05
);
}
}

View File

@ -27,8 +27,6 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.CloseableDefaultBlockingPool;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -42,7 +40,6 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
@ -53,6 +50,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TestBufferPool;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -83,7 +81,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -91,7 +88,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullHandlingTest
@ -267,27 +263,18 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH
{
executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]");
final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
Integer.MAX_VALUE
);
final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
final CloseableDefaultBlockingPool<ByteBuffer> mergePool = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
final CloseableDefaultBlockingPool<ByteBuffer> tooSmallMergePool = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 255),
2
);
final TestBufferPool tooSmallMergePool = TestBufferPool.onHeap(255, 2);
resourceCloser.register(bufferPool);
resourceCloser.register(mergePool);
resourceCloser.register(tooSmallMergePool);
resourceCloser.register(() -> {
// Verify that all objects have been returned to the pools.
Assert.assertEquals(0, mergePool.getOutstandingObjectCount());
Assert.assertEquals(0, tooSmallMergePool.getOutstandingObjectCount());
});
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@ -626,34 +613,6 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH
return runners;
}
private static class OffheapBufferGenerator implements Supplier<ByteBuffer>
{
private static final Logger log = new Logger(OffheapBufferGenerator.class);
private final String description;
private final int computationBufferSize;
private final AtomicLong count = new AtomicLong(0);
public OffheapBufferGenerator(String description, int computationBufferSize)
{
this.description = description;
this.computationBufferSize = computationBufferSize;
}
@Override
public ByteBuffer get()
{
log.info(
"Allocating new %s buffer[%,d] of size[%,d]",
description,
count.getAndIncrement(),
computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
SegmentId segmentId,

View File

@ -26,8 +26,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.CloseableDefaultBlockingPool;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
@ -43,7 +41,6 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
@ -54,6 +51,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TestBufferPool;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
@ -92,7 +90,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -100,7 +97,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class GroupByLimitPushDownMultiNodeMergeTest
@ -539,27 +535,17 @@ public class GroupByLimitPushDownMultiNodeMergeTest
{
executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]");
final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
Integer.MAX_VALUE
);
final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);
final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
final CloseableDefaultBlockingPool<ByteBuffer> mergePool = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
final CloseableDefaultBlockingPool<ByteBuffer> mergePool2 = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
final TestBufferPool mergePool2 = TestBufferPool.offHeap(10_000_000, 2);
resourceCloser.register(bufferPool);
resourceCloser.register(mergePool);
resourceCloser.register(mergePool2);
resourceCloser.register(() -> {
// Verify that all objects have been returned to the pools.
Assert.assertEquals(0, bufferPool.getOutstandingObjectCount());
Assert.assertEquals(0, mergePool.getOutstandingObjectCount());
Assert.assertEquals(0, mergePool2.getOutstandingObjectCount());
});
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@ -1036,34 +1022,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest
return runners;
}
private static class OffheapBufferGenerator implements Supplier<ByteBuffer>
{
private static final Logger log = new Logger(OffheapBufferGenerator.class);
private final String description;
private final int computationBufferSize;
private final AtomicLong count = new AtomicLong(0);
public OffheapBufferGenerator(String description, int computationBufferSize)
{
this.description = description;
this.computationBufferSize = computationBufferSize;
}
@Override
public ByteBuffer get()
{
log.info(
"Allocating new %s buffer[%,d] of size[%,d]",
description,
count.getAndIncrement(),
computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
SegmentId segmentId,

View File

@ -25,8 +25,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.CloseableDefaultBlockingPool;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -39,7 +37,6 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
@ -50,6 +47,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TestBufferPool;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -79,7 +77,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -87,7 +84,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
public class GroupByMultiSegmentTest
{
@ -205,21 +201,17 @@ public class GroupByMultiSegmentTest
{
executorService = Execs.multiThreaded(2, "GroupByThreadPool[%d]");
final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
Integer.MAX_VALUE
);
final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
final CloseableDefaultBlockingPool<ByteBuffer> mergePool = new CloseableDefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
2
);
final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2);
resourceCloser.register(() -> {
// Verify that all objects have been returned to the pools.
Assert.assertEquals(0, bufferPool.getOutstandingObjectCount());
Assert.assertEquals(0, mergePool.getOutstandingObjectCount());
});
resourceCloser.register(bufferPool);
resourceCloser.register(mergePool);
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@Override
@ -363,34 +355,6 @@ public class GroupByMultiSegmentTest
return runners;
}
private static class OffheapBufferGenerator implements Supplier<ByteBuffer>
{
private static final Logger log = new Logger(OffheapBufferGenerator.class);
private final String description;
private final int computationBufferSize;
private final AtomicLong count = new AtomicLong(0);
public OffheapBufferGenerator(String description, int computationBufferSize)
{
this.description = description;
this.computationBufferSize = computationBufferSize;
}
@Override
public ByteBuffer get()
{
log.info(
"Allocating new %s buffer[%,d] of size[%,d]",
description,
count.getAndIncrement(),
computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
SegmentId segmentId,

View File

@ -149,11 +149,11 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest
private static final CloseableStupidPool<ByteBuffer> BUFFER_POOL = new CloseableStupidPool<>(
"GroupByQueryEngine-bufferPool",
() -> ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes())
() -> ByteBuffer.allocate(PROCESSING_CONFIG.intermediateComputeSizeBytes())
);
private static final TestBlockingPool MERGE_BUFFER_POOL = new TestBlockingPool(
() -> ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes()),
() -> ByteBuffer.allocate(PROCESSING_CONFIG.intermediateComputeSizeBytes()),
PROCESSING_CONFIG.getNumMergeBuffers()
);

View File

@ -24,7 +24,6 @@ import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
@ -67,17 +66,15 @@ public class GroupByQueryRunnerFactoryTest
@Before
public void setup()
{
final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
new GroupByQueryConfig()
);
factory = factoryAndCloser.lhs;
resourceCloser = factoryAndCloser.rhs;
this.resourceCloser = Closer.create();
final TestGroupByBuffers buffers = resourceCloser.register(TestGroupByBuffers.createDefault());
this.factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig(), buffers);
}
@After
public void teardown() throws IOException
{
factory = null;
resourceCloser.close();
}

View File

@ -119,24 +119,10 @@ public class GroupByQueryRunnerFailureTest
private static final CloseableStupidPool<ByteBuffer> BUFFER_POOL = new CloseableStupidPool<>(
"GroupByQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes());
}
}
() -> ByteBuffer.allocate(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes())
);
private static final CloseableDefaultBlockingPool<ByteBuffer> MERGE_BUFFER_POOL = new CloseableDefaultBlockingPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes());
}
},
() -> ByteBuffer.allocate(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()),
DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers()
);

View File

@ -30,8 +30,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import org.apache.druid.collections.CloseableDefaultBlockingPool;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.Rows;
@ -39,7 +37,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -49,7 +46,6 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.math.expr.ExprMacroTable;
@ -149,6 +145,7 @@ import org.joda.time.DateTimeZone;
import org.joda.time.Period;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@ -157,8 +154,6 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -176,7 +171,7 @@ import java.util.concurrent.Executors;
public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
{
public static final ObjectMapper DEFAULT_MAPPER = TestHelper.makeSmileMapper();
private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
{
@Override
public String getFormatString()
@ -205,7 +200,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
}
};
private static final Closer RESOURCE_CLOSER = Closer.create();
private static TestGroupByBuffers BUFFER_POOLS = null;
private final QueryRunner<ResultRow> runner;
private final QueryRunner<ResultRow> originalRunner;
@ -365,87 +360,74 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
);
}
public static Pair<GroupByQueryRunnerFactory, Closer> makeQueryRunnerFactory(
final GroupByQueryConfig config
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final GroupByQueryConfig config,
final TestGroupByBuffers bufferPools
)
{
return makeQueryRunnerFactory(DEFAULT_MAPPER, config, DEFAULT_PROCESSING_CONFIG);
return makeQueryRunnerFactory(DEFAULT_MAPPER, config, bufferPools, DEFAULT_PROCESSING_CONFIG);
}
public static Pair<GroupByQueryRunnerFactory, Closer> makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config
)
{
return makeQueryRunnerFactory(mapper, config, DEFAULT_PROCESSING_CONFIG);
}
public static Pair<GroupByQueryRunnerFactory, Closer> makeQueryRunnerFactory(
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config,
final TestGroupByBuffers bufferPools
)
{
return makeQueryRunnerFactory(mapper, config, bufferPools, DEFAULT_PROCESSING_CONFIG);
}
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config,
final TestGroupByBuffers bufferPools,
final DruidProcessingConfig processingConfig
)
{
if (bufferPools.getBufferSize() != processingConfig.intermediateComputeSizeBytes()) {
throw new ISE(
"Provided buffer size [%,d] does not match configured size [%,d]",
bufferPools.getBufferSize(),
processingConfig.intermediateComputeSizeBytes()
);
}
if (bufferPools.getNumMergeBuffers() != processingConfig.getNumMergeBuffers()) {
throw new ISE(
"Provided merge buffer count [%,d] does not match configured count [%,d]",
bufferPools.getNumMergeBuffers(),
processingConfig.getNumMergeBuffers()
);
}
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final CloseableStupidPool<ByteBuffer> bufferPool = new CloseableStupidPool<>(
"GroupByQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());
}
}
);
final CloseableDefaultBlockingPool<ByteBuffer> mergeBufferPool = new CloseableDefaultBlockingPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());
}
},
processingConfig.getNumMergeBuffers()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
new GroupByQueryEngine(configSupplier, bufferPools.getProcessingPool()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
processingConfig,
configSupplier,
bufferPool,
mergeBufferPool,
bufferPools.getProcessingPool(),
bufferPools.getMergePool(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector);
final Closer closer = Closer.create();
closer.register(() -> {
// Verify that all objects have been returned to the pool.
Assert.assertEquals(bufferPool.poolSize(), bufferPool.objectsCreatedCount());
bufferPool.close();
});
closer.register(mergeBufferPool);
return Pair.of(new GroupByQueryRunnerFactory(strategySelector, toolChest), closer);
return new GroupByQueryRunnerFactory(strategySelector, toolChest);
}
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
NullHandling.initializeForTests();
setUpClass();
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : testConfigs()) {
final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = makeQueryRunnerFactory(config);
final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
RESOURCE_CLOSER.register(factoryAndCloser.rhs);
final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, BUFFER_POOLS);
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
for (boolean vectorize : ImmutableList.of(false, true)) {
final String testName = StringUtils.format("config=%s, runner=%s, vectorize=%s", config, runner, vectorize);
@ -463,10 +445,19 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
return constructors;
}
@AfterClass
public static void teardown() throws IOException
@BeforeClass
public static void setUpClass()
{
RESOURCE_CLOSER.close();
if (BUFFER_POOLS == null) {
BUFFER_POOLS = TestGroupByBuffers.createDefault();
}
}
@AfterClass
public static void tearDownClass()
{
BUFFER_POOLS.close();
BUFFER_POOLS = null;
}
public GroupByQueryRunnerTest(
@ -4546,7 +4537,10 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setLimitSpec(
new DefaultLimitSpec(
Collections.singletonList(
new OrderByColumnSpec(QueryRunnerTestHelper.MARKET_DIMENSION, OrderByColumnSpec.Direction.DESCENDING)
new OrderByColumnSpec(
QueryRunnerTestHelper.MARKET_DIMENSION,
OrderByColumnSpec.Direction.DESCENDING
)
),
3
)
@ -5040,9 +5034,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval("2011-01-25/2011-01-28")
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new DoubleSumAggregatorFactory("index", "index"), QueryRunnerTestHelper.INDEX_LONG_MIN,
QueryRunnerTestHelper.INDEX_LONG_MAX, QueryRunnerTestHelper.INDEX_DOUBLE_MIN, QueryRunnerTestHelper.INDEX_DOUBLE_MAX,
QueryRunnerTestHelper.INDEX_FLOAT_MIN, QueryRunnerTestHelper.INDEX_FLOAT_MAX)
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT,
new DoubleSumAggregatorFactory("index", "index"),
QueryRunnerTestHelper.INDEX_LONG_MIN,
QueryRunnerTestHelper.INDEX_LONG_MAX,
QueryRunnerTestHelper.INDEX_DOUBLE_MIN,
QueryRunnerTestHelper.INDEX_DOUBLE_MAX,
QueryRunnerTestHelper.INDEX_FLOAT_MIN,
QueryRunnerTestHelper.INDEX_FLOAT_MAX
)
.setGranularity(Granularities.ALL)
.setHavingSpec(new GreaterThanHavingSpec("index", 310L))
.setLimitSpec(
@ -5055,26 +5055,111 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
GroupByQuery fullQuery = builder.build();
List<ResultRow> expectedResults = Arrays.asList(
makeRow(fullQuery, "2011-01-25", "alias", "business", "rows", 3L, "index", 312.38165283203125,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 101L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 105L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 101.624789D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 105.873942D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 101.62479F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 105.87394F),
makeRow(fullQuery, "2011-01-25", "alias", "news", "rows", 3L, "index", 312.7834167480469,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 102L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 105L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 102.907866D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 105.266058D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 102.90787F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 105.26606F),
makeRow(fullQuery, "2011-01-25", "alias", "technology", "rows", 3L, "index", 324.6412353515625,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 102L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 116L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 102.044542D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 116.979005D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 102.04454F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 116.979004F),
makeRow(fullQuery, "2011-01-25", "alias", "travel", "rows", 3L, "index", 393.36322021484375,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 122L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 149L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 122.077247D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 149.125271D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 122.07725F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 149.12527F),
makeRow(fullQuery, "2011-01-25", "alias", "health", "rows", 3L, "index", 511.2996826171875,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 159L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 180L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 159.988606D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 180.575246D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 159.9886F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 180.57524F)
makeRow(fullQuery,
"2011-01-25",
"alias",
"business",
"rows",
3L,
"index",
312.38165283203125,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
101L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC,
105L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC,
101.624789D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC,
105.873942D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC,
101.62479F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC,
105.87394F
),
makeRow(fullQuery,
"2011-01-25",
"alias",
"news",
"rows",
3L,
"index",
312.7834167480469,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
102L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC,
105L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC,
102.907866D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC,
105.266058D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC,
102.90787F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC,
105.26606F
),
makeRow(fullQuery,
"2011-01-25",
"alias",
"technology",
"rows",
3L,
"index",
324.6412353515625,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
102L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC,
116L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC,
102.044542D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC,
116.979005D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC,
102.04454F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC,
116.979004F
),
makeRow(fullQuery,
"2011-01-25",
"alias",
"travel",
"rows",
3L,
"index",
393.36322021484375,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
122L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC,
149L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC,
122.077247D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC,
149.125271D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC,
122.07725F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC,
149.12527F
),
makeRow(fullQuery,
"2011-01-25",
"alias",
"health",
"rows",
3L,
"index",
511.2996826171875,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
159L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC,
180L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC,
159.988606D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC,
180.575246D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC,
159.9886F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC,
180.57524F
)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, fullQuery);
@ -5198,25 +5283,77 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"),
QueryRunnerTestHelper.INDEX_LONG_MIN, QueryRunnerTestHelper.INDEX_LONG_MAX,
QueryRunnerTestHelper.INDEX_DOUBLE_MIN, QueryRunnerTestHelper.INDEX_DOUBLE_MAX,
QueryRunnerTestHelper.INDEX_FLOAT_MIN, QueryRunnerTestHelper.INDEX_FLOAT_MAX)
QueryRunnerTestHelper.INDEX_FLOAT_MIN, QueryRunnerTestHelper.INDEX_FLOAT_MAX
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setHavingSpec(havingSpec);
final GroupByQuery fullQuery = builder.build();
List<ResultRow> expectedResults = Arrays.asList(
makeRow(fullQuery, "2011-04-01", "alias", "business", "rows", 2L, "idx", 217L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 105L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 112L, QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 105.735462D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 112.987027D, QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 105.73546F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 112.98703F),
makeRow(fullQuery, "2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 107L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 1193L, QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 107.047773D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 1193.556278D, QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 107.047775F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 1193.5563F),
makeRow(fullQuery, "2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 122L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 1321L, QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 122.141707D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 1321.375057D, QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 122.14171F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 1321.375F)
makeRow(fullQuery,
"2011-04-01",
"alias",
"business",
"rows",
2L,
"idx",
217L,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
105L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC,
112L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC,
105.735462D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC,
112.987027D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC,
105.73546F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC,
112.98703F
),
makeRow(fullQuery,
"2011-04-01",
"alias",
"mezzanine",
"rows",
6L,
"idx",
4420L,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
107L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC,
1193L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC,
107.047773D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC,
1193.556278D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC,
107.047775F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC,
1193.5563F
),
makeRow(fullQuery,
"2011-04-01",
"alias",
"premium",
"rows",
6L,
"idx",
4416L,
QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC,
122L,
QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC,
1321L,
QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC,
122.141707D,
QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC,
1321.375057D,
QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC,
122.14171F,
QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC,
1321.375F
)
);
TestHelper.assertExpectedObjects(
@ -7287,7 +7424,12 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ColumnType.STRING, TestExprMacroTable.INSTANCE))
.setVirtualColumns(new ExpressionVirtualColumn(
"alias",
"quality",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
))
.setDimensions(Lists.newArrayList(
new DefaultDimensionSpec("market", "market2"),
new DefaultDimensionSpec("alias", "alias2")
@ -7400,7 +7542,12 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ColumnType.STRING, TestExprMacroTable.INSTANCE))
.setVirtualColumns(new ExpressionVirtualColumn(
"alias",
"quality",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
))
.setDimensions(Lists.newArrayList(
new DefaultDimensionSpec("quality", "quality2"),
new DefaultDimensionSpec("market", "market2"),
@ -7756,7 +7903,12 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ColumnType.STRING, TestExprMacroTable.INSTANCE))
.setVirtualColumns(new ExpressionVirtualColumn(
"alias",
"quality",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
))
.setDimensions(Lists.newArrayList(
new DefaultDimensionSpec("quality", "quality"),
new DefaultDimensionSpec("market", "market"),
@ -8211,7 +8363,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
}
@Test
public void testGroupByWithSubtotalsSpecWithOrderLimitForcePushdown()
{
@ -11228,18 +11379,23 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
return;
}
GroupByQuery.Builder builder = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(new ExtractionDimensionSpec("quality", "qualityLen", ColumnType.LONG, StrlenExtractionFn.instance()))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
.setLimitSpec(
new DefaultLimitSpec(
Collections.emptyList(),
20
)
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(new ExtractionDimensionSpec(
"quality",
"qualityLen",
ColumnType.LONG,
StrlenExtractionFn.instance()
))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
.setLimitSpec(
new DefaultLimitSpec(
Collections.emptyList(),
20
)
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
.setGranularity(Granularities.ALL);
)
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
.setGranularity(Granularities.ALL);
final GroupByQuery allGranQuery = builder.build();
@ -11247,14 +11403,14 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
(queryPlus, responseContext) -> {
// simulate two daily segments
final QueryPlus<ResultRow> queryPlus1 = queryPlus.withQuery(
queryPlus.getQuery().withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03")))
)
queryPlus.getQuery().withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03")))
)
);
final QueryPlus<ResultRow> queryPlus2 = queryPlus.withQuery(
queryPlus.getQuery().withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04")))
)
queryPlus.getQuery().withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04")))
)
);
return factory.getToolchest().mergeResults(
@ -11272,19 +11428,19 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
);
Map<String, Object> context = new HashMap<>();
List<ResultRow> allGranExpectedResults = Arrays.asList(
makeRow(allGranQuery, "2011-04-02", "qualityLen", 4L, "rows", 2L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 6L, "rows", 4L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 7L, "rows", 6L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 8L, "rows", 2L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 9L, "rows", 6L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 10L, "rows", 4L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 13L, "rows", 2L)
makeRow(allGranQuery, "2011-04-02", "qualityLen", 4L, "rows", 2L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 6L, "rows", 4L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 7L, "rows", 6L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 8L, "rows", 2L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 9L, "rows", 6L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 10L, "rows", 4L),
makeRow(allGranQuery, "2011-04-02", "qualityLen", 13L, "rows", 2L)
);
TestHelper.assertExpectedObjects(
allGranExpectedResults,
mergedRunner.run(QueryPlus.wrap(allGranQuery)),
"merged"
allGranExpectedResults,
mergedRunner.run(QueryPlus.wrap(allGranQuery)),
"merged"
);
}
@ -12057,7 +12213,10 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, false))
.setLimitSpec(new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("nullable", OrderByColumnSpec.Direction.ASCENDING)), 5))
.setLimitSpec(new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec(
"nullable",
OrderByColumnSpec.Direction.ASCENDING
)), 5))
.build();
List<ResultRow> expectedResults;
@ -12151,7 +12310,10 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setDimensions(
new DefaultDimensionSpec("v", "v", ColumnType.LONG)
)
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("twosum", null, "1 + two", TestExprMacroTable.INSTANCE))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("twosum", null, "1 + two", TestExprMacroTable.INSTANCE)
)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.setLimit(5)
.build();
@ -12536,7 +12698,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
// array types don't work with group by v1
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Unable to handle type[ARRAY<STRING>] for AggregatorFactory[class org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory]");
expectedException.expectMessage(
"Unable to handle type[ARRAY<STRING>] for AggregatorFactory[class org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory]");
}
GroupByQuery query = makeQueryBuilder()
@ -12607,7 +12770,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
135.88510131835938d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12619,7 +12782,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
118.57034,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12631,7 +12794,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
158.747224,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12643,7 +12806,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
120.134704,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12655,7 +12818,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
2871.8866900000003d,
"array_agg_distinct",
new String[] {"spot", "total_market", "upfront"}
new String[]{"spot", "total_market", "upfront"}
),
makeRow(
query,
@ -12667,7 +12830,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
121.58358d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12679,7 +12842,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
2900.798647d,
"array_agg_distinct",
new String[] {"spot", "total_market", "upfront"}
new String[]{"spot", "total_market", "upfront"}
),
makeRow(
query,
@ -12691,7 +12854,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
78.622547d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12703,7 +12866,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
119.922742d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
@ -12716,7 +12879,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
147.42593d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12728,7 +12891,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
112.987027d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12740,7 +12903,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
166.016049d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12752,7 +12915,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
113.446008d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12764,7 +12927,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
2448.830613d,
"array_agg_distinct",
new String[] {"spot", "total_market", "upfront"}
new String[]{"spot", "total_market", "upfront"}
),
makeRow(
query,
@ -12776,7 +12939,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
114.290141d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12788,7 +12951,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
2506.415148d,
"array_agg_distinct",
new String[] {"spot", "total_market", "upfront"}
new String[]{"spot", "total_market", "upfront"}
),
makeRow(
query,
@ -12800,7 +12963,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
97.387433d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
),
makeRow(
query,
@ -12812,7 +12975,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"idx",
126.411364d,
"array_agg_distinct",
new String[] {"spot"}
new String[]{"spot"}
)
);
@ -12829,7 +12992,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
// array types don't work with group by v1
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Unable to handle type[ARRAY<STRING>] for AggregatorFactory[class org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory]");
expectedException.expectMessage(
"Unable to handle type[ARRAY<STRING>] for AggregatorFactory[class org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory]");
}
GroupByQuery query = makeQueryBuilder()
@ -12864,7 +13028,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"automotive",
"array_agg_distinct",
new String[] {"a", "preferred"}
new String[]{"a", "preferred"}
),
makeRow(
query,
@ -12872,7 +13036,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"business",
"array_agg_distinct",
new String[] {"b", "preferred"}
new String[]{"b", "preferred"}
),
makeRow(
query,
@ -12880,7 +13044,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"entertainment",
"array_agg_distinct",
new String[] {"e", "preferred"}
new String[]{"e", "preferred"}
),
makeRow(
query,
@ -12888,7 +13052,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"health",
"array_agg_distinct",
new String[] {"h", "preferred"}
new String[]{"h", "preferred"}
),
makeRow(
query,
@ -12896,7 +13060,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"mezzanine",
"array_agg_distinct",
new String[] {"m", "preferred"}
new String[]{"m", "preferred"}
),
makeRow(
query,
@ -12904,7 +13068,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"news",
"array_agg_distinct",
new String[] {"n", "preferred"}
new String[]{"n", "preferred"}
),
makeRow(
query,
@ -12912,7 +13076,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"premium",
"array_agg_distinct",
new String[] {"p", "preferred"}
new String[]{"p", "preferred"}
),
makeRow(
query,
@ -12920,7 +13084,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"technology",
"array_agg_distinct",
new String[] {"preferred", "t"}
new String[]{"preferred", "t"}
),
makeRow(
query,
@ -12928,7 +13092,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"travel",
"array_agg_distinct",
new String[] {"preferred", "t"}
new String[]{"preferred", "t"}
),
makeRow(
@ -12937,7 +13101,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"automotive",
"array_agg_distinct",
new String[] {"a", "preferred"}
new String[]{"a", "preferred"}
),
makeRow(
query,
@ -12945,7 +13109,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"business",
"array_agg_distinct",
new String[] {"b", "preferred"}
new String[]{"b", "preferred"}
),
makeRow(
query,
@ -12953,7 +13117,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"entertainment",
"array_agg_distinct",
new String[] {"e", "preferred"}
new String[]{"e", "preferred"}
),
makeRow(
query,
@ -12961,7 +13125,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"health",
"array_agg_distinct",
new String[] {"h", "preferred"}
new String[]{"h", "preferred"}
),
makeRow(
query,
@ -12969,7 +13133,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"mezzanine",
"array_agg_distinct",
new String[] {"m", "preferred"}
new String[]{"m", "preferred"}
),
makeRow(
query,
@ -12977,7 +13141,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"news",
"array_agg_distinct",
new String[] {"n", "preferred"}
new String[]{"n", "preferred"}
),
makeRow(
query,
@ -12985,7 +13149,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"premium",
"array_agg_distinct",
new String[] {"p", "preferred"}
new String[]{"p", "preferred"}
),
makeRow(
query,
@ -12993,7 +13157,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"technology",
"array_agg_distinct",
new String[] {"preferred", "t"}
new String[]{"preferred", "t"}
),
makeRow(
query,
@ -13001,7 +13165,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
"alias",
"travel",
"array_agg_distinct",
new String[] {"preferred", "t"}
new String[]{"preferred", "t"}
)
);
@ -13208,10 +13372,10 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
private Map<String, Object> makeContext()
{
return ImmutableMap.<String, Object>builder()
.put(QueryContexts.VECTORIZE_KEY, vectorize ? "force" : "false")
.put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ? "force" : "false")
.put("vectorSize", 16) // Small vector size to ensure we use more than one.
.build();
.put(QueryContexts.VECTORIZE_KEY, vectorize ? "force" : "false")
.put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ? "force" : "false")
.put("vectorSize", 16) // Small vector size to ensure we use more than one.
.build();
}
private void cannotVectorize()

View File

@ -25,14 +25,12 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryPlus;
@ -56,11 +54,11 @@ import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -74,25 +72,29 @@ import java.util.Map;
@RunWith(Parameterized.class)
public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
{
private static final Closer RESOURCE_CLOSER = Closer.create();
private static TestGroupByBuffers BUFFER_POOLS = null;
@BeforeClass
public static void setUpClass()
{
BUFFER_POOLS = TestGroupByBuffers.createDefault();
}
@AfterClass
public static void teardown() throws IOException
public static void tearDownClass()
{
RESOURCE_CLOSER.close();
BUFFER_POOLS.close();
BUFFER_POOLS = null;
}
@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "{0}, vectorize = {1}")
public static Iterable<Object[]> constructorFeeder()
{
setUpClass();
GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
config
);
final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
RESOURCE_CLOSER.register(factoryAndCloser.rhs);
final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config, BUFFER_POOLS);
final List<Object[]> constructors = new ArrayList<>();

View File

@ -26,10 +26,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -42,7 +38,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner;
@ -54,6 +50,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TestBufferPool;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
@ -79,6 +76,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.junit.After;
import org.junit.Assert;
@ -86,7 +84,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -94,10 +91,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class NestedQueryPushDownTest
public class NestedQueryPushDownTest extends InitializedNullHandlingTest
{
private static final IndexIO INDEX_IO;
private static final IndexMergerV9 INDEX_MERGER_V9;
@ -108,6 +104,7 @@ public class NestedQueryPushDownTest
private List<IncrementalIndex> incrementalIndices = new ArrayList<>();
private List<QueryableIndex> groupByIndices = new ArrayList<>();
private ExecutorService executorService;
private Closer closer;
static {
JSON_MAPPER = new DefaultObjectMapper();
@ -147,6 +144,7 @@ public class NestedQueryPushDownTest
@Before
public void setup() throws Exception
{
closer = Closer.create();
tmpDir = FileUtils.createTempDir();
InputRow row;
@ -249,23 +247,15 @@ public class NestedQueryPushDownTest
{
executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]");
NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 10_000_000),
0,
Integer.MAX_VALUE
);
// limit of 3 is required since we simulate running historical running nested query and broker doing the final merge
BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
10
);
// limit of 3 is required since we simulate running historical running nested query and broker doing the final merge
BlockingPool<ByteBuffer> mergePool2 = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 10_000_000),
10
);
TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);
TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 10);
TestBufferPool mergePool2 = TestBufferPool.offHeap(10_000_000, 10);
closer.register(() -> {
// Verify that all objects have been returned to the pool.
Assert.assertEquals(0, bufferPool.getOutstandingObjectCount());
Assert.assertEquals(0, mergePool.getOutstandingObjectCount());
Assert.assertEquals(0, mergePool2.getOutstandingObjectCount());
});
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@ -356,6 +346,9 @@ public class NestedQueryPushDownTest
@After
public void tearDown() throws Exception
{
closer.close();
closer = null;
for (IncrementalIndex incrementalIndex : incrementalIndices) {
incrementalIndex.close();
}
@ -866,34 +859,6 @@ public class NestedQueryPushDownTest
return runners;
}
private static class OffheapBufferGenerator implements Supplier<ByteBuffer>
{
private static final Logger log = new Logger(OffheapBufferGenerator.class);
private final String description;
private final int computationBufferSize;
private final AtomicLong count = new AtomicLong(0);
public OffheapBufferGenerator(String description, int computationBufferSize)
{
this.description = description;
this.computationBufferSize = computationBufferSize;
}
@Override
public ByteBuffer get()
{
log.info(
"Allocating new %s buffer[%,d] of size[%,d]",
description,
count.getAndIncrement(),
computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
private static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunnerForSegment(
QueryRunnerFactory<T, QueryType> factory,
SegmentId segmentId,

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby;
import com.google.common.base.Preconditions;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.TestBufferPool;
import org.junit.Assert;
import javax.annotation.Nullable;
import java.io.Closeable;
public class TestGroupByBuffers implements Closeable
{
private final int bufferSize;
private final int numMergeBuffers;
@Nullable
private TestBufferPool processingPool;
@Nullable
private TestBufferPool mergePool;
public TestGroupByBuffers(final int bufferSize, final int numMergeBuffers)
{
this.bufferSize = bufferSize;
this.numMergeBuffers = numMergeBuffers;
this.processingPool = TestBufferPool.offHeap(bufferSize, Integer.MAX_VALUE);
this.mergePool = TestBufferPool.offHeap(bufferSize, numMergeBuffers);
}
public static TestGroupByBuffers createFromProcessingConfig(final DruidProcessingConfig config)
{
return new TestGroupByBuffers(config.intermediateComputeSizeBytes(), config.getNumMergeBuffers());
}
public static TestGroupByBuffers createDefault()
{
return createFromProcessingConfig(GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG);
}
public int getBufferSize()
{
return bufferSize;
}
public int getNumMergeBuffers()
{
return numMergeBuffers;
}
public TestBufferPool getProcessingPool()
{
return Preconditions.checkNotNull(processingPool, "processingPool");
}
public TestBufferPool getMergePool()
{
return Preconditions.checkNotNull(mergePool, "mergePool");
}
@Override
public void close()
{
if (processingPool != null) {
Assert.assertEquals(0, processingPool.getOutstandingObjectCount());
processingPool = null;
}
if (mergePool != null) {
Assert.assertEquals(0, mergePool.getOutstandingObjectCount());
mergePool = null;
}
}
}

View File

@ -22,12 +22,15 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.CloserRule;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -38,7 +41,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
public class BufferHashGrouperTest
public class BufferHashGrouperTest extends InitializedNullHandlingTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -97,28 +100,30 @@ public class BufferHashGrouperTest
public void testGrowing()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 10000, 2, 0.75f);
final int expectedMaxSize = NullHandling.replaceWithDefault() ? 219 : 210;
try (final ResourceHolder<Grouper<IntKey>> grouperHolder = makeGrouper(columnSelectorFactory, 10000, 2, 0.75f)) {
final Grouper<IntKey> grouper = grouperHolder.get();
final int expectedMaxSize = NullHandling.replaceWithDefault() ? 219 : 210;
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
// Aggregate slightly different row
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
final List<Grouper.Entry<IntKey>> expected = new ArrayList<>();
for (int i = 0; i < expectedMaxSize; i++) {
expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L}));
}
GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true));
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
// Aggregate slightly different row
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
final List<Grouper.Entry<IntKey>> expected = new ArrayList<>();
for (int i = 0; i < expectedMaxSize; i++) {
expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L}));
}
GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true));
}
@Test
@ -129,14 +134,16 @@ public class BufferHashGrouperTest
if (NullHandling.replaceWithDefault()) {
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
// the buffer size below is chosen to test integer overflow in ByteBufferHashTable.adjustTableWhenFull().
final Grouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 1_900_000_000, 2, 0.3f);
final int expectedMaxSize = 15323979;
try (final ResourceHolder<Grouper<IntKey>> holder = makeGrouper(columnSelectorFactory, 1_900_000_000, 2, 0.3f)) {
final Grouper<IntKey> grouper = holder.get();
final int expectedMaxSize = 15323979;
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
}
}
@ -144,41 +151,45 @@ public class BufferHashGrouperTest
public void testNoGrowing()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 10000, Integer.MAX_VALUE, 0.75f);
final int expectedMaxSize = NullHandling.replaceWithDefault() ? 267 : 258;
try (final ResourceHolder<Grouper<IntKey>> grouperHolder =
makeGrouper(columnSelectorFactory, 10000, Integer.MAX_VALUE, 0.75f)) {
final Grouper<IntKey> grouper = grouperHolder.get();
final int expectedMaxSize = NullHandling.replaceWithDefault() ? 267 : 258;
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
// Aggregate slightly different row
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
final List<Grouper.Entry<IntKey>> expected = new ArrayList<>();
for (int i = 0; i < expectedMaxSize; i++) {
expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L}));
}
GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true));
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
// Aggregate slightly different row
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
final List<Grouper.Entry<IntKey>> expected = new ArrayList<>();
for (int i = 0; i < expectedMaxSize; i++) {
expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L}));
}
GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true));
}
private BufferHashGrouper<IntKey> makeGrouper(
private ResourceHolder<Grouper<IntKey>> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize,
int initialBuckets,
float maxLoadFactor
)
{
final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
// Use off-heap allocation since one of our tests has a 1.9GB buffer. Heap size may be insufficient.
final ResourceHolder<ByteBuffer> bufferHolder = ByteBufferUtils.allocateDirect(bufferSize);
final BufferHashGrouper<IntKey> grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(buffer),
bufferHolder::get,
GrouperTestUtil.intKeySerde(),
AggregatorAdapters.factorizeBuffered(
columnSelectorFactory,
@ -192,7 +203,23 @@ public class BufferHashGrouperTest
initialBuckets,
true
);
grouper.init();
return grouper;
return new ResourceHolder<Grouper<IntKey>>()
{
@Override
public BufferHashGrouper<IntKey> get()
{
return grouper;
}
@Override
public void close()
{
grouper.close();
bufferHolder.close();
}
};
}
}

View File

@ -1166,6 +1166,14 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
Object exValue = ex.getValue();
if (exValue instanceof Double || exValue instanceof Float) {
final double expectedDoubleValue = ((Number) exValue).doubleValue();
Assert.assertNotNull(
StringUtils.format(
"invalid null value for %s (expected %f)",
ex.getKey(),
expectedDoubleValue
),
actVal
);
Assert.assertEquals(
"invalid value for " + ex.getKey(),
expectedDoubleValue,

View File

@ -21,6 +21,8 @@ package org.apache.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.junit.After;
import org.junit.Assert;
@ -112,11 +114,14 @@ public class CompressionStrategyTest
{
ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer);
ByteBuffer compressed = compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), compressionOut);
ByteBuffer output = ByteBuffer.allocateDirect(originalData.length);
compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
try (final ResourceHolder<ByteBuffer> holder = ByteBufferUtils.allocateDirect(originalData.length)) {
final ByteBuffer output = holder.get();
compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
}
}
@Test(timeout = 60_000L)

View File

@ -80,8 +80,8 @@ import java.util.concurrent.ForkJoinPool;
public class CachingClusteredClientFunctionalityTest
{
private static final ObjectMapper OBJECT_MAPPER = CachingClusteredClientTestUtils.createObjectMapper();
private static final Pair<QueryToolChestWarehouse, Closer> WAREHOUSE_AND_CLOSER = CachingClusteredClientTestUtils
.createWarehouse(OBJECT_MAPPER);
private static final Pair<QueryToolChestWarehouse, Closer> WAREHOUSE_AND_CLOSER =
CachingClusteredClientTestUtils.createWarehouse();
private static final QueryToolChestWarehouse WAREHOUSE = WAREHOUSE_AND_CLOSER.lhs;
private static final Closer RESOURCE_CLOSER = WAREHOUSE_AND_CLOSER.rhs;

View File

@ -262,8 +262,8 @@ public class CachingClusteredClientTest
private static final DateTimeZone TIMEZONE = DateTimes.inferTzFromString("America/Los_Angeles");
private static final Granularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
private static final String TOP_DIM = "a_dim";
private static final Pair<QueryToolChestWarehouse, Closer> WAREHOUSE_AND_CLOSER = CachingClusteredClientTestUtils
.createWarehouse(JSON_MAPPER);
private static final Pair<QueryToolChestWarehouse, Closer> WAREHOUSE_AND_CLOSER =
CachingClusteredClientTestUtils.createWarehouse();
private static final QueryToolChestWarehouse WAREHOUSE = WAREHOUSE_AND_CLOSER.lhs;
private static final Closer RESOURCE_CLOSER = WAREHOUSE_AND_CLOSER.rhs;

View File

@ -33,6 +33,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.search.SearchQueryConfig;
import org.apache.druid.query.search.SearchQueryQueryToolChest;
@ -50,15 +51,13 @@ public final class CachingClusteredClientTestUtils
* Returns a new {@link QueryToolChestWarehouse} for unit tests and a resourceCloser which should be closed at the end
* of the test.
*/
public static Pair<QueryToolChestWarehouse, Closer> createWarehouse(
ObjectMapper objectMapper
)
public static Pair<QueryToolChestWarehouse, Closer> createWarehouse()
{
final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair = GroupByQueryRunnerTest.makeQueryRunnerFactory(
new GroupByQueryConfig()
final Closer resourceCloser = Closer.create();
final GroupByQueryRunnerFactory groupByQueryRunnerFactory = GroupByQueryRunnerTest.makeQueryRunnerFactory(
new GroupByQueryConfig(),
resourceCloser.register(TestGroupByBuffers.createDefault())
);
final GroupByQueryRunnerFactory factory = factoryCloserPair.lhs;
final Closer resourceCloser = factoryCloserPair.rhs;
return Pair.of(
new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
@ -76,7 +75,7 @@ public final class CachingClusteredClientTestUtils
)
.put(
GroupByQuery.class,
factory.getToolchest()
groupByQueryRunnerFactory.getToolchest()
)
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())
.build()

View File

@ -22,8 +22,6 @@ package org.apache.druid.server;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
@ -40,10 +38,12 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.TestBufferPool;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
@ -77,9 +77,9 @@ import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.junit.Assert;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@ -276,14 +276,16 @@ public class QueryStackTests
final Supplier<Integer> minTopNThresholdSupplier
)
{
final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(COMPUTE_BUFFER_SIZE)
);
final TestBufferPool testBufferPool = TestBufferPool.offHeap(COMPUTE_BUFFER_SIZE, Integer.MAX_VALUE);
closer.register(() -> {
// Verify that all objects have been returned to the pool.
Assert.assertEquals(0, testBufferPool.getOutstandingObjectCount());
});
closer.register(stupidPool);
final TestGroupByBuffers groupByBuffers =
closer.register(TestGroupByBuffers.createFromProcessingConfig(processingConfig));
final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair =
final GroupByQueryRunnerFactory groupByQueryRunnerFactory =
GroupByQueryRunnerTest.makeQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
@ -294,12 +296,10 @@ public class QueryStackTests
return GroupByStrategySelector.STRATEGY_V2;
}
},
groupByBuffers,
processingConfig
);
final GroupByQueryRunnerFactory groupByQueryRunnerFactory = factoryCloserPair.lhs;
closer.register(factoryCloserPair.rhs);
final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
@ -333,7 +333,7 @@ public class QueryStackTests
.put(
TopNQuery.class,
new TopNQueryRunnerFactory(
stupidPool,
testBufferPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()
{
@Override