Associate groupBy v2 resources with the Sequence lifecycle. (#3296)

This fixes a potential issue where groupBy resources could be allocated to
create a Sequence, but then the Sequence is never used, and thus the resources
are never freed.

Also simplifies how groupBy handles config overrides (this made the new
unit test easier to write).
This commit is contained in:
Gian Merlino 2016-07-27 18:44:19 -07:00 committed by Fangjin Yang
parent 546e4f79b0
commit 2553997200
13 changed files with 293 additions and 251 deletions

View File

@ -203,3 +203,4 @@ When using the "v2" strategy, the following query context parameters apply:
|--------|-----------|
|`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.|
|`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|

View File

@ -56,8 +56,6 @@ import java.util.concurrent.TimeoutException;
public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
{
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final Logger log = new Logger(GroupByMergedQueryRunner.class);
private final Iterable<QueryRunner<T>> queryables;
private final ListeningExecutorService exec;
@ -84,15 +82,11 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> responseContext)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final boolean isSingleThreaded = query.getContextValue(
CTX_KEY_IS_SINGLE_THREADED,
configSupplier.get().isSingleThreaded()
);
final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query);
final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded();
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
configSupplier.get(),
querySpecificConfig,
bufferPool
);
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();

View File

@ -26,6 +26,14 @@ import io.druid.query.groupby.strategy.GroupByStrategySelector;
*/
public class GroupByQueryConfig
{
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets";
private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V1;
@ -107,4 +115,32 @@ public class GroupByQueryConfig
{
return maxOnDiskStorage;
}
public GroupByQueryConfig withOverrides(final GroupByQuery query)
{
final GroupByQueryConfig newConfig = new GroupByQueryConfig();
newConfig.defaultStrategy = query.getContextValue(CTX_KEY_STRATEGY, getDefaultStrategy());
newConfig.singleThreaded = query.getContextBoolean(CTX_KEY_IS_SINGLE_THREADED, isSingleThreaded());
newConfig.maxIntermediateRows = Math.min(
query.getContextValue(CTX_KEY_MAX_INTERMEDIATE_ROWS, getMaxIntermediateRows()),
getMaxIntermediateRows()
);
newConfig.maxResults = Math.min(
query.getContextValue(CTX_KEY_MAX_RESULTS, getMaxResults()),
getMaxResults()
);
newConfig.bufferGrouperInitialBuckets = query.getContextValue(
CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS,
getBufferGrouperInitialBuckets()
);
newConfig.bufferGrouperMaxSize = Math.min(
query.getContextValue(CTX_KEY_BUFFER_GROUPER_MAX_SIZE, getBufferGrouperMaxSize()),
getBufferGrouperMaxSize()
);
newConfig.maxOnDiskStorage = Math.min(
((Number)query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(),
getMaxOnDiskStorage()
);
return newConfig;
}
}

View File

@ -68,8 +68,6 @@ import java.util.NoSuchElementException;
*/
public class GroupByQueryEngine
{
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private final Supplier<GroupByQueryConfig> config;
private final StupidPool<ByteBuffer> intermediateResultsBufferPool;
@ -310,16 +308,12 @@ public class GroupByQueryEngine
public RowIterator(GroupByQuery query, final Cursor cursor, ByteBuffer metricsBuffer, GroupByQueryConfig config)
{
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
this.query = query;
this.cursor = cursor;
this.metricsBuffer = metricsBuffer;
this.maxIntermediateRows = Math.min(
query.getContextValue(
CTX_KEY_MAX_INTERMEDIATE_ROWS,
config.getMaxIntermediateRows()
), config.getMaxIntermediateRows()
);
this.maxIntermediateRows = querySpecificConfig.getMaxIntermediateRows();
unprocessedKeys = null;
delegate = Iterators.emptyIterator();

View File

@ -45,7 +45,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class GroupByQueryHelper
{
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
public final static String CTX_KEY_SORT_RESULTS = "sortResults";
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
@ -54,6 +53,7 @@ public class GroupByQueryHelper
StupidPool<ByteBuffer> bufferPool
)
{
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
@ -97,7 +97,7 @@ public class GroupByQueryHelper
false,
true,
sortResults,
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()),
querySpecificConfig.getMaxResults(),
bufferPool
);
} else {
@ -110,7 +110,7 @@ public class GroupByQueryHelper
false,
true,
sortResults,
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults())
querySpecificConfig.getMaxResults()
);
}

View File

