Fix GroupBy type cast when ChainedExecutionQueryRunner merges results (#4488)

* Fix GroupBy type cast error when ChainedExecutionQueryRunner merges multiple runners

* Move conversion step to separate method

* Remove unnecessary comment

* Use compute to update map
This commit is contained in:
Jonathan Wei 2017-06-30 17:33:03 -07:00 committed by Gian Merlino
parent d757ef3e9b
commit 97a79f4478
2 changed files with 88 additions and 0 deletions

View File

@ -36,6 +36,7 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ColumnSelectorPlus; import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.ColumnSelectorStrategyFactory; import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.epinephelinae.column.DictionaryBuildingStringGroupByColumnSelectorStrategy; import io.druid.query.groupby.epinephelinae.column.DictionaryBuildingStringGroupByColumnSelectorStrategy;
@ -360,6 +361,8 @@ outer:
); );
} }
convertRowTypesToOutputTypes(query.getDimensions(), theMap);
// Add aggregations. // Add aggregations.
for (int i = 0; i < entry.getValues().length; i++) { for (int i = 0; i < entry.getValues().length; i++) {
theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
@ -402,6 +405,34 @@ outer:
} }
} }
private static void convertRowTypesToOutputTypes(List<DimensionSpec> dimensionSpecs, Map<String, Object> rowMap)
{
for (DimensionSpec dimSpec : dimensionSpecs) {
final ValueType outputType = dimSpec.getOutputType();
rowMap.compute(
dimSpec.getOutputName(),
(dimName, baseVal) -> {
switch (outputType) {
case STRING:
baseVal = baseVal == null ? "" : baseVal.toString();
break;
case LONG:
baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal);
baseVal = baseVal == null ? 0L : baseVal;
break;
case FLOAT:
baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal);
baseVal = baseVal == null ? 0.f : baseVal;
break;
default:
throw new IAE("Unsupported type: " + outputType);
}
return baseVal;
}
);
}
}
private static class GroupByEngineKeySerde implements Grouper.KeySerde<ByteBuffer> private static class GroupByEngineKeySerde implements Grouper.KeySerde<ByteBuffer>
{ {
private final int keySize; private final int keySize;

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.BlockingPool; import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool; import io.druid.collections.DefaultBlockingPool;
@ -49,6 +50,7 @@ import io.druid.java.util.common.parsers.ParseException;
import io.druid.js.JavaScriptConfig; import io.druid.js.JavaScriptConfig;
import io.druid.query.BySegmentResultValue; import io.druid.query.BySegmentResultValue;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.DruidProcessingConfig; import io.druid.query.DruidProcessingConfig;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
@ -59,6 +61,7 @@ import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException; import io.druid.query.ResourceLimitExceededException;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
@ -9130,4 +9133,58 @@ public class GroupByQueryRunnerTest
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testTypeConversionWithMergingChainedExecutionRunner()
{
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(UnsupportedOperationException.class);
expectedException.expectMessage("GroupBy v1 only supports dimensions with an outputType of STRING.");
}
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("quality", "alias"),
new ExtractionDimensionSpec("quality", "qualityLen", ValueType.LONG, StrlenExtractionFn.instance())
))
.setDimFilter(new SelectorDimFilter(
"quality",
"technology",
null
))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "qualityLen", 10L, "rows", 2L, "idx", 156L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "qualityLen", 10L, "rows", 2L, "idx", 194L)
);
ChainedExecutionQueryRunner ceqr = new ChainedExecutionQueryRunner(
MoreExecutors.sameThreadExecutor(),
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
return;
}
},
ImmutableList.<QueryRunner<Row>>of(runner, runner)
);
QueryRunner<Row> mergingRunner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(ceqr));
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, mergingRunner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
} }