diff --git a/processing/pom.xml b/processing/pom.xml
index 209518746a0..e4b737d0127 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -244,6 +244,11 @@
log4j-api
test
+
+ org.apache.commons
+ commons-lang3
+ test
+
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
index b05ccb5f358..dae166125a2 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
@@ -265,7 +265,7 @@ public class HashVectorGrouper implements VectorGrouper
@Override
public void close()
{
- // Nothing to do.
+ aggregators.close();
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index 794f44c4989..f516ea5b63b 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -19,12 +19,14 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorAdapters;
@@ -216,7 +218,8 @@ public class VectorGroupByEngine
);
}
- private static class VectorGroupByEngineIterator implements CloseableIterator
+ @VisibleForTesting
+ static class VectorGroupByEngineIterator implements CloseableIterator
{
private final GroupByQuery query;
private final GroupByQueryConfig querySpecificConfig;
@@ -278,7 +281,7 @@ public class VectorGroupByEngine
@Override
public ResultRow next()
{
- if (delegate == null || !delegate.hasNext()) {
+ if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -310,22 +313,19 @@ public class VectorGroupByEngine
}
@Override
- public void remove()
+ public void close() throws IOException
{
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close()
- {
- cursor.close();
-
+ Closer closer = Closer.create();
+ closer.register(vectorGrouper);
if (delegate != null) {
- delegate.close();
+ closer.register(delegate);
}
+ closer.register(cursor);
+ closer.close();
}
- private VectorGrouper makeGrouper()
+ @VisibleForTesting
+ VectorGrouper makeGrouper()
{
final VectorGrouper grouper;
@@ -463,7 +463,7 @@ public class VectorGroupByEngine
return resultRow;
},
- vectorGrouper
+ () -> {} // Grouper will be closed when VectorGroupByEngineIterator is closed.
);
}
}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
new file mode 100644
index 00000000000..46cd043289b
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import com.google.common.base.Suppliers;
+import org.apache.druid.query.aggregation.AggregatorAdapters;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+
+public class HashVectorGrouperTest
+{
+ @Test
+ public void testCloseAggregatorAdaptorsShouldBeClosed()
+ {
+ final ByteBuffer buffer = ByteBuffer.wrap(new byte[4096]);
+ final AggregatorAdapters aggregatorAdapters = Mockito.mock(AggregatorAdapters.class);
+ final HashVectorGrouper grouper = new HashVectorGrouper(
+ Suppliers.ofInstance(buffer),
+ 1024,
+ aggregatorAdapters,
+ Integer.MAX_VALUE,
+ 0.f,
+ 0
+ );
+ grouper.initVectorized(512);
+ grouper.close();
+ Mockito.verify(aggregatorAdapters, Mockito.times(1)).close();
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java
new file mode 100644
index 00000000000..627c2bdc886
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
+import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.joda.time.Interval;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class VectorGroupByEngineIteratorTest extends InitializedNullHandlingTest
+{
+ @Test
+ public void testCreateOneGrouperAndCloseItWhenClose() throws IOException
+ {
+ final Interval interval = TestIndex.DATA_INTERVAL;
+ final AggregatorFactory factory = new DoubleSumAggregatorFactory("index", "index");
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+ .setInterval(interval)
+ .setDimensions(new DefaultDimensionSpec("market", null, null))
+ .setAggregatorSpecs(factory)
+ .build();
+ final StorageAdapter storageAdapter = new QueryableIndexStorageAdapter(TestIndex.getMMappedTestIndex());
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[4096]);
+ final VectorCursor cursor = storageAdapter.makeVectorCursor(
+ Filters.toFilter(query.getDimFilter()),
+ interval,
+ query.getVirtualColumns(),
+ false,
+ QueryContexts.getVectorSize(query),
+ null
+ );
+ final List dimensions = query.getDimensions().stream().map(
+ dimensionSpec ->
+ DimensionHandlerUtils.makeVectorProcessor(
+ dimensionSpec,
+ GroupByVectorColumnProcessorFactory.instance(),
+ cursor.getColumnSelectorFactory()
+ )
+ ).collect(Collectors.toList());
+ final MutableObject grouperCaptor = new MutableObject<>();
+ final VectorGroupByEngineIterator iterator = new VectorGroupByEngineIterator(
+ query,
+ new GroupByQueryConfig(),
+ storageAdapter,
+ cursor,
+ interval,
+ dimensions,
+ byteBuffer,
+ null
+ )
+ {
+ @Override
+ VectorGrouper makeGrouper()
+ {
+ grouperCaptor.setValue(Mockito.spy(super.makeGrouper()));
+ return grouperCaptor.getValue();
+ }
+ };
+ iterator.close();
+ Mockito.verify(grouperCaptor.getValue()).close();
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/TestIndex.java b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
index f7baecc3acb..f53f5d9ede6 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestIndex.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
@@ -134,8 +134,8 @@ public class TestIndex
public static final String[] DOUBLE_METRICS = new String[]{"index", "indexMin", "indexMaxPlusTen"};
public static final String[] FLOAT_METRICS = new String[]{"indexFloat", "indexMinFloat", "indexMaxFloat"};
+ public static final Interval DATA_INTERVAL = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
private static final Logger log = new Logger(TestIndex.class);
- private static final Interval DATA_INTERVAL = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
private static final VirtualColumns VIRTUAL_COLUMNS = VirtualColumns.create(
Collections.singletonList(
new ExpressionVirtualColumn("expr", "index + 10", ValueType.FLOAT, TestExprMacroTable.INSTANCE)