mirror of https://github.com/apache/druid.git
ParallelCombiner: Fix buffer leak on exception in "combine". (#5630)
Once a buffer is acquired, we need to make sure to release it if an exception is thrown before the closeable iterator is created.
This commit is contained in:
parent
e6efd75a3d
commit
72d6dcda4f
|
@ -136,44 +136,55 @@ public class ParallelCombiner<KeyType>
|
|||
)
|
||||
{
|
||||
// CombineBuffer is initialized when this method is called and closed after the result iterator is done
|
||||
final ResourceHolder<ByteBuffer> combineBufferHolder = combineBufferSupplier.get();
|
||||
final ByteBuffer combineBuffer = combineBufferHolder.get();
|
||||
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
|
||||
combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary),
|
||||
combiningFactories
|
||||
);
|
||||
// We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size
|
||||
// required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the
|
||||
// required number of buffers maximizing the parallelism.
|
||||
final Pair<Integer, Integer> degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers(
|
||||
combineBuffer,
|
||||
minimumRequiredBufferCapacity,
|
||||
concurrencyHint,
|
||||
sortedIterators.size()
|
||||
);
|
||||
|
||||
final int leafCombineDegree = degreeAndNumBuffers.lhs;
|
||||
final int numBuffers = degreeAndNumBuffers.rhs;
|
||||
final int sliceSize = combineBuffer.capacity() / numBuffers;
|
||||
|
||||
final Supplier<ByteBuffer> bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize);
|
||||
|
||||
final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
|
||||
sortedIterators,
|
||||
bufferSupplier,
|
||||
combiningFactories,
|
||||
leafCombineDegree,
|
||||
mergedDictionary
|
||||
);
|
||||
|
||||
final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
|
||||
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
|
||||
|
||||
final Closer closer = Closer.create();
|
||||
final ResourceHolder<ByteBuffer> combineBufferHolder = combineBufferSupplier.get();
|
||||
closer.register(combineBufferHolder);
|
||||
closer.register(() -> checkCombineFutures(combineFutures));
|
||||
|
||||
return CloseableIterators.wrap(combineIterator, closer);
|
||||
try {
|
||||
final ByteBuffer combineBuffer = combineBufferHolder.get();
|
||||
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
|
||||
combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary),
|
||||
combiningFactories
|
||||
);
|
||||
// We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size
|
||||
// required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the
|
||||
// required number of buffers maximizing the parallelism.
|
||||
final Pair<Integer, Integer> degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers(
|
||||
combineBuffer,
|
||||
minimumRequiredBufferCapacity,
|
||||
concurrencyHint,
|
||||
sortedIterators.size()
|
||||
);
|
||||
|
||||
final int leafCombineDegree = degreeAndNumBuffers.lhs;
|
||||
final int numBuffers = degreeAndNumBuffers.rhs;
|
||||
final int sliceSize = combineBuffer.capacity() / numBuffers;
|
||||
|
||||
final Supplier<ByteBuffer> bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize);
|
||||
|
||||
final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
|
||||
sortedIterators,
|
||||
bufferSupplier,
|
||||
combiningFactories,
|
||||
leafCombineDegree,
|
||||
mergedDictionary
|
||||
);
|
||||
|
||||
final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
|
||||
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
|
||||
closer.register(() -> checkCombineFutures(combineFutures));
|
||||
|
||||
return CloseableIterators.wrap(combineIterator, closer);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
try {
|
||||
closer.close();
|
||||
}
|
||||
catch (Throwable t2) {
|
||||
t.addSuppressed(t2);
|
||||
}
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkCombineFutures(List<Future> combineFutures)
|
||||
|
@ -289,11 +300,11 @@ public class ParallelCombiner<KeyType>
|
|||
* Recursively build a combining tree in a bottom-up manner. Each node of the tree is a task that combines input
|
||||
* iterators asynchronously.
|
||||
*
|
||||
* @param childIterators all iterators of the child level
|
||||
* @param bufferSupplier combining buffer supplier
|
||||
* @param combiningFactories array of combining aggregator factories
|
||||
* @param combineDegree combining degree for the current level
|
||||
* @param dictionary merged dictionary
|
||||
* @param childIterators all iterators of the child level
|
||||
* @param bufferSupplier combining buffer supplier
|
||||
* @param combiningFactories array of combining aggregator factories
|
||||
* @param combineDegree combining degree for the current level
|
||||
* @param dictionary merged dictionary
|
||||
*
|
||||
* @return a pair of a list of iterators of the current level in the combining tree and a list of futures of all
|
||||
* executed combining tasks
|
||||
|
|
Loading…
Reference in New Issue