use AbstractPrioritizedCallable

This commit is contained in:
Jihoon Son 2017-09-02 20:28:10 +09:00
parent 2a619b946a
commit 3f14db44e5
3 changed files with 37 additions and 23 deletions

View File

@ -139,8 +139,9 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
getCombiningFactories(aggregatorFactories),
combineKeySerdeFactory,
executor,
sortHasNonGroupingFields,
concurrencyHint,
sortHasNonGroupingFields
priority
);
this.maxDictionarySizeForCombiner = combineKeySerdeFactory.getMaxDictionarySize();
}

View File

@ -31,6 +31,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
@ -71,14 +72,16 @@ public class ParallelCombiner<KeyType>
private final ListeningExecutorService executor;
private final Comparator<Entry<KeyType>> keyObjComparator;
private final int concurrencyHint;
private final int priority;
public ParallelCombiner(
Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
AggregatorFactory[] combiningFactories,
KeySerdeFactory<KeyType> combineKeySerdeFactory,
ListeningExecutorService executor,
boolean sortHasNonGroupingFields,
int concurrencyHint,
boolean sortHasNonGroupingFields
int priority
)
{
this.combineBufferSupplier = combineBufferSupplier;
@ -87,6 +90,7 @@ public class ParallelCombiner<KeyType>
this.executor = executor;
this.keyObjComparator = combineKeySerdeFactory.objectComparator(sortHasNonGroupingFields);;
this.concurrencyHint = concurrencyHint;
this.priority = priority;
}
/**
@ -322,29 +326,37 @@ public class ParallelCombiner<KeyType>
combiningFactories
);
final ListenableFuture future = executor.submit(() -> {
grouper.init();
final ListenableFuture future = executor.submit(
new AbstractPrioritizedCallable<Void>(priority)
{
@Override
public Void call() throws Exception
{
grouper.init();
try (
CloseableIterator<Entry<KeyType>> mergedIterator = CloseableIterators.mergeSorted(
iterators,
keyObjComparator
)
) {
while (mergedIterator.hasNext()) {
final Entry<KeyType> next = mergedIterator.next();
try (
CloseableIterator<Entry<KeyType>> mergedIterator = CloseableIterators.mergeSorted(
iterators,
keyObjComparator
)
) {
while (mergedIterator.hasNext()) {
final Entry<KeyType> next = mergedIterator.next();
settableColumnSelectorFactory.set(next.values);
grouper.aggregate(next.key); // grouper always returns ok or throws an exception
settableColumnSelectorFactory.set(null);
settableColumnSelectorFactory.set(next.values);
grouper.aggregate(next.key); // grouper always returns ok or throws an exception
settableColumnSelectorFactory.set(null);
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
grouper.finish();
return null;
}
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
grouper.finish();
});
);
return new Pair<>(grouper.iterator(), future);
}

View File

@ -113,8 +113,9 @@ public class ParallelCombinerTest
new AggregatorFactory[]{new CountAggregatorFactory("cnt").getCombiningFactory()},
ConcurrentGrouperTest.KEY_SERDE_FACTORY,
MoreExecutors.listeningDecorator(SERVICE),
false,
THREAD_NUM,
false
0
);
final int numRows = 1000;