Add ParallelCombinerTest

This commit is contained in:
Jihoon Son 2017-09-02 20:02:19 +09:00
parent f31f6d203b
commit 3cdce754f2
4 changed files with 171 additions and 26 deletions

View File

@ -139,8 +139,8 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
getCombiningFactories(aggregatorFactories),
combineKeySerdeFactory,
executor,
keyObjComparator,
concurrencyHint
concurrencyHint,
sortHasNonGroupingFields
);
this.maxDictionarySizeForCombiner = combineKeySerdeFactory.getMaxDictionarySize();
}

View File

@ -76,15 +76,15 @@ public class ParallelCombiner<KeyType>
AggregatorFactory[] combiningFactories,
KeySerdeFactory<KeyType> combineKeySerdeFactory,
ListeningExecutorService executor,
Comparator<Entry<KeyType>> keyObjComparator,
int concurrencyHint
int concurrencyHint,
boolean sortHasNonGroupingFields
)
{
this.combineBufferSupplier = combineBufferSupplier;
this.combiningFactories = combiningFactories;
this.combineKeySerdeFactory = combineKeySerdeFactory;
this.executor = executor;
this.keyObjComparator = keyObjComparator;
this.keyObjComparator = combineKeySerdeFactory.objectComparator(sortHasNonGroupingFields);;
this.concurrencyHint = concurrencyHint;
}
@ -98,11 +98,11 @@ public class ParallelCombiner<KeyType>
* @return an iterator of the root grouper of the combining tree
*/
public CloseableIterator<Entry<KeyType>> combine(
List<CloseableIterator<Entry<KeyType>>> sortedIterators,
List<? extends CloseableIterator<Entry<KeyType>>> sortedIterators,
List<String> mergedDictionary
)
{
// CombineBuffer is initialized when this method is called
// CombineBuffer is initialized when this method is called and closed after the result iterator is done
final ResourceHolder<ByteBuffer> combineBufferHolder = combineBufferSupplier.get();
final ByteBuffer combineBuffer = combineBufferHolder.get();
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
@ -258,7 +258,7 @@ public class ParallelCombiner<KeyType>
* tasks
*/
private Pair<CloseableIterator<Entry<KeyType>>, List<Future>> buildCombineTree(
List<CloseableIterator<Entry<KeyType>>> sortedIterators,
List<? extends CloseableIterator<Entry<KeyType>>> sortedIterators,
Supplier<ByteBuffer> bufferSupplier,
AggregatorFactory[] combiningFactories,
int combineDegree,
@ -302,7 +302,7 @@ public class ParallelCombiner<KeyType>
}
private Pair<CloseableIterator<Entry<KeyType>>, Future> runCombiner(
List<CloseableIterator<Entry<KeyType>>> iterators,
List<? extends CloseableIterator<Entry<KeyType>>> iterators,
ByteBuffer combineBuffer,
AggregatorFactory[] combiningFactories,
List<String> dictionary

View File

@ -26,8 +26,8 @@ import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.ResourceHolder;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
@ -61,19 +61,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ConcurrentGrouperTest
{
private static final ExecutorService service = Executors.newFixedThreadPool(8);
private static final TestResourceHolder testResourceHolder = new TestResourceHolder();
private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);
private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(256);
@AfterClass
public static void teardown()
{
service.shutdown();
SERVICE.shutdown();
}
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final Supplier<ByteBuffer> bufferSupplier = new Supplier<ByteBuffer>()
private static final Supplier<ByteBuffer> BUFFER_SUPPLIER = new Supplier<ByteBuffer>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
private ByteBuffer buffer;
@ -89,7 +89,7 @@ public class ConcurrentGrouperTest
}
};
private static final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier = new Supplier<ResourceHolder<ByteBuffer>>()
private static final Supplier<ResourceHolder<ByteBuffer>> COMBINE_BUFFER_SUPPLIER = new Supplier<ResourceHolder<ByteBuffer>>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
@ -97,17 +97,22 @@ public class ConcurrentGrouperTest
public ResourceHolder<ByteBuffer> get()
{
if (called.compareAndSet(false, true)) {
return testResourceHolder;
return TEST_RESOURCE_HOLDER;
} else {
throw new IAE("should be called once");
}
}
};
private static class TestResourceHolder implements ResourceHolder<ByteBuffer>
static class TestResourceHolder implements ResourceHolder<ByteBuffer>
{
private boolean closed;
private ByteBuffer buffer = ByteBuffer.allocate(256);
private ByteBuffer buffer;
TestResourceHolder(int bufferSize)
{
buffer = ByteBuffer.allocate(bufferSize);
}
@Override
public ByteBuffer get()
@ -122,7 +127,7 @@ public class ConcurrentGrouperTest
}
}
private static final KeySerdeFactory<Long> keySerdeFactory = new KeySerdeFactory<Long>()
static final KeySerdeFactory<Long> KEY_SERDE_FACTORY = new KeySerdeFactory<Long>()
{
@Override
public long getMaxDictionarySize()
@ -260,10 +265,10 @@ public class ConcurrentGrouperTest
public void testAggregate() throws InterruptedException, ExecutionException, IOException
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
bufferSupplier,
combineBufferSupplier,
keySerdeFactory,
keySerdeFactory,
BUFFER_SUPPLIER,
COMBINE_BUFFER_SUPPLIER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
null_factory,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
24,
@ -274,7 +279,7 @@ public class ConcurrentGrouperTest
8,
null,
false,
MoreExecutors.listeningDecorator(service),
MoreExecutors.listeningDecorator(SERVICE),
0,
false,
0
@ -286,7 +291,7 @@ public class ConcurrentGrouperTest
Future<?>[] futures = new Future[8];
for (int i = 0; i < 8; i++) {
futures[i] = service.submit(new Runnable()
futures[i] = SERVICE.submit(new Runnable()
{
@Override
public void run()
@ -306,7 +311,7 @@ public class ConcurrentGrouperTest
final List<Entry<Long>> actual = Lists.newArrayList(iterator);
iterator.close();
Assert.assertTrue(testResourceHolder.closed);
Assert.assertTrue(TEST_RESOURCE_HOLDER.closed);
final List<Entry<Long>> expected = new ArrayList<>();
for (long i = 0; i < numRows; i++) {

View File

@ -0,0 +1,140 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.ResourceHolder;
import io.druid.concurrent.Execs;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.groupby.epinephelinae.ConcurrentGrouperTest.TestResourceHolder;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
public class ParallelCombinerTest
{
private static final int THREAD_NUM = 8;
private static final ExecutorService SERVICE = Execs.multiThreaded(THREAD_NUM, "parallel-combiner-test-%d");
private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(512);
private static final Supplier<ResourceHolder<ByteBuffer>> COMBINE_BUFFER_SUPPLIER =
new Supplier<ResourceHolder<ByteBuffer>>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
@Override
public ResourceHolder<ByteBuffer> get()
{
if (called.compareAndSet(false, true)) {
return TEST_RESOURCE_HOLDER;
} else {
throw new IAE("should be called once");
}
}
};
private static final class TestIterator implements CloseableIterator<Entry<Long>>
{
private final Iterator<Entry<Long>> innerIterator;
private boolean closed;
TestIterator(Iterator<Entry<Long>> innerIterator)
{
this.innerIterator = innerIterator;
}
@Override
public boolean hasNext()
{
return innerIterator.hasNext();
}
@Override
public Entry<Long> next()
{
return innerIterator.next();
}
public boolean isClosed()
{
return closed;
}
@Override
public void close() throws IOException
{
if (!closed) {
closed = true;
}
}
}
@AfterClass
public static void teardown()
{
SERVICE.shutdown();
}
@Test
public void testCombine() throws IOException
{
final ParallelCombiner<Long> combiner = new ParallelCombiner<>(
COMBINE_BUFFER_SUPPLIER,
new AggregatorFactory[]{new CountAggregatorFactory("cnt").getCombiningFactory()},
ConcurrentGrouperTest.KEY_SERDE_FACTORY,
MoreExecutors.listeningDecorator(SERVICE),
THREAD_NUM,
false
);
final int numRows = 1000;
final List<Entry<Long>> baseIterator = new ArrayList<>(numRows);
for (long i = 0; i < numRows; i++) {
baseIterator.add(new Entry<>(i, new Object[]{i * 10}));
}
final List<TestIterator> iterators = new ArrayList<>(8);
for (int i = 0; i < 8; i++) {
iterators.add(new TestIterator(baseIterator.iterator()));
}
try (final CloseableIterator<Entry<Long>> iterator = combiner.combine(iterators, new ArrayList<>())) {
long expectedKey = 0;
while (iterator.hasNext()) {
Assert.assertEquals(new Entry<>(expectedKey, new Object[]{expectedKey++ * 80}), iterator.next());
}
}
iterators.forEach(it -> Assert.assertTrue(it.isClosed()));
}
}