Address comments

This commit is contained in:
Jihoon Son 2017-09-07 21:35:50 +09:00
parent 3f14db44e5
commit dee9633b59
5 changed files with 88 additions and 75 deletions

View File

@ -19,7 +19,6 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Throwables;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
@ -74,7 +73,7 @@ public class CloseableGrouperIterator<KeyType, T> implements CloseableIterator<T
closer.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
}

View File

@ -40,6 +40,7 @@ import io.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@ -331,12 +332,11 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
final List<String> dictionary = grouper.getDictionary();
for (String key : dictionary) {
if (!mergedDictionary.contains(key)) {
if (mergedDictionary.add(key)) {
totalDictionarySize += RowBasedGrouperHelper.estimateStringKeySize(key);
if (totalDictionarySize > maxDictionarySizeForCombiner) {
return null;
}
mergedDictionary.add(key);
}
}
}
@ -361,9 +361,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
private AggregatorFactory[] getCombiningFactories(AggregatorFactory[] aggregatorFactories)
{
final AggregatorFactory[] combiningFactories = new AggregatorFactory[aggregatorFactories.length];
for (int i = 0; i < aggregatorFactories.length; i++) {
combiningFactories[i] = aggregatorFactories[i].getCombiningFactory();
}
Arrays.setAll(combiningFactories, i -> aggregatorFactories[i].getCombiningFactory());
return combiningFactories;
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.collections.ResourceHolder;
@ -51,7 +52,6 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CancellationException;
@ -88,7 +88,7 @@ public class ParallelCombiner<KeyType>
this.combiningFactories = combiningFactories;
this.combineKeySerdeFactory = combineKeySerdeFactory;
this.executor = executor;
this.keyObjComparator = combineKeySerdeFactory.objectComparator(sortHasNonGroupingFields);;
this.keyObjComparator = combineKeySerdeFactory.objectComparator(sortHasNonGroupingFields);
this.concurrencyHint = concurrencyHint;
this.priority = priority;
}
@ -130,7 +130,7 @@ public class ParallelCombiner<KeyType>
final Supplier<ByteBuffer> bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize);
final Pair<CloseableIterator<Entry<KeyType>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
sortedIterators,
bufferSupplier,
combiningFactories,
@ -138,7 +138,7 @@ public class ParallelCombiner<KeyType>
mergedDictionary
);
final CloseableIterator<Entry<KeyType>> combineIterator = combineIteratorAndFutures.lhs;
final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
final Closer closer = Closer.create();
@ -192,12 +192,12 @@ public class ParallelCombiner<KeyType>
/**
* Find a minimum size of the buffer slice and corresponding combining degree and number of slices. Note that each
* node in the combining tree is executed by different threads. This method assumes that higher degree of parallelism
* can exploit better performance and find such a shape of the combining tree.
* node in the combining tree is executed by different threads. This method assumes that using more threads can
* exploit better performance and find such a shape of the combining tree.
*
* @param combineBuffer entire buffer used for combining tree
* @param requiredMinimumBufferCapacity minimum buffer capacity for {@link StreamingMergeSortedGrouper}
* @param concurrencyHint available degree of parallelism
* @param numAvailableThreads number of available threads
* @param numLeafNodes number of leaf nodes of combining tree
*
* @return a pair of degree and number of buffers if found.
@ -205,13 +205,13 @@ public class ParallelCombiner<KeyType>
private static Pair<Integer, Integer> findCombineDegreeAndNumBuffers(
ByteBuffer combineBuffer,
int requiredMinimumBufferCapacity,
int concurrencyHint,
int numAvailableThreads,
int numLeafNodes
)
{
for (int degree = MINIMUM_COMBINE_DEGREE; degree <= numLeafNodes; degree++) {
final int requiredBufferNum = computeRequiredBufferNum(numLeafNodes, degree);
if (requiredBufferNum <= concurrencyHint) {
if (requiredBufferNum <= numAvailableThreads) {
final int expectedSliceSize = combineBuffer.capacity() / requiredBufferNum;
if (expectedSliceSize >= requiredMinimumBufferCapacity) {
return Pair.of(degree, requiredBufferNum);
@ -223,90 +223,101 @@ public class ParallelCombiner<KeyType>
}
/**
* Recursively compute the number of required buffers for a combining tree in a top-down manner. Since each node of
* Recursively compute the number of required buffers for a combining tree in a bottom-up manner. Since each node of
* the combining tree represents a combining task and each combining task requires one buffer, the number of required
* buffers is the number of nodes of the combining tree.
*
* @param numLeafNodes number of leaf nodes
* @param numChildNodes number of child nodes
* @param combineDegree combine degree
*
* @return minimum number of buffers required for combining tree
*
* @see {@link #buildCombineTree(List, Supplier, AggregatorFactory[], int, List)}
* @see #buildCombineTree(List, Supplier, AggregatorFactory[], int, List)
*/
private static int computeRequiredBufferNum(int numLeafNodes, int combineDegree)
private static int computeRequiredBufferNum(int numChildNodes, int combineDegree)
{
if (numLeafNodes > combineDegree) {
final int numLeafNodesPerChild = (numLeafNodes + combineDegree - 1) / combineDegree; // ceiling
int sum = 1; // count for the current node
for (int i = 0; i < combineDegree; i++) {
// further compute for child nodes
sum += computeRequiredBufferNum(
Math.min(numLeafNodesPerChild, numLeafNodes - i * numLeafNodesPerChild),
combineDegree
);
}
// numChildrenForLastNode used to determine that the last node is needed for the current level.
// Please see buildCombineTree() for more details.
final int numChildrenForLastNode = numChildNodes % combineDegree;
final int numCurLevelNodes = numChildNodes / combineDegree + (numChildrenForLastNode > 1 ? 1 : 0);
final int numChildOfParentNodes = numCurLevelNodes + (numChildrenForLastNode == 1 ? 1 : 0);
return sum;
if (numChildOfParentNodes == 1) {
return numCurLevelNodes;
} else {
return 1;
return numCurLevelNodes +
computeRequiredBufferNum(numChildOfParentNodes, combineDegree);
}
}
/**
* Recursively build a combining tree in a top-down manner. Each node of the tree is a task that combines input
* Recursively build a combining tree in a bottom-up manner. Each node of the tree is a task that combines input
* iterators asynchronously.
*
* @param sortedIterators sorted iterators
* @param bufferSupplier combining buffer supplier
* @param combiningFactories array of combining aggregator factories
* @param combineDegree combining degree
* @param dictionary merged dictionary
* @param childIterators all iterators of the child level
* @param bufferSupplier combining buffer supplier
* @param combiningFactories array of combining aggregator factories
* @param combineDegree combining degree
* @param dictionary merged dictionary
*
* @return a pair of an iterator of the root of the combining tree and a list of futures of all executed combining
* tasks
* @return a pair of a list of iterators of the current level in the combining tree and a list of futures of all
* executed combining tasks
*/
private Pair<CloseableIterator<Entry<KeyType>>, List<Future>> buildCombineTree(
List<? extends CloseableIterator<Entry<KeyType>>> sortedIterators,
private Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> buildCombineTree(
List<? extends CloseableIterator<Entry<KeyType>>> childIterators,
Supplier<ByteBuffer> bufferSupplier,
AggregatorFactory[] combiningFactories,
int combineDegree,
List<String> dictionary
)
{
final int numIterators = sortedIterators.size();
if (numIterators > combineDegree) {
final List<CloseableIterator<Entry<KeyType>>> childIterators = new ArrayList<>(combineDegree);
final List<Future> combineFutures = new ArrayList<>(combineDegree + 1);
final int numChildLevelIterators = childIterators.size();
final List<CloseableIterator<Entry<KeyType>>> childIteratorsOfNextLevel = new ArrayList<>();
final List<Future> combineFutures = new ArrayList<>();
final int iteratorsPerChild = (numIterators + combineDegree - 1) / combineDegree; // ceiling
for (int i = 0; i < combineDegree; i++) {
final Pair<CloseableIterator<Entry<KeyType>>, List<Future>> childIteratorAndFutures = buildCombineTree(
sortedIterators.subList(i * iteratorsPerChild, Math.min(numIterators, (i + 1) * iteratorsPerChild)),
bufferSupplier,
// The below algorithm creates the combining nodes of the current level. It first checks that the number of children
// to be combined together is 1. If it is, the intermediate combining node for that child is not needed. Instead, it
// can be directly connected to a node of the parent level. Here is an example of generated tree when
// numLeafNodes = 6 and combineDegree = 2.
// o
// / \
// o \
// / \ \
// o o o
// / \ / \ / \
// o o o o o o
int i;
for (i = 0; i < numChildLevelIterators; i += combineDegree) {
if (i < numChildLevelIterators - 1) {
final List<? extends CloseableIterator<Entry<KeyType>>> subIterators = childIterators.subList(
i,
Math.min(i + combineDegree, numChildLevelIterators)
);
final Pair<CloseableIterator<Entry<KeyType>>, Future> iteratorAndFuture = runCombiner(
subIterators,
bufferSupplier.get(),
combiningFactories,
combineDegree,
dictionary
);
childIterators.add(childIteratorAndFutures.lhs);
combineFutures.addAll(childIteratorAndFutures.rhs);
childIteratorsOfNextLevel.add(iteratorAndFuture.lhs);
combineFutures.add(iteratorAndFuture.rhs);
} else {
// If there remains one child, it can be directly connected to a node of the parent level.
childIteratorsOfNextLevel.add(childIterators.get(i));
}
final Pair<CloseableIterator<Entry<KeyType>>, Future> iteratorAndFuture = runCombiner(
childIterators,
bufferSupplier.get(),
combiningFactories,
dictionary
);
combineFutures.add(iteratorAndFuture.rhs);
return new Pair<>(iteratorAndFuture.lhs, combineFutures);
}
if (childIteratorsOfNextLevel.size() == 1) {
// This is the root
return Pair.of(childIteratorsOfNextLevel, combineFutures);
} else {
final Pair<CloseableIterator<Entry<KeyType>>, Future> iteratorAndFuture = runCombiner(
sortedIterators,
bufferSupplier.get(),
combiningFactories,
dictionary
);
return new Pair<>(iteratorAndFuture.lhs, Collections.singletonList(iteratorAndFuture.rhs));
// Build the parent level iterators
final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> parentIteratorsAndFutures =
buildCombineTree(childIteratorsOfNextLevel, bufferSupplier, combiningFactories, combineDegree, dictionary);
combineFutures.addAll(parentIteratorsAndFutures.rhs);
return Pair.of(parentIteratorsAndFutures.lhs, combineFutures);
}
}
@ -325,6 +336,7 @@ public class ParallelCombiner<KeyType>
settableColumnSelectorFactory,
combiningFactories
);
grouper.init();
final ListenableFuture future = executor.submit(
new AbstractPrioritizedCallable<Void>(priority)
@ -332,8 +344,6 @@ public class ParallelCombiner<KeyType>
@Override
public Void call() throws Exception
{
grouper.init();
try (
CloseableIterator<Entry<KeyType>> mergedIterator = CloseableIterators.mergeSorted(
iterators,

View File

@ -22,9 +22,10 @@ package io.druid.query.groupby.epinephelinae;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
@ -296,6 +297,10 @@ public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
*/
public CloseableIterator<Entry<KeyType>> iterator()
{
if (!initialized) {
throw new ISE("Grouper should be initialized first");
}
return new CloseableIterator<Entry<KeyType>>()
{
{

View File

@ -124,15 +124,16 @@ public class ParallelCombinerTest
baseIterator.add(new Entry<>(i, new Object[]{i * 10}));
}
final List<TestIterator> iterators = new ArrayList<>(8);
for (int i = 0; i < 8; i++) {
final int leafNum = 8;
final List<TestIterator> iterators = new ArrayList<>(leafNum);
for (int i = 0; i < leafNum; i++) {
iterators.add(new TestIterator(baseIterator.iterator()));
}
try (final CloseableIterator<Entry<Long>> iterator = combiner.combine(iterators, new ArrayList<>())) {
long expectedKey = 0;
while (iterator.hasNext()) {
Assert.assertEquals(new Entry<>(expectedKey, new Object[]{expectedKey++ * 80}), iterator.next());
Assert.assertEquals(new Entry<>(expectedKey, new Object[]{expectedKey++ * leafNum * 10}), iterator.next());
}
}