Fix FilteredAggregators at ingestion time and in groupBy v2 nested queries. (#3312)

The common theme between the two is they both create "fake" DimensionSelectors
that work on top of Rows. They both do it because there isn't really any
dictionary for the underlying Rows, they're just a stream of data. The fix for
both is to allow a DimensionSelector to tell callers that it has no dictionary
by returning CARDINALITY_UNKNOWN from getValueCardinality. The callers, in
turn, can avoid using it in ways that assume it has a dictionary.

Fixes #3311.
This commit is contained in:
Gian Merlino 2016-08-02 17:39:40 -07:00 committed by Fangjin Yang
parent ae3e0015b6
commit 0299ac73b8
16 changed files with 188 additions and 84 deletions

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.metamx.common.UOE;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.DruidLongPredicate;
@ -43,6 +42,7 @@ import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public class FilteredAggregatorFactory extends AggregatorFactory
{
@ -246,7 +246,12 @@ public class FilteredAggregatorFactory extends AggregatorFactory
return new BooleanValueMatcher(valueString == null);
}
final int cardinality = selector.getValueCardinality();
if (cardinality >= 0) {
// Dictionary-encoded dimension. Compare by id instead of by value to save time.
final int valueId = selector.lookupId(valueString);
return new ValueMatcher()
{
@Override
@ -267,6 +272,29 @@ public class FilteredAggregatorFactory extends AggregatorFactory
}
}
};
} else {
// Not dictionary-encoded. Skip the optimization.
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return valueString == null;
} else {
for (int i = 0; i < size; ++i) {
if (Objects.equals(selector.lookupName(row.get(i)), valueString)) {
return true;
}
}
return false;
}
}
};
}
}
public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory)
@ -294,8 +322,10 @@ public class FilteredAggregatorFactory extends AggregatorFactory
return new BooleanValueMatcher(doesMatchNull);
}
// Check every value in the dimension, as a String.
final int cardinality = selector.getValueCardinality();
if (cardinality >= 0) {
// Dictionary-encoded dimension. Check every value; build a bitset of matching ids.
final BitSet valueIds = new BitSet(cardinality);
for (int i = 0; i < cardinality; i++) {
if (predicate.apply(selector.lookupName(i))) {
@ -323,6 +353,29 @@ public class FilteredAggregatorFactory extends AggregatorFactory
}
}
};
} else {
// Not dictionary-encoded. Skip the optimization.
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return doesMatchNull;
} else {
for (int i = 0; i < size; ++i) {
if (predicate.apply(selector.lookupName(row.get(i)))) {
return true;
}
}
return false;
}
}
};
}
}
private ValueMatcher makeLongValueMatcher(String dimension, DruidLongPredicate predicate)

View File

@ -74,7 +74,10 @@ public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
return selector;
}
int selectorCardinality = selector.getValueCardinality();
final int selectorCardinality = selector.getValueCardinality();
if (selectorCardinality < 0) {
throw new UnsupportedOperationException("Cannot decorate a selector with no dictionary");
}
int cardinality = isWhitelist ? values.size() : selectorCardinality - values.size();
int count = 0;

View File

@ -68,7 +68,12 @@ public class RegexFilteredDimensionSpec extends BaseFilteredDimensionSpec
int count = 0;
final Map<Integer,Integer> forwardMapping = new HashMap<>();
for (int i = 0; i < selector.getValueCardinality(); i++) {
final int selectorCardinality = selector.getValueCardinality();
if (selectorCardinality < 0) {
throw new UnsupportedOperationException("Cannot decorate a selector with no dictionary");
}
for (int i = 0; i < selectorCardinality; i++) {
if (compiledRegex.matcher(Strings.nullToEmpty(selector.lookupName(i))).matches()) {
forwardMapping.put(i, count++);
}

View File

@ -68,6 +68,8 @@ import java.util.NoSuchElementException;
*/
public class GroupByQueryEngine
{
private static final int MISSING_VALUE = -1;
private final Supplier<GroupByQueryConfig> config;
private final StupidPool<ByteBuffer> intermediateResultsBufferPool;
@ -189,7 +191,7 @@ public class GroupByQueryEngine
final IndexedInts row = dimSelector.getRow();
if (row == null || row.size() == 0) {
ByteBuffer newKey = key.duplicate();
newKey.putInt(dimSelector.getValueCardinality());
newKey.putInt(MISSING_VALUE);
unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size()));
} else {
for (Integer dimValue : row) {
@ -405,7 +407,7 @@ public class GroupByQueryEngine
for (int i = 0; i < dimensions.size(); ++i) {
final DimensionSelector dimSelector = dimensions.get(i);
final int dimVal = keyBuffer.getInt();
if (dimSelector.getValueCardinality() != dimVal) {
if (MISSING_VALUE != dimVal) {
theEvent.put(dimNames.get(i), dimSelector.lookupName(dimVal));
}
}

View File

@ -35,7 +35,6 @@ import com.metamx.common.guava.Accumulator;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
@ -527,7 +526,7 @@ public class RowBasedGrouperHelper
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown");
return DimensionSelector.CARDINALITY_UNKNOWN;
}
@Override

View File

@ -61,7 +61,6 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
return new TopNParams(
dimSelector,
cursor,
dimSelector.getValueCardinality(),
Integer.MAX_VALUE
);
}
@ -129,7 +128,7 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
private int[] getDimValSelectorForTopNMetric(TopNParams params, TopNResultBuilder resultBuilder)
{
int[] dimValSelector = new int[params.getDimSelector().getValueCardinality()];
int[] dimValSelector = new int[params.getCardinality()];
Arrays.fill(dimValSelector, SKIP_POSITION_VALUE);
Iterator<DimValHolder> dimValIter = resultBuilder.getTopNIterator();

View File

@ -196,6 +196,10 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
ignoreAfterThreshold = false;
ignoreFirstN = 0;
keepOnlyN = dimSelector.getValueCardinality();
if (keepOnlyN < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with no dictionary");
}
}
@Override

View File

@ -54,7 +54,6 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
return new TopNParams(
dimSelector,
cursor,
dimSelector.getValueCardinality(),
Integer.MAX_VALUE
);
}

View File

@ -67,6 +67,10 @@ public class PooledTopNAlgorithm
final int cardinality = dimSelector.getValueCardinality();
if (cardinality < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with no dictionary");
}
final TopNMetricSpecBuilder<int[]> arrayProvider = new BaseArrayProvider<int[]>(
dimSelector,
query,
@ -102,7 +106,6 @@ public class PooledTopNAlgorithm
return PooledTopNParams.builder()
.withDimSelector(dimSelector)
.withCursor(cursor)
.withCardinality(cardinality)
.withResultsBufHolder(resultsBufHolder)
.withResultsBuf(resultsBuf)
.withArrayProvider(arrayProvider)
@ -517,7 +520,6 @@ public class PooledTopNAlgorithm
public PooledTopNParams(
DimensionSelector dimSelector,
Cursor cursor,
int cardinality,
ResourceHolder<ByteBuffer> resultsBufHolder,
ByteBuffer resultsBuf,
int[] aggregatorSizes,
@ -526,7 +528,7 @@ public class PooledTopNAlgorithm
TopNMetricSpecBuilder<int[]> arrayProvider
)
{
super(dimSelector, cursor, cardinality, numValuesPerPass);
super(dimSelector, cursor, numValuesPerPass);
this.resultsBufHolder = resultsBufHolder;
this.resultsBuf = resultsBuf;
@ -569,7 +571,6 @@ public class PooledTopNAlgorithm
{
private DimensionSelector dimSelector;
private Cursor cursor;
private int cardinality;
private ResourceHolder<ByteBuffer> resultsBufHolder;
private ByteBuffer resultsBuf;
private int[] aggregatorSizes;
@ -581,7 +582,6 @@ public class PooledTopNAlgorithm
{
dimSelector = null;
cursor = null;
cardinality = 0;
resultsBufHolder = null;
resultsBuf = null;
aggregatorSizes = null;
@ -602,12 +602,6 @@ public class PooledTopNAlgorithm
return this;
}
public Builder withCardinality(int cardinality)
{
this.cardinality = cardinality;
return this;
}
public Builder withResultsBufHolder(ResourceHolder<ByteBuffer> resultsBufHolder)
{
this.resultsBufHolder = resultsBufHolder;
@ -649,7 +643,6 @@ public class PooledTopNAlgorithm
return new PooledTopNParams(
dimSelector,
cursor,
cardinality,
resultsBufHolder,
resultsBuf,
aggregatorSizes,

View File

@ -45,7 +45,6 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
return new TopNParams(
dimSelector,
cursor,
dimSelector.getValueCardinality(),
Integer.MAX_VALUE
);
}

View File

@ -34,14 +34,17 @@ public class TopNParams
protected TopNParams(
DimensionSelector dimSelector,
Cursor cursor,
int cardinality,
int numValuesPerPass
)
{
this.dimSelector = dimSelector;
this.cursor = cursor;
this.cardinality = cardinality;
this.cardinality = dimSelector.getValueCardinality();
this.numValuesPerPass = numValuesPerPass;
if (cardinality < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension without a dictionary");
}
}
public DimensionSelector getDimSelector()

View File

@ -23,6 +23,8 @@ package io.druid.segment;import io.druid.segment.data.IndexedInts;
*/
public interface DimensionSelector
{
public static int CARDINALITY_UNKNOWN = -1;
/**
* Gets all values for the row inside of an IntBuffer. I.e. one possible implementation could be
*
@ -42,7 +44,12 @@ public interface DimensionSelector
*
* Value cardinality would be 2.
*
* @return the value cardinality
* Cardinality may be unknown (e.g. the selector used by IncrementalIndex while reading input rows),
* in which case this method will return -1. If cardinality is unknown, you should assume this
* dimension selector has no dictionary, and avoid storing ids, calling "lookupId", or calling "lookupName"
* outside of the context of operating on a single row.
*
* @return the value cardinality, or -1 if unknown.
*/
public int getValueCardinality();

View File

@ -333,7 +333,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
return DimensionSelector.CARDINALITY_UNKNOWN;
}
@Override

View File

@ -4589,20 +4589,12 @@ public class GroupByQueryRunnerTest
.setGranularity(QueryRunnerTestHelper.allGran)
.build();
// v2 strategy would throw an exception for this because the dimension selector in GroupByRowProcessor is not
// dictionary encoded, but instead require the row to be set when doing lookup. Null Pointer Exception occurs
// when the filter aggregator is initialized with no row set.
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(NullPointerException.class);
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else {
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "rows", 837L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
}
@Test
public void testSubqueryWithOuterTimeFilter()

View File

@ -48,7 +48,11 @@ import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.ordering.StringComparators;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@ -289,6 +293,48 @@ public class IncrementalIndexTest
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
}
@Test
public void testFilteredAggregators() throws Exception
{
long timestamp = System.currentTimeMillis();
IncrementalIndex index = closer.closeLater(
indexCreator.createIndex(new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new FilteredAggregatorFactory(
new CountAggregatorFactory("count_selector_filtered"),
new SelectorDimFilter("dim2", "2", null)
),
new FilteredAggregatorFactory(
new CountAggregatorFactory("count_bound_filtered"),
new BoundDimFilter("dim2", "2", "3", false, true, null, null, StringComparators.NUMERIC)
)
})
);
populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
Assert.assertEquals(
Arrays.asList("count", "count_selector_filtered", "count_bound_filtered"),
index.getMetricNames()
);
Assert.assertEquals(2, index.size());
final Iterator<Row> rows = index.iterator();
Row row = rows.next();
Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(Arrays.asList("1"), row.getDimension("dim1"));
Assert.assertEquals(Arrays.asList("2"), row.getDimension("dim2"));
Assert.assertEquals(1L, row.getLongMetric("count"));
Assert.assertEquals(1L, row.getLongMetric("count_selector_filtered"));
Assert.assertEquals(1L, row.getLongMetric("count_bound_filtered"));
row = rows.next();
Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(1L, row.getLongMetric("count"));
Assert.assertEquals(0L, row.getLongMetric("count_selector_filtered"));
Assert.assertEquals(0L, row.getLongMetric("count_bound_filtered"));
}
@Test
public void testSingleThreadedIndexingAndQuery() throws Exception
{

View File

@ -143,10 +143,10 @@ public class IncrementalIndexStorageAdapterTest
Assert.assertEquals(2, results.size());
MapBasedRow row = (MapBasedRow) results.get(0);
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1L), row.getEvent());
row = (MapBasedRow) results.get(1);
Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1L), row.getEvent());
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
}
@Test