Merge pull request #1206 from metamx/group-by-NPE-fix

fix race in groupByParallelQueryRunner
This commit is contained in:
Fangjin Yang 2015-03-17 15:35:48 -07:00
commit b0e33ac879
4 changed files with 76 additions and 7 deletions

View File

@ -44,6 +44,7 @@ import io.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -84,7 +85,7 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
configSupplier.get(),
bufferPool
);
final Pair<List, Accumulator<List, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = query.getContextBySegment(false);
final int priority = query.getContextPriority(0);

View File

@ -36,7 +36,10 @@ import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class GroupByQueryHelper
{
@ -130,18 +133,19 @@ public class GroupByQueryHelper
return new Pair<>(index, accumulator);
}
public static <T> Pair<List, Accumulator<List, T>> createBySegmentAccumulatorPair()
public static <T> Pair<Queue, Accumulator<Queue, T>> createBySegmentAccumulatorPair()
{
List init = Lists.newArrayList();
Accumulator<List, T> accumulator = new Accumulator<List, T>()
// In parallel query runner multiple threads add to this queue concurrently
Queue init = new ConcurrentLinkedQueue<>();
Accumulator<Queue, T> accumulator = new Accumulator<Queue, T>()
{
@Override
public List accumulate(List accumulated, T in)
public Queue accumulate(Queue accumulated, T in)
{
if(in == null){
throw new ISE("Cannot have null result");
}
accumulated.add(in);
accumulated.offer(in);
return accumulated;
}
};

View File

@ -49,6 +49,7 @@ import io.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -115,7 +116,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
config.get(),
computationBufferPool
);
final Pair<List, Accumulator<List, Row>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final Pair<Queue, Accumulator<Queue, Row>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final int priority = query.getContextPriority(0);
final boolean bySegment = query.getContextBySegment(false);

View File

@ -22,6 +22,7 @@ import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -33,9 +34,14 @@ import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.BySegmentResultValue;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -88,6 +94,8 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerTest
@ -2337,4 +2345,59 @@ public class GroupByQueryRunnerTest
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testBySegmentResults()
{
int segmentCount = 32;
Result<BySegmentResultValue> singleSegmentResult = new Result<BySegmentResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new BySegmentResultValueClass(
Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-01",
"alias",
"mezzanine",
"rows",
6L,
"idx",
4420L
)
), "testSegment", new Interval("2011-04-02T00:00:00.000Z/2011-04-04T00:00:00.000Z")
)
);
List<Result> bySegmentResults = Lists.newArrayList();
for (int i = 0; i < segmentCount; i++) {
bySegmentResults.add(singleSegmentResult);
}
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setDimFilter(new SelectorDimFilter("quality", "mezzanine"))
.setContext(ImmutableMap.<String, Object>of("bySegment", true));
final GroupByQuery fullQuery = builder.build();
QueryToolChest toolChest = factory.getToolchest();
List<QueryRunner<Row>> singleSegmentRunners = Lists.newArrayList();
for (int i = 0; i < segmentCount; i++) {
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(Executors.newCachedThreadPool(), singleSegmentRunners)),
toolChest
);
TestHelper.assertExpectedObjects(bySegmentResults, theRunner.run(fullQuery, Maps.newHashMap()), "");
exec.shutdownNow();
}
}