Fix ClassCastException in nested v2 groupBys with timeouts. (#3310)

Add tests for the CCE and for a bunch of other groupBy stuff.

Also avoids setting the interrupted flag when InterruptedExceptions
happen, since this might interfere with resource closing, no other
query does it, and is probably pointless anyway since the thread
is likely to be a jetty thread that we don't actually want to set
an interrupt flag on.

Also fixes toString on OrderByColumnSpec.
This commit is contained in:
Gian Merlino 2016-08-02 15:02:44 -07:00 committed by David Lim
parent 50d52a24fc
commit ae3e0015b6
4 changed files with 173 additions and 14 deletions

View File

@ -42,7 +42,6 @@ import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.Releaser; import io.druid.collections.Releaser;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.BaseQuery; import io.druid.query.BaseQuery;
import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.ChainedExecutionQueryRunner;
@ -167,8 +166,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
closeOnFailure.add(mergeBufferHolder); closeOnFailure.add(mergeBufferHolder);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); throw new QueryInterruptedException(e);
throw Throwables.propagate(e);
} }
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(

View File

@ -21,7 +21,6 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
@ -29,7 +28,6 @@ import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FilteredSequence; import com.metamx.common.guava.FilteredSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.logger.Logger;
import io.druid.collections.BlockingPool; import io.druid.collections.BlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
@ -62,8 +60,6 @@ import java.util.concurrent.TimeoutException;
public class GroupByRowProcessor public class GroupByRowProcessor
{ {
private static final Logger log = new Logger(GroupByRowProcessor.class);
public static Sequence<Row> process( public static Sequence<Row> process(
final Query queryParam, final Query queryParam,
final Sequence<Row> rows, final Sequence<Row> rows,
@ -85,7 +81,8 @@ public class GroupByRowProcessor
String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId())
); );
final long timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeout = queryTimeout == null ? JodaUtils.MAX_INSTANT : queryTimeout.longValue();
final List<Interval> queryIntervals = query.getIntervals(); final List<Interval> queryIntervals = query.getIntervals();
final Filter filter = Filters.convertToCNFFromQueryContext( final Filter filter = Filters.convertToCNFFromQueryContext(
query, query,
@ -144,8 +141,7 @@ public class GroupByRowProcessor
closeOnFailure.add(mergeBufferHolder); closeOnFailure.add(mergeBufferHolder);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); throw new QueryInterruptedException(e);
throw Throwables.propagate(e);
} }
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(

View File

@ -213,7 +213,7 @@ public class OrderByColumnSpec
{ {
return "OrderByColumnSpec{" + return "OrderByColumnSpec{" +
"dimension='" + dimension + '\'' + "dimension='" + dimension + '\'' +
", direction=" + direction + '\'' + ", direction='" + direction + '\'' +
", dimensionComparator='" + dimensionComparator + '\'' + ", dimensionComparator='" + dimensionComparator + '\'' +
'}'; '}';
} }

View File

@ -52,6 +52,7 @@ import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory; import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory;
@ -872,11 +873,79 @@ public class GroupByQueryRunnerTest
.setContext(ImmutableMap.<String, Object>of("maxResults", 1)) .setContext(ImmutableMap.<String, Object>of("maxResults", 1))
.build(); .build();
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { List<Row> expectedResults = null;
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(ISE.class); expectedException.expect(ISE.class);
} else {
expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
} }
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByTimeoutContextOverride()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setContext(ImmutableMap.<String, Object>of("timeout", Integer.valueOf(60000)))
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test @Test
@ -897,12 +966,36 @@ public class GroupByQueryRunnerTest
.setContext(ImmutableMap.<String, Object>of("maxOnDiskStorage", 0, "bufferGrouperMaxSize", 1)) .setContext(ImmutableMap.<String, Object>of("maxOnDiskStorage", 0, "bufferGrouperMaxSize", 1))
.build(); .build();
List<Row> expectedResults = null;
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ISE.class); expectedException.expect(ISE.class);
expectedException.expectMessage("Grouping resources exhausted"); expectedException.expectMessage("Grouping resources exhausted");
} else {
expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
} }
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test @Test
@ -4560,6 +4653,34 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testSubqueryWithContextTimeout()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList())
.setAggregatorSpecs(ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("count")))
.setGranularity(QueryRunnerTestHelper.allGran)
.setContext(ImmutableMap.<String, Object>of("timeout", 10000))
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "count", 18L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test @Test
public void testSubqueryWithOuterCardinalityAggregator() public void testSubqueryWithOuterCardinalityAggregator()
{ {
@ -4608,6 +4729,50 @@ public class GroupByQueryRunnerTest
} }
} }
@Test
public void testSubqueryWithOuterCountAggregator()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setGranularity(QueryRunnerTestHelper.dayGran)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(new OrderByColumnSpec("alias", OrderByColumnSpec.Direction.ASCENDING)),
null
)
)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList())
.setAggregatorSpecs(ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("count")))
.setGranularity(QueryRunnerTestHelper.allGran)
.build();
// v1 strategy throws an exception for this query because it tries to merge the noop outer
// and default inner limit specs, then apply the resulting spec to the outer query, which
// fails because the inner limit spec refers to columns that don't exist in the outer
// query. I'm not sure why it does this.
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(ISE.class);
expectedException.expectMessage("Unknown column in order clause");
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else {
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "count", 18L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
}
@Test @Test
public void testSubqueryWithOuterJavascriptAggregators() public void testSubqueryWithOuterJavascriptAggregators()
{ {