GroupBy: Validation of output names, and a gross hack for v1 subqueries. (#3686)

v1 subqueries try to use aggregators to "transfer" values from the inner
results to an incremental index, but aggregators can't transfer all kinds of
values (strings are a common one). This is a workaround that selectively
ignores what the outer aggregators ask for and instead assumes that we know
best.

These are in the same commit because the name validation changed the kinds of
errors that were thrown by v1 subqueries.
This commit is contained in:
Gian Merlino 2016-11-28 23:05:03 -08:00 committed by Nishant
parent 9f7050e221
commit 6922d684bf
4 changed files with 98 additions and 69 deletions

View File

@ -28,9 +28,11 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
@ -57,6 +59,7 @@ import org.joda.time.Interval;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
@ -106,6 +109,11 @@ public class GroupByQuery extends BaseQuery<Row>
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
// Verify no duplicate names between dimensions, aggregators, and postAggregators.
// They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other.
// We're not counting __time, even though that name is problematic. See: https://github.com/druid-io/druid/pull/3684
verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
Function<Sequence<Row>, Sequence<Row>> postProcFn =
this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
@ -436,6 +444,32 @@ public class GroupByQuery extends BaseQuery<Row>
);
}
private static void verifyOutputNames(
List<DimensionSpec> dimensions,
List<AggregatorFactory> aggregators,
List<PostAggregator> postAggregators
)
{
final Set<String> outputNames = Sets.newHashSet();
for (DimensionSpec dimension : dimensions) {
if (!outputNames.add(dimension.getOutputName())) {
throw new IAE("Duplicate output name[%s]", dimension.getOutputName());
}
}
for (AggregatorFactory aggregator : aggregators) {
if (!outputNames.add(aggregator.getName())) {
throw new IAE("Duplicate output name[%s]", aggregator.getName());
}
}
for (PostAggregator postAggregator : postAggregators) {
if (!outputNames.add(postAggregator.getName())) {
throw new IAE("Duplicate output name[%s]", postAggregator.getName());
}
}
}
public static class Builder
{
private DataSource dataSource;

View File

@ -21,7 +21,6 @@ package io.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
@ -37,7 +36,6 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
@ -48,7 +46,6 @@ import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
public class GroupByQueryHelper
@ -95,21 +92,11 @@ public class GroupByQueryHelper
final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);
// All groupBy dimensions are strings, for now, as long as they don't conflict with any non-dimensions.
// This should get cleaned up if/when https://github.com/druid-io/druid/pull/3686 makes name conflicts impossible.
final Set<String> otherNames = Sets.newHashSet();
for (AggregatorFactory agg : aggs) {
otherNames.add(agg.getName());
}
for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) {
otherNames.add(postAggregator.getName());
}
// All groupBy dimensions are strings, for now.
final List<DimensionSchema> dimensionSchemas = Lists.newArrayList();
for (DimensionSpec dimension : query.getDimensions()) {
if (!otherNames.contains(dimension.getOutputName())) {
dimensionSchemas.add(new StringDimensionSchema(dimension.getOutputName()));
}
}
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(dimensionSchemas, null, null))

View File

@ -41,6 +41,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
@ -135,8 +136,21 @@ public class GroupByStrategyV1 implements GroupByStrategy
// multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple
// aggregators of the same type referencing the same fieldName (and skip creating identical columns for the
// subsequent ones) and return an error if the aggregator types are different.
final Set<String> dimensionNames = Sets.newHashSet();
for (DimensionSpec dimension : subquery.getDimensions()) {
dimensionNames.add(dimension.getOutputName());
}
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) {
if (dimensionNames.contains(transferAgg.getName())) {
// This transferAgg is already represented in the subquery's dimensions. Assume that the outer aggregator
// *probably* wants the dimension and just ignore it. This is a gross workaround for cases like having
// a cardinality aggregator in the outer query. It is necessary because what this block of code is trying to
// do is use aggregators to "transfer" values from the inner results to an incremental index, but aggregators
// can't transfer all kinds of values (strings are a common one). If you don't like it, use groupBy v2, which
// doesn't have this problem.
continue;
}
if (Iterables.any(aggs, new Predicate<AggregatorFactory>()
{
@Override

View File

@ -36,7 +36,6 @@ import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
@ -392,6 +391,27 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithOutputNameCollisions()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Duplicate output name[alias]");
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("alias", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
}
@Test
public void testGroupByNoAggregators()
{
@ -4978,24 +4998,12 @@ public class GroupByQueryRunnerTest
.setGranularity(QueryRunnerTestHelper.allGran)
.build();
// v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get
// aggregator for all fields to build the inner query result incremental index. In this case, quality is a string
// field but getRequiredColumn() returned a Cardinality aggregator for it, which has type hyperUnique.
// The "quality" column is interpreted as a dimension because it appears in the dimension list of the
// MapBasedInputRows from the subquery, but the COMPLEX type from the agg overrides the actual string type.
// COMPLEX is not currently supported as a dimension type, so IAE is thrown. Even if it were, the actual string
// values in the "quality" column could not be interpreted as hyperUniques.
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(IAE.class);
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else {
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "car", QueryRunnerTestHelper.UNIQUES_9)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
}
@Test
public void testSubqueryWithOuterCountAggregator()
@ -5079,19 +5087,6 @@ public class GroupByQueryRunnerTest
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
// v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get
// aggregator for all fields to build the inner query result incremental index. In this case, market is a string
// field but getRequiredColumn() returned a Javascript aggregator for it, which has type float.
// The "market" column is interpreted as a dimension because it appears in the dimension list of the
// MapBasedInputRows from the subquery, but the float type from the agg overrides the actual string type.
// Float is not currently supported as a dimension type, so IAE is thrown. Even if it were, a ParseException
// would occur because the "market" column really contains non-numeric values.
// Additionally, the V1 strategy always uses "combining" aggregator factories (meant for merging) on the subquery,
// which does not work for this particular javascript agg.
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(IAE.class);
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else {
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 139D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 122D),
@ -5116,7 +5111,6 @@ public class GroupByQueryRunnerTest
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
}
@Test
public void testSubqueryWithHyperUniques()