add bloom filter fallback aggregator when types are unknown (#7719)

This commit is contained in:
Clint Wylie 2019-06-06 14:39:32 -07:00 committed by Gian Merlino
parent 4dd446bfdd
commit ee0d4ea589
5 changed files with 215 additions and 87 deletions

View File

@ -82,95 +82,13 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory columnFactory)
{
ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
if (capabilities == null) {
BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
if (selector instanceof NilColumnValueSelector) {
// BloomKFilter must be the same size so we cannot use a constant for the empty agg
return new NoopBloomFilterAggregator(maxNumEntries, true);
}
throw new IAE(
"Cannot create bloom filter buffer aggregator for column selector type [%s]",
selector.getClass().getName()
);
}
ValueType type = capabilities.getType();
switch (type) {
case STRING:
return new StringBloomFilterAggregator(
columnFactory.makeDimensionSelector(field),
maxNumEntries,
true
);
case LONG:
return new LongBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
true
);
case FLOAT:
return new FloatBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
true
);
case DOUBLE:
return new DoubleBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
true
);
default:
throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type);
}
return factorizeInternal(columnFactory, true);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
{
ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
if (capabilities == null) {
BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
if (selector instanceof NilColumnValueSelector) {
return new NoopBloomFilterAggregator(maxNumEntries, false);
}
throw new IAE(
"Cannot create bloom filter buffer aggregator for column selector type [%s]",
selector.getClass().getName()
);
}
ValueType type = capabilities.getType();
switch (type) {
case STRING:
return new StringBloomFilterAggregator(
columnFactory.makeDimensionSelector(field),
maxNumEntries,
false
);
case LONG:
return new LongBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
false
);
case FLOAT:
return new FloatBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
false
);
case DOUBLE:
return new DoubleBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
false
);
default:
throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type);
}
return factorizeInternal(columnFactory, false);
}
@Override
@ -310,4 +228,67 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
", maxNumEntries=" + maxNumEntries +
'}';
}
private BaseBloomFilterAggregator factorizeInternal(ColumnSelectorFactory columnFactory, boolean onHeap)
{
if (field == null || field.getDimension() == null) {
return new NoopBloomFilterAggregator(maxNumEntries, onHeap);
}
ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
if (capabilities != null) {
ValueType type = capabilities.getType();
switch (type) {
case STRING:
return new StringBloomFilterAggregator(
columnFactory.makeDimensionSelector(field),
maxNumEntries,
onHeap
);
case LONG:
return new LongBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
case FLOAT:
return new FloatBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
case DOUBLE:
return new DoubleBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
case COMPLEX:
// in an ideal world, we would check complex type, but until then assume it's a bloom filter
return new BloomFilterMergeAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
default:
throw new IAE(
"Cannot create bloom filter %s for invalid column type [%s]",
onHeap ? "aggregator" : "buffer aggregator",
type
);
}
} else {
BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
if (selector instanceof NilColumnValueSelector) {
return new NoopBloomFilterAggregator(maxNumEntries, onHeap);
}
// no column capabilities, use fallback 'object' aggregator
return new ObjectBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
}
}
}

View File

