diff --git a/.travis.yml b/.travis.yml index af43cbe0ecf..7334528d4dd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -391,7 +391,7 @@ jobs: jdk: openjdk11 env: - MAVEN_PROJECTS='core,indexing-hadoop,indexing-service,processing' - script: ${MVN} test -B -pl ${MAVEN_PROJECTS} -Ddruid.console.skip=true -DargLine=-Xmx3000m -T1C + script: ${MVN} test -B -pl ${MAVEN_PROJECTS} -Ddruid.console.skip=true -DargLine=-Xmx3000m - name: "Build and test on ARM64 CPU architecture (2)" stage: Tests - phase 2 @@ -402,7 +402,7 @@ jobs: jdk: openjdk11 env: - MAVEN_PROJECTS='core,sql,server,services' - script: ${MVN} test -B -pl ${MAVEN_PROJECTS} -Ddruid.console.skip=true -DargLine=-Xmx3000m -T1C + script: ${MVN} test -B -pl ${MAVEN_PROJECTS} -Ddruid.console.skip=true -DargLine=-Xmx3000m - name: "web console end-to-end test" before_install: *setup_generate_license diff --git a/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index a32236db20d..1021974b1b4 100644 --- a/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -85,39 +85,6 @@ public class DefaultBlockingPool implements BlockingPool ); } - @Nullable - private T pollObject() - { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - return objects.isEmpty() ? null : objects.pop(); - } - finally { - lock.unlock(); - } - } - - @Nullable - private T pollObject(long timeoutMs) throws InterruptedException - { - long nanos = TIME_UNIT.toNanos(timeoutMs); - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (objects.isEmpty()) { - if (nanos <= 0) { - return null; - } - nanos = notEnough.awaitNanos(nanos); - } - return objects.pop(); - } - finally { - lock.unlock(); - } - } - @Override public List> takeBatch(final int elementNum, final long timeoutMs) { diff --git a/core/src/main/java/org/apache/druid/java/util/common/ByteBufferUtils.java b/core/src/main/java/org/apache/druid/java/util/common/ByteBufferUtils.java index 61a9211e8bf..29c46dc69a6 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/ByteBufferUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/ByteBufferUtils.java @@ -19,6 +19,8 @@ package org.apache.druid.java.util.common; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.utils.JvmUtils; import java.lang.invoke.MethodHandle; @@ -28,11 +30,15 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; /** + * */ public class ByteBufferUtils { + private static final Logger log = new Logger(ByteBufferUtils.class); + // the following MethodHandle lookup code is adapted from Apache Kafka // https://github.com/apache/kafka/blob/e554dc518eaaa0747899e708160275f95c4e525f/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java @@ -138,6 +144,52 @@ public class ByteBufferUtils return unmapper.bindTo(UnsafeUtils.theUnsafe()); } + /** + * Same as {@link ByteBuffer#allocateDirect(int)}, but returns a closeable {@link ResourceHolder} that + * frees the buffer upon close. + * + * Direct (off-heap) buffers are an alternative to on-heap buffers that allow memory to be managed + * outside the purview of the garbage collector. It's most useful when allocating big chunks of memory, + * like processing buffers. + * + * Holders cannot be closed more than once. Attempting to close a holder twice will earn you an + * {@link IllegalStateException}. + */ + public static ResourceHolder allocateDirect(final int size) + { + class DirectByteBufferHolder implements ResourceHolder + { + private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile ByteBuffer buf = ByteBuffer.allocateDirect(size); + + @Override + public ByteBuffer get() + { + final ByteBuffer theBuf = buf; + + if (theBuf == null) { + throw new ISE("Closed"); + } else { + return theBuf; + } + } + + @Override + public void close() + { + if (closed.compareAndSet(false, true)) { + final ByteBuffer theBuf = buf; + buf = null; + free(theBuf); + } else { + throw new ISE("Already closed"); + } + } + } + + return new DirectByteBufferHolder(); + } + /** * Releases memory held by the given direct ByteBuffer * diff --git a/core/src/main/java/org/apache/druid/math/expr/ExprEval.java b/core/src/main/java/org/apache/druid/math/expr/ExprEval.java index 3152fb26d5a..ae4b5168995 100644 --- a/core/src/main/java/org/apache/druid/math/expr/ExprEval.java +++ b/core/src/main/java/org/apache/druid/math/expr/ExprEval.java @@ -45,9 +45,30 @@ public abstract class ExprEval /** * Deserialize an expression stored in a bytebuffer, e.g. for an agg. * - * This should be refactored to be consolidated with some of the standard type handling of aggregators probably + * This method is not thread-safe with respect to the provided {@link ByteBuffer}, because the position of the + * buffer may be changed transiently during execution of this method. However, it will be restored to its original + * position prior to the method completing. Therefore, if the provided buffer is being used by a single thread, then + * this method does not change the position of the buffer. + * + * The {@code canRetainBufferReference} parameter determines + * + * @param buffer source buffer + * @param offset position to start reading from + * @param maxSize maximum number of bytes from "offset" that may be required. This is used as advice, + * but is not strictly enforced in all cases. It is possible that type strategies may + * attempt reads past this limit. + * @param type data type to read + * @param canRetainBufferReference whether the returned {@link ExprEval} may retain a reference to the provided + * {@link ByteBuffer}. Certain types are deserialized more efficiently if allowed + * to retain references to the provided buffer. */ - public static ExprEval deserialize(ByteBuffer buffer, int offset, ExpressionType type) + public static ExprEval deserialize( + final ByteBuffer buffer, + final int offset, + final int maxSize, + final ExpressionType type, + final boolean canRetainBufferReference + ) { switch (type.getType()) { case LONG: @@ -61,7 +82,19 @@ public abstract class ExprEval } return of(TypeStrategies.readNotNullNullableDouble(buffer, offset)); default: - return ofType(type, type.getNullableStrategy().read(buffer, offset)); + final NullableTypeStrategy strategy = type.getNullableStrategy(); + + if (!canRetainBufferReference && strategy.readRetainsBufferReference()) { + final ByteBuffer dataCopyBuffer = ByteBuffer.allocate(maxSize); + final ByteBuffer mutationBuffer = buffer.duplicate(); + mutationBuffer.limit(offset + maxSize); + mutationBuffer.position(offset); + dataCopyBuffer.put(mutationBuffer); + dataCopyBuffer.rewind(); + return ofType(type, strategy.read(dataCopyBuffer, 0)); + } else { + return ofType(type, strategy.read(buffer, offset)); + } } } diff --git a/core/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java b/core/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java index 998e6a92dc4..f7ed0298cc5 100644 --- a/core/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java +++ b/core/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java @@ -102,6 +102,15 @@ public final class NullableTypeStrategy implements Comparator } } + /** + * Whether the {@link #read} methods return an object that may retain a reference to the provided {@link ByteBuffer}. + * If a reference is sometimes retained, this method returns true. It returns false if, and only if, a reference + * is *never* retained. + */ + public boolean readRetainsBufferReference() + { + return delegate.readRetainsBufferReference(); + } public int write(ByteBuffer buffer, int offset, @Nullable T value, int maxSizeBytes) { diff --git a/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java b/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java index c7b212fd00e..38d72046f11 100644 --- a/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java +++ b/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java @@ -250,6 +250,12 @@ public class TypeStrategies return buffer.getLong(); } + @Override + public boolean readRetainsBufferReference() + { + return false; + } + @Override public int write(ByteBuffer buffer, Long value, int maxSizeBytes) { @@ -291,6 +297,12 @@ public class TypeStrategies return buffer.getFloat(); } + @Override + public boolean readRetainsBufferReference() + { + return false; + } + @Override public int write(ByteBuffer buffer, Float value, int maxSizeBytes) { @@ -332,6 +344,12 @@ public class TypeStrategies return buffer.getDouble(); } + @Override + public boolean readRetainsBufferReference() + { + return false; + } + @Override public int write(ByteBuffer buffer, Double value, int maxSizeBytes) { @@ -379,6 +397,12 @@ public class TypeStrategies return StringUtils.fromUtf8(blob); } + @Override + public boolean readRetainsBufferReference() + { + return false; + } + @Override public int write(ByteBuffer buffer, String value, int maxSizeBytes) { @@ -447,6 +471,12 @@ public class TypeStrategies return array; } + @Override + public boolean readRetainsBufferReference() + { + return elementStrategy.readRetainsBufferReference(); + } + @Override public int write(ByteBuffer buffer, Object[] value, int maxSizeBytes) { diff --git a/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java b/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java index 26a284a9ee9..4ac575469e7 100644 --- a/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java +++ b/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java @@ -73,13 +73,18 @@ public interface TypeStrategy extends Comparator * Read a non-null value from the {@link ByteBuffer} at the current {@link ByteBuffer#position()}. This will move * the underlying position by the size of the value read. * - * The contract of this method is that any value returned from this method MUST be completely detached from the - * underlying {@link ByteBuffer}, since it might outlive the memory location being allocated to hold the object. - * In other words, if an object is memory mapped, it must be copied on heap, or relocated to another memory location - * that is owned by the caller with {@link #write}. + * The value returned from this method may retain a reference to the provided {@link ByteBuffer}. If it does, then + * {@link #readRetainsBufferReference()} returns true. */ T read(ByteBuffer buffer); + /** + * Whether the {@link #read} methods return an object that may retain a reference to the provided {@link ByteBuffer}. + * If a reference is sometimes retained, this method returns true. It returns false if, and only if, a reference + * is *never* retained. + */ + boolean readRetainsBufferReference(); + /** * Write a non-null value to the {@link ByteBuffer} at position {@link ByteBuffer#position()}. This will move the * underlying position by the size of the value written. diff --git a/core/src/test/java/org/apache/druid/common/config/NullHandlingTest.java b/core/src/test/java/org/apache/druid/common/config/NullHandlingTest.java index 1419c617e84..3e1a1a87c3d 100644 --- a/core/src/test/java/org/apache/druid/common/config/NullHandlingTest.java +++ b/core/src/test/java/org/apache/druid/common/config/NullHandlingTest.java @@ -94,11 +94,15 @@ public class NullHandlingTest @Test public void test_ignoreNullsStrings() { - NullHandling.initializeForTestsWithValues(false, true); - Assert.assertFalse(NullHandling.ignoreNullsForStringCardinality()); - - NullHandling.initializeForTestsWithValues(true, false); - Assert.assertFalse(NullHandling.ignoreNullsForStringCardinality()); + try { + NullHandling.initializeForTestsWithValues(false, true); + Assert.assertFalse(NullHandling.ignoreNullsForStringCardinality()); + NullHandling.initializeForTestsWithValues(true, false); + Assert.assertFalse(NullHandling.ignoreNullsForStringCardinality()); + } + finally { + NullHandling.initializeForTests(); + } } } diff --git a/core/src/test/java/org/apache/druid/java/util/common/ByteBufferUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/ByteBufferUtilsTest.java index 74622396bbd..91a37dba427 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/ByteBufferUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/ByteBufferUtilsTest.java @@ -20,7 +20,8 @@ package org.apache.druid.java.util.common; import com.google.common.io.Files; -import junit.framework.Assert; +import org.apache.druid.collections.ResourceHolder; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -38,6 +39,18 @@ public class ByteBufferUtilsTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test + public void testAllocateDirect() + { + final int sz = 10; + + try (final ResourceHolder holder = ByteBufferUtils.allocateDirect(sz)) { + final ByteBuffer buf = holder.get(); + Assert.assertTrue(buf.isDirect()); + Assert.assertEquals(sz, buf.remaining()); + } + } + @Test public void testUnmapDoesntCrashJVM() throws Exception { diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index 4f80740f67f..754e7237c76 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.druid.java.util.common; +import org.apache.druid.collections.ResourceHolder; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -125,12 +126,14 @@ public class StringUtilsTest @Test public void fromUtf8ByteBufferDirect() { - ByteBuffer bytes = ByteBuffer.allocateDirect(4); - bytes.put(new byte[]{'a', 'b', 'c', 'd'}); - bytes.rewind(); - Assert.assertEquals("abcd", StringUtils.fromUtf8(bytes, 4)); - bytes.rewind(); - Assert.assertEquals("abcd", StringUtils.fromUtf8(bytes)); + try (final ResourceHolder bufferHolder = ByteBufferUtils.allocateDirect(4)) { + final ByteBuffer bytes = bufferHolder.get(); + bytes.put(new byte[]{'a', 'b', 'c', 'd'}); + bytes.rewind(); + Assert.assertEquals("abcd", StringUtils.fromUtf8(bytes, 4)); + bytes.rewind(); + Assert.assertEquals("abcd", StringUtils.fromUtf8(bytes)); + } } @SuppressWarnings("MalformedFormatString") diff --git a/core/src/test/java/org/apache/druid/math/expr/ExprEvalTest.java b/core/src/test/java/org/apache/druid/math/expr/ExprEvalTest.java index d65588bc362..6e57e1cd74b 100644 --- a/core/src/test/java/org/apache/druid/math/expr/ExprEvalTest.java +++ b/core/src/test/java/org/apache/druid/math/expr/ExprEvalTest.java @@ -368,13 +368,27 @@ public class ExprEvalTest extends InitializedNullHandlingTest ExprEval.serialize(buffer, position, expected.type(), expected, maxSizeBytes); if (expected.type().isArray()) { Assert.assertArrayEquals( + "deserialized value with buffer references allowed", expected.asArray(), - ExprEval.deserialize(buffer, position, expected.type()).asArray() + ExprEval.deserialize(buffer, position, MAX_SIZE_BYTES, expected.type(), true).asArray() + ); + + Assert.assertArrayEquals( + "deserialized value with buffer references not allowed", + expected.asArray(), + ExprEval.deserialize(buffer, position, MAX_SIZE_BYTES, expected.type(), false).asArray() ); } else { Assert.assertEquals( + "deserialized value with buffer references allowed", expected.value(), - ExprEval.deserialize(buffer, position, expected.type()).value() + ExprEval.deserialize(buffer, position, MAX_SIZE_BYTES, expected.type(), true).value() + ); + + Assert.assertEquals( + "deserialized value with buffer references not allowed", + expected.value(), + ExprEval.deserialize(buffer, position, MAX_SIZE_BYTES, expected.type(), false).value() ); } } diff --git a/core/src/test/java/org/apache/druid/math/expr/ParserTest.java b/core/src/test/java/org/apache/druid/math/expr/ParserTest.java index 51ae5da7532..b997a53f91f 100644 --- a/core/src/test/java/org/apache/druid/math/expr/ParserTest.java +++ b/core/src/test/java/org/apache/druid/math/expr/ParserTest.java @@ -309,24 +309,29 @@ public class ParserTest extends InitializedNullHandlingTest public void testLiteralExplicitTypedArrays() { ExpressionProcessing.initializeForTests(true); - validateConstantExpression("ARRAY[1.0, 2.0, null, 3.0]", new Object[]{1.0, 2.0, null, 3.0}); - validateConstantExpression("ARRAY[1, 2, null, 3]", new Object[]{1L, 2L, null, 3L}); - validateConstantExpression("ARRAY['1', '2', null, '3.0']", new Object[]{"1", "2", null, "3.0"}); - // mixed type tests - validateConstantExpression("ARRAY[3, null, 4, 2.345]", new Object[]{3.0, null, 4.0, 2.345}); - validateConstantExpression("ARRAY[1.0, null, 2000.0]", new Object[]{1L, null, 2000L}); + try { + validateConstantExpression("ARRAY[1.0, 2.0, null, 3.0]", new Object[]{1.0, 2.0, null, 3.0}); + validateConstantExpression("ARRAY[1, 2, null, 3]", new Object[]{1L, 2L, null, 3L}); + validateConstantExpression("ARRAY['1', '2', null, '3.0']", new Object[]{"1", "2", null, "3.0"}); - // explicit typed string arrays should accept any literal and convert - validateConstantExpression("ARRAY['1', null, 2000, 1.1]", new Object[]{"1", null, "2000", "1.1"}); - validateConstantExpression("ARRAY['1', null, 2000, 1.1]", new Object[]{1L, null, 2000L, 1L}); - validateConstantExpression("ARRAY['1', null, 2000, 1.1]", new Object[]{1.0, null, 2000.0, 1.1}); + // mixed type tests + validateConstantExpression("ARRAY[3, null, 4, 2.345]", new Object[]{3.0, null, 4.0, 2.345}); + validateConstantExpression("ARRAY[1.0, null, 2000.0]", new Object[]{1L, null, 2000L}); - // the gramar isn't cool enough yet to parse populated nested-arrays or complex arrays..., but empty ones can - // be defined... - validateConstantExpression("ARRAY>[]", new Object[]{}); - validateConstantExpression("ARRAY>[]", new Object[]{}); - ExpressionProcessing.initializeForTests(null); + // explicit typed string arrays should accept any literal and convert + validateConstantExpression("ARRAY['1', null, 2000, 1.1]", new Object[]{"1", null, "2000", "1.1"}); + validateConstantExpression("ARRAY['1', null, 2000, 1.1]", new Object[]{1L, null, 2000L, 1L}); + validateConstantExpression("ARRAY['1', null, 2000, 1.1]", new Object[]{1.0, null, 2000.0, 1.1}); + + // the gramar isn't cool enough yet to parse populated nested-arrays or complex arrays..., but empty ones can + // be defined... + validateConstantExpression("ARRAY>[]", new Object[]{}); + validateConstantExpression("ARRAY>[]", new Object[]{}); + } + finally { + ExpressionProcessing.initializeForTests(null); + } } @Test diff --git a/core/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java b/core/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java index 861562b5de7..0e006904cad 100644 --- a/core/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java +++ b/core/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java @@ -90,6 +90,12 @@ public class TypeStrategiesTest return null; } + @Override + public boolean readRetainsBufferReference() + { + return false; + } + @Override public int write(ByteBuffer buffer, String value, int maxSizeBytes) { @@ -658,6 +664,12 @@ public class TypeStrategiesTest return new NullableLongPair(lhs, rhs); } + @Override + public boolean readRetainsBufferReference() + { + return false; + } + @Override public int write(ByteBuffer buffer, NullableLongPair value, int maxSizeBytes) { diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java index 3da07ff6240..8328600561f 100644 --- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunnerTestHelper; @@ -35,6 +34,7 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.segment.IncrementalIndexSegment; @@ -63,11 +63,11 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest { final GroupByQueryConfig config = new GroupByQueryConfig(); config.setMaxIntermediateRows(10000); - final Pair factoryCloserPair = GroupByQueryRunnerTest.makeQueryRunnerFactory( - config + this.resourceCloser = Closer.create(); + this.factory = GroupByQueryRunnerTest.makeQueryRunnerFactory( + config, + this.resourceCloser.register(TestGroupByBuffers.createDefault()) ); - factory = factoryCloserPair.lhs; - resourceCloser = factoryCloserPair.rhs; } @After diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java index 3e941d232b5..35e52d4052e 100644 --- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java @@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation.histogram; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.Row; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunner; @@ -33,12 +32,15 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.segment.TestHelper; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -54,13 +56,31 @@ import java.util.List; public class ApproximateHistogramGroupByQueryTest extends InitializedNullHandlingTest { private static final Closer RESOURCE_CLOSER = Closer.create(); + private static TestGroupByBuffers BUFFER_POOLS = null; private final QueryRunner runner; private final GroupByQueryRunnerFactory factory; + @BeforeClass + public static void setUpClass() + { + if (BUFFER_POOLS == null) { + BUFFER_POOLS = TestGroupByBuffers.createDefault(); + } + } + + @AfterClass + public static void tearDownClass() + { + BUFFER_POOLS.close(); + BUFFER_POOLS = null; + } + @Parameterized.Parameters(name = "{0}") public static Iterable constructorFeeder() { + setUpClass(); + final GroupByQueryConfig v1Config = new GroupByQueryConfig() { @Override @@ -121,11 +141,7 @@ public class ApproximateHistogramGroupByQueryTest extends InitializedNullHandlin ); for (GroupByQueryConfig config : configs) { - final Pair factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory( - config - ); - final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs; - RESOURCE_CLOSER.register(factoryAndCloser.rhs); + final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config, BUFFER_POOLS); for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners(factory)) { final String testName = StringUtils.format( "config=%s, runner=%s", diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramGroupByQueryTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramGroupByQueryTest.java index 793dcc3b0be..78a520fbb94 100644 --- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramGroupByQueryTest.java +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramGroupByQueryTest.java @@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation.histogram; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.Row; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunner; @@ -33,12 +32,15 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.segment.TestHelper; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -54,13 +56,31 @@ import java.util.List; public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandlingTest { private static final Closer RESOURCE_CLOSER = Closer.create(); + private static TestGroupByBuffers BUFFER_POOLS = null; private final QueryRunner runner; private final GroupByQueryRunnerFactory factory; + @BeforeClass + public static void setUpClass() + { + if (BUFFER_POOLS == null) { + BUFFER_POOLS = TestGroupByBuffers.createDefault(); + } + } + + @AfterClass + public static void tearDownClass() + { + BUFFER_POOLS.close(); + BUFFER_POOLS = null; + } + @Parameterized.Parameters(name = "{0}") public static Iterable constructorFeeder() { + setUpClass(); + final GroupByQueryConfig v1Config = new GroupByQueryConfig() { @Override @@ -121,11 +141,7 @@ public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandli ); for (GroupByQueryConfig config : configs) { - final Pair factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory( - config - ); - final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs; - RESOURCE_CLOSER.register(factoryAndCloser.rhs); + final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config, BUFFER_POOLS); for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners(factory)) { final String testName = StringUtils.format( "config=%s, runner=%s", diff --git a/hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java b/hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java index 36cb5f077f9..d46faf44724 100644 --- a/hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java +++ b/hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java @@ -624,7 +624,7 @@ public class HyperLogLogCollectorTest int valsToCheckIndex = 0; HyperLogLogCollector collector = HyperLogLogCollector.makeCollector( - ByteBuffer.allocateDirect( + ByteBuffer.allocate( HyperLogLogCollector.getLatestNumBytesForDenseStorage() ) ); diff --git a/pom.xml b/pom.xml index 8f114306954..be3e2e88156 100644 --- a/pom.xml +++ b/pom.xml @@ -1502,6 +1502,8 @@ @{jacocoArgLine} -Xmx1500m -XX:MaxDirectMemorySize=512m + -XX:+ExitOnOutOfMemoryError + -XX:+HeapDumpOnOutOfMemoryError -Duser.language=en -Duser.GroupByQueryRunnerTest.javacountry=US -Dfile.encoding=UTF-8 @@ -1514,6 +1516,7 @@ false true + @@ -1734,6 +1737,8 @@ ${jdk.surefire.argLine} -Xmx768m -Duser.language=en -Duser.country=US -Dfile.encoding=UTF-8 + -XX:+ExitOnOutOfMemoryError + -XX:+HeapDumpOnOutOfMemoryError -Duser.timezone=UTC -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Daws.region=us-east-1 diff --git a/processing/pom.xml b/processing/pom.xml index cd41197eee5..fdcdaaffff3 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -288,6 +288,8 @@ ${jdk.surefire.argLine} -Xmx512m -XX:MaxDirectMemorySize=2500m + -XX:+ExitOnOutOfMemoryError + -XX:+HeapDumpOnOutOfMemoryError -Duser.language=en -Duser.GroupByQueryRunnerTest.javacountry=US -Dfile.encoding=UTF-8 @@ -315,6 +317,8 @@ ${jdk.surefire.argLine} -server -Xms3G -Xmx3G -Djub.consumers=CONSOLE,H2 -Djub.db.file=benchmarks/benchmarks + -XX:+ExitOnOutOfMemoryError + -XX:+HeapDumpOnOutOfMemoryError org.apache.druid.collections.test.annotation.Benchmark org.apache.druid.collections.test.annotation.Dummy diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java index 44b2cad9b3e..8fcb7f3bc08 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java @@ -93,9 +93,9 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory private final Supplier finalizeExpression; private final HumanReadableBytes maxSizeBytes; - private final Supplier compareBindings; - private final Supplier combineBindings; - private final Supplier finalizeBindings; + private final ThreadLocal compareBindings; + private final ThreadLocal combineBindings; + private final ThreadLocal finalizeBindings; private final Supplier finalizeInspector; @JsonCreator @@ -166,7 +166,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory ImmutableMap.of(FINALIZE_IDENTIFIER, this.initialCombineValue.get().type()) ) ); - this.compareBindings = Suppliers.memoize( + this.compareBindings = ThreadLocal.withInitial( () -> new SettableObjectBinding(2).withInspector( InputBindings.inspectorFromTypeMap( ImmutableMap.of( @@ -176,7 +176,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory ) ) ); - this.combineBindings = Suppliers.memoize( + this.combineBindings = ThreadLocal.withInitial( () -> new SettableObjectBinding(2).withInspector( InputBindings.inspectorFromTypeMap( ImmutableMap.of( @@ -186,7 +186,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory ) ) ); - this.finalizeBindings = Suppliers.memoize( + this.finalizeBindings = ThreadLocal.withInitial( () -> new SettableObjectBinding(1).withInspector(finalizeInspector.get()) ); this.finalizeExpression = Parser.lazyParse(finalizeExpressionString, macroTable); @@ -302,7 +302,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory FactorizePlan thePlan = new FactorizePlan(metricFactory); return new ExpressionLambdaAggregator( thePlan, - maxSizeBytes.getBytesInInt() + getMaxIntermediateSize() ); } @@ -312,7 +312,7 @@ public class ExpressionLambdaAggregatorFactory extends AggregatorFactory FactorizePlan thePlan = new FactorizePlan(metricFactory); return new ExpressionLambdaBufferAggregator( thePlan, - maxSizeBytes.getBytesInInt() + getMaxIntermediateSize() ); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaBufferAggregator.java index 7cbcc5c5948..5dc9a88d963 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaBufferAggregator.java @@ -75,7 +75,7 @@ public class ExpressionLambdaBufferAggregator implements BufferAggregator } } } - ExprEval acc = ExprEval.deserialize(buf, position, outputType); + ExprEval acc = ExprEval.deserialize(buf, position, maxSizeBytes, outputType, true); bindings.setAccumulator(acc); ExprEval newAcc = lambda.eval(bindings); ExprEval.serialize(buf, position, outputType, newAcc, maxSizeBytes); @@ -90,25 +90,25 @@ public class ExpressionLambdaBufferAggregator implements BufferAggregator if (isNullUnlessAggregated && (buf.get(position) & NOT_AGGREGATED_BIT) != 0) { return null; } - return ExprEval.deserialize(buf, position, outputType).value(); + return ExprEval.deserialize(buf, position, maxSizeBytes, outputType, false).value(); } @Override public float getFloat(ByteBuffer buf, int position) { - return (float) ExprEval.deserialize(buf, position, outputType).asDouble(); + return (float) ExprEval.deserialize(buf, position, maxSizeBytes, outputType, true).asDouble(); } @Override public double getDouble(ByteBuffer buf, int position) { - return ExprEval.deserialize(buf, position, outputType).asDouble(); + return ExprEval.deserialize(buf, position, maxSizeBytes, outputType, true).asDouble(); } @Override public long getLong(ByteBuffer buf, int position) { - return ExprEval.deserialize(buf, position, outputType).asLong(); + return ExprEval.deserialize(buf, position, maxSizeBytes, outputType, true).asLong(); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java index a34682f6532..91ba3803fe6 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java @@ -58,6 +58,13 @@ public class ObjectStrategyComplexTypeStrategy implements TypeStrategy return objectStrategy.fromByteBuffer(dupe, complexLength); } + @Override + public boolean readRetainsBufferReference() + { + // Can't guarantee that ObjectStrategy *doesn't* retain a reference. + return true; + } + @Override public int write(ByteBuffer buffer, T value, int maxSizeBytes) { diff --git a/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationAgainstConsecutiveRunsTest.java b/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationAgainstConsecutiveRunsTest.java index c9b24893a2f..db45fe7ca4f 100644 --- a/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationAgainstConsecutiveRunsTest.java +++ b/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationAgainstConsecutiveRunsTest.java @@ -21,9 +21,11 @@ package org.apache.druid.collections.bitmap; import org.apache.druid.extendedset.intset.ConciseSet; import org.apache.druid.extendedset.intset.ImmutableConciseSet; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.roaringbitmap.buffer.MutableRoaringBitmap; +import java.io.IOException; import java.util.BitSet; public class BitmapOperationAgainstConsecutiveRunsTest extends BitmapOperationTestBase @@ -66,10 +68,16 @@ public class BitmapOperationAgainstConsecutiveRunsTest extends BitmapOperationTe ROARING[i] = r; IMMUTABLE_ROARING[i] = makeImmutableRoaring(r); OFF_HEAP_ROARING[i] = makeOffheapRoaring(r); - GENERIC_CONCISE[i] = new WrappedImmutableConciseBitmap(OFF_HEAP_CONCISE[i]); - GENERIC_ROARING[i] = new WrappedImmutableRoaringBitmap(OFF_HEAP_ROARING[i]); + GENERIC_CONCISE[i] = new WrappedImmutableConciseBitmap(OFF_HEAP_CONCISE[i].get()); + GENERIC_ROARING[i] = new WrappedImmutableRoaringBitmap(OFF_HEAP_ROARING[i].get()); } unionCount = expectedUnion.cardinality(); printSizeStats(DENSITY, "Random Alternating Bitmap"); } + + @AfterClass + public static void tearDownClass() throws IOException + { + reset(); + } } diff --git a/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationAgainstUniformDistributionTest.java b/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationAgainstUniformDistributionTest.java index 08460db5ca5..8120939641b 100644 --- a/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationAgainstUniformDistributionTest.java +++ b/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationAgainstUniformDistributionTest.java @@ -21,9 +21,11 @@ package org.apache.druid.collections.bitmap; import org.apache.druid.extendedset.intset.ConciseSet; import org.apache.druid.extendedset.intset.ImmutableConciseSet; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.roaringbitmap.buffer.MutableRoaringBitmap; +import java.io.IOException; import java.util.BitSet; public class BitmapOperationAgainstUniformDistributionTest extends BitmapOperationTestBase @@ -61,11 +63,17 @@ public class BitmapOperationAgainstUniformDistributionTest extends BitmapOperati ROARING[i] = r; IMMUTABLE_ROARING[i] = makeImmutableRoaring(r); OFF_HEAP_ROARING[i] = makeOffheapRoaring(r); - GENERIC_CONCISE[i] = new WrappedImmutableConciseBitmap(OFF_HEAP_CONCISE[i]); - GENERIC_ROARING[i] = new WrappedImmutableRoaringBitmap(OFF_HEAP_ROARING[i]); + GENERIC_CONCISE[i] = new WrappedImmutableConciseBitmap(OFF_HEAP_CONCISE[i].get()); + GENERIC_ROARING[i] = new WrappedImmutableRoaringBitmap(OFF_HEAP_ROARING[i].get()); } unionCount = expectedUnion.cardinality(); minIntersection = knownTrue.length; printSizeStats(DENSITY, "Uniform Bitmap"); } + + @AfterClass + public static void tearDownClass() throws IOException + { + reset(); + } } diff --git a/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationTestBase.java b/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationTestBase.java index fa054691058..d521f2ef4ec 100644 --- a/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationTestBase.java +++ b/processing/src/test/java/org/apache/druid/collections/bitmap/BitmapOperationTestBase.java @@ -19,8 +19,11 @@ package org.apache.druid.collections.bitmap; +import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.config.NullHandling; import org.apache.druid.extendedset.intset.ImmutableConciseSet; +import org.apache.druid.java.util.common.ByteBufferUtils; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Test; import org.roaringbitmap.buffer.BufferFastAggregation; @@ -40,10 +43,10 @@ public abstract class BitmapOperationTestBase public static final int BITMAP_LENGTH = 500_000; public static final int NUM_BITMAPS = 1000; static final ImmutableConciseSet[] CONCISE = new ImmutableConciseSet[NUM_BITMAPS]; - static final ImmutableConciseSet[] OFF_HEAP_CONCISE = new ImmutableConciseSet[NUM_BITMAPS]; + static final ResourceHolder[] OFF_HEAP_CONCISE = new ResourceHolder[NUM_BITMAPS]; static final ImmutableRoaringBitmap[] ROARING = new ImmutableRoaringBitmap[NUM_BITMAPS]; static final ImmutableRoaringBitmap[] IMMUTABLE_ROARING = new ImmutableRoaringBitmap[NUM_BITMAPS]; - static final ImmutableRoaringBitmap[] OFF_HEAP_ROARING = new ImmutableRoaringBitmap[NUM_BITMAPS]; + static final ResourceHolder[] OFF_HEAP_ROARING = new ResourceHolder[NUM_BITMAPS]; static final ImmutableBitmap[] GENERIC_CONCISE = new ImmutableBitmap[NUM_BITMAPS]; static final ImmutableBitmap[] GENERIC_ROARING = new ImmutableBitmap[NUM_BITMAPS]; static final ConciseBitmapFactory CONCISE_FACTORY = new ConciseBitmapFactory(); @@ -60,14 +63,31 @@ public abstract class BitmapOperationTestBase NullHandling.initializeForTests(); } - protected static ImmutableConciseSet makeOffheapConcise(ImmutableConciseSet concise) + protected static ResourceHolder makeOffheapConcise(ImmutableConciseSet concise) { final byte[] bytes = concise.toBytes(); totalConciseBytes += bytes.length; conciseCount++; - final ByteBuffer buf = ByteBuffer.allocateDirect(bytes.length).put(bytes); + + final ResourceHolder bufHolder = ByteBufferUtils.allocateDirect(bytes.length); + final ByteBuffer buf = bufHolder.get().put(bytes); buf.rewind(); - return new ImmutableConciseSet(buf.asIntBuffer()); + final ImmutableConciseSet bitmap = new ImmutableConciseSet(buf.asIntBuffer()); + + return new ResourceHolder() + { + @Override + public ImmutableConciseSet get() + { + return bitmap; + } + + @Override + public void close() + { + bufHolder.close(); + } + }; } protected static ImmutableRoaringBitmap writeImmutable(MutableRoaringBitmap r, ByteBuffer buf) throws IOException @@ -81,8 +101,19 @@ public abstract class BitmapOperationTestBase return new ImmutableRoaringBitmap(buf.asReadOnlyBuffer()); } - protected static void reset() + protected static void reset() throws IOException { + CloseableUtils.closeAll(Arrays.asList(OFF_HEAP_CONCISE)); + CloseableUtils.closeAll(Arrays.asList(OFF_HEAP_ROARING)); + + Arrays.fill(CONCISE, null); + Arrays.fill(ROARING, null); + Arrays.fill(IMMUTABLE_ROARING, null); + Arrays.fill(GENERIC_CONCISE, null); + Arrays.fill(GENERIC_ROARING, null); + Arrays.fill(OFF_HEAP_CONCISE, null); + Arrays.fill(OFF_HEAP_ROARING, null); + conciseCount = 0; roaringCount = 0; totalConciseBytes = 0; @@ -111,13 +142,29 @@ public abstract class BitmapOperationTestBase System.out.flush(); } - protected static ImmutableRoaringBitmap makeOffheapRoaring(MutableRoaringBitmap r) throws IOException + protected static ResourceHolder makeOffheapRoaring(MutableRoaringBitmap r) throws IOException { final int size = r.serializedSizeInBytes(); - final ByteBuffer buf = ByteBuffer.allocateDirect(size); + final ResourceHolder bufHolder = ByteBufferUtils.allocateDirect(size); + final ByteBuffer buf = bufHolder.get(); totalRoaringBytes += size; roaringCount++; - return writeImmutable(r, buf); + final ImmutableRoaringBitmap bitmap = writeImmutable(r, buf); + + return new ResourceHolder() + { + @Override + public ImmutableRoaringBitmap get() + { + return bitmap; + } + + @Override + public void close() + { + bufHolder.close(); + } + }; } protected static ImmutableRoaringBitmap makeImmutableRoaring(MutableRoaringBitmap r) throws IOException @@ -136,7 +183,9 @@ public abstract class BitmapOperationTestBase @Test public void testOffheapConciseUnion() { - ImmutableConciseSet union = ImmutableConciseSet.union(OFF_HEAP_CONCISE); + ImmutableConciseSet union = ImmutableConciseSet.union( + Arrays.stream(OFF_HEAP_CONCISE).map(ResourceHolder::get).iterator() + ); Assert.assertEquals(unionCount, union.size()); } @@ -171,7 +220,9 @@ public abstract class BitmapOperationTestBase @Test public void testOffheapRoaringUnion() { - ImmutableRoaringBitmap union = BufferFastAggregation.horizontal_or(Arrays.asList(OFF_HEAP_ROARING).iterator()); + ImmutableRoaringBitmap union = BufferFastAggregation.naive_or( + Arrays.stream(OFF_HEAP_ROARING).map(ResourceHolder::get).iterator() + ); Assert.assertEquals(unionCount, union.getCardinality()); } diff --git a/processing/src/test/java/org/apache/druid/collections/bitmap/WrappedBitSetBitmapBitSetTest.java b/processing/src/test/java/org/apache/druid/collections/bitmap/WrappedBitSetBitmapBitSetTest.java index 36e6be43cac..6095e77ff88 100644 --- a/processing/src/test/java/org/apache/druid/collections/bitmap/WrappedBitSetBitmapBitSetTest.java +++ b/processing/src/test/java/org/apache/druid/collections/bitmap/WrappedBitSetBitmapBitSetTest.java @@ -21,6 +21,8 @@ package org.apache.druid.collections.bitmap; import com.google.common.collect.Sets; import org.apache.druid.collections.IntSetTestUtility; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.java.util.common.ByteBufferUtils; import org.junit.Assert; import org.junit.Test; import org.roaringbitmap.IntIterator; @@ -67,13 +69,15 @@ public class WrappedBitSetBitmapBitSetTest @Test public void testOffHeap() { - ByteBuffer buffer = ByteBuffer.allocateDirect(Long.SIZE * 100 / 8).order(ByteOrder.LITTLE_ENDIAN); - BitSet testSet = BitSet.valueOf(buffer); - testSet.set(1); - WrappedImmutableBitSetBitmap bitMap = new WrappedImmutableBitSetBitmap(testSet); - Assert.assertTrue(bitMap.get(1)); - testSet.set(2); - Assert.assertTrue(bitMap.get(2)); + try (final ResourceHolder bufferHolder = ByteBufferUtils.allocateDirect(Long.SIZE * 100 / 8)) { + final ByteBuffer buffer = bufferHolder.get().order(ByteOrder.LITTLE_ENDIAN); + BitSet testSet = BitSet.valueOf(buffer); + testSet.set(1); + WrappedImmutableBitSetBitmap bitMap = new WrappedImmutableBitSetBitmap(testSet); + Assert.assertTrue(bitMap.get(1)); + testSet.set(2); + Assert.assertTrue(bitMap.get(2)); + } } @Test diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java new file mode 100644 index 00000000000..e841397c0e1 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.collect.Iterables; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.NonBlockingPool; +import org.apache.druid.collections.ReferenceCountingResourceHolder; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.java.util.common.ByteBufferUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.utils.CloseableUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +/** + * A buffer pool that throws away buffers when they are "returned" to the pool. Useful for tests that need to make + * many pools and use them one at a time. + * + * This pool implements {@link BlockingPool}, but never blocks. It returns immediately if resources are available; + * otherwise it returns an empty list immediately. This is also useful for tests, because it allows "timeouts" to + * happen immediately and therefore speeds up tests. + */ +public class TestBufferPool implements NonBlockingPool, BlockingPool +{ + private final Supplier> generator; + private final int maxCount; + + @GuardedBy("this") + private long numOutstanding; + + private TestBufferPool(final Supplier> generator, final int maxCount) + { + this.generator = generator; + this.maxCount = maxCount; + } + + public static TestBufferPool onHeap(final int bufferSize, final int maxCount) + { + return new TestBufferPool( + () -> new ReferenceCountingResourceHolder<>(ByteBuffer.allocate(bufferSize), () -> {}), + maxCount + ); + } + + public static TestBufferPool offHeap(final int bufferSize, final int maxCount) + { + return new TestBufferPool( + () -> ByteBufferUtils.allocateDirect(bufferSize), + maxCount + ); + } + + + @Override + public int maxSize() + { + return maxCount; + } + + @Override + public ResourceHolder take() + { + final List> holders = takeBatch(1); + + if (holders.isEmpty()) { + throw new ISE("Too many objects outstanding"); + } else { + return Iterables.getOnlyElement(holders); + } + } + + @Override + public List> takeBatch(int elementNum, long timeoutMs) + { + return takeBatch(elementNum); + } + + @Override + public List> takeBatch(int elementNum) + { + synchronized (this) { + if (numOutstanding + elementNum <= maxCount) { + final List> retVal = new ArrayList<>(); + + try { + for (int i = 0; i < elementNum; i++) { + final ResourceHolder holder = generator.get(); + final ByteBuffer o = holder.get(); + retVal.add(new ReferenceCountingResourceHolder<>(o, () -> { + synchronized (this) { + numOutstanding--; + holder.close(); + } + })); + numOutstanding++; + } + } + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, () -> CloseableUtils.closeAll(retVal)); + } + + return retVal; + } else { + return Collections.emptyList(); + } + } + } + + public long getOutstandingObjectCount() + { + synchronized (this) { + return numOutstanding; + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 2ba78a3dcdf..37c31ec1323 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -38,7 +38,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; @@ -59,6 +58,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.query.scan.ScanQueryConfig; import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanQueryQueryToolChest; @@ -152,13 +152,14 @@ public class AggregationTestHelper implements Closeable TemporaryFolder tempFolder ) { + final Closer closer = Closer.create(); final ObjectMapper mapper = TestHelper.makeJsonMapper(); - final Pair factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory( + final TestGroupByBuffers groupByBuffers = closer.register(TestGroupByBuffers.createDefault()); + final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory( mapper, - config + config, + groupByBuffers ); - final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs; - final Closer closer = factoryAndCloser.rhs; IndexIO indexIO = new IndexIO( mapper, diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/HistogramAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/HistogramAggregatorTest.java index 6d3261b45d5..9651b168178 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/HistogramAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/HistogramAggregatorTest.java @@ -114,7 +114,7 @@ public class HistogramAggregatorTest ); HistogramBufferAggregator agg = new HistogramBufferAggregator(selector, breaks); - ByteBuffer buf = ByteBuffer.allocateDirect(factory.getMaxIntermediateSizeWithNulls()); + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls()); int position = 0; agg.init(buf, position); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/JavaScriptAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/JavaScriptAggregatorTest.java index 808e64dcf21..2275b915f52 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/JavaScriptAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/JavaScriptAggregatorTest.java @@ -159,7 +159,7 @@ public class JavaScriptAggregatorTest ) ); - ByteBuffer buf = ByteBuffer.allocateDirect(32); + ByteBuffer buf = ByteBuffer.allocate(32); final int position = 4; agg.init(buf, position); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java index 5d875862442..24f4c4eca0b 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java @@ -49,6 +49,7 @@ import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelectorUtils; import org.apache.druid.segment.IdLookup; import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Test; @@ -59,7 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class CardinalityAggregatorTest +public class CardinalityAggregatorTest extends InitializedNullHandlingTest { public static class TestDimensionSelector extends AbstractDimensionSelector { @@ -381,7 +382,6 @@ public class CardinalityAggregatorTest @Test public void testAggregateRows() { - NullHandling.initializeForTestsWithValues(null, null); CardinalityAggregator agg = new CardinalityAggregator( dimInfoList, true @@ -397,7 +397,6 @@ public class CardinalityAggregatorTest @Test public void testAggregateValues() { - NullHandling.initializeForTestsWithValues(null, null); CardinalityAggregator agg = new CardinalityAggregator( dimInfoList, false @@ -421,7 +420,6 @@ public class CardinalityAggregatorTest @Test public void testBufferAggregateRows() { - NullHandling.initializeForTestsWithValues(null, null); CardinalityBufferAggregator agg = new CardinalityBufferAggregator( dimInfoList.toArray(new ColumnSelectorPlus[0]), true @@ -444,7 +442,6 @@ public class CardinalityAggregatorTest @Test public void testBufferAggregateValues() { - NullHandling.initializeForTestsWithValues(null, null); CardinalityBufferAggregator agg = new CardinalityBufferAggregator( dimInfoList.toArray(new ColumnSelectorPlus[0]), false @@ -474,7 +471,6 @@ public class CardinalityAggregatorTest @Test public void testCombineRows() { - NullHandling.initializeForTestsWithValues(null, null); List selector1 = Collections.singletonList(dim1); List selector2 = Collections.singletonList(dim2); List> dimInfo1 = Collections.singletonList( @@ -520,7 +516,6 @@ public class CardinalityAggregatorTest @Test public void testCombineValues() { - NullHandling.initializeForTestsWithValues(null, null); List selector1 = Collections.singletonList(dim1); List selector2 = Collections.singletonList(dim2); @@ -573,7 +568,6 @@ public class CardinalityAggregatorTest @Test public void testAggregateRowsWithExtraction() { - NullHandling.initializeForTestsWithValues(null, null); CardinalityAggregator agg = new CardinalityAggregator( dimInfoListWithExtraction, true @@ -596,7 +590,6 @@ public class CardinalityAggregatorTest @Test public void testAggregateValuesWithExtraction() { - NullHandling.initializeForTestsWithValues(null, null); CardinalityAggregator agg = new CardinalityAggregator( dimInfoListWithExtraction, false @@ -669,92 +662,107 @@ public class CardinalityAggregatorTest public void testAggregateRowsIgnoreNulls() { NullHandling.initializeForTestsWithValues(null, true); - CardinalityAggregator agg = new CardinalityAggregator( - dimInfoList, - true - ); + try { + CardinalityAggregator agg = new CardinalityAggregator( + dimInfoList, + true + ); - for (int i = 0; i < VALUES1.size(); ++i) { - aggregate(selectorList, agg); + for (int i = 0; i < VALUES1.size(); ++i) { + aggregate(selectorList, agg); + } + Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05); + Assert.assertEquals(9L, rowAggregatorFactoryRounded.finalizeComputation(agg.get())); + } + finally { + NullHandling.initializeForTests(); } - Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05); - Assert.assertEquals(9L, rowAggregatorFactoryRounded.finalizeComputation(agg.get())); } @Test public void testAggregateValuesIgnoreNulls() { NullHandling.initializeForTestsWithValues(null, true); - CardinalityAggregator agg = new CardinalityAggregator( - dimInfoList, - false - ); + try { + CardinalityAggregator agg = new CardinalityAggregator( + dimInfoList, + false + ); - for (int i = 0; i < VALUES1.size(); ++i) { - aggregate(selectorList, agg); + for (int i = 0; i < VALUES1.size(); ++i) { + aggregate(selectorList, agg); + } + //setting is not applied when druid.generic.useDefaultValueForNull=false + Assert.assertEquals( + NullHandling.replaceWithDefault() ? 6.0 : 6.0, + (Double) valueAggregatorFactory.finalizeComputation(agg.get()), + 0.05 + ); + Assert.assertEquals( + NullHandling.replaceWithDefault() ? 6L : 6L, + rowAggregatorFactoryRounded.finalizeComputation(agg.get()) + ); + } + finally { + NullHandling.initializeForTests(); } - //setting is not applied when druid.generic.useDefaultValueForNull=false - Assert.assertEquals( - NullHandling.replaceWithDefault() ? 6.0 : 6.0, - (Double) valueAggregatorFactory.finalizeComputation(agg.get()), - 0.05 - ); - Assert.assertEquals( - NullHandling.replaceWithDefault() ? 6L : 6L, - rowAggregatorFactoryRounded.finalizeComputation(agg.get()) - ); } @Test public void testCombineValuesIgnoreNulls() { NullHandling.initializeForTestsWithValues(null, true); - List selector1 = Collections.singletonList(dim1); - List selector2 = Collections.singletonList(dim2); + try { + List selector1 = Collections.singletonList(dim1); + List selector2 = Collections.singletonList(dim2); - List> dimInfo1 = Collections.singletonList( - new ColumnSelectorPlus<>( - dimSpec1.getDimension(), - dimSpec1.getOutputName(), - new StringCardinalityAggregatorColumnSelectorStrategy(), dim1 - ) - ); - List> dimInfo2 = Collections.singletonList( - new ColumnSelectorPlus<>( - dimSpec1.getDimension(), - dimSpec1.getOutputName(), - new StringCardinalityAggregatorColumnSelectorStrategy(), dim2 - ) - ); + List> dimInfo1 = Collections.singletonList( + new ColumnSelectorPlus<>( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim1 + ) + ); + List> dimInfo2 = Collections.singletonList( + new ColumnSelectorPlus<>( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim2 + ) + ); - CardinalityAggregator agg1 = new CardinalityAggregator(dimInfo1, false); - CardinalityAggregator agg2 = new CardinalityAggregator(dimInfo2, false); + CardinalityAggregator agg1 = new CardinalityAggregator(dimInfo1, false); + CardinalityAggregator agg2 = new CardinalityAggregator(dimInfo2, false); - for (int i = 0; i < VALUES1.size(); ++i) { - aggregate(selector1, agg1); + for (int i = 0; i < VALUES1.size(); ++i) { + aggregate(selector1, agg1); + } + for (int i = 0; i < VALUES2.size(); ++i) { + aggregate(selector2, agg2); + } + Assert.assertEquals( + NullHandling.replaceWithDefault() ? 3.0 : 3.0, + (Double) valueAggregatorFactory.finalizeComputation(agg1.get()), + 0.05 + ); + Assert.assertEquals( + NullHandling.replaceWithDefault() ? 6.0 : 6.0, + (Double) valueAggregatorFactory.finalizeComputation(agg2.get()), + 0.05 + ); + Assert.assertEquals( + NullHandling.replaceWithDefault() ? 6.0 : 6.0, + (Double) rowAggregatorFactory.finalizeComputation( + rowAggregatorFactory.combine( + agg1.get(), + agg2.get() + ) + ), + 0.05 + ); } - for (int i = 0; i < VALUES2.size(); ++i) { - aggregate(selector2, agg2); + finally { + NullHandling.initializeForTests(); } - Assert.assertEquals( - NullHandling.replaceWithDefault() ? 3.0 : 3.0, - (Double) valueAggregatorFactory.finalizeComputation(agg1.get()), - 0.05 - ); - Assert.assertEquals( - NullHandling.replaceWithDefault() ? 6.0 : 6.0, - (Double) valueAggregatorFactory.finalizeComputation(agg2.get()), - 0.05 - ); - Assert.assertEquals( - NullHandling.replaceWithDefault() ? 6.0 : 6.0, - (Double) rowAggregatorFactory.finalizeComputation( - rowAggregatorFactory.combine( - agg1.get(), - agg2.get() - ) - ), - 0.05 - ); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index 888044d223a..8e5d7011ed9 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -27,8 +27,6 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.collections.CloseableDefaultBlockingPool; -import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -42,7 +40,6 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.DruidProcessingConfig; @@ -53,6 +50,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.TestBufferPool; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -83,7 +81,6 @@ import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -91,7 +88,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullHandlingTest @@ -267,27 +263,18 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH { executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]"); - final CloseableStupidPool bufferPool = new CloseableStupidPool<>( - "GroupByBenchmark-computeBufferPool", - new OffheapBufferGenerator("compute", 10_000_000), - 0, - Integer.MAX_VALUE - ); + final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); // limit of 2 is required since we simulate both historical merge and broker merge in the same process - final CloseableDefaultBlockingPool mergePool = new CloseableDefaultBlockingPool<>( - new OffheapBufferGenerator("merge", 10_000_000), - 2 - ); + final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2); // limit of 2 is required since we simulate both historical merge and broker merge in the same process - final CloseableDefaultBlockingPool tooSmallMergePool = new CloseableDefaultBlockingPool<>( - new OffheapBufferGenerator("merge", 255), - 2 - ); + final TestBufferPool tooSmallMergePool = TestBufferPool.onHeap(255, 2); - resourceCloser.register(bufferPool); - resourceCloser.register(mergePool); - resourceCloser.register(tooSmallMergePool); + resourceCloser.register(() -> { + // Verify that all objects have been returned to the pools. + Assert.assertEquals(0, mergePool.getOutstandingObjectCount()); + Assert.assertEquals(0, tooSmallMergePool.getOutstandingObjectCount()); + }); final GroupByQueryConfig config = new GroupByQueryConfig() { @@ -626,34 +613,6 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH return runners; } - private static class OffheapBufferGenerator implements Supplier - { - private static final Logger log = new Logger(OffheapBufferGenerator.class); - - private final String description; - private final int computationBufferSize; - private final AtomicLong count = new AtomicLong(0); - - public OffheapBufferGenerator(String description, int computationBufferSize) - { - this.description = description; - this.computationBufferSize = computationBufferSize; - } - - @Override - public ByteBuffer get() - { - log.info( - "Allocating new %s buffer[%,d] of size[%,d]", - description, - count.getAndIncrement(), - computationBufferSize - ); - - return ByteBuffer.allocateDirect(computationBufferSize); - } - } - public static > QueryRunner makeQueryRunner( QueryRunnerFactory factory, SegmentId segmentId, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java index 5533677a3a3..4c362a53c80 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java @@ -26,8 +26,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.collections.CloseableDefaultBlockingPool; -import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionSchema; @@ -43,7 +41,6 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.DruidProcessingConfig; @@ -54,6 +51,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.TestBufferPool; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.context.ResponseContext; @@ -92,7 +90,6 @@ import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -100,7 +97,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; public class GroupByLimitPushDownMultiNodeMergeTest @@ -539,27 +535,17 @@ public class GroupByLimitPushDownMultiNodeMergeTest { executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]"); - final CloseableStupidPool bufferPool = new CloseableStupidPool<>( - "GroupByBenchmark-computeBufferPool", - new OffheapBufferGenerator("compute", 10_000_000), - 0, - Integer.MAX_VALUE - ); - + final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); + final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2); // limit of 2 is required since we simulate both historical merge and broker merge in the same process - final CloseableDefaultBlockingPool mergePool = new CloseableDefaultBlockingPool<>( - new OffheapBufferGenerator("merge", 10_000_000), - 2 - ); - // limit of 2 is required since we simulate both historical merge and broker merge in the same process - final CloseableDefaultBlockingPool mergePool2 = new CloseableDefaultBlockingPool<>( - new OffheapBufferGenerator("merge", 10_000_000), - 2 - ); + final TestBufferPool mergePool2 = TestBufferPool.offHeap(10_000_000, 2); - resourceCloser.register(bufferPool); - resourceCloser.register(mergePool); - resourceCloser.register(mergePool2); + resourceCloser.register(() -> { + // Verify that all objects have been returned to the pools. + Assert.assertEquals(0, bufferPool.getOutstandingObjectCount()); + Assert.assertEquals(0, mergePool.getOutstandingObjectCount()); + Assert.assertEquals(0, mergePool2.getOutstandingObjectCount()); + }); final GroupByQueryConfig config = new GroupByQueryConfig() { @@ -1036,34 +1022,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest return runners; } - private static class OffheapBufferGenerator implements Supplier - { - private static final Logger log = new Logger(OffheapBufferGenerator.class); - - private final String description; - private final int computationBufferSize; - private final AtomicLong count = new AtomicLong(0); - - public OffheapBufferGenerator(String description, int computationBufferSize) - { - this.description = description; - this.computationBufferSize = computationBufferSize; - } - - @Override - public ByteBuffer get() - { - log.info( - "Allocating new %s buffer[%,d] of size[%,d]", - description, - count.getAndIncrement(), - computationBufferSize - ); - - return ByteBuffer.allocateDirect(computationBufferSize); - } - } - public static > QueryRunner makeQueryRunner( QueryRunnerFactory factory, SegmentId segmentId, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java index 90e9058a90c..58c4abed174 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java @@ -25,8 +25,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.collections.CloseableDefaultBlockingPool; -import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -39,7 +37,6 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.DruidProcessingConfig; @@ -50,6 +47,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.TestBufferPool; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -79,7 +77,6 @@ import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -87,7 +84,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; public class GroupByMultiSegmentTest { @@ -205,21 +201,17 @@ public class GroupByMultiSegmentTest { executorService = Execs.multiThreaded(2, "GroupByThreadPool[%d]"); - final CloseableStupidPool bufferPool = new CloseableStupidPool<>( - "GroupByBenchmark-computeBufferPool", - new OffheapBufferGenerator("compute", 10_000_000), - 0, - Integer.MAX_VALUE - ); + final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); // limit of 2 is required since we simulate both historical merge and broker merge in the same process - final CloseableDefaultBlockingPool mergePool = new CloseableDefaultBlockingPool<>( - new OffheapBufferGenerator("merge", 10_000_000), - 2 - ); + final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2); + + resourceCloser.register(() -> { + // Verify that all objects have been returned to the pools. + Assert.assertEquals(0, bufferPool.getOutstandingObjectCount()); + Assert.assertEquals(0, mergePool.getOutstandingObjectCount()); + }); - resourceCloser.register(bufferPool); - resourceCloser.register(mergePool); final GroupByQueryConfig config = new GroupByQueryConfig() { @Override @@ -363,34 +355,6 @@ public class GroupByMultiSegmentTest return runners; } - private static class OffheapBufferGenerator implements Supplier - { - private static final Logger log = new Logger(OffheapBufferGenerator.class); - - private final String description; - private final int computationBufferSize; - private final AtomicLong count = new AtomicLong(0); - - public OffheapBufferGenerator(String description, int computationBufferSize) - { - this.description = description; - this.computationBufferSize = computationBufferSize; - } - - @Override - public ByteBuffer get() - { - log.info( - "Allocating new %s buffer[%,d] of size[%,d]", - description, - count.getAndIncrement(), - computationBufferSize - ); - - return ByteBuffer.allocateDirect(computationBufferSize); - } - } - public static > QueryRunner makeQueryRunner( QueryRunnerFactory factory, SegmentId segmentId, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java index 3e9a630a172..00d638af693 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -149,11 +149,11 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest private static final CloseableStupidPool BUFFER_POOL = new CloseableStupidPool<>( "GroupByQueryEngine-bufferPool", - () -> ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes()) + () -> ByteBuffer.allocate(PROCESSING_CONFIG.intermediateComputeSizeBytes()) ); private static final TestBlockingPool MERGE_BUFFER_POOL = new TestBlockingPool( - () -> ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes()), + () -> ByteBuffer.allocate(PROCESSING_CONFIG.intermediateComputeSizeBytes()), PROCESSING_CONFIG.getNumMergeBuffers() ); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index 5a8180f193e..3892a2018a0 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -24,7 +24,6 @@ import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.MergeSequence; import org.apache.druid.java.util.common.guava.Sequence; @@ -67,17 +66,15 @@ public class GroupByQueryRunnerFactoryTest @Before public void setup() { - final Pair factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory( - new GroupByQueryConfig() - ); - - factory = factoryAndCloser.lhs; - resourceCloser = factoryAndCloser.rhs; + this.resourceCloser = Closer.create(); + final TestGroupByBuffers buffers = resourceCloser.register(TestGroupByBuffers.createDefault()); + this.factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig(), buffers); } @After public void teardown() throws IOException { + factory = null; resourceCloser.close(); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 2dfe08e6ce3..acb52cc057b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -119,24 +119,10 @@ public class GroupByQueryRunnerFailureTest private static final CloseableStupidPool BUFFER_POOL = new CloseableStupidPool<>( "GroupByQueryEngine-bufferPool", - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()); - } - } + () -> ByteBuffer.allocate(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()) ); private static final CloseableDefaultBlockingPool MERGE_BUFFER_POOL = new CloseableDefaultBlockingPool<>( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()); - } - }, + () -> ByteBuffer.allocate(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()), DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers() ); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 23cc2cdc2b4..1511750bcae 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -30,8 +30,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; -import org.apache.druid.collections.CloseableDefaultBlockingPool; -import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.Rows; @@ -39,7 +37,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.concurrent.Execs; @@ -49,7 +46,6 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.guava.MergeSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.math.expr.ExprMacroTable; @@ -149,6 +145,7 @@ import org.joda.time.DateTimeZone; import org.joda.time.Period; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -157,8 +154,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -176,7 +171,7 @@ import java.util.concurrent.Executors; public class GroupByQueryRunnerTest extends InitializedNullHandlingTest { public static final ObjectMapper DEFAULT_MAPPER = TestHelper.makeSmileMapper(); - private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() + public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() { @Override public String getFormatString() @@ -205,7 +200,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest } }; - private static final Closer RESOURCE_CLOSER = Closer.create(); + private static TestGroupByBuffers BUFFER_POOLS = null; private final QueryRunner runner; private final QueryRunner originalRunner; @@ -365,87 +360,74 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest ); } - public static Pair makeQueryRunnerFactory( - final GroupByQueryConfig config + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools ) { - return makeQueryRunnerFactory(DEFAULT_MAPPER, config, DEFAULT_PROCESSING_CONFIG); + return makeQueryRunnerFactory(DEFAULT_MAPPER, config, bufferPools, DEFAULT_PROCESSING_CONFIG); } - public static Pair makeQueryRunnerFactory( - final ObjectMapper mapper, - final GroupByQueryConfig config - ) - { - return makeQueryRunnerFactory(mapper, config, DEFAULT_PROCESSING_CONFIG); - } - - public static Pair makeQueryRunnerFactory( + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( final ObjectMapper mapper, final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory(mapper, config, bufferPools, DEFAULT_PROCESSING_CONFIG); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools, final DruidProcessingConfig processingConfig ) { + if (bufferPools.getBufferSize() != processingConfig.intermediateComputeSizeBytes()) { + throw new ISE( + "Provided buffer size [%,d] does not match configured size [%,d]", + bufferPools.getBufferSize(), + processingConfig.intermediateComputeSizeBytes() + ); + } + if (bufferPools.getNumMergeBuffers() != processingConfig.getNumMergeBuffers()) { + throw new ISE( + "Provided merge buffer count [%,d] does not match configured count [%,d]", + bufferPools.getNumMergeBuffers(), + processingConfig.getNumMergeBuffers() + ); + } final Supplier configSupplier = Suppliers.ofInstance(config); - final CloseableStupidPool bufferPool = new CloseableStupidPool<>( - "GroupByQueryEngine-bufferPool", - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes()); - } - } - ); - final CloseableDefaultBlockingPool mergeBufferPool = new CloseableDefaultBlockingPool<>( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes()); - } - }, - processingConfig.getNumMergeBuffers() - ); final GroupByStrategySelector strategySelector = new GroupByStrategySelector( configSupplier, new GroupByStrategyV1( configSupplier, - new GroupByQueryEngine(configSupplier, bufferPool), + new GroupByQueryEngine(configSupplier, bufferPools.getProcessingPool()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ), new GroupByStrategyV2( processingConfig, configSupplier, - bufferPool, - mergeBufferPool, + bufferPools.getProcessingPool(), + bufferPools.getMergePool(), mapper, QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector); - final Closer closer = Closer.create(); - closer.register(() -> { - // Verify that all objects have been returned to the pool. - Assert.assertEquals(bufferPool.poolSize(), bufferPool.objectsCreatedCount()); - bufferPool.close(); - }); - closer.register(mergeBufferPool); - return Pair.of(new GroupByQueryRunnerFactory(strategySelector, toolChest), closer); + return new GroupByQueryRunnerFactory(strategySelector, toolChest); } @Parameterized.Parameters(name = "{0}") public static Collection constructorFeeder() { NullHandling.initializeForTests(); + setUpClass(); final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : testConfigs()) { - final Pair factoryAndCloser = makeQueryRunnerFactory(config); - final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs; - RESOURCE_CLOSER.register(factoryAndCloser.rhs); + final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, BUFFER_POOLS); for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners(factory)) { for (boolean vectorize : ImmutableList.of(false, true)) { final String testName = StringUtils.format("config=%s, runner=%s, vectorize=%s", config, runner, vectorize); @@ -463,10 +445,19 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest return constructors; } - @AfterClass - public static void teardown() throws IOException + @BeforeClass + public static void setUpClass() { - RESOURCE_CLOSER.close(); + if (BUFFER_POOLS == null) { + BUFFER_POOLS = TestGroupByBuffers.createDefault(); + } + } + + @AfterClass + public static void tearDownClass() + { + BUFFER_POOLS.close(); + BUFFER_POOLS = null; } public GroupByQueryRunnerTest( @@ -4546,7 +4537,10 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setLimitSpec( new DefaultLimitSpec( Collections.singletonList( - new OrderByColumnSpec(QueryRunnerTestHelper.MARKET_DIMENSION, OrderByColumnSpec.Direction.DESCENDING) + new OrderByColumnSpec( + QueryRunnerTestHelper.MARKET_DIMENSION, + OrderByColumnSpec.Direction.DESCENDING + ) ), 3 ) @@ -5040,9 +5034,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setInterval("2011-01-25/2011-01-28") .setDimensions(new DefaultDimensionSpec("quality", "alias")) - .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new DoubleSumAggregatorFactory("index", "index"), QueryRunnerTestHelper.INDEX_LONG_MIN, - QueryRunnerTestHelper.INDEX_LONG_MAX, QueryRunnerTestHelper.INDEX_DOUBLE_MIN, QueryRunnerTestHelper.INDEX_DOUBLE_MAX, - QueryRunnerTestHelper.INDEX_FLOAT_MIN, QueryRunnerTestHelper.INDEX_FLOAT_MAX) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, + new DoubleSumAggregatorFactory("index", "index"), + QueryRunnerTestHelper.INDEX_LONG_MIN, + QueryRunnerTestHelper.INDEX_LONG_MAX, + QueryRunnerTestHelper.INDEX_DOUBLE_MIN, + QueryRunnerTestHelper.INDEX_DOUBLE_MAX, + QueryRunnerTestHelper.INDEX_FLOAT_MIN, + QueryRunnerTestHelper.INDEX_FLOAT_MAX + ) .setGranularity(Granularities.ALL) .setHavingSpec(new GreaterThanHavingSpec("index", 310L)) .setLimitSpec( @@ -5055,26 +5055,111 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest GroupByQuery fullQuery = builder.build(); List expectedResults = Arrays.asList( - makeRow(fullQuery, "2011-01-25", "alias", "business", "rows", 3L, "index", 312.38165283203125, - QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 101L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 105L, - QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 101.624789D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 105.873942D, - QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 101.62479F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 105.87394F), - makeRow(fullQuery, "2011-01-25", "alias", "news", "rows", 3L, "index", 312.7834167480469, - QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 102L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 105L, - QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 102.907866D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 105.266058D, - QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 102.90787F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 105.26606F), - makeRow(fullQuery, "2011-01-25", "alias", "technology", "rows", 3L, "index", 324.6412353515625, - QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 102L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 116L, - QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 102.044542D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 116.979005D, - QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 102.04454F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 116.979004F), - makeRow(fullQuery, "2011-01-25", "alias", "travel", "rows", 3L, "index", 393.36322021484375, - QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 122L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 149L, - QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 122.077247D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 149.125271D, - QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 122.07725F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 149.12527F), - makeRow(fullQuery, "2011-01-25", "alias", "health", "rows", 3L, "index", 511.2996826171875, - QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 159L, QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 180L, - QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 159.988606D, QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 180.575246D, - QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 159.9886F, QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 180.57524F) + makeRow(fullQuery, + "2011-01-25", + "alias", + "business", + "rows", + 3L, + "index", + 312.38165283203125, + QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, + 101L, + QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, + 105L, + QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, + 101.624789D, + QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, + 105.873942D, + QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, + 101.62479F, + QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, + 105.87394F + ), + makeRow(fullQuery, + "2011-01-25", + "alias", + "news", + "rows", + 3L, + "index", + 312.7834167480469, + QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, + 102L, + QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, + 105L, + QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, + 102.907866D, + QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, + 105.266058D, + QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, + 102.90787F, + QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, + 105.26606F + ), + makeRow(fullQuery, + "2011-01-25", + "alias", + "technology", + "rows", + 3L, + "index", + 324.6412353515625, + QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, + 102L, + QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, + 116L, + QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, + 102.044542D, + QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, + 116.979005D, + QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, + 102.04454F, + QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, + 116.979004F + ), + makeRow(fullQuery, + "2011-01-25", + "alias", + "travel", + "rows", + 3L, + "index", + 393.36322021484375, + QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, + 122L, + QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, + 149L, + QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, + 122.077247D, + QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, + 149.125271D, + QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, + 122.07725F, + QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, + 149.12527F + ), + makeRow(fullQuery, + "2011-01-25", + "alias", + "health", + "rows", + 3L, + "index", + 511.2996826171875, + QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, + 159L, + QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, + 180L, + QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, + 159.988606D, + QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, + 180.575246D, + QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, + 159.9886F, + QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, + 180.57524F + ) ); Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, fullQuery); @@ -5198,25 +5283,77 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"), QueryRunnerTestHelper.INDEX_LONG_MIN, QueryRunnerTestHelper.INDEX_LONG_MAX, QueryRunnerTestHelper.INDEX_DOUBLE_MIN, QueryRunnerTestHelper.INDEX_DOUBLE_MAX, - QueryRunnerTestHelper.INDEX_FLOAT_MIN, QueryRunnerTestHelper.INDEX_FLOAT_MAX) + QueryRunnerTestHelper.INDEX_FLOAT_MIN, QueryRunnerTestHelper.INDEX_FLOAT_MAX + ) .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) .setHavingSpec(havingSpec); final GroupByQuery fullQuery = builder.build(); List expectedResults = Arrays.asList( - makeRow(fullQuery, "2011-04-01", "alias", "business", "rows", 2L, "idx", 217L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 105L, - QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 112L, QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 105.735462D, - QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 112.987027D, QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 105.73546F, - QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 112.98703F), - makeRow(fullQuery, "2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 107L, - QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 1193L, QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 107.047773D, - QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 1193.556278D, QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 107.047775F, - QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 1193.5563F), - makeRow(fullQuery, "2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L, QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, 122L, - QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, 1321L, QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, 122.141707D, - QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, 1321.375057D, QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, 122.14171F, - QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, 1321.375F) + makeRow(fullQuery, + "2011-04-01", + "alias", + "business", + "rows", + 2L, + "idx", + 217L, + QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, + 105L, + QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, + 112L, + QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, + 105.735462D, + QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, + 112.987027D, + QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, + 105.73546F, + QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, + 112.98703F + ), + makeRow(fullQuery, + "2011-04-01", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 4420L, + QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, + 107L, + QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, + 1193L, + QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, + 107.047773D, + QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, + 1193.556278D, + QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, + 107.047775F, + QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, + 1193.5563F + ), + makeRow(fullQuery, + "2011-04-01", + "alias", + "premium", + "rows", + 6L, + "idx", + 4416L, + QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC, + 122L, + QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC, + 1321L, + QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC, + 122.141707D, + QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC, + 1321.375057D, + QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC, + 122.14171F, + QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC, + 1321.375F + ) ); TestHelper.assertExpectedObjects( @@ -7287,7 +7424,12 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) - .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ColumnType.STRING, TestExprMacroTable.INSTANCE)) + .setVirtualColumns(new ExpressionVirtualColumn( + "alias", + "quality", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + )) .setDimensions(Lists.newArrayList( new DefaultDimensionSpec("market", "market2"), new DefaultDimensionSpec("alias", "alias2") @@ -7400,7 +7542,12 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) - .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ColumnType.STRING, TestExprMacroTable.INSTANCE)) + .setVirtualColumns(new ExpressionVirtualColumn( + "alias", + "quality", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + )) .setDimensions(Lists.newArrayList( new DefaultDimensionSpec("quality", "quality2"), new DefaultDimensionSpec("market", "market2"), @@ -7756,7 +7903,12 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .builder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) - .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality", ColumnType.STRING, TestExprMacroTable.INSTANCE)) + .setVirtualColumns(new ExpressionVirtualColumn( + "alias", + "quality", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + )) .setDimensions(Lists.newArrayList( new DefaultDimensionSpec("quality", "quality"), new DefaultDimensionSpec("market", "market"), @@ -8211,7 +8363,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest } - @Test public void testGroupByWithSubtotalsSpecWithOrderLimitForcePushdown() { @@ -11228,18 +11379,23 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest return; } GroupByQuery.Builder builder = makeQueryBuilder() - .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) - .setInterval("2011-04-02/2011-04-04") - .setDimensions(new ExtractionDimensionSpec("quality", "qualityLen", ColumnType.LONG, StrlenExtractionFn.instance())) - .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) - .setLimitSpec( - new DefaultLimitSpec( - Collections.emptyList(), - 20 - ) + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(new ExtractionDimensionSpec( + "quality", + "qualityLen", + ColumnType.LONG, + StrlenExtractionFn.instance() + )) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setLimitSpec( + new DefaultLimitSpec( + Collections.emptyList(), + 20 ) - .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true)) - .setGranularity(Granularities.ALL); + ) + .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true)) + .setGranularity(Granularities.ALL); final GroupByQuery allGranQuery = builder.build(); @@ -11247,14 +11403,14 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest (queryPlus, responseContext) -> { // simulate two daily segments final QueryPlus queryPlus1 = queryPlus.withQuery( - queryPlus.getQuery().withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) - ) + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); final QueryPlus queryPlus2 = queryPlus.withQuery( - queryPlus.getQuery().withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) - ) + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -11272,19 +11428,19 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest ); Map context = new HashMap<>(); List allGranExpectedResults = Arrays.asList( - makeRow(allGranQuery, "2011-04-02", "qualityLen", 4L, "rows", 2L), - makeRow(allGranQuery, "2011-04-02", "qualityLen", 6L, "rows", 4L), - makeRow(allGranQuery, "2011-04-02", "qualityLen", 7L, "rows", 6L), - makeRow(allGranQuery, "2011-04-02", "qualityLen", 8L, "rows", 2L), - makeRow(allGranQuery, "2011-04-02", "qualityLen", 9L, "rows", 6L), - makeRow(allGranQuery, "2011-04-02", "qualityLen", 10L, "rows", 4L), - makeRow(allGranQuery, "2011-04-02", "qualityLen", 13L, "rows", 2L) + makeRow(allGranQuery, "2011-04-02", "qualityLen", 4L, "rows", 2L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 6L, "rows", 4L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 7L, "rows", 6L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 8L, "rows", 2L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 9L, "rows", 6L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 10L, "rows", 4L), + makeRow(allGranQuery, "2011-04-02", "qualityLen", 13L, "rows", 2L) ); TestHelper.assertExpectedObjects( - allGranExpectedResults, - mergedRunner.run(QueryPlus.wrap(allGranQuery)), - "merged" + allGranExpectedResults, + mergedRunner.run(QueryPlus.wrap(allGranQuery)), + "merged" ); } @@ -12057,7 +12213,10 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) .setGranularity(QueryRunnerTestHelper.ALL_GRAN) .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, false)) - .setLimitSpec(new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("nullable", OrderByColumnSpec.Direction.ASCENDING)), 5)) + .setLimitSpec(new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec( + "nullable", + OrderByColumnSpec.Direction.ASCENDING + )), 5)) .build(); List expectedResults; @@ -12151,7 +12310,10 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setDimensions( new DefaultDimensionSpec("v", "v", ColumnType.LONG) ) - .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("twosum", null, "1 + two", TestExprMacroTable.INSTANCE)) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("twosum", null, "1 + two", TestExprMacroTable.INSTANCE) + ) .setGranularity(QueryRunnerTestHelper.ALL_GRAN) .setLimit(5) .build(); @@ -12536,7 +12698,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest // array types don't work with group by v1 if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { expectedException.expect(IllegalStateException.class); - expectedException.expectMessage("Unable to handle type[ARRAY] for AggregatorFactory[class org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory]"); + expectedException.expectMessage( + "Unable to handle type[ARRAY] for AggregatorFactory[class org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory]"); } GroupByQuery query = makeQueryBuilder() @@ -12607,7 +12770,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 135.88510131835938d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12619,7 +12782,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 118.57034, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12631,7 +12794,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 158.747224, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12643,7 +12806,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 120.134704, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12655,7 +12818,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 2871.8866900000003d, "array_agg_distinct", - new String[] {"spot", "total_market", "upfront"} + new String[]{"spot", "total_market", "upfront"} ), makeRow( query, @@ -12667,7 +12830,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 121.58358d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12679,7 +12842,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 2900.798647d, "array_agg_distinct", - new String[] {"spot", "total_market", "upfront"} + new String[]{"spot", "total_market", "upfront"} ), makeRow( query, @@ -12691,7 +12854,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 78.622547d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12703,7 +12866,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 119.922742d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( @@ -12716,7 +12879,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 147.42593d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12728,7 +12891,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 112.987027d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12740,7 +12903,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 166.016049d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12752,7 +12915,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 113.446008d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12764,7 +12927,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 2448.830613d, "array_agg_distinct", - new String[] {"spot", "total_market", "upfront"} + new String[]{"spot", "total_market", "upfront"} ), makeRow( query, @@ -12776,7 +12939,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 114.290141d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12788,7 +12951,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 2506.415148d, "array_agg_distinct", - new String[] {"spot", "total_market", "upfront"} + new String[]{"spot", "total_market", "upfront"} ), makeRow( query, @@ -12800,7 +12963,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 97.387433d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ), makeRow( query, @@ -12812,7 +12975,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "idx", 126.411364d, "array_agg_distinct", - new String[] {"spot"} + new String[]{"spot"} ) ); @@ -12829,7 +12992,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest // array types don't work with group by v1 if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { expectedException.expect(IllegalStateException.class); - expectedException.expectMessage("Unable to handle type[ARRAY] for AggregatorFactory[class org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory]"); + expectedException.expectMessage( + "Unable to handle type[ARRAY] for AggregatorFactory[class org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory]"); } GroupByQuery query = makeQueryBuilder() @@ -12864,7 +13028,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "automotive", "array_agg_distinct", - new String[] {"a", "preferred"} + new String[]{"a", "preferred"} ), makeRow( query, @@ -12872,7 +13036,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "business", "array_agg_distinct", - new String[] {"b", "preferred"} + new String[]{"b", "preferred"} ), makeRow( query, @@ -12880,7 +13044,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "entertainment", "array_agg_distinct", - new String[] {"e", "preferred"} + new String[]{"e", "preferred"} ), makeRow( query, @@ -12888,7 +13052,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "health", "array_agg_distinct", - new String[] {"h", "preferred"} + new String[]{"h", "preferred"} ), makeRow( query, @@ -12896,7 +13060,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "mezzanine", "array_agg_distinct", - new String[] {"m", "preferred"} + new String[]{"m", "preferred"} ), makeRow( query, @@ -12904,7 +13068,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "news", "array_agg_distinct", - new String[] {"n", "preferred"} + new String[]{"n", "preferred"} ), makeRow( query, @@ -12912,7 +13076,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "premium", "array_agg_distinct", - new String[] {"p", "preferred"} + new String[]{"p", "preferred"} ), makeRow( query, @@ -12920,7 +13084,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "technology", "array_agg_distinct", - new String[] {"preferred", "t"} + new String[]{"preferred", "t"} ), makeRow( query, @@ -12928,7 +13092,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "travel", "array_agg_distinct", - new String[] {"preferred", "t"} + new String[]{"preferred", "t"} ), makeRow( @@ -12937,7 +13101,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "automotive", "array_agg_distinct", - new String[] {"a", "preferred"} + new String[]{"a", "preferred"} ), makeRow( query, @@ -12945,7 +13109,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "business", "array_agg_distinct", - new String[] {"b", "preferred"} + new String[]{"b", "preferred"} ), makeRow( query, @@ -12953,7 +13117,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "entertainment", "array_agg_distinct", - new String[] {"e", "preferred"} + new String[]{"e", "preferred"} ), makeRow( query, @@ -12961,7 +13125,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "health", "array_agg_distinct", - new String[] {"h", "preferred"} + new String[]{"h", "preferred"} ), makeRow( query, @@ -12969,7 +13133,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "mezzanine", "array_agg_distinct", - new String[] {"m", "preferred"} + new String[]{"m", "preferred"} ), makeRow( query, @@ -12977,7 +13141,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "news", "array_agg_distinct", - new String[] {"n", "preferred"} + new String[]{"n", "preferred"} ), makeRow( query, @@ -12985,7 +13149,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "premium", "array_agg_distinct", - new String[] {"p", "preferred"} + new String[]{"p", "preferred"} ), makeRow( query, @@ -12993,7 +13157,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "technology", "array_agg_distinct", - new String[] {"preferred", "t"} + new String[]{"preferred", "t"} ), makeRow( query, @@ -13001,7 +13165,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest "alias", "travel", "array_agg_distinct", - new String[] {"preferred", "t"} + new String[]{"preferred", "t"} ) ); @@ -13208,10 +13372,10 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest private Map makeContext() { return ImmutableMap.builder() - .put(QueryContexts.VECTORIZE_KEY, vectorize ? "force" : "false") - .put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ? "force" : "false") - .put("vectorSize", 16) // Small vector size to ensure we use more than one. - .build(); + .put(QueryContexts.VECTORIZE_KEY, vectorize ? "force" : "false") + .put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ? "force" : "false") + .put("vectorSize", 16) // Small vector size to ensure we use more than one. + .build(); } private void cannotVectorize() diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 609a5346f89..0877cccbb49 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -25,14 +25,12 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.HumanReadableBytes; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.QueryPlus; @@ -56,11 +54,11 @@ import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.joda.time.DateTime; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -74,25 +72,29 @@ import java.util.Map; @RunWith(Parameterized.class) public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest { - private static final Closer RESOURCE_CLOSER = Closer.create(); + private static TestGroupByBuffers BUFFER_POOLS = null; + + @BeforeClass + public static void setUpClass() + { + BUFFER_POOLS = TestGroupByBuffers.createDefault(); + } @AfterClass - public static void teardown() throws IOException + public static void tearDownClass() { - RESOURCE_CLOSER.close(); + BUFFER_POOLS.close(); + BUFFER_POOLS = null; } @SuppressWarnings("unchecked") @Parameterized.Parameters(name = "{0}, vectorize = {1}") public static Iterable constructorFeeder() { + setUpClass(); GroupByQueryConfig config = new GroupByQueryConfig(); config.setMaxIntermediateRows(10000); - final Pair factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory( - config - ); - final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs; - RESOURCE_CLOSER.register(factoryAndCloser.rhs); + final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config, BUFFER_POOLS); final List constructors = new ArrayList<>(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java index 1f140d4118a..81554b26725 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java @@ -26,10 +26,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.collections.BlockingPool; -import org.apache.druid.collections.DefaultBlockingPool; -import org.apache.druid.collections.NonBlockingPool; -import org.apache.druid.collections.StupidPool; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -42,7 +38,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentQueryRunner; @@ -54,6 +50,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.TestBufferPool; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.MetricManipulatorFns; @@ -79,6 +76,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.SegmentId; import org.junit.After; import org.junit.Assert; @@ -86,7 +84,6 @@ import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -94,10 +91,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -public class NestedQueryPushDownTest +public class NestedQueryPushDownTest extends InitializedNullHandlingTest { private static final IndexIO INDEX_IO; private static final IndexMergerV9 INDEX_MERGER_V9; @@ -108,6 +104,7 @@ public class NestedQueryPushDownTest private List incrementalIndices = new ArrayList<>(); private List groupByIndices = new ArrayList<>(); private ExecutorService executorService; + private Closer closer; static { JSON_MAPPER = new DefaultObjectMapper(); @@ -147,6 +144,7 @@ public class NestedQueryPushDownTest @Before public void setup() throws Exception { + closer = Closer.create(); tmpDir = FileUtils.createTempDir(); InputRow row; @@ -249,23 +247,15 @@ public class NestedQueryPushDownTest { executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]"); - NonBlockingPool bufferPool = new StupidPool<>( - "GroupByBenchmark-computeBufferPool", - new OffheapBufferGenerator("compute", 10_000_000), - 0, - Integer.MAX_VALUE - ); - - // limit of 3 is required since we simulate running historical running nested query and broker doing the final merge - BlockingPool mergePool = new DefaultBlockingPool<>( - new OffheapBufferGenerator("merge", 10_000_000), - 10 - ); - // limit of 3 is required since we simulate running historical running nested query and broker doing the final merge - BlockingPool mergePool2 = new DefaultBlockingPool<>( - new OffheapBufferGenerator("merge", 10_000_000), - 10 - ); + TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); + TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 10); + TestBufferPool mergePool2 = TestBufferPool.offHeap(10_000_000, 10); + closer.register(() -> { + // Verify that all objects have been returned to the pool. + Assert.assertEquals(0, bufferPool.getOutstandingObjectCount()); + Assert.assertEquals(0, mergePool.getOutstandingObjectCount()); + Assert.assertEquals(0, mergePool2.getOutstandingObjectCount()); + }); final GroupByQueryConfig config = new GroupByQueryConfig() { @@ -356,6 +346,9 @@ public class NestedQueryPushDownTest @After public void tearDown() throws Exception { + closer.close(); + closer = null; + for (IncrementalIndex incrementalIndex : incrementalIndices) { incrementalIndex.close(); } @@ -866,34 +859,6 @@ public class NestedQueryPushDownTest return runners; } - private static class OffheapBufferGenerator implements Supplier - { - private static final Logger log = new Logger(OffheapBufferGenerator.class); - - private final String description; - private final int computationBufferSize; - private final AtomicLong count = new AtomicLong(0); - - public OffheapBufferGenerator(String description, int computationBufferSize) - { - this.description = description; - this.computationBufferSize = computationBufferSize; - } - - @Override - public ByteBuffer get() - { - log.info( - "Allocating new %s buffer[%,d] of size[%,d]", - description, - count.getAndIncrement(), - computationBufferSize - ); - - return ByteBuffer.allocateDirect(computationBufferSize); - } - } - private static > QueryRunner makeQueryRunnerForSegment( QueryRunnerFactory factory, SegmentId segmentId, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java b/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java new file mode 100644 index 00000000000..f83160b1951 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.google.common.base.Preconditions; +import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.TestBufferPool; +import org.junit.Assert; + +import javax.annotation.Nullable; +import java.io.Closeable; + +public class TestGroupByBuffers implements Closeable +{ + private final int bufferSize; + private final int numMergeBuffers; + + @Nullable + private TestBufferPool processingPool; + + @Nullable + private TestBufferPool mergePool; + + public TestGroupByBuffers(final int bufferSize, final int numMergeBuffers) + { + this.bufferSize = bufferSize; + this.numMergeBuffers = numMergeBuffers; + this.processingPool = TestBufferPool.offHeap(bufferSize, Integer.MAX_VALUE); + this.mergePool = TestBufferPool.offHeap(bufferSize, numMergeBuffers); + } + + public static TestGroupByBuffers createFromProcessingConfig(final DruidProcessingConfig config) + { + return new TestGroupByBuffers(config.intermediateComputeSizeBytes(), config.getNumMergeBuffers()); + } + + public static TestGroupByBuffers createDefault() + { + return createFromProcessingConfig(GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG); + } + + public int getBufferSize() + { + return bufferSize; + } + + public int getNumMergeBuffers() + { + return numMergeBuffers; + } + + public TestBufferPool getProcessingPool() + { + return Preconditions.checkNotNull(processingPool, "processingPool"); + } + + public TestBufferPool getMergePool() + { + return Preconditions.checkNotNull(mergePool, "mergePool"); + } + + @Override + public void close() + { + if (processingPool != null) { + Assert.assertEquals(0, processingPool.getOutstandingObjectCount()); + processingPool = null; + } + + if (mergePool != null) { + Assert.assertEquals(0, mergePool.getOutstandingObjectCount()); + mergePool = null; + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java index 97b4b093ee2..bf4e1c6ada4 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java @@ -22,12 +22,15 @@ package org.apache.druid.query.groupby.epinephelinae; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.collections.ResourceHolder; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.MapBasedRow; +import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.CloserRule; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -38,7 +41,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; -public class BufferHashGrouperTest +public class BufferHashGrouperTest extends InitializedNullHandlingTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -97,28 +100,30 @@ public class BufferHashGrouperTest public void testGrowing() { final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - final Grouper grouper = makeGrouper(columnSelectorFactory, 10000, 2, 0.75f); - final int expectedMaxSize = NullHandling.replaceWithDefault() ? 219 : 210; + try (final ResourceHolder> grouperHolder = makeGrouper(columnSelectorFactory, 10000, 2, 0.75f)) { + final Grouper grouper = grouperHolder.get(); + final int expectedMaxSize = NullHandling.replaceWithDefault() ? 219 : 210; - columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); - for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); + } + Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); + + // Aggregate slightly different row + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L))); + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); + } + Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); + + final List> expected = new ArrayList<>(); + for (int i = 0; i < expectedMaxSize; i++) { + expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L})); + } + + GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true)); } - Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); - - // Aggregate slightly different row - columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L))); - for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); - } - Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); - - final List> expected = new ArrayList<>(); - for (int i = 0; i < expectedMaxSize; i++) { - expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L})); - } - - GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true)); } @Test @@ -129,14 +134,16 @@ public class BufferHashGrouperTest if (NullHandling.replaceWithDefault()) { final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); // the buffer size below is chosen to test integer overflow in ByteBufferHashTable.adjustTableWhenFull(). - final Grouper grouper = makeGrouper(columnSelectorFactory, 1_900_000_000, 2, 0.3f); - final int expectedMaxSize = 15323979; + try (final ResourceHolder> holder = makeGrouper(columnSelectorFactory, 1_900_000_000, 2, 0.3f)) { + final Grouper grouper = holder.get(); + final int expectedMaxSize = 15323979; - columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); - for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); + } + Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); } - Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); } } @@ -144,41 +151,45 @@ public class BufferHashGrouperTest public void testNoGrowing() { final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - final Grouper grouper = makeGrouper(columnSelectorFactory, 10000, Integer.MAX_VALUE, 0.75f); - final int expectedMaxSize = NullHandling.replaceWithDefault() ? 267 : 258; + try (final ResourceHolder> grouperHolder = + makeGrouper(columnSelectorFactory, 10000, Integer.MAX_VALUE, 0.75f)) { + final Grouper grouper = grouperHolder.get(); + final int expectedMaxSize = NullHandling.replaceWithDefault() ? 267 : 258; - columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); - for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); + } + Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); + + // Aggregate slightly different row + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L))); + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); + } + Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); + + final List> expected = new ArrayList<>(); + for (int i = 0; i < expectedMaxSize; i++) { + expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L})); + } + + GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true)); } - Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); - - // Aggregate slightly different row - columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L))); - for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk()); - } - Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk()); - - final List> expected = new ArrayList<>(); - for (int i = 0; i < expectedMaxSize; i++) { - expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L})); - } - - GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true)); } - private BufferHashGrouper makeGrouper( + private ResourceHolder> makeGrouper( TestColumnSelectorFactory columnSelectorFactory, int bufferSize, int initialBuckets, float maxLoadFactor ) { - final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize); + // Use off-heap allocation since one of our tests has a 1.9GB buffer. Heap size may be insufficient. + final ResourceHolder bufferHolder = ByteBufferUtils.allocateDirect(bufferSize); final BufferHashGrouper grouper = new BufferHashGrouper<>( - Suppliers.ofInstance(buffer), + bufferHolder::get, GrouperTestUtil.intKeySerde(), AggregatorAdapters.factorizeBuffered( columnSelectorFactory, @@ -192,7 +203,23 @@ public class BufferHashGrouperTest initialBuckets, true ); + grouper.init(); - return grouper; + + return new ResourceHolder>() + { + @Override + public BufferHashGrouper get() + { + return grouper; + } + + @Override + public void close() + { + grouper.close(); + bufferHolder.close(); + } + }; } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index f0d87c9e36c..5cd0e0802e3 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -1166,6 +1166,14 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest Object exValue = ex.getValue(); if (exValue instanceof Double || exValue instanceof Float) { final double expectedDoubleValue = ((Number) exValue).doubleValue(); + Assert.assertNotNull( + StringUtils.format( + "invalid null value for %s (expected %f)", + ex.getKey(), + expectedDoubleValue + ), + actVal + ); Assert.assertEquals( "invalid value for " + ex.getKey(), expectedDoubleValue, diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java index 1397db6ea50..44a97d01e67 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java @@ -21,6 +21,8 @@ package org.apache.druid.segment.data; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.java.util.common.io.Closer; import org.junit.After; import org.junit.Assert; @@ -112,11 +114,14 @@ public class CompressionStrategyTest { ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer); ByteBuffer compressed = compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), compressionOut); - ByteBuffer output = ByteBuffer.allocateDirect(originalData.length); - compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output); - byte[] checkArray = new byte[DATA_SIZER]; - output.get(checkArray); - Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray); + + try (final ResourceHolder holder = ByteBufferUtils.allocateDirect(originalData.length)) { + final ByteBuffer output = holder.get(); + compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output); + byte[] checkArray = new byte[DATA_SIZER]; + output.get(checkArray); + Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray); + } } @Test(timeout = 60_000L) diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 6e4a9b2ef61..4773581b1b1 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -80,8 +80,8 @@ import java.util.concurrent.ForkJoinPool; public class CachingClusteredClientFunctionalityTest { private static final ObjectMapper OBJECT_MAPPER = CachingClusteredClientTestUtils.createObjectMapper(); - private static final Pair WAREHOUSE_AND_CLOSER = CachingClusteredClientTestUtils - .createWarehouse(OBJECT_MAPPER); + private static final Pair WAREHOUSE_AND_CLOSER = + CachingClusteredClientTestUtils.createWarehouse(); private static final QueryToolChestWarehouse WAREHOUSE = WAREHOUSE_AND_CLOSER.lhs; private static final Closer RESOURCE_CLOSER = WAREHOUSE_AND_CLOSER.rhs; diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 6262ce1947c..e9e295f6e18 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -262,8 +262,8 @@ public class CachingClusteredClientTest private static final DateTimeZone TIMEZONE = DateTimes.inferTzFromString("America/Los_Angeles"); private static final Granularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE); private static final String TOP_DIM = "a_dim"; - private static final Pair WAREHOUSE_AND_CLOSER = CachingClusteredClientTestUtils - .createWarehouse(JSON_MAPPER); + private static final Pair WAREHOUSE_AND_CLOSER = + CachingClusteredClientTestUtils.createWarehouse(); private static final QueryToolChestWarehouse WAREHOUSE = WAREHOUSE_AND_CLOSER.lhs; private static final Closer RESOURCE_CLOSER = WAREHOUSE_AND_CLOSER.rhs; diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTestUtils.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTestUtils.java index 5a945069741..f5fdf24ebe6 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTestUtils.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTestUtils.java @@ -33,6 +33,7 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; +import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.query.search.SearchQuery; import org.apache.druid.query.search.SearchQueryConfig; import org.apache.druid.query.search.SearchQueryQueryToolChest; @@ -50,15 +51,13 @@ public final class CachingClusteredClientTestUtils * Returns a new {@link QueryToolChestWarehouse} for unit tests and a resourceCloser which should be closed at the end * of the test. */ - public static Pair createWarehouse( - ObjectMapper objectMapper - ) + public static Pair createWarehouse() { - final Pair factoryCloserPair = GroupByQueryRunnerTest.makeQueryRunnerFactory( - new GroupByQueryConfig() + final Closer resourceCloser = Closer.create(); + final GroupByQueryRunnerFactory groupByQueryRunnerFactory = GroupByQueryRunnerTest.makeQueryRunnerFactory( + new GroupByQueryConfig(), + resourceCloser.register(TestGroupByBuffers.createDefault()) ); - final GroupByQueryRunnerFactory factory = factoryCloserPair.lhs; - final Closer resourceCloser = factoryCloserPair.rhs; return Pair.of( new MapQueryToolChestWarehouse( ImmutableMap., QueryToolChest>builder() @@ -76,7 +75,7 @@ public final class CachingClusteredClientTestUtils ) .put( GroupByQuery.class, - factory.getToolchest() + groupByQueryRunnerFactory.getToolchest() ) .put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest()) .build() diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 0dfe0d53620..3c27b9ad3ff 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -22,8 +22,6 @@ package org.apache.druid.server; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.client.cache.CacheConfig; -import org.apache.druid.collections.CloseableStupidPool; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.DataSource; @@ -40,10 +38,12 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.RetryQueryRunnerConfig; +import org.apache.druid.query.TestBufferPool; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; +import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; @@ -77,9 +77,9 @@ import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.junit.Assert; import javax.annotation.Nullable; -import java.nio.ByteBuffer; import java.util.Map; import java.util.Set; import java.util.function.Supplier; @@ -276,14 +276,16 @@ public class QueryStackTests final Supplier minTopNThresholdSupplier ) { - final CloseableStupidPool stupidPool = new CloseableStupidPool<>( - "TopNQueryRunnerFactory-bufferPool", - () -> ByteBuffer.allocate(COMPUTE_BUFFER_SIZE) - ); + final TestBufferPool testBufferPool = TestBufferPool.offHeap(COMPUTE_BUFFER_SIZE, Integer.MAX_VALUE); + closer.register(() -> { + // Verify that all objects have been returned to the pool. + Assert.assertEquals(0, testBufferPool.getOutstandingObjectCount()); + }); - closer.register(stupidPool); + final TestGroupByBuffers groupByBuffers = + closer.register(TestGroupByBuffers.createFromProcessingConfig(processingConfig)); - final Pair factoryCloserPair = + final GroupByQueryRunnerFactory groupByQueryRunnerFactory = GroupByQueryRunnerTest.makeQueryRunnerFactory( GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig() @@ -294,12 +296,10 @@ public class QueryStackTests return GroupByStrategySelector.STRATEGY_V2; } }, + groupByBuffers, processingConfig ); - final GroupByQueryRunnerFactory groupByQueryRunnerFactory = factoryCloserPair.lhs; - closer.register(factoryCloserPair.rhs); - final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate( ImmutableMap., QueryRunnerFactory>builder() .put( @@ -333,7 +333,7 @@ public class QueryStackTests .put( TopNQuery.class, new TopNQueryRunnerFactory( - stupidPool, + testBufferPool, new TopNQueryQueryToolChest(new TopNQueryConfig() { @Override