@ -60,6 +60,7 @@ public class BufferGrouper<KeyType extends Comparable<KeyType>> implements Group
{
private static final Logger log = new Logger(BufferGrouper.class);
private static final int MIN_INITIAL_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024;
private static final float MAX_LOAD_FACTOR = 0.75f;
private static final int HASH_SIZE = Ints.BYTES;
@ -104,7 +105,7 @@ public class BufferGrouper<KeyType extends Comparable<KeyType>> implements Group
this.aggregators = new BufferAggregator[aggregatorFactories.length];
this.aggregatorOffsets = new int[aggregatorFactories.length];
this.bufferGrouperMaxSize = bufferGrouperMaxSize;
this.initialBuckets = initialBuckets > 0 ? initialBuckets : DEFAULT_INITIAL_BUCKETS;
this.initialBuckets = initialBuckets > 0 ? Math.max(MIN_INITIAL_BUCKETS, initialBuckets) : DEFAULT_INITIAL_BUCKETS;
int offset = HASH_SIZE + keySize;
for (int i = 0; i < aggregatorFactories.length; i++) {

View File

@ -41,12 +41,12 @@ import com.metamx.common.ISE;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.logger.Logger;
import io.druid.collections.BlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.Releaser;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.query.AbstractPrioritizedCallable;
@ -119,6 +119,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
public Sequence<Row> run(final Query queryParam, final Map responseContext)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
// CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION is here because realtime servers use nested mergeRunners calls
// (one for the entire query and one for each sink). We only want the outer call to actually do merging with a
@ -142,61 +143,68 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
combiningAggregatorFactories[i] = query.getAggregatorSpecs().get(i).getCombiningFactory();
}
final GroupByMergingKeySerdeFactory keySerdeFactory = new GroupByMergingKeySerdeFactory(
query.getDimensions().size(),
config.getMaxMergingDictionarySize() / concurrencyHint
);
final GroupByMergingColumnSelectorFactory columnSelectorFactory = new GroupByMergingColumnSelectorFactory();
final File temporaryStorageDirectory = new File(
System.getProperty("java.io.tmpdir"),
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
);
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
temporaryStorageDirectory,
config.getMaxOnDiskStorage()
);
// Figure out timeoutAt time now, so we can apply the timeout to both the mergeBufferPool.take and the actual
// query processing together.
final long startTime = System.currentTimeMillis();
final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeoutAt = timeout == null ? -1L : startTime + timeout.longValue();
final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeoutAt = queryTimeout == null
? JodaUtils.MAX_INSTANT
: System.currentTimeMillis() + queryTimeout.longValue();
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
// This will potentially block if there are no merge buffers left in the pool.
mergeBufferHolder = mergeBufferPool.take(timeout != null && timeout.longValue() > 0 ? timeout.longValue() : -1);
}
catch (InterruptedException e) {
CloseQuietly.close(temporaryStorage);
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
final long processingStartTime = System.currentTimeMillis();
try {
return new ResourceClosingSequence<>(
new BaseSequence<>(
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<GroupByMergingKey, Row>>()
{
@Override
public CloseableGrouperIterator<GroupByMergingKey, Row> make()
{
final Grouper<GroupByMergingKey> grouper = new ConcurrentGrouper<>(
final List<Closeable> closeOnFailure = Lists.newArrayList();
try {
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
final LimitedTemporaryStorage temporaryStorage;
final Grouper<GroupByMergingKey> grouper;
temporaryStorage = new LimitedTemporaryStorage(
temporaryStorageDirectory,
querySpecificConfig.getMaxOnDiskStorage()
);
closeOnFailure.add(temporaryStorage);
try {
// This will potentially block if there are no merge buffers left in the pool.
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new QueryInterruptedException(new TimeoutException());
}
closeOnFailure.add(mergeBufferHolder);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
final GroupByMergingKeySerdeFactory keySerdeFactory = new GroupByMergingKeySerdeFactory(
query.getDimensions().size(),
querySpecificConfig.getMaxMergingDictionarySize() / concurrencyHint
);
final GroupByMergingColumnSelectorFactory columnSelectorFactory = new GroupByMergingColumnSelectorFactory();
grouper = new ConcurrentGrouper<>(
mergeBufferHolder.get(),
concurrencyHint,
temporaryStorage,
spillMapper,
config.getBufferGrouperMaxSize(),
GroupByStrategyV2.getBufferGrouperInitialBuckets(config, query),
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperInitialBuckets(),
keySerdeFactory,
columnSelectorFactory,
combiningAggregatorFactories
);
closeOnFailure.add(grouper);
final Accumulator<Grouper<GroupByMergingKey>, Row> accumulator = new Accumulator<Grouper<GroupByMergingKey>, Row>()
{
@ -245,7 +253,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
}
);
try {
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
@ -310,13 +317,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
)
);
waitForFutureCompletion(query, futures, timeoutAt - processingStartTime);
}
catch (Exception e) {
// Exception caught while creating or waiting for futures; release resources.
grouperHolder.close();
throw e;
}
waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis());
return new CloseableGrouperIterator<>(
grouper,
@ -353,10 +354,20 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
public void close() throws IOException
{
grouperHolder.close();
mergeBufferHolder.close();
CloseQuietly.close(temporaryStorage);
}
}
);
}
catch (Throwable e) {
// Exception caught while setting up the iterator; release resources.
for (Closeable closeable : Lists.reverse(closeOnFailure)) {
CloseQuietly.close(closeable);
}
throw e;
}
}
@Override
public void cleanup(CloseableGrouperIterator<GroupByMergingKey, Row> iterFromMake)
@ -364,25 +375,8 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
iterFromMake.close();
}
}
),
new Closeable()
{
@Override
public void close() throws IOException
{
mergeBufferHolder.close();
CloseQuietly.close(temporaryStorage);
}
}
);
}
catch (Exception e) {
// Exception caught while creating the sequence; release resources.
mergeBufferHolder.close();
CloseQuietly.close(temporaryStorage);
throw e;
}
}
private void waitForFutureCompletion(
GroupByQuery query,
@ -391,16 +385,16 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
)
{
try {
final List<Boolean> results;
if (queryWatcher != null) {
queryWatcher.registerQuery(query, future);
}
if (timeout <= 0) {
results = future.get();
} else {
results = future.get(timeout, TimeUnit.MILLISECONDS);
throw new TimeoutException();
}
final List<Boolean> results = future.get(timeout, TimeUnit.MILLISECONDS);
for (Boolean result : results) {
if (!result) {
future.cancel(true);

View File

@ -148,7 +148,7 @@ public class GroupByQueryEngineV2
private static class GroupByEngineIterator implements Iterator<Row>, Closeable
{
private final GroupByQuery query;
private final GroupByQueryConfig config;
private final GroupByQueryConfig querySpecificConfig;
private final Cursor cursor;
private final ByteBuffer buffer;
private final Grouper.KeySerde<ByteBuffer> keySerde;
@ -174,7 +174,7 @@ public class GroupByQueryEngineV2
final int dimCount = query.getDimensions().size();
this.query = query;
this.config = config;
this.querySpecificConfig = config.withOverrides(query);
this.cursor = cursor;
this.buffer = buffer;
this.keySerde = keySerde;
@ -213,8 +213,8 @@ public class GroupByQueryEngineV2
cursor,
query.getAggregatorSpecs()
.toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]),
config.getBufferGrouperMaxSize(),
GroupByStrategyV2.getBufferGrouperInitialBuckets(config, query)
querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperInitialBuckets()
);
outer:

View File

@ -22,6 +22,7 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
@ -36,7 +37,7 @@ import java.util.Iterator;
*
* @param <KeyType> type of the key that will be passed in
*/
public interface Grouper<KeyType extends Comparable<KeyType>>
public interface Grouper<KeyType extends Comparable<KeyType>> extends Closeable
{
/**
* Aggregate the current row with the provided key. Some implementations are thread-safe and
@ -67,6 +68,7 @@ public interface Grouper<KeyType extends Comparable<KeyType>>
/**
* Close the grouper and release associated resources.
*/
@Override
void close();
/**

View File

@ -27,7 +27,6 @@ import io.druid.query.groupby.GroupByQueryConfig;
public class GroupByStrategySelector
{
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
public static final String STRATEGY_V2 = "v2";
public static final String STRATEGY_V1 = "v1";
@ -49,7 +48,7 @@ public class GroupByStrategySelector
public GroupByStrategy strategize(GroupByQuery query)
{
final String strategyString = query.getContextValue(CTX_KEY_STRATEGY, config.getDefaultStrategy());
final String strategyString = config.withOverrides(query).getDefaultStrategy();
switch (strategyString) {
case STRATEGY_V2:

View File

@ -99,7 +99,7 @@ public class GroupByStrategyV1 implements GroupByStrategy
//no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would return
//merged results
GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false,
GroupByStrategySelector.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1
)
),
responseContext

View File

@ -142,7 +142,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
).withOverriddenContext(
ImmutableMap.<String, Object>of(
"finalize", false,
GroupByStrategySelector.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp
)
),
@ -203,9 +203,4 @@ public class GroupByStrategyV2 implements GroupByStrategy
{
return GroupByQueryEngineV2.process(query, storageAdapter, bufferPool, configSupplier.get());
}
public static int getBufferGrouperInitialBuckets(final GroupByQueryConfig config, final GroupByQuery query)
{
return query.getContextValue("bufferGrouperInitialBuckets", config.getBufferGrouperInitialBuckets());
}
}

View File

@ -156,7 +156,7 @@ public class GroupByQueryRunnerTest
return ByteBuffer.allocate(10 * 1024 * 1024);
}
},
4
2 // There are some tests that need to allocate two buffers (simulating two levels of merging)
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
@ -854,7 +854,7 @@ public class GroupByQueryRunnerTest
}
@Test
public void testGroupByMaxRowsLimitContextOverrid()
public void testGroupByMaxRowsLimitContextOverride()
{
GroupByQuery query = GroupByQuery
.builder()
@ -878,6 +878,32 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
@Test
public void testGroupByMaxOnDiskStorageContextOverride()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setContext(ImmutableMap.<String, Object>of("maxOnDiskStorage", 0, "bufferGrouperMaxSize", 1))
.build();
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ISE.class);
expectedException.expectMessage("Grouping resources exhausted");
}
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
@Test
public void testGroupByWithRebucketRename()
{