@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<ByteBuffer>>
{
public BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
{
super(selector, maxNumEntries, onHeap);
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.bloom;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
/**
* Handles "unknown" columns by examining what comes out of the selector
*/
class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator<ColumnValueSelector>
{
ObjectBloomFilterAggregator(
ColumnValueSelector selector,
int maxNumEntries,
boolean onHeap
)
{
super(selector, maxNumEntries, onHeap);
}
@Override
void bufferAdd(ByteBuffer buf)
{
final Object object = selector.getObject();
if (object instanceof ByteBuffer) {
final ByteBuffer other = (ByteBuffer) object;
BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
} else {
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
if (object instanceof Long) {
BloomKFilter.addLong(buf, selector.getLong());
} else if (object instanceof Double) {
BloomKFilter.addDouble(buf, selector.getDouble());
} else if (object instanceof Float) {
BloomKFilter.addFloat(buf, selector.getFloat());
} else {
StringBloomFilterAggregator.stringBufferAdd(buf, (DimensionSelector) selector);
}
} else {
BloomKFilter.addBytes(buf, null, 0, 0);
}
}
}
}

View File

@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator<DimensionSelector>
{
StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap)
{
super(selector, maxNumEntries, onHeap);
@ -34,6 +33,11 @@ public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator
@Override
public void bufferAdd(ByteBuffer buf)
{
stringBufferAdd(buf, selector);
}
static void stringBufferAdd(ByteBuffer buf, DimensionSelector selector)
{
if (selector.getRow().size() > 1) {
selector.getRow().forEach(v -> {

View File

@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.segment.TestHelper;
import org.junit.After;
import org.junit.Assert;
@ -61,6 +62,7 @@ public class BloomFilterGroupByQueryTest
}
private AggregationTestHelper helper;
private boolean isV2;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@ -72,6 +74,7 @@ public class BloomFilterGroupByQueryTest
config,
tempFolder
);
isV2 = config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2);
}
@Parameterized.Parameters(name = "{0}")
@ -93,7 +96,6 @@ public class BloomFilterGroupByQueryTest
@Test
public void testQuery() throws Exception
{
String query = "{"
+ "\"queryType\": \"groupBy\","
+ "\"dataSource\": \"test_datasource\","
@ -115,6 +117,81 @@ public class BloomFilterGroupByQueryTest
Assert.assertFalse(filter.testString("entertainment"));
}
@Test
public void testNestedQuery() throws Exception
{
if (!isV2) {
return;
}
String query = "{"
+ "\"queryType\": \"groupBy\","
+ "\"dataSource\": {"
+ "\"type\": \"query\","
+ "\"query\": {"
+ "\"queryType\":\"groupBy\","
+ "\"dataSource\": \"test_datasource\","
+ "\"intervals\": [ \"1970/2050\" ],"
+ "\"granularity\":\"ALL\","
+ "\"dimensions\":[],"
+ "\"aggregations\": [{ \"type\":\"longSum\", \"name\":\"innerSum\", \"fieldName\":\"count\"}]"
+ "}"
+ "},"
+ "\"granularity\": \"ALL\","
+ "\"dimensions\": [],"
+ "\"aggregations\": ["
+ " { \"type\": \"bloom\", \"name\": \"bloom\", \"field\": \"innerSum\" }"
+ "],"
+ "\"intervals\": [ \"1970/2050\" ]"
+ "}";
MapBasedRow row = ingestAndQuery(query);
BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("bloom"));
Assert.assertTrue(filter.testLong(13L));
Assert.assertFalse(filter.testLong(5L));
}
@Test
public void testNestedQueryComplex() throws Exception
{
if (!isV2) {
return;
}
String query = "{"
+ "\"queryType\": \"groupBy\","
+ "\"dataSource\": {"
+ "\"type\": \"query\","
+ "\"query\": {"
+ "\"queryType\":\"groupBy\","
+ "\"dataSource\": \"test_datasource\","
+ "\"intervals\": [ \"1970/2050\" ],"
+ "\"granularity\":\"ALL\","
+ "\"dimensions\":[],"
+ "\"filter\":{ \"type\":\"selector\", \"dimension\":\"market\", \"value\":\"upfront\"},"
+ "\"aggregations\": [{ \"type\":\"bloom\", \"name\":\"innerBloom\", \"field\":\"quality\"}]"
+ "}"
+ "},"
+ "\"granularity\": \"ALL\","
+ "\"dimensions\": [],"
+ "\"aggregations\": ["
+ " { \"type\": \"bloom\", \"name\": \"innerBloom\", \"field\": \"innerBloom\" }"
+ "],"
+ "\"intervals\": [ \"1970/2050\" ]"
+ "}";
MapBasedRow row = ingestAndQuery(query);
BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("innerBloom"));
Assert.assertTrue(filter.testString("mezzanine"));
Assert.assertTrue(filter.testString("premium"));
Assert.assertFalse(filter.testString("entertainment"));
}
@Test
public void testQueryFakeDimension() throws Exception
{