mirror of https://github.com/apache/druid.git
parent
bcd20441be
commit
9ad34a3f03
|
@ -21,10 +21,14 @@ package io.druid.query.groupby;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
import io.druid.collections.StupidPool;
|
import io.druid.collections.StupidPool;
|
||||||
import io.druid.data.input.MapBasedInputRow;
|
import io.druid.data.input.MapBasedInputRow;
|
||||||
import io.druid.data.input.MapBasedRow;
|
import io.druid.data.input.MapBasedRow;
|
||||||
import io.druid.data.input.Row;
|
import io.druid.data.input.Row;
|
||||||
|
import io.druid.data.input.impl.DimensionSchema;
|
||||||
|
import io.druid.data.input.impl.DimensionsSpec;
|
||||||
|
import io.druid.data.input.impl.StringDimensionSchema;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.java.util.common.ISE;
|
import io.druid.java.util.common.ISE;
|
||||||
import io.druid.java.util.common.Pair;
|
import io.druid.java.util.common.Pair;
|
||||||
|
@ -33,8 +37,10 @@ import io.druid.java.util.common.guava.Sequence;
|
||||||
import io.druid.java.util.common.guava.Sequences;
|
import io.druid.java.util.common.guava.Sequences;
|
||||||
import io.druid.query.ResourceLimitExceededException;
|
import io.druid.query.ResourceLimitExceededException;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.PostAggregator;
|
||||||
import io.druid.query.dimension.DimensionSpec;
|
import io.druid.query.dimension.DimensionSpec;
|
||||||
import io.druid.segment.incremental.IncrementalIndex;
|
import io.druid.segment.incremental.IncrementalIndex;
|
||||||
|
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||||
import io.druid.segment.incremental.IndexSizeExceededException;
|
import io.druid.segment.incremental.IndexSizeExceededException;
|
||||||
import io.druid.segment.incremental.OffheapIncrementalIndex;
|
import io.druid.segment.incremental.OffheapIncrementalIndex;
|
||||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||||
|
@ -42,6 +48,7 @@ import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
public class GroupByQueryHelper
|
public class GroupByQueryHelper
|
||||||
|
@ -88,13 +95,32 @@ public class GroupByQueryHelper
|
||||||
|
|
||||||
final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);
|
final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);
|
||||||
|
|
||||||
|
// All groupBy dimensions are strings, for now, as long as they don't conflict with any non-dimensions.
|
||||||
|
// This should get cleaned up if/when https://github.com/druid-io/druid/pull/3686 makes name conflicts impossible.
|
||||||
|
final Set<String> otherNames = Sets.newHashSet();
|
||||||
|
for (AggregatorFactory agg : aggs) {
|
||||||
|
otherNames.add(agg.getName());
|
||||||
|
}
|
||||||
|
for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) {
|
||||||
|
otherNames.add(postAggregator.getName());
|
||||||
|
}
|
||||||
|
final List<DimensionSchema> dimensionSchemas = Lists.newArrayList();
|
||||||
|
for (DimensionSpec dimension : query.getDimensions()) {
|
||||||
|
if (!otherNames.contains(dimension.getOutputName())) {
|
||||||
|
dimensionSchemas.add(new StringDimensionSchema(dimension.getOutputName()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
|
||||||
|
.withDimensionsSpec(new DimensionsSpec(dimensionSchemas, null, null))
|
||||||
|
.withMetrics(aggs.toArray(new AggregatorFactory[aggs.size()]))
|
||||||
|
.withQueryGranularity(gran)
|
||||||
|
.withMinTimestamp(granTimeStart)
|
||||||
|
.build();
|
||||||
|
|
||||||
if (query.getContextValue("useOffheap", false)) {
|
if (query.getContextValue("useOffheap", false)) {
|
||||||
index = new OffheapIncrementalIndex(
|
index = new OffheapIncrementalIndex(
|
||||||
// use granularity truncated min timestamp
|
indexSchema,
|
||||||
// since incoming truncated timestamps may precede timeStart
|
|
||||||
granTimeStart,
|
|
||||||
gran,
|
|
||||||
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
sortResults,
|
sortResults,
|
||||||
|
@ -103,11 +129,7 @@ public class GroupByQueryHelper
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
index = new OnheapIncrementalIndex(
|
index = new OnheapIncrementalIndex(
|
||||||
// use granularity truncated min timestamp
|
indexSchema,
|
||||||
// since incoming truncated timestamps may precede timeStart
|
|
||||||
granTimeStart,
|
|
||||||
gran,
|
|
||||||
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
sortResults,
|
sortResults,
|
||||||
|
|
|
@ -87,6 +87,7 @@ import io.druid.query.filter.OrDimFilter;
|
||||||
import io.druid.query.filter.RegexDimFilter;
|
import io.druid.query.filter.RegexDimFilter;
|
||||||
import io.druid.query.filter.SearchQueryDimFilter;
|
import io.druid.query.filter.SearchQueryDimFilter;
|
||||||
import io.druid.query.filter.SelectorDimFilter;
|
import io.druid.query.filter.SelectorDimFilter;
|
||||||
|
import io.druid.query.groupby.having.DimensionSelectorHavingSpec;
|
||||||
import io.druid.query.groupby.having.EqualToHavingSpec;
|
import io.druid.query.groupby.having.EqualToHavingSpec;
|
||||||
import io.druid.query.groupby.having.GreaterThanHavingSpec;
|
import io.druid.query.groupby.having.GreaterThanHavingSpec;
|
||||||
import io.druid.query.groupby.having.HavingSpec;
|
import io.druid.query.groupby.having.HavingSpec;
|
||||||
|
@ -3981,6 +3982,103 @@ public class GroupByQueryRunnerTest
|
||||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupByTimeExtractionNamedUnderUnderTime()
|
||||||
|
{
|
||||||
|
GroupByQuery query = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
|
||||||
|
.setDimensions(
|
||||||
|
Lists.newArrayList(
|
||||||
|
new DefaultDimensionSpec("market", "market"),
|
||||||
|
new ExtractionDimensionSpec(
|
||||||
|
Column.TIME_COLUMN_NAME,
|
||||||
|
Column.TIME_COLUMN_NAME,
|
||||||
|
new TimeFormatExtractionFn("EEEE", null, null, null),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
Arrays.asList(
|
||||||
|
QueryRunnerTestHelper.rowsCount,
|
||||||
|
QueryRunnerTestHelper.indexDoubleSum
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setPostAggregatorSpecs(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||||
|
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||||
|
.setDimFilter(
|
||||||
|
new OrDimFilter(
|
||||||
|
Arrays.<DimFilter>asList(
|
||||||
|
new SelectorDimFilter("market", "spot", null),
|
||||||
|
new SelectorDimFilter("market", "upfront", null)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setLimitSpec(new DefaultLimitSpec(ImmutableList.<OrderByColumnSpec>of(), 1))
|
||||||
|
.build();
|
||||||
|
List<Row> expectedResults = Arrays.asList(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow(
|
||||||
|
"1970-01-01",
|
||||||
|
"__time",
|
||||||
|
"Friday",
|
||||||
|
"market",
|
||||||
|
"spot",
|
||||||
|
"index",
|
||||||
|
13219.574157714844,
|
||||||
|
"rows",
|
||||||
|
117L,
|
||||||
|
"addRowsIndexConstant",
|
||||||
|
13337.574157714844
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupByWithUnderUnderTimeAsDimensionNameWithHavingAndLimit()
|
||||||
|
{
|
||||||
|
GroupByQuery query = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "__time")))
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
Arrays.asList(
|
||||||
|
QueryRunnerTestHelper.rowsCount,
|
||||||
|
new LongSumAggregatorFactory("idx", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.setHavingSpec(
|
||||||
|
new OrHavingSpec(
|
||||||
|
ImmutableList.<HavingSpec>of(
|
||||||
|
new DimensionSelectorHavingSpec("__time", "automotive", null),
|
||||||
|
new DimensionSelectorHavingSpec("__time", "business", null)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setLimitSpec(
|
||||||
|
new DefaultLimitSpec(
|
||||||
|
ImmutableList.of(new OrderByColumnSpec("__time", OrderByColumnSpec.Direction.DESCENDING)),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Row> expectedResults = Arrays.asList(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "__time", "business", "rows", 1L, "idx", 118L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "__time", "automotive", "rows", 1L, "idx", 135L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "__time", "business", "rows", 1L, "idx", 112L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "__time", "automotive", "rows", 1L, "idx", 147L)
|
||||||
|
);
|
||||||
|
|
||||||
|
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEmptySubquery()
|
public void testEmptySubquery()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue