From ee0d4ea589234fb2d0467197939faafdd37804b4 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 6 Jun 2019 14:39:32 -0700 Subject: [PATCH] add bloom filter fallback aggregator when types are unknown (#7719) --- .../bloom/BloomFilterAggregatorFactory.java | 149 ++++++++---------- .../bloom/BloomFilterMergeAggregator.java | 2 +- .../bloom/ObjectBloomFilterAggregator.java | 66 ++++++++ .../bloom/StringBloomFilterAggregator.java | 6 +- .../bloom/BloomFilterGroupByQueryTest.java | 79 +++++++++- 5 files changed, 215 insertions(+), 87 deletions(-) create mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index 2c633f2dde5..53839e82f7a 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -82,95 +82,13 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory @Override public Aggregator factorize(ColumnSelectorFactory columnFactory) { - ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension()); - - if (capabilities == null) { - BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); - if (selector instanceof NilColumnValueSelector) { - // BloomKFilter must be the same size so we cannot use a constant for the empty agg - return new NoopBloomFilterAggregator(maxNumEntries, true); - } - throw new IAE( - "Cannot create bloom filter buffer aggregator for column selector type [%s]", - selector.getClass().getName() - ); - } - ValueType type = capabilities.getType(); - switch (type) { - case STRING: - return new StringBloomFilterAggregator( - columnFactory.makeDimensionSelector(field), - maxNumEntries, - true - ); - case LONG: - return new LongBloomFilterAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), - maxNumEntries, - true - ); - case FLOAT: - return new FloatBloomFilterAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), - maxNumEntries, - true - ); - case DOUBLE: - return new DoubleBloomFilterAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), - maxNumEntries, - true - ); - default: - throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type); - } + return factorizeInternal(columnFactory, true); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) { - ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension()); - - if (capabilities == null) { - BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); - if (selector instanceof NilColumnValueSelector) { - return new NoopBloomFilterAggregator(maxNumEntries, false); - } - throw new IAE( - "Cannot create bloom filter buffer aggregator for column selector type [%s]", - selector.getClass().getName() - ); - } - - ValueType type = capabilities.getType(); - switch (type) { - case STRING: - return new StringBloomFilterAggregator( - columnFactory.makeDimensionSelector(field), - maxNumEntries, - false - ); - case LONG: - return new LongBloomFilterAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), - maxNumEntries, - false - ); - case FLOAT: - return new FloatBloomFilterAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), - maxNumEntries, - false - ); - case DOUBLE: - return new DoubleBloomFilterAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), - maxNumEntries, - false - ); - default: - throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type); - } + return factorizeInternal(columnFactory, false); } @Override @@ -310,4 +228,67 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory ", maxNumEntries=" + maxNumEntries + '}'; } + + private BaseBloomFilterAggregator factorizeInternal(ColumnSelectorFactory columnFactory, boolean onHeap) + { + if (field == null || field.getDimension() == null) { + return new NoopBloomFilterAggregator(maxNumEntries, onHeap); + } + + ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension()); + + if (capabilities != null) { + ValueType type = capabilities.getType(); + switch (type) { + case STRING: + return new StringBloomFilterAggregator( + columnFactory.makeDimensionSelector(field), + maxNumEntries, + onHeap + ); + case LONG: + return new LongBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + onHeap + ); + case FLOAT: + return new FloatBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + onHeap + ); + case DOUBLE: + return new DoubleBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + onHeap + ); + case COMPLEX: + // in an ideal world, we would check complex type, but until then assume it's a bloom filter + return new BloomFilterMergeAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + onHeap + ); + default: + throw new IAE( + "Cannot create bloom filter %s for invalid column type [%s]", + onHeap ? "aggregator" : "buffer aggregator", + type + ); + } + } else { + BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); + if (selector instanceof NilColumnValueSelector) { + return new NoopBloomFilterAggregator(maxNumEntries, onHeap); + } + // no column capabilities, use fallback 'object' aggregator + return new ObjectBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + onHeap + ); + } + } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java index 011f2f6c563..6855d83a35e 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java @@ -27,7 +27,7 @@ import java.nio.ByteBuffer; public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator> { - public BloomFilterMergeAggregator(ColumnValueSelector selector, int maxNumEntries, boolean onHeap) + BloomFilterMergeAggregator(ColumnValueSelector selector, int maxNumEntries, boolean onHeap) { super(selector, maxNumEntries, onHeap); } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java new file mode 100644 index 00000000000..bc97fab800a --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; + +import java.nio.ByteBuffer; + +/** + * Handles "unknown" columns by examining what comes out of the selector + */ +class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator +{ + ObjectBloomFilterAggregator( + ColumnValueSelector selector, + int maxNumEntries, + boolean onHeap + ) + { + super(selector, maxNumEntries, onHeap); + } + + @Override + void bufferAdd(ByteBuffer buf) + { + final Object object = selector.getObject(); + if (object instanceof ByteBuffer) { + final ByteBuffer other = (ByteBuffer) object; + BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); + } else { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + if (object instanceof Long) { + BloomKFilter.addLong(buf, selector.getLong()); + } else if (object instanceof Double) { + BloomKFilter.addDouble(buf, selector.getDouble()); + } else if (object instanceof Float) { + BloomKFilter.addFloat(buf, selector.getFloat()); + } else { + StringBloomFilterAggregator.stringBufferAdd(buf, (DimensionSelector) selector); + } + } else { + BloomKFilter.addBytes(buf, null, 0, 0); + } + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java index f3f6daec75e..db65ca5b2a6 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator { - StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap) { super(selector, maxNumEntries, onHeap); @@ -34,6 +33,11 @@ public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator @Override public void bufferAdd(ByteBuffer buf) + { + stringBufferAdd(buf, selector); + } + + static void stringBufferAdd(ByteBuffer buf, DimensionSelector selector) { if (selector.getRow().size() > 1) { selector.getRow().forEach(v -> { diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java index ce3b9322d74..5661e7b9e47 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java @@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.segment.TestHelper; import org.junit.After; import org.junit.Assert; @@ -61,6 +62,7 @@ public class BloomFilterGroupByQueryTest } private AggregationTestHelper helper; + private boolean isV2; @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @@ -72,6 +74,7 @@ public class BloomFilterGroupByQueryTest config, tempFolder ); + isV2 = config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2); } @Parameterized.Parameters(name = "{0}") @@ -93,7 +96,6 @@ public class BloomFilterGroupByQueryTest @Test public void testQuery() throws Exception { - String query = "{" + "\"queryType\": \"groupBy\"," + "\"dataSource\": \"test_datasource\"," @@ -115,6 +117,81 @@ public class BloomFilterGroupByQueryTest Assert.assertFalse(filter.testString("entertainment")); } + @Test + public void testNestedQuery() throws Exception + { + if (!isV2) { + return; + } + + String query = "{" + + "\"queryType\": \"groupBy\"," + + "\"dataSource\": {" + + "\"type\": \"query\"," + + "\"query\": {" + + "\"queryType\":\"groupBy\"," + + "\"dataSource\": \"test_datasource\"," + + "\"intervals\": [ \"1970/2050\" ]," + + "\"granularity\":\"ALL\"," + + "\"dimensions\":[]," + + "\"aggregations\": [{ \"type\":\"longSum\", \"name\":\"innerSum\", \"fieldName\":\"count\"}]" + + "}" + + "}," + + "\"granularity\": \"ALL\"," + + "\"dimensions\": []," + + "\"aggregations\": [" + + " { \"type\": \"bloom\", \"name\": \"bloom\", \"field\": \"innerSum\" }" + + "]," + + "\"intervals\": [ \"1970/2050\" ]" + + "}"; + + MapBasedRow row = ingestAndQuery(query); + + + BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("bloom")); + Assert.assertTrue(filter.testLong(13L)); + Assert.assertFalse(filter.testLong(5L)); + } + + + @Test + public void testNestedQueryComplex() throws Exception + { + if (!isV2) { + return; + } + + String query = "{" + + "\"queryType\": \"groupBy\"," + + "\"dataSource\": {" + + "\"type\": \"query\"," + + "\"query\": {" + + "\"queryType\":\"groupBy\"," + + "\"dataSource\": \"test_datasource\"," + + "\"intervals\": [ \"1970/2050\" ]," + + "\"granularity\":\"ALL\"," + + "\"dimensions\":[]," + + "\"filter\":{ \"type\":\"selector\", \"dimension\":\"market\", \"value\":\"upfront\"}," + + "\"aggregations\": [{ \"type\":\"bloom\", \"name\":\"innerBloom\", \"field\":\"quality\"}]" + + "}" + + "}," + + "\"granularity\": \"ALL\"," + + "\"dimensions\": []," + + "\"aggregations\": [" + + " { \"type\": \"bloom\", \"name\": \"innerBloom\", \"field\": \"innerBloom\" }" + + "]," + + "\"intervals\": [ \"1970/2050\" ]" + + "}"; + + MapBasedRow row = ingestAndQuery(query); + + + BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("innerBloom")); + Assert.assertTrue(filter.testString("mezzanine")); + Assert.assertTrue(filter.testString("premium")); + Assert.assertFalse(filter.testString("entertainment")); + } + @Test public void testQueryFakeDimension() throws Exception {