From 72d6dcda4fb674261f14e21f143d39aaedb68dc9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 11 Apr 2018 20:39:39 -0400 Subject: [PATCH] ParallelCombiner: Fix buffer leak on exception in "combine". (#5630) Once a buffer is acquired, we need to make sure to release it if an exception is thrown before the closeable iterator is created. --- .../epinephelinae/ParallelCombiner.java | 91 +++++++++++-------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java index bc786490c47..e8db98d31da 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java @@ -136,44 +136,55 @@ public class ParallelCombiner ) { // CombineBuffer is initialized when this method is called and closed after the result iterator is done - final ResourceHolder combineBufferHolder = combineBufferSupplier.get(); - final ByteBuffer combineBuffer = combineBufferHolder.get(); - final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity( - combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary), - combiningFactories - ); - // We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size - // required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the - // required number of buffers maximizing the parallelism. - final Pair degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers( - combineBuffer, - minimumRequiredBufferCapacity, - concurrencyHint, - sortedIterators.size() - ); - - final int leafCombineDegree = degreeAndNumBuffers.lhs; - final int numBuffers = degreeAndNumBuffers.rhs; - final int sliceSize = combineBuffer.capacity() / numBuffers; - - final Supplier bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize); - - final Pair>>, List> combineIteratorAndFutures = buildCombineTree( - sortedIterators, - bufferSupplier, - combiningFactories, - leafCombineDegree, - mergedDictionary - ); - - final CloseableIterator> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs); - final List combineFutures = combineIteratorAndFutures.rhs; - final Closer closer = Closer.create(); + final ResourceHolder combineBufferHolder = combineBufferSupplier.get(); closer.register(combineBufferHolder); - closer.register(() -> checkCombineFutures(combineFutures)); - return CloseableIterators.wrap(combineIterator, closer); + try { + final ByteBuffer combineBuffer = combineBufferHolder.get(); + final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity( + combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary), + combiningFactories + ); + // We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size + // required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the + // required number of buffers maximizing the parallelism. + final Pair degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers( + combineBuffer, + minimumRequiredBufferCapacity, + concurrencyHint, + sortedIterators.size() + ); + + final int leafCombineDegree = degreeAndNumBuffers.lhs; + final int numBuffers = degreeAndNumBuffers.rhs; + final int sliceSize = combineBuffer.capacity() / numBuffers; + + final Supplier bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize); + + final Pair>>, List> combineIteratorAndFutures = buildCombineTree( + sortedIterators, + bufferSupplier, + combiningFactories, + leafCombineDegree, + mergedDictionary + ); + + final CloseableIterator> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs); + final List combineFutures = combineIteratorAndFutures.rhs; + closer.register(() -> checkCombineFutures(combineFutures)); + + return CloseableIterators.wrap(combineIterator, closer); + } + catch (Throwable t) { + try { + closer.close(); + } + catch (Throwable t2) { + t.addSuppressed(t2); + } + throw t; + } } private static void checkCombineFutures(List combineFutures) @@ -289,11 +300,11 @@ public class ParallelCombiner * Recursively build a combining tree in a bottom-up manner. Each node of the tree is a task that combines input * iterators asynchronously. * - * @param childIterators all iterators of the child level - * @param bufferSupplier combining buffer supplier - * @param combiningFactories array of combining aggregator factories - * @param combineDegree combining degree for the current level - * @param dictionary merged dictionary + * @param childIterators all iterators of the child level + * @param bufferSupplier combining buffer supplier + * @param combiningFactories array of combining aggregator factories + * @param combineDegree combining degree for the current level + * @param dictionary merged dictionary * * @return a pair of a list of iterators of the current level in the combining tree and a list of futures of all * executed combining tasks