mirror of https://github.com/apache/druid.git
Add ability to pass in Bloom filter from Hive Queries (#6222)
* Bloom filter initial implementation fix checkstyle review comments Fix wierd failure review comments Revert "Fix wierd failure" This reverts commit a13a83ad7887e679f6d539191b52aeaaea85b613. * fix test * review comment
This commit is contained in:
parent
0f2dfe6fe8
commit
c9d281a2e9
|
@ -45,7 +45,6 @@
|
|||
<!-- the default value is a repeated flag from the command line, since blank value is not allowed -->
|
||||
<druid.distribution.pulldeps.opts>--clean</druid.distribution.pulldeps.opts>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>dist</id>
|
||||
|
@ -91,6 +90,8 @@
|
|||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions:druid-avro-extensions</argument>
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions:druid-bloom-filter</argument>
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions:druid-datasketches</argument>
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions:druid-hdfs-storage</argument>
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
|
||||
# Druid Bloom Filter
|
||||
|
||||
Make sure to [include](../../operations/including-extensions.html) `druid-bloom-filter` as an extension.
|
||||
|
||||
BloomFilter is a probabilistic data structure for set membership check.
|
||||
Following are some characterstics of BloomFilter
|
||||
- BloomFilters are highly space efficient when compared to using a HashSet.
|
||||
- Because of the probabilistic nature of bloom filter false positive (element not present in bloom filter but test() says true) are possible
|
||||
- false negatives are not possible (if element is present then test() will never say false).
|
||||
- The false positive probability is configurable (default: 5%) depending on which storage requirement may increase or decrease.
|
||||
- Lower the false positive probability greater is the space requirement.
|
||||
- Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
|
||||
- During the creation of bloom filter expected number of entries must be specified.If the number of insertions exceed the specified initial number of entries then false positive probability will increase accordingly.
|
||||
|
||||
Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash algorithm.
|
||||
|
||||
### Json Representation of Bloom Filter
|
||||
```json
|
||||
{
|
||||
"type" : "bloom",
|
||||
"dimension" : <dimension_name>,
|
||||
"bloomKFilter" : <serialized_bytes_for_BloomKFilter>,
|
||||
"extractionFn" : <extraction_fn>
|
||||
}
|
||||
```
|
||||
|
||||
|Property |Description |required? |
|
||||
|-------------------------|------------------------------|----------------------------------|
|
||||
|`type` |Filter Type. Should always be `bloom`|yes|
|
||||
|`dimension` |The dimension to filter over. | yes |
|
||||
|`bloomKFilter` |Base64 encoded Binary representation of `org.apache.hive.common.util.BloomKFilter`| yes |
|
||||
|`extractionFn`|[Extraction function](./../dimensionspecs.html#extraction-functions) to apply to the dimension values |no|
|
||||
|
||||
|
||||
### Serialized Format for BloomKFilter
|
||||
Serialized BloomKFilter format:
|
||||
- 1 byte for the number of hash functions.
|
||||
- 1 big endian int(That is how OutputStream works) for the number of longs in the bitset
|
||||
- big endian longs in the BloomKFilter bitset
|
||||
|
||||
Note: `org.apache.hive.common.util.BloomKFilter` provides a serialize method which can be used to serialize bloom filters to outputStream.
|
|
@ -23,6 +23,7 @@ Core extensions are maintained by Druid committers.
|
|||
|----|-----------|----|
|
||||
|druid-avro-extensions|Support for data in Apache Avro data format.|[link](../development/extensions-core/avro.html)|
|
||||
|druid-basic-security|Support for Basic HTTP authentication and role-based access control.|[link](../development/extensions-core/druid-basic-security.html)|
|
||||
|druid-bloom-filter|Support for providing Bloom filters in druid queries.|[link](../development/extensions-core/bloom-filter.html)|
|
||||
|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/caffeine-cache.html)|
|
||||
|druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-extension.html)|
|
||||
|druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)|
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.apache.druid.extensions</groupId>
|
||||
<artifactId>druid-bloom-filter</artifactId>
|
||||
<name>druid-bloom-filter</name>
|
||||
<description>druid-bloom-filter</description>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.13.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-storage-api</artifactId>
|
||||
<version>2.7.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.google.inject.Binder;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class BloomFilterExtensionModule implements DruidModule
|
||||
{
|
||||
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return Collections.singletonList(new BloomFilterSerializersModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import org.apache.druid.query.filter.BloomDimFilter;
|
||||
import org.apache.hive.common.util.BloomKFilter;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
public class BloomFilterSerializersModule extends SimpleModule
|
||||
{
|
||||
public static String BLOOM_FILTER_TYPE_NAME = "bloom";
|
||||
|
||||
public BloomFilterSerializersModule()
|
||||
{
|
||||
registerSubtypes(
|
||||
new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME)
|
||||
);
|
||||
addSerializer(BloomKFilter.class, new BloomKFilterSerializer());
|
||||
addDeserializer(BloomKFilter.class, new BloomKFilterDeserializer());
|
||||
}
|
||||
|
||||
public static class BloomKFilterSerializer extends StdSerializer<BloomKFilter>
|
||||
{
|
||||
|
||||
public BloomKFilterSerializer()
|
||||
{
|
||||
super(BloomKFilter.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(
|
||||
BloomKFilter bloomKFilter, JsonGenerator jsonGenerator, SerializerProvider serializerProvider
|
||||
) throws IOException
|
||||
{
|
||||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
|
||||
BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter);
|
||||
byte[] bytes = byteArrayOutputStream.toByteArray();
|
||||
jsonGenerator.writeBinary(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
public static class BloomKFilterDeserializer extends StdDeserializer<BloomKFilter>
|
||||
{
|
||||
|
||||
protected BloomKFilterDeserializer()
|
||||
{
|
||||
super(BloomKFilter.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BloomKFilter deserialize(
|
||||
JsonParser jsonParser, DeserializationContext deserializationContext
|
||||
) throws IOException, JsonProcessingException
|
||||
{
|
||||
byte[] bytes = jsonParser.getBinaryValue();
|
||||
return BloomKFilter.deserialize(new ByteArrayInputStream(bytes));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,236 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.filter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.RangeSet;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.query.extraction.ExtractionFn;
|
||||
import org.apache.druid.segment.filter.DimensionPredicateFilter;
|
||||
import org.apache.hive.common.util.BloomKFilter;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BloomDimFilter implements DimFilter
|
||||
{
|
||||
|
||||
private final String dimension;
|
||||
private final BloomKFilter bloomKFilter;
|
||||
private final ExtractionFn extractionFn;
|
||||
|
||||
@JsonCreator
|
||||
public BloomDimFilter(
|
||||
@JsonProperty("dimension") String dimension,
|
||||
@JsonProperty("bloomKFilter") BloomKFilter bloomKFilter,
|
||||
@JsonProperty("extractionFn") ExtractionFn extractionFn
|
||||
)
|
||||
{
|
||||
Preconditions.checkArgument(dimension != null, "dimension must not be null");
|
||||
Preconditions.checkNotNull(bloomKFilter);
|
||||
this.dimension = dimension;
|
||||
this.bloomKFilter = bloomKFilter;
|
||||
this.extractionFn = extractionFn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
|
||||
try {
|
||||
BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException(StringUtils.format("Exception when generating cache key for [%s]", this), e);
|
||||
}
|
||||
byte[] bloomFilterBytes = byteArrayOutputStream.toByteArray();
|
||||
return new CacheKeyBuilder(DimFilterUtils.BLOOM_DIM_FILTER_CACHE_ID)
|
||||
.appendString(dimension)
|
||||
.appendByte(DimFilterUtils.STRING_SEPARATOR)
|
||||
.appendByteArray(extractionFn == null ? new byte[0] : extractionFn.getCacheKey())
|
||||
.appendByte(DimFilterUtils.STRING_SEPARATOR)
|
||||
.appendByteArray(bloomFilterBytes)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public DimFilter optimize()
|
||||
{
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Filter toFilter()
|
||||
{
|
||||
return new DimensionPredicateFilter(
|
||||
dimension,
|
||||
new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return str -> {
|
||||
if (str == null) {
|
||||
return bloomKFilter.testBytes(null, 0, 0);
|
||||
}
|
||||
return bloomKFilter.testString(str);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return bloomKFilter.testLong(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyNull()
|
||||
{
|
||||
return bloomKFilter.testBytes(null, 0, 0);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidFloatPredicate makeFloatPredicate()
|
||||
{
|
||||
return new DruidFloatPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyFloat(float input)
|
||||
{
|
||||
return bloomKFilter.testFloat(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyNull()
|
||||
{
|
||||
return bloomKFilter.testBytes(null, 0, 0);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidDoublePredicate makeDoublePredicate()
|
||||
{
|
||||
return new DruidDoublePredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyDouble(double input)
|
||||
{
|
||||
return bloomKFilter.testDouble(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyNull()
|
||||
{
|
||||
return bloomKFilter.testBytes(null, 0, 0);
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
extractionFn
|
||||
);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDimension()
|
||||
{
|
||||
return dimension;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public BloomKFilter getBloomKFilter()
|
||||
{
|
||||
return bloomKFilter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public ExtractionFn getExtractionFn()
|
||||
{
|
||||
return extractionFn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
return StringUtils.format("%s(%s) = %s", extractionFn, dimension, bloomKFilter);
|
||||
} else {
|
||||
return StringUtils.format("%s = %s", dimension, bloomKFilter);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BloomDimFilter that = (BloomDimFilter) o;
|
||||
|
||||
if (!dimension.equals(that.dimension)) {
|
||||
return false;
|
||||
}
|
||||
if (bloomKFilter != null ? !bloomKFilter.equals(that.bloomKFilter) : that.bloomKFilter != null) {
|
||||
return false;
|
||||
}
|
||||
return extractionFn != null ? extractionFn.equals(that.extractionFn) : that.extractionFn == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RangeSet<String> getDimensionRangeSet(String dimension)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HashSet<String> getRequiredColumns()
|
||||
{
|
||||
return Sets.newHashSet(dimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = dimension.hashCode();
|
||||
result = 31 * result + (bloomKFilter != null ? bloomKFilter.hashCode() : 0);
|
||||
result = 31 * result + (extractionFn != null ? extractionFn.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
org.apache.druid.guice.BloomFilterExtensionModule
|
|
@ -0,0 +1,388 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.filter;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.data.input.impl.MapInputRowParser;
|
||||
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.guice.BloomFilterSerializersModule;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.query.extraction.MapLookupExtractor;
|
||||
import org.apache.druid.query.extraction.TimeDimExtractionFn;
|
||||
import org.apache.druid.query.lookup.LookupExtractionFn;
|
||||
import org.apache.druid.query.lookup.LookupExtractor;
|
||||
import org.apache.druid.segment.IndexBuilder;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.filter.BaseFilterTest;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.hive.common.util.BloomKFilter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class BloomDimFilterTest extends BaseFilterTest
|
||||
{
|
||||
private static final String TIMESTAMP_COLUMN = "timestamp";
|
||||
|
||||
private static final InputRowParser<Map<String, Object>> PARSER = new MapInputRowParser(
|
||||
new TimeAndDimsParseSpec(
|
||||
new TimestampSpec(TIMESTAMP_COLUMN, "iso", DateTimes.of("2000")),
|
||||
new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim0", "dim1", "dim2", "dim3", "dim6")),
|
||||
null,
|
||||
null
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
private static final List<InputRow> ROWS = ImmutableList.of(
|
||||
PARSER.parseBatch(ImmutableMap.of(
|
||||
"dim0",
|
||||
"0",
|
||||
"dim1",
|
||||
"",
|
||||
"dim2",
|
||||
ImmutableList.of("a", "b"),
|
||||
"dim6",
|
||||
"2017-07-25"
|
||||
)).get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of(), "dim6", "2017-07-25"))
|
||||
.get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""), "dim6", "2017-05-25"))
|
||||
.get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "5", "dim1", "abc")).get(0)
|
||||
);
|
||||
|
||||
public BloomDimFilterTest(
|
||||
String testName,
|
||||
IndexBuilder indexBuilder,
|
||||
Function<IndexBuilder, Pair<StorageAdapter, Closeable>> finisher,
|
||||
boolean cnf,
|
||||
boolean optimize
|
||||
)
|
||||
{
|
||||
super(
|
||||
testName,
|
||||
ROWS,
|
||||
indexBuilder.schema(
|
||||
new IncrementalIndexSchema.Builder()
|
||||
.withDimensionsSpec(PARSER.getParseSpec().getDimensionsSpec()).build()
|
||||
),
|
||||
finisher,
|
||||
cnf,
|
||||
optimize
|
||||
);
|
||||
}
|
||||
|
||||
private static DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass()
|
||||
{
|
||||
mapper.registerModule(new BloomFilterSerializersModule());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception
|
||||
{
|
||||
BaseFilterTest.tearDown(BloomDimFilterTest.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws IOException
|
||||
{
|
||||
BloomKFilter bloomFilter = new BloomKFilter(1500);
|
||||
bloomFilter.addString("myTestString");
|
||||
BloomDimFilter bloomDimFilter = new BloomDimFilter(
|
||||
"abc",
|
||||
bloomFilter,
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
);
|
||||
DimFilter filter = mapper.readValue(mapper.writeValueAsBytes(bloomDimFilter), DimFilter.class);
|
||||
Assert.assertTrue(filter instanceof BloomDimFilter);
|
||||
BloomDimFilter serde = (BloomDimFilter) filter;
|
||||
Assert.assertEquals(bloomDimFilter.getDimension(), serde.getDimension());
|
||||
Assert.assertEquals(bloomDimFilter.getExtractionFn(), serde.getExtractionFn());
|
||||
Assert.assertTrue(bloomDimFilter.getBloomKFilter().testString("myTestString"));
|
||||
Assert.assertFalse(bloomDimFilter.getBloomKFilter().testString("not_match"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithTimeExtractionFnNull()
|
||||
{
|
||||
assertFilterMatches(new BloomDimFilter(
|
||||
"dim0",
|
||||
bloomKFilter(1000, null, ""),
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter(
|
||||
"dim6",
|
||||
bloomKFilter(1000, null, ""),
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
), ImmutableList.of("3", "4", "5"));
|
||||
assertFilterMatches(new BloomDimFilter(
|
||||
"dim6",
|
||||
bloomKFilter(1000, "2017-07"),
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(new BloomDimFilter(
|
||||
"dim6",
|
||||
bloomKFilter(1000, "2017-05"),
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
), ImmutableList.of("2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleValueStringColumnWithoutNulls()
|
||||
{
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, (String) null), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, ""), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, "0"), null), ImmutableList.of("0"));
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, "1"), null), ImmutableList.of("1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleValueStringColumnWithNulls()
|
||||
{
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, (String) null), null), ImmutableList.of("0"));
|
||||
} else {
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, (String) null), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, ""), null), ImmutableList.of("0"));
|
||||
}
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "10"), null), ImmutableList.of("1"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "2"), null), ImmutableList.of("2"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "1"), null), ImmutableList.of("3"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "def"), null), ImmutableList.of("4"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "abc"), null), ImmutableList.of("5"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "ab"), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiValueStringColumn()
|
||||
{
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim2", bloomKFilter(1000, (String) null), null),
|
||||
ImmutableList.of("1", "2", "5")
|
||||
);
|
||||
} else {
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim2", bloomKFilter(1000, (String) null), null),
|
||||
ImmutableList.of("1", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, ""), null), ImmutableList.of("2"));
|
||||
}
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "a"), null), ImmutableList.of("0", "3"));
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "b"), null), ImmutableList.of("0"));
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "c"), null), ImmutableList.of("4"));
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "d"), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingColumnSpecifiedInDimensionList()
|
||||
{
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim3", bloomKFilter(1000, (String) null), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, ""), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, "a"), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, "b"), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, "c"), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingColumnNotSpecifiedInDimensionList()
|
||||
{
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim4", bloomKFilter(1000, (String) null), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, ""), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, "a"), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, "b"), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, "c"), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpressionVirtualColumn()
|
||||
{
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("expr", bloomKFilter(1000, 1.1F), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("expr", bloomKFilter(1000, 1.2F), null), ImmutableList.of());
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("exprDouble", bloomKFilter(1000, 2.1D), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("exprDouble", bloomKFilter(1000, 2.2D), null), ImmutableList.of());
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("exprLong", bloomKFilter(1000, 3L), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("exprLong", bloomKFilter(1000, 4L), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectorWithLookupExtractionFn()
|
||||
{
|
||||
final Map<String, String> stringMap = ImmutableMap.of(
|
||||
"1", "HELLO",
|
||||
"a", "HELLO",
|
||||
"def", "HELLO",
|
||||
"abc", "UNKNOWN"
|
||||
);
|
||||
LookupExtractor mapExtractor = new MapLookupExtractor(stringMap, false);
|
||||
LookupExtractionFn lookupFn = new LookupExtractionFn(mapExtractor, false, "UNKNOWN", false, true);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of("1"));
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim0", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "2", "3", "4", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of("3", "4"));
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim1", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "1", "2", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of("0", "3"));
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim2", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "1", "2", "4", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of());
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim3", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of());
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim4", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
|
||||
final Map<String, String> stringMap2 = ImmutableMap.of(
|
||||
"2", "5"
|
||||
);
|
||||
LookupExtractor mapExtractor2 = new MapLookupExtractor(stringMap2, false);
|
||||
LookupExtractionFn lookupFn2 = new LookupExtractionFn(mapExtractor2, true, null, false, true);
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, "5"), lookupFn2), ImmutableList.of("2", "5"));
|
||||
|
||||
final Map<String, String> stringMap3 = ImmutableMap.of(
|
||||
"1", ""
|
||||
);
|
||||
LookupExtractor mapExtractor3 = new MapLookupExtractor(stringMap3, false);
|
||||
LookupExtractionFn lookupFn3 = new LookupExtractionFn(mapExtractor3, false, null, false, true);
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
// Nulls and empty strings are considered equivalent
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim0", bloomKFilter(1000, (String) null), lookupFn3),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
} else {
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim0", bloomKFilter(1000, (String) null), lookupFn3),
|
||||
ImmutableList.of("0", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim0", bloomKFilter(1000, ""), lookupFn3),
|
||||
ImmutableList.of("1")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static BloomKFilter bloomKFilter(int expectedEntries, String... values)
|
||||
{
|
||||
BloomKFilter filter = new BloomKFilter(expectedEntries);
|
||||
for (String value : values) {
|
||||
if (value == null) {
|
||||
filter.addBytes(null, 0, 0);
|
||||
} else {
|
||||
filter.addString(value);
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
private static BloomKFilter bloomKFilter(int expectedEntries, Float... values)
|
||||
{
|
||||
BloomKFilter filter = new BloomKFilter(expectedEntries);
|
||||
for (Float value : values) {
|
||||
if (value == null) {
|
||||
filter.addBytes(null, 0, 0);
|
||||
} else {
|
||||
filter.addFloat(value);
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
private static BloomKFilter bloomKFilter(int expectedEntries, Double... values)
|
||||
{
|
||||
BloomKFilter filter = new BloomKFilter(expectedEntries);
|
||||
for (Double value : values) {
|
||||
if (value == null) {
|
||||
filter.addBytes(null, 0, 0);
|
||||
} else {
|
||||
filter.addDouble(value);
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
private static BloomKFilter bloomKFilter(int expectedEntries, Long... values)
|
||||
{
|
||||
BloomKFilter filter = new BloomKFilter(expectedEntries);
|
||||
for (Long value : values) {
|
||||
if (value == null) {
|
||||
filter.addBytes(null, 0, 0);
|
||||
} else {
|
||||
filter.addLong(value);
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
}
|
1
pom.xml
1
pom.xml
|
@ -116,6 +116,7 @@
|
|||
<!-- Core extensions -->
|
||||
<module>extensions-core/avro-extensions</module>
|
||||
<module>extensions-core/datasketches</module>
|
||||
<module>extensions-core/druid-bloom-filter</module>
|
||||
<module>extensions-core/druid-kerberos</module>
|
||||
<module>extensions-core/hdfs-storage</module>
|
||||
<module>extensions-core/histogram</module>
|
||||
|
|
|
@ -52,8 +52,10 @@ public class DimFilterUtils
|
|||
static final byte COLUMN_COMPARISON_CACHE_ID = 0xD;
|
||||
static final byte EXPRESSION_CACHE_ID = 0xE;
|
||||
static final byte TRUE_CACHE_ID = 0xF;
|
||||
public static byte BLOOM_DIM_FILTER_CACHE_ID = 0x10;
|
||||
public static final byte STRING_SEPARATOR = (byte) 0xFF;
|
||||
|
||||
|
||||
static byte[] computeCacheKey(byte cacheIdKey, List<DimFilter> filters)
|
||||
{
|
||||
if (filters.size() == 1) {
|
||||
|
|
|
@ -82,7 +82,9 @@ public abstract class BaseFilterTest
|
|||
{
|
||||
private static final VirtualColumns VIRTUAL_COLUMNS = VirtualColumns.create(
|
||||
ImmutableList.of(
|
||||
new ExpressionVirtualColumn("expr", "1.0 + 0.1", ValueType.FLOAT, TestExprMacroTable.INSTANCE)
|
||||
new ExpressionVirtualColumn("expr", "1.0 + 0.1", ValueType.FLOAT, TestExprMacroTable.INSTANCE),
|
||||
new ExpressionVirtualColumn("exprDouble", "1.0 + 1.1", ValueType.DOUBLE, TestExprMacroTable.INSTANCE),
|
||||
new ExpressionVirtualColumn("exprLong", "1 + 2", ValueType.LONG, TestExprMacroTable.INSTANCE)
|
||||
)
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue