Close all aggregators when closing onHeapIncrementalIndex (#3926)

* Close all aggregators when closing onHeapIncrementalIndex

* Aggregators are now handled as Closeables, remove unnecessary mock in test

* Fix variable shadowing
This commit is contained in:
Pierre 2017-02-13 23:01:27 +00:00 committed by Gian Merlino
parent 1f263fe50b
commit 9ab9feced6
3 changed files with 58 additions and 1 deletions

View File

@ -19,6 +19,8 @@
package io.druid.query.aggregation;
import java.io.Closeable;
/**
* An Aggregator is an object that can aggregate metrics. Its aggregation-related methods (namely, aggregate() and get())
* do not take any arguments as the assumption is that the Aggregator was given something in its constructor that
@ -32,7 +34,8 @@ package io.druid.query.aggregation;
*
* This interface is old and going away. It is being replaced by BufferAggregator
*/
public interface Aggregator {
public interface Aggregator extends Closeable
{
void aggregate();
void reset();
Object get();

View File

@ -20,7 +20,9 @@
package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularity;
@ -37,6 +39,7 @@ import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -257,6 +260,23 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
rowContainer.set(null);
}
private void closeAggregators()
{
Closer closer = Closer.create();
for (Aggregator[] aggs : aggregators.values()) {
for (Aggregator agg : aggs) {
closer.register(agg);
}
}
try {
closer.close();
}
catch (IOException e) {
Throwables.propagate(e);
}
}
protected Aggregator[] concurrentGet(int offset)
{
// All get operations should be fine
@ -327,6 +347,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
public void close()
{
super.close();
closeAggregators();
aggregators.clear();
facts.clear();
if (selectors != null) {

View File

@ -23,8 +23,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregator;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@ -97,4 +100,34 @@ public class OnheapIncrementalIndexTest
Assert.assertEquals(0, checkFailedCount.get());
}
@Test
public void testOnHeapIncrementalIndexClose() throws Exception
{
// Prepare the mocks & set close() call count expectation to 1
Aggregator mockedAggregator = EasyMock.createMock(LongMaxAggregator.class);
mockedAggregator.close();
EasyMock.expectLastCall().times(1);
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(
0,
QueryGranularities.MINUTE,
new AggregatorFactory[]{new LongMaxAggregatorFactory("max", "max")},
MAX_ROWS
);
index.add(new MapBasedInputRow(
0,
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", 1, "max", 1)
));
// override the aggregators with the mocks
index.concurrentGet(0)[0] = mockedAggregator;
// close the indexer and validate the expectations
EasyMock.replay(mockedAggregator);
index.close();
EasyMock.verify(mockedAggregator);
}
}