mirror of https://github.com/apache/druid.git
Close aggregators in HashVectorGrouper.close() (#10452)
* Close aggregators in HashVectorGrouper.close() * reuse grouper * Add missing dependency
This commit is contained in:
parent
207ef310f2
commit
1deed9fbcd
|
@ -244,6 +244,11 @@
|
|||
<artifactId>log4j-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -265,7 +265,7 @@ public class HashVectorGrouper implements VectorGrouper
|
|||
@Override
|
||||
public void close()
|
||||
{
|
||||
// Nothing to do.
|
||||
aggregators.close();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -19,12 +19,14 @@
|
|||
|
||||
package org.apache.druid.query.groupby.epinephelinae.vector;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Suppliers;
|
||||
import org.apache.datasketches.memory.Memory;
|
||||
import org.apache.datasketches.memory.WritableMemory;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.guava.BaseSequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.aggregation.AggregatorAdapters;
|
||||
|
@ -216,7 +218,8 @@ public class VectorGroupByEngine
|
|||
);
|
||||
}
|
||||
|
||||
private static class VectorGroupByEngineIterator implements CloseableIterator<ResultRow>
|
||||
@VisibleForTesting
|
||||
static class VectorGroupByEngineIterator implements CloseableIterator<ResultRow>
|
||||
{
|
||||
private final GroupByQuery query;
|
||||
private final GroupByQueryConfig querySpecificConfig;
|
||||
|
@ -278,7 +281,7 @@ public class VectorGroupByEngine
|
|||
@Override
|
||||
public ResultRow next()
|
||||
{
|
||||
if (delegate == null || !delegate.hasNext()) {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
|
@ -310,22 +313,19 @@ public class VectorGroupByEngine
|
|||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
public void close() throws IOException
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
cursor.close();
|
||||
|
||||
Closer closer = Closer.create();
|
||||
closer.register(vectorGrouper);
|
||||
if (delegate != null) {
|
||||
delegate.close();
|
||||
closer.register(delegate);
|
||||
}
|
||||
closer.register(cursor);
|
||||
closer.close();
|
||||
}
|
||||
|
||||
private VectorGrouper makeGrouper()
|
||||
@VisibleForTesting
|
||||
VectorGrouper makeGrouper()
|
||||
{
|
||||
final VectorGrouper grouper;
|
||||
|
||||
|
@ -463,7 +463,7 @@ public class VectorGroupByEngine
|
|||
|
||||
return resultRow;
|
||||
},
|
||||
vectorGrouper
|
||||
() -> {} // Grouper will be closed when VectorGroupByEngineIterator is closed.
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.groupby.epinephelinae;
|
||||
|
||||
import com.google.common.base.Suppliers;
|
||||
import org.apache.druid.query.aggregation.AggregatorAdapters;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class HashVectorGrouperTest
|
||||
{
|
||||
@Test
|
||||
public void testCloseAggregatorAdaptorsShouldBeClosed()
|
||||
{
|
||||
final ByteBuffer buffer = ByteBuffer.wrap(new byte[4096]);
|
||||
final AggregatorAdapters aggregatorAdapters = Mockito.mock(AggregatorAdapters.class);
|
||||
final HashVectorGrouper grouper = new HashVectorGrouper(
|
||||
Suppliers.ofInstance(buffer),
|
||||
1024,
|
||||
aggregatorAdapters,
|
||||
Integer.MAX_VALUE,
|
||||
0.f,
|
||||
0
|
||||
);
|
||||
grouper.initVectorized(512);
|
||||
grouper.close();
|
||||
Mockito.verify(aggregatorAdapters, Mockito.times(1)).close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.groupby.epinephelinae.vector;
|
||||
|
||||
import org.apache.commons.lang3.mutable.MutableObject;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.QueryRunnerTestHelper;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.query.groupby.GroupByQuery;
|
||||
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
||||
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
|
||||
import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.TestIndex;
|
||||
import org.apache.druid.segment.filter.Filters;
|
||||
import org.apache.druid.segment.vector.VectorCursor;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class VectorGroupByEngineIteratorTest extends InitializedNullHandlingTest
|
||||
{
|
||||
@Test
|
||||
public void testCreateOneGrouperAndCloseItWhenClose() throws IOException
|
||||
{
|
||||
final Interval interval = TestIndex.DATA_INTERVAL;
|
||||
final AggregatorFactory factory = new DoubleSumAggregatorFactory("index", "index");
|
||||
final GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
|
||||
.setInterval(interval)
|
||||
.setDimensions(new DefaultDimensionSpec("market", null, null))
|
||||
.setAggregatorSpecs(factory)
|
||||
.build();
|
||||
final StorageAdapter storageAdapter = new QueryableIndexStorageAdapter(TestIndex.getMMappedTestIndex());
|
||||
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[4096]);
|
||||
final VectorCursor cursor = storageAdapter.makeVectorCursor(
|
||||
Filters.toFilter(query.getDimFilter()),
|
||||
interval,
|
||||
query.getVirtualColumns(),
|
||||
false,
|
||||
QueryContexts.getVectorSize(query),
|
||||
null
|
||||
);
|
||||
final List<GroupByVectorColumnSelector> dimensions = query.getDimensions().stream().map(
|
||||
dimensionSpec ->
|
||||
DimensionHandlerUtils.makeVectorProcessor(
|
||||
dimensionSpec,
|
||||
GroupByVectorColumnProcessorFactory.instance(),
|
||||
cursor.getColumnSelectorFactory()
|
||||
)
|
||||
).collect(Collectors.toList());
|
||||
final MutableObject<VectorGrouper> grouperCaptor = new MutableObject<>();
|
||||
final VectorGroupByEngineIterator iterator = new VectorGroupByEngineIterator(
|
||||
query,
|
||||
new GroupByQueryConfig(),
|
||||
storageAdapter,
|
||||
cursor,
|
||||
interval,
|
||||
dimensions,
|
||||
byteBuffer,
|
||||
null
|
||||
)
|
||||
{
|
||||
@Override
|
||||
VectorGrouper makeGrouper()
|
||||
{
|
||||
grouperCaptor.setValue(Mockito.spy(super.makeGrouper()));
|
||||
return grouperCaptor.getValue();
|
||||
}
|
||||
};
|
||||
iterator.close();
|
||||
Mockito.verify(grouperCaptor.getValue()).close();
|
||||
}
|
||||
}
|
|
@ -134,8 +134,8 @@ public class TestIndex
|
|||
|
||||
public static final String[] DOUBLE_METRICS = new String[]{"index", "indexMin", "indexMaxPlusTen"};
|
||||
public static final String[] FLOAT_METRICS = new String[]{"indexFloat", "indexMinFloat", "indexMaxFloat"};
|
||||
public static final Interval DATA_INTERVAL = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
|
||||
private static final Logger log = new Logger(TestIndex.class);
|
||||
private static final Interval DATA_INTERVAL = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
|
||||
private static final VirtualColumns VIRTUAL_COLUMNS = VirtualColumns.create(
|
||||
Collections.singletonList(
|
||||
new ExpressionVirtualColumn("expr", "index + 10", ValueType.FLOAT, TestExprMacroTable.INSTANCE)
|
||||
|
|
Loading…
Reference in New Issue