mirror of https://github.com/apache/druid.git
Support filtering on long columns (including __time) (#3180)
* Support filtering on __time column * Rename DruidPredicate * Add docs for ValueMatcherFactory, add comment on getColumnCapabilities * Combine ValueMatcherFactory predicate methods to accept DruidCompositePredicate * Address PR comments (support filter on all long columns) * Use predicate factory instead of composite predicate * Address PR comments * Lazily initialize long handling in selector/in filter * Move long value parsing from InFilter to InDimFilter, make long value parsing thread-safe * Add multithreaded selector/in filter test * Fix non-final lock object in SelectorDimFilter
This commit is contained in:
parent
cd7337fc8a
commit
a42ccb6d19
|
@ -29,6 +29,8 @@ import com.metamx.collections.bitmap.MutableBitmap;
|
|||
import com.metamx.collections.bitmap.RoaringBitmapFactory;
|
||||
import com.metamx.collections.spatial.ImmutableRTree;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
import io.druid.segment.data.BitmapSerdeFactory;
|
||||
import io.druid.segment.data.GenericIndexed;
|
||||
|
@ -63,16 +65,35 @@ public class DimensionPredicateFilterBenchmark
|
|||
|
||||
private static final DimensionPredicateFilter IS_EVEN = new DimensionPredicateFilter(
|
||||
"foo",
|
||||
new Predicate<String>()
|
||||
new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
if (input == null) {
|
||||
return false;
|
||||
}
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
if (input == null) {
|
||||
return false;
|
||||
}
|
||||
return Integer.parseInt(input.toString()) % 2 == 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return Integer.parseInt(input) % 2 == 0;
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
null
|
||||
|
|
|
@ -43,7 +43,10 @@ import io.druid.query.extraction.ExtractionFn;
|
|||
import io.druid.query.extraction.JavaScriptExtractionFn;
|
||||
import io.druid.query.filter.AndDimFilter;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.BoundDimFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.OrDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
|
@ -56,9 +59,11 @@ import io.druid.segment.LongColumnSelector;
|
|||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.QueryableIndexStorageAdapter;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.filter.AndFilter;
|
||||
import io.druid.segment.filter.BoundFilter;
|
||||
import io.druid.segment.filter.DimensionPredicateFilter;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import io.druid.segment.filter.OrFilter;
|
||||
|
@ -67,6 +72,7 @@ import io.druid.segment.incremental.IncrementalIndex;
|
|||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import org.joda.time.Interval;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
|
@ -109,6 +115,10 @@ public class FilterPartitionBenchmark
|
|||
private QueryableIndex qIndex;
|
||||
private File indexFile;
|
||||
|
||||
private Filter timeFilterNone;
|
||||
private Filter timeFilterHalf;
|
||||
private Filter timeFilterAll;
|
||||
|
||||
private BenchmarkSchemaInfo schemaInfo;
|
||||
|
||||
private static String JS_FN = "function(str) { return 'super-' + str; }";
|
||||
|
@ -168,6 +178,38 @@ public class FilterPartitionBenchmark
|
|||
new IndexSpec()
|
||||
);
|
||||
qIndex = INDEX_IO.loadIndex(indexFile);
|
||||
|
||||
Interval interval = schemaInfo.getDataInterval();
|
||||
timeFilterNone = new BoundFilter(new BoundDimFilter(
|
||||
Column.TIME_COLUMN_NAME,
|
||||
String.valueOf(Long.MAX_VALUE),
|
||||
String.valueOf(Long.MAX_VALUE),
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
null
|
||||
));
|
||||
|
||||
long halfEnd = (interval.getEndMillis() + interval.getStartMillis()) / 2;
|
||||
timeFilterHalf = new BoundFilter(new BoundDimFilter(
|
||||
Column.TIME_COLUMN_NAME,
|
||||
String.valueOf(interval.getStartMillis()),
|
||||
String.valueOf(halfEnd),
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
null
|
||||
));
|
||||
|
||||
timeFilterAll = new BoundFilter(new BoundDimFilter(
|
||||
Column.TIME_COLUMN_NAME,
|
||||
String.valueOf(interval.getStartMillis()),
|
||||
String.valueOf(interval.getEndMillis()),
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
null
|
||||
));
|
||||
}
|
||||
|
||||
private IncrementalIndex makeIncIndex()
|
||||
|
@ -215,6 +257,51 @@ public class FilterPartitionBenchmark
|
|||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
public void timeFilterNone(Blackhole blackhole) throws Exception
|
||||
{
|
||||
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
|
||||
Sequence<Cursor> cursors = sa.makeCursors(timeFilterNone, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);
|
||||
|
||||
Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
|
||||
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
|
||||
for (Long st : strings) {
|
||||
blackhole.consume(st);
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
public void timeFilterHalf(Blackhole blackhole) throws Exception
|
||||
{
|
||||
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
|
||||
Sequence<Cursor> cursors = sa.makeCursors(timeFilterHalf, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);
|
||||
|
||||
Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
|
||||
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
|
||||
for (Long st : strings) {
|
||||
blackhole.consume(st);
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
public void timeFilterAll(Blackhole blackhole) throws Exception
|
||||
{
|
||||
StorageAdapter sa = new QueryableIndexStorageAdapter(qIndex);
|
||||
Sequence<Cursor> cursors = sa.makeCursors(timeFilterAll, schemaInfo.getDataInterval(), QueryGranularities.ALL, false);
|
||||
|
||||
Sequence<List<Long>> longListSeq = readCursorsLong(cursors, blackhole);
|
||||
List<Long> strings = Sequences.toList(Sequences.limit(longListSeq, 1), Lists.<List<Long>>newArrayList()).get(0);
|
||||
for (Long st : strings) {
|
||||
blackhole.consume(st);
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
|
@ -442,7 +529,6 @@ public class FilterPartitionBenchmark
|
|||
public List<Long> apply(Cursor input)
|
||||
{
|
||||
List<Long> longvals = new ArrayList<Long>();
|
||||
|
||||
LongColumnSelector selector = input.makeLongColumnSelector("sumLongSequential");
|
||||
while (!input.isDone()) {
|
||||
long rowval = selector.get();
|
||||
|
@ -476,11 +562,11 @@ public class FilterPartitionBenchmark
|
|||
{
|
||||
public NoBitmapDimensionPredicateFilter(
|
||||
final String dimension,
|
||||
final Predicate<String> predicate,
|
||||
final DruidPredicateFactory predicateFactory,
|
||||
final ExtractionFn extractionFn
|
||||
)
|
||||
{
|
||||
super(dimension, predicate, extractionFn);
|
||||
super(dimension, predicateFactory, extractionFn);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -510,21 +596,37 @@ public class FilterPartitionBenchmark
|
|||
return new NoBitmapSelectorFilter(dimension, value);
|
||||
} else {
|
||||
final String valueOrNull = Strings.emptyToNull(value);
|
||||
final Predicate<String> predicate = new Predicate<String>()
|
||||
|
||||
final DruidPredicateFactory predicateFactory = new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return Objects.equals(valueOrNull, input);
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return Objects.equals(valueOrNull, input);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return value;
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
return new NoBitmapDimensionPredicateFilter(dimension, predicate, extractionFn);
|
||||
|
||||
return new NoBitmapDimensionPredicateFilter(dimension, predicateFactory, extractionFn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,316 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.benchmark;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import io.druid.benchmark.query.QueryBenchmarkUtil;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.js.JavaScriptConfig;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.FinalizeResultsQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.FilteredAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.extraction.JavaScriptExtractionFn;
|
||||
import io.druid.query.filter.AndDimFilter;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.BoundDimFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.InDimFilter;
|
||||
import io.druid.query.filter.JavaScriptDimFilter;
|
||||
import io.druid.query.filter.OrDimFilter;
|
||||
import io.druid.query.filter.RegexDimFilter;
|
||||
import io.druid.query.filter.SearchQueryDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.query.search.search.ContainsSearchQuerySpec;
|
||||
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||
import io.druid.query.spec.QuerySegmentSpec;
|
||||
import io.druid.query.timeseries.TimeseriesQuery;
|
||||
import io.druid.query.timeseries.TimeseriesQueryEngine;
|
||||
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
|
||||
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
|
||||
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.IndexMergerV9;
|
||||
import io.druid.segment.IndexSpec;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.QueryableIndexStorageAdapter;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.filter.AndFilter;
|
||||
import io.druid.segment.filter.DimensionPredicateFilter;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import io.druid.segment.filter.OrFilter;
|
||||
import io.druid.segment.filter.SelectorFilter;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@Fork(jvmArgsPrepend = "-server", value = 1)
|
||||
@Warmup(iterations = 10)
|
||||
@Measurement(iterations = 25)
|
||||
public class FilteredAggregatorBenchmark
|
||||
{
|
||||
@Param({"75000"})
|
||||
private int rowsPerSegment;
|
||||
|
||||
@Param({"basic"})
|
||||
private String schema;
|
||||
|
||||
private static final Logger log = new Logger(FilteredAggregatorBenchmark.class);
|
||||
private static final int RNG_SEED = 9999;
|
||||
private static final IndexMergerV9 INDEX_MERGER_V9;
|
||||
private static final IndexIO INDEX_IO;
|
||||
public static final ObjectMapper JSON_MAPPER;
|
||||
private IncrementalIndex incIndex;
|
||||
private IncrementalIndex incIndexFilteredAgg;
|
||||
private AggregatorFactory[] filteredMetrics;
|
||||
private QueryableIndex qIndex;
|
||||
private File indexFile;
|
||||
private DimFilter filter;
|
||||
private List<InputRow> inputRows;
|
||||
private QueryRunnerFactory factory;
|
||||
private BenchmarkSchemaInfo schemaInfo;
|
||||
private TimeseriesQuery query;
|
||||
|
||||
private static String JS_FN = "function(str) { return 'super-' + str; }";
|
||||
private static ExtractionFn JS_EXTRACTION_FN = new JavaScriptExtractionFn(JS_FN, false, JavaScriptConfig.getDefault());
|
||||
|
||||
static {
|
||||
JSON_MAPPER = new DefaultObjectMapper();
|
||||
INDEX_IO = new IndexIO(
|
||||
JSON_MAPPER,
|
||||
new ColumnConfig()
|
||||
{
|
||||
@Override
|
||||
public int columnCacheSizeBytes()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
);
|
||||
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
|
||||
}
|
||||
|
||||
@Setup
|
||||
public void setup() throws IOException
|
||||
{
|
||||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
}
|
||||
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
||||
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
|
||||
schemaInfo.getColumnSchemas(),
|
||||
RNG_SEED,
|
||||
schemaInfo.getDataInterval(),
|
||||
rowsPerSegment
|
||||
);
|
||||
|
||||
incIndex = makeIncIndex(schemaInfo.getAggsArray());
|
||||
|
||||
filter = new OrDimFilter(
|
||||
Arrays.asList(
|
||||
new BoundDimFilter("dimSequential", "-1", "-1", true, true, true, null),
|
||||
new JavaScriptDimFilter("dimSequential", "function(x) { return false }", null, JavaScriptConfig.getDefault()),
|
||||
new RegexDimFilter("dimSequential", "X", null),
|
||||
new SearchQueryDimFilter("dimSequential", new ContainsSearchQuerySpec("X", false), null),
|
||||
new InDimFilter("dimSequential", Arrays.asList("X"), null)
|
||||
)
|
||||
);
|
||||
filteredMetrics = new AggregatorFactory[1];
|
||||
filteredMetrics[0] = new FilteredAggregatorFactory(new CountAggregatorFactory("rows"), filter);
|
||||
incIndexFilteredAgg = makeIncIndex(filteredMetrics);
|
||||
|
||||
inputRows = new ArrayList<>();
|
||||
for (int j = 0; j < rowsPerSegment; j++) {
|
||||
InputRow row = gen.nextRow();
|
||||
if (j % 10000 == 0) {
|
||||
log.info(j + " rows generated.");
|
||||
}
|
||||
incIndex.add(row);
|
||||
inputRows.add(row);
|
||||
}
|
||||
|
||||
File tmpFile = Files.createTempDir();
|
||||
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
|
||||
tmpFile.deleteOnExit();
|
||||
|
||||
indexFile = INDEX_MERGER_V9.persist(
|
||||
incIndex,
|
||||
tmpFile,
|
||||
new IndexSpec()
|
||||
);
|
||||
qIndex = INDEX_IO.loadIndex(indexFile);
|
||||
|
||||
factory = new TimeseriesQueryRunnerFactory(
|
||||
new TimeseriesQueryQueryToolChest(
|
||||
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
),
|
||||
new TimeseriesQueryEngine(),
|
||||
QueryBenchmarkUtil.NOOP_QUERYWATCHER
|
||||
);
|
||||
|
||||
BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
|
||||
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Arrays.asList(basicSchema.getDataInterval()));
|
||||
List<AggregatorFactory> queryAggs = new ArrayList<>();
|
||||
queryAggs.add(filteredMetrics[0]);
|
||||
|
||||
query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("blah")
|
||||
.granularity(QueryGranularities.ALL)
|
||||
.intervals(intervalSpec)
|
||||
.aggregators(queryAggs)
|
||||
.descending(false)
|
||||
.build();
|
||||
}
|
||||
|
||||
private IncrementalIndex makeIncIndex(AggregatorFactory[] metrics)
|
||||
{
|
||||
return new OnheapIncrementalIndex(
|
||||
new IncrementalIndexSchema.Builder()
|
||||
.withQueryGranularity(QueryGranularities.NONE)
|
||||
.withMetrics(metrics)
|
||||
.withDimensionsSpec(new DimensionsSpec(null, null, null))
|
||||
.build(),
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
rowsPerSegment
|
||||
);
|
||||
}
|
||||
|
||||
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
|
||||
{
|
||||
QueryToolChest toolChest = factory.getToolchest();
|
||||
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
|
||||
toolChest
|
||||
);
|
||||
|
||||
Sequence<T> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
|
||||
return Sequences.toList(queryResult, Lists.<T>newArrayList());
|
||||
}
|
||||
|
||||
// Filtered agg doesn't work with ingestion, cardinality is not supported in incremental index
|
||||
// See https://github.com/druid-io/druid/issues/3164
|
||||
// @Benchmark
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
public void ingest(Blackhole blackhole) throws Exception
|
||||
{
|
||||
incIndexFilteredAgg = makeIncIndex(filteredMetrics);
|
||||
for (InputRow row : inputRows) {
|
||||
int rv = incIndexFilteredAgg.add(row);
|
||||
blackhole.consume(rv);
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
public void querySingleIncrementalIndex(Blackhole blackhole) throws Exception
|
||||
{
|
||||
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
|
||||
factory,
|
||||
"incIndex",
|
||||
new IncrementalIndexSegment(incIndex, "incIndex")
|
||||
);
|
||||
|
||||
List<Result<TimeseriesResultValue>> results = FilteredAggregatorBenchmark.runQuery(factory, runner, query);
|
||||
for (Result<TimeseriesResultValue> result : results) {
|
||||
blackhole.consume(result);
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
public void querySingleQueryableIndex(Blackhole blackhole) throws Exception
|
||||
{
|
||||
final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
|
||||
factory,
|
||||
"qIndex",
|
||||
new QueryableIndexSegment("qIndex", qIndex)
|
||||
);
|
||||
|
||||
List<Result<TimeseriesResultValue>> results = FilteredAggregatorBenchmark.runQuery(factory, runner, query);
|
||||
for (Result<TimeseriesResultValue> result : results) {
|
||||
blackhole.consume(result);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,8 +30,18 @@ import io.druid.benchmark.datagen.BenchmarkSchemas;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.js.JavaScriptConfig;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.filter.AndDimFilter;
|
||||
import io.druid.query.filter.BoundDimFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.InDimFilter;
|
||||
import io.druid.query.filter.JavaScriptDimFilter;
|
||||
import io.druid.query.filter.OrDimFilter;
|
||||
import io.druid.query.filter.RegexDimFilter;
|
||||
import io.druid.query.filter.SearchQueryDimFilter;
|
||||
import io.druid.query.search.search.ContainsSearchQuerySpec;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
|
@ -55,6 +65,7 @@ import org.openjdk.jmh.infra.Blackhole;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -145,4 +156,39 @@ public class IncrementalIndexReadBenchmark
|
|||
cursor.advance();
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
public void readWithFilters(Blackhole blackhole) throws Exception
|
||||
{
|
||||
DimFilter filter = new OrDimFilter(
|
||||
Arrays.asList(
|
||||
new BoundDimFilter("dimSequential", "-1", "-1", true, true, true, null),
|
||||
new JavaScriptDimFilter("dimSequential", "function(x) { return false }", null, JavaScriptConfig.getDefault()),
|
||||
new RegexDimFilter("dimSequential", "X", null),
|
||||
new SearchQueryDimFilter("dimSequential", new ContainsSearchQuerySpec("X", false), null),
|
||||
new InDimFilter("dimSequential", Arrays.asList("X"), null)
|
||||
)
|
||||
);
|
||||
|
||||
IncrementalIndexStorageAdapter sa = new IncrementalIndexStorageAdapter(incIndex);
|
||||
Sequence<Cursor> cursors = sa.makeCursors(filter.toFilter(), schemaInfo.getDataInterval(), QueryGranularities.ALL, false);
|
||||
Cursor cursor = Sequences.toList(Sequences.limit(cursors, 1), Lists.<Cursor>newArrayList()).get(0);
|
||||
|
||||
List<DimensionSelector> selectors = new ArrayList<>();
|
||||
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null)));
|
||||
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimZipf", null)));
|
||||
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimUniform", null)));
|
||||
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequentialHalfNull", null)));
|
||||
|
||||
cursor.reset();
|
||||
while (!cursor.isDone()) {
|
||||
for (DimensionSelector selector : selectors) {
|
||||
IndexedInts row = selector.getRow();
|
||||
blackhole.consume(selector.lookupName(row.get(0)));
|
||||
}
|
||||
cursor.advance();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -332,3 +332,40 @@ The following matches dimension values in `[product_1, product_3, product_5]` fo
|
|||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Filtering on the Timestamp Column
|
||||
Filters can also be applied to the timestamp column. The timestamp column has long millisecond values.
|
||||
|
||||
To refer to the timestamp column, use the string `__time` as the dimension name.
|
||||
|
||||
The filter parameters (e.g., the selector value for the SelectorFilter) should be provided as Strings.
|
||||
|
||||
If the user wishes to interpret the timestamp with a specific format, timezone, or locale, the [Time Format Extraction Function](./dimensionspecs.html#time-format-extraction-function) is useful.
|
||||
|
||||
Note that the timestamp column does not have a bitmap index. Thus, filtering on timestamp in a query requires a scan of the column, and performance will be affected accordingly. If possible, excluding time ranges by specifying the query interval will be faster.
|
||||
|
||||
**Example**
|
||||
|
||||
Filtering on a long timestamp value:
|
||||
```json
|
||||
"filter": {
|
||||
"type": "selector",
|
||||
"dimension": "__time",
|
||||
"value": "124457387532"
|
||||
}
|
||||
```
|
||||
|
||||
Filtering on day of week:
|
||||
```json
|
||||
"filter": {
|
||||
"type": "selector",
|
||||
"dimension": "__time",
|
||||
"value": "Friday",
|
||||
"extractionFn": {
|
||||
"type": "timeFormat",
|
||||
"format": "EEEE",
|
||||
"timeZone": "America/New_York",
|
||||
"locale": "en"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
|
|
@ -23,12 +23,18 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import com.metamx.common.UOE;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.filter.Filters;
|
||||
|
@ -221,6 +227,13 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final Comparable value)
|
||||
{
|
||||
if (getTypeForDimension(dimension) == ValueType.LONG) {
|
||||
return Filters.getLongValueMatcher(
|
||||
columnSelectorFactory.makeLongColumnSelector(dimension),
|
||||
value
|
||||
);
|
||||
}
|
||||
|
||||
final DimensionSelector selector = columnSelectorFactory.makeDimensionSelector(
|
||||
new DefaultDimensionSpec(dimension, dimension)
|
||||
);
|
||||
|
@ -256,8 +269,20 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
|||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final Predicate predicate)
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
ValueType type = getTypeForDimension(dimension);
|
||||
switch (type) {
|
||||
case LONG:
|
||||
return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate());
|
||||
case STRING:
|
||||
return makeStringValueMatcher(dimension, predicateFactory.makeStringPredicate());
|
||||
default:
|
||||
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
|
||||
}
|
||||
}
|
||||
|
||||
public ValueMatcher makeStringValueMatcher(final String dimension, final Predicate<String> predicate)
|
||||
{
|
||||
final DimensionSelector selector = columnSelectorFactory.makeDimensionSelector(
|
||||
new DefaultDimensionSpec(dimension, dimension)
|
||||
|
@ -299,5 +324,25 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private ValueMatcher makeLongValueMatcher(String dimension, DruidLongPredicate predicate)
|
||||
{
|
||||
return Filters.getLongPredicateMatcher(
|
||||
columnSelectorFactory.makeLongColumnSelector(dimension),
|
||||
predicate
|
||||
);
|
||||
}
|
||||
|
||||
private ValueType getTypeForDimension(String dimension)
|
||||
{
|
||||
// FilteredAggregatorFactory is sometimes created from a ColumnSelectorFactory that
|
||||
// has no knowledge of column capabilities/types.
|
||||
// Default to LONG for __time, STRING for everything else.
|
||||
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return ValueType.LONG;
|
||||
}
|
||||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimension);
|
||||
return capabilities == null ? ValueType.STRING : capabilities.getType();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.filter;
|
||||
|
||||
/**
|
||||
* LongPredicate is only supported in Java 8+, so use this to avoid boxing when a long predicate is needed.
|
||||
*/
|
||||
public interface DruidLongPredicate
|
||||
{
|
||||
boolean applyLong(long input);
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.filter;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
|
||||
public interface DruidPredicateFactory
|
||||
{
|
||||
public Predicate<String> makeStringPredicate();
|
||||
|
||||
public DruidLongPredicate makeLongPredicate();
|
||||
}
|
|
@ -24,11 +24,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableSortedSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.RangeSet;
|
||||
import com.google.common.collect.TreeRangeSet;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.StringUtils;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.lookup.LookupExtractionFn;
|
||||
|
@ -37,15 +39,22 @@ import io.druid.segment.filter.InFilter;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public class InDimFilter implements DimFilter
|
||||
{
|
||||
// determined through benchmark that binary search on long[] is faster than HashSet until ~16 elements
|
||||
// Hashing threshold is not applied to String for now, String still uses ImmutableSortedSet
|
||||
public static final int LONG_HASHING_THRESHOLD = 16;
|
||||
|
||||
private final ImmutableSortedSet<String> values;
|
||||
private final String dimension;
|
||||
private final ExtractionFn extractionFn;
|
||||
private final Supplier<DruidLongPredicate> longPredicateSupplier;
|
||||
|
||||
@JsonCreator
|
||||
public InDimFilter(
|
||||
|
@ -71,6 +80,7 @@ public class InDimFilter implements DimFilter
|
|||
);
|
||||
this.dimension = dimension;
|
||||
this.extractionFn = extractionFn;
|
||||
this.longPredicateSupplier = getLongPredicateSupplier();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -170,7 +180,7 @@ public class InDimFilter implements DimFilter
|
|||
@Override
|
||||
public Filter toFilter()
|
||||
{
|
||||
return new InFilter(dimension, values, extractionFn);
|
||||
return new InFilter(dimension, values, longPredicateSupplier, extractionFn);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -216,4 +226,80 @@ public class InDimFilter implements DimFilter
|
|||
result = 31 * result + (extractionFn != null ? extractionFn.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
// As the set of filtered values can be large, parsing them as longs should be done only if needed, and only once.
|
||||
// Pass in a common long predicate supplier to all filters created by .toFilter(), so that
|
||||
// we only compute the long hashset/array once per query.
|
||||
// This supplier must be thread-safe, since this DimFilter will be accessed in the query runners.
|
||||
private Supplier<DruidLongPredicate> getLongPredicateSupplier()
|
||||
{
|
||||
return new Supplier<DruidLongPredicate>()
|
||||
{
|
||||
private final Object initLock = new Object();
|
||||
private volatile boolean longsInitialized = false;
|
||||
private volatile boolean useLongHash;
|
||||
private volatile long[] longArray;
|
||||
private volatile HashSet<Long> longHashSet;
|
||||
|
||||
private void initLongValues()
|
||||
{
|
||||
if (longsInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (initLock) {
|
||||
if (longsInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Long> longs = new ArrayList<>();
|
||||
for (String value : values) {
|
||||
Long longValue = Longs.tryParse(value);
|
||||
if (longValue != null) {
|
||||
longs.add(longValue);
|
||||
}
|
||||
}
|
||||
|
||||
useLongHash = longs.size() > LONG_HASHING_THRESHOLD;
|
||||
if (useLongHash) {
|
||||
longHashSet = new HashSet<Long>(longs);
|
||||
} else {
|
||||
longArray = new long[longs.size()];
|
||||
for (int i = 0; i < longs.size(); i++) {
|
||||
longArray[i] = longs.get(i).longValue();
|
||||
}
|
||||
Arrays.sort(longArray);
|
||||
}
|
||||
|
||||
longsInitialized = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate get()
|
||||
{
|
||||
initLongValues();
|
||||
|
||||
if (useLongHash) {
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return longHashSet.contains(input);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return Arrays.binarySearch(longArray, input) >= 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.mozilla.javascript.Context;
|
|||
import org.mozilla.javascript.Function;
|
||||
import org.mozilla.javascript.ScriptableObject;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class JavaScriptDimFilter implements DimFilter
|
||||
|
@ -43,7 +44,7 @@ public class JavaScriptDimFilter implements DimFilter
|
|||
private final ExtractionFn extractionFn;
|
||||
private final JavaScriptConfig config;
|
||||
|
||||
private final JavaScriptPredicate predicate;
|
||||
private final JavaScriptPredicateFactory predicateFactory;
|
||||
|
||||
@JsonCreator
|
||||
public JavaScriptDimFilter(
|
||||
|
@ -61,9 +62,9 @@ public class JavaScriptDimFilter implements DimFilter
|
|||
this.config = config;
|
||||
|
||||
if (config.isDisabled()) {
|
||||
this.predicate = null;
|
||||
this.predicateFactory = null;
|
||||
} else {
|
||||
this.predicate = new JavaScriptPredicate(function, extractionFn);
|
||||
this.predicateFactory = new JavaScriptPredicateFactory(function, extractionFn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,7 +116,7 @@ public class JavaScriptDimFilter implements DimFilter
|
|||
throw new ISE("JavaScript is disabled");
|
||||
}
|
||||
|
||||
return new JavaScriptFilter(dimension, predicate);
|
||||
return new JavaScriptFilter(dimension, predicateFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,14 +166,14 @@ public class JavaScriptDimFilter implements DimFilter
|
|||
return result;
|
||||
}
|
||||
|
||||
public static class JavaScriptPredicate implements Predicate<String>
|
||||
public static class JavaScriptPredicateFactory implements DruidPredicateFactory
|
||||
{
|
||||
final ScriptableObject scope;
|
||||
final Function fnApply;
|
||||
final String script;
|
||||
final ExtractionFn extractionFn;
|
||||
|
||||
public JavaScriptPredicate(final String script, final ExtractionFn extractionFn)
|
||||
public JavaScriptPredicateFactory(final String script, final ExtractionFn extractionFn)
|
||||
{
|
||||
Preconditions.checkNotNull(script, "script must not be null");
|
||||
this.script = script;
|
||||
|
@ -191,7 +192,33 @@ public class JavaScriptDimFilter implements DimFilter
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(final String input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return applyObject(input);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
// Can't avoid boxing here because the Mozilla JS Function.call() only accepts Object[]
|
||||
return applyObject(input);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public boolean applyObject(final Object input)
|
||||
{
|
||||
// one and only one context per thread
|
||||
final Context cx = Context.enter();
|
||||
|
@ -203,12 +230,12 @@ public class JavaScriptDimFilter implements DimFilter
|
|||
}
|
||||
}
|
||||
|
||||
public boolean applyInContext(Context cx, String input)
|
||||
public boolean applyInContext(Context cx, Object input)
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
input = extractionFn.apply(input);
|
||||
}
|
||||
return Context.toBoolean(fnApply.call(cx, scope, scope, new String[]{input}));
|
||||
return Context.toBoolean(fnApply.call(cx, scope, scope, new Object[]{input}));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -221,7 +248,7 @@ public class JavaScriptDimFilter implements DimFilter
|
|||
return false;
|
||||
}
|
||||
|
||||
JavaScriptPredicate that = (JavaScriptPredicate) o;
|
||||
JavaScriptPredicateFactory that = (JavaScriptPredicateFactory) o;
|
||||
|
||||
if (!script.equals(that.script)) {
|
||||
return false;
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.RangeSet;
|
||||
import com.google.common.collect.TreeRangeSet;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.StringUtils;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.segment.filter.DimensionPredicateFilter;
|
||||
|
@ -44,6 +45,11 @@ public class SelectorDimFilter implements DimFilter
|
|||
private final String value;
|
||||
private final ExtractionFn extractionFn;
|
||||
|
||||
private final Object initLock = new Object();
|
||||
private volatile boolean longsInitialized = false;
|
||||
private volatile Long valueAsLong;
|
||||
|
||||
|
||||
@JsonCreator
|
||||
public SelectorDimFilter(
|
||||
@JsonProperty("dimension") String dimension,
|
||||
|
@ -88,21 +94,51 @@ public class SelectorDimFilter implements DimFilter
|
|||
return new SelectorFilter(dimension, value);
|
||||
} else {
|
||||
final String valueOrNull = Strings.emptyToNull(value);
|
||||
final Predicate<String> predicate = new Predicate<String>()
|
||||
|
||||
final DruidPredicateFactory predicateFactory = new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return Objects.equals(valueOrNull, input);
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return Objects.equals(valueOrNull, input);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return value;
|
||||
initLongValue();
|
||||
|
||||
if (valueAsLong == null) {
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// store the primitive, so we don't unbox for every comparison
|
||||
final long unboxedLong = valueAsLong.longValue();
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return input == unboxedLong;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
return new DimensionPredicateFilter(dimension, predicate, extractionFn);
|
||||
return new DimensionPredicateFilter(dimension, predicateFactory, extractionFn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -174,4 +210,19 @@ public class SelectorDimFilter implements DimFilter
|
|||
result = 31 * result + (extractionFn != null ? extractionFn.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
private void initLongValue()
|
||||
{
|
||||
if (longsInitialized) {
|
||||
return;
|
||||
}
|
||||
synchronized (initLock) {
|
||||
if (longsInitialized) {
|
||||
return;
|
||||
}
|
||||
valueAsLong = Longs.tryParse(value);
|
||||
longsInitialized = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,48 @@
|
|||
package io.druid.query.filter;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
/**
|
||||
* A ValueMatcherFactory is an object associated with a collection of rows (e.g., an IncrementalIndexStorageAdapter)
|
||||
* that generates ValueMatchers for filtering on the associated collection of rows.
|
||||
*
|
||||
* A ValueMatcher is an object that decides whether a row value matches a value or predicate
|
||||
* associated with the ValueMatcher.
|
||||
*
|
||||
* The ValueMatcher is expected to track the current row to be matched with a stateful
|
||||
* object (e.g., a ColumnSelectorFactory). The ValueMatcher has no responsibility for moving the current
|
||||
* "row pointer", this is handled outside of the ValueMatcher.
|
||||
*
|
||||
* The ValueMatcherFactory/ValueMatcher classes are used for filtering rows during column scans.
|
||||
*/
|
||||
public interface ValueMatcherFactory
|
||||
{
|
||||
/**
|
||||
* Create a ValueMatcher that compares row values to the provided value.
|
||||
*
|
||||
* An implementation of this method should be able to handle dimensions of various types.
|
||||
*
|
||||
* @param dimension The dimension to filter.
|
||||
* @param value The value to match against.
|
||||
*
|
||||
* @return An object that matches row values on the provided value.
|
||||
*/
|
||||
public ValueMatcher makeValueMatcher(String dimension, Comparable value);
|
||||
public ValueMatcher makeValueMatcher(String dimension, Predicate predicate);
|
||||
|
||||
|
||||
/**
|
||||
* Create a ValueMatcher that applies a predicate to row values.
|
||||
*
|
||||
* The caller provides a predicate factory that can create a predicate for each value type supported by Druid.
|
||||
* See {@link DruidPredicateFactory} for more information.
|
||||
*
|
||||
* When creating the ValueMatcher, the ValueMatcherFactory implementation should decide what type of predicate
|
||||
* to create from the predicate factory based on the ValueType of the specified dimension.
|
||||
*
|
||||
* @param dimension The dimension to filter.
|
||||
* @param predicateFactory Predicate factory
|
||||
* @return An object that applies a predicate to row values
|
||||
*/
|
||||
public ValueMatcher makeValueMatcher(String dimension, DruidPredicateFactory predicateFactory);
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ import io.druid.segment.DimensionSelector;
|
|||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
|
@ -669,5 +670,14 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
// getColumnCapabilities() is only used by FilteredAggregatorFactory for determining dimension types.
|
||||
// Since FilteredAggregatorFactory only works with ColumnSelectorFactory implementations
|
||||
// that support makeDimensionSelector(), this method is also left unsupported.
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,12 +25,15 @@ import com.metamx.collections.bitmap.ImmutableBitmap;
|
|||
import com.metamx.collections.spatial.ImmutableRTree;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.DictionaryEncodedColumn;
|
||||
import io.druid.segment.column.GenericColumn;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.data.IndexedIterable;
|
||||
import io.druid.segment.filter.Filters;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
|
@ -115,12 +118,14 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
|
|||
public BitmapIndex getBitmapIndex(String dimension)
|
||||
{
|
||||
final Column column = index.getColumn(dimension);
|
||||
|
||||
if (column == null) {
|
||||
// Create a BitmapIndex for null columns so that filters applied to null columns can use
|
||||
if (column == null || !columnSupportsFiltering(column)) {
|
||||
// for missing columns and columns with types that do not support filtering,
|
||||
// treat the column as if it were a String column full of nulls.
|
||||
// Create a BitmapIndex so that filters applied to null columns can use
|
||||
// bitmap indexes. Filters check for the presence of a bitmap index, this is used to determine
|
||||
// whether the filter is applied in the pre or post filtering stage.
|
||||
return new BitmapIndex() {
|
||||
return new BitmapIndex()
|
||||
{
|
||||
@Override
|
||||
public int getCardinality()
|
||||
{
|
||||
|
@ -175,7 +180,7 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
|
|||
public ImmutableBitmap getBitmapIndex(String dimension, String value)
|
||||
{
|
||||
final Column column = index.getColumn(dimension);
|
||||
if (column == null) {
|
||||
if (column == null || !columnSupportsFiltering(column)) {
|
||||
if (Strings.isNullOrEmpty(value)) {
|
||||
return bitmapFactory.complement(bitmapFactory.makeEmptyImmutableBitmap(), getNumRows());
|
||||
} else {
|
||||
|
@ -201,4 +206,10 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
|
|||
|
||||
return column.getSpatialIndex().getRTree();
|
||||
}
|
||||
|
||||
private static boolean columnSupportsFiltering(Column column)
|
||||
{
|
||||
ValueType columnType = column.getCapabilities().getType();
|
||||
return Filters.FILTERABLE_TYPES.contains(columnType);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.segment;
|
||||
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
/**
|
||||
* Factory class for MetricSelectors
|
||||
|
@ -30,4 +31,5 @@ public interface ColumnSelectorFactory
|
|||
public FloatColumnSelector makeFloatColumnSelector(String columnName);
|
||||
public LongColumnSelector makeLongColumnSelector(String columnName);
|
||||
public ObjectColumnSelector makeObjectColumnSelector(String columnName);
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName);
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.collections.bitmap.ImmutableBitmap;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.UOE;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
@ -38,6 +40,8 @@ import io.druid.query.dimension.DefaultDimensionSpec;
|
|||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.BooleanFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
|
@ -54,6 +58,7 @@ import io.druid.segment.data.IndexedInts;
|
|||
import io.druid.segment.data.Offset;
|
||||
import io.druid.segment.filter.AndFilter;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.roaringbitmap.IntIterator;
|
||||
|
@ -184,11 +189,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String column)
|
||||
{
|
||||
Column columnObj = index.getColumn(column);
|
||||
if (columnObj == null) {
|
||||
return null;
|
||||
}
|
||||
return columnObj.getCapabilities();
|
||||
return getColumnCapabilites(index, column);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -314,6 +315,15 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
);
|
||||
}
|
||||
|
||||
private static ColumnCapabilities getColumnCapabilites(ColumnSelector index, String columnName)
|
||||
{
|
||||
Column columnObj = index.getColumn(columnName);
|
||||
if (columnObj == null) {
|
||||
return null;
|
||||
}
|
||||
return columnObj.getCapabilities();
|
||||
}
|
||||
|
||||
private interface CursorAdvancer
|
||||
{
|
||||
public void advance();
|
||||
|
@ -790,6 +800,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
return getColumnCapabilites(index, columnName);
|
||||
}
|
||||
}
|
||||
|
||||
if (postFilter == null) {
|
||||
|
@ -841,6 +857,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
return new QueryableIndexBaseCursor()
|
||||
{
|
||||
CursorOffsetHolderValueMatcherFactory valueMatcherFactory = new CursorOffsetHolderValueMatcherFactory(
|
||||
index,
|
||||
this
|
||||
);
|
||||
RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory(
|
||||
|
@ -981,20 +998,28 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
|
||||
private static class CursorOffsetHolderValueMatcherFactory implements ValueMatcherFactory
|
||||
{
|
||||
private final ColumnSelector index;
|
||||
private final ColumnSelectorFactory cursor;
|
||||
|
||||
public CursorOffsetHolderValueMatcherFactory(
|
||||
ColumnSelector index,
|
||||
ColumnSelectorFactory cursor
|
||||
)
|
||||
{
|
||||
this.index = index;
|
||||
this.cursor = cursor;
|
||||
}
|
||||
|
||||
// Currently unused, except by unit tests, since filters always support bitmap indexes currently.
|
||||
// This will change when non-String dimensions are added.
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(String dimension, final Comparable value)
|
||||
{
|
||||
if (getTypeForDimension(dimension) == ValueType.LONG) {
|
||||
return Filters.getLongValueMatcher(
|
||||
cursor.makeLongColumnSelector(dimension),
|
||||
value
|
||||
);
|
||||
}
|
||||
|
||||
final DimensionSelector selector = cursor.makeDimensionSelector(
|
||||
new DefaultDimensionSpec(dimension, dimension)
|
||||
);
|
||||
|
@ -1026,10 +1051,21 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
}
|
||||
|
||||
// Currently unused, except by unit tests, since filters always support bitmap indexes currently.
|
||||
// This will change when non-String dimensions are added.
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(String dimension, final Predicate predicate)
|
||||
public ValueMatcher makeValueMatcher(String dimension, final DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
ValueType type = getTypeForDimension(dimension);
|
||||
switch (type) {
|
||||
case LONG:
|
||||
return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate());
|
||||
case STRING:
|
||||
return makeStringValueMatcher(dimension, predicateFactory.makeStringPredicate());
|
||||
default:
|
||||
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
|
||||
}
|
||||
}
|
||||
|
||||
private ValueMatcher makeStringValueMatcher(String dimension, final Predicate<String> predicate)
|
||||
{
|
||||
final DimensionSelector selector = cursor.makeDimensionSelector(
|
||||
new DefaultDimensionSpec(dimension, dimension)
|
||||
|
@ -1055,6 +1091,20 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private ValueMatcher makeLongValueMatcher(String dimension, final DruidLongPredicate predicate)
|
||||
{
|
||||
return Filters.getLongPredicateMatcher(
|
||||
cursor.makeLongColumnSelector(dimension),
|
||||
predicate
|
||||
);
|
||||
}
|
||||
|
||||
private ValueType getTypeForDimension(String dimension)
|
||||
{
|
||||
ColumnCapabilities capabilities = getColumnCapabilites(index, dimension);
|
||||
return capabilities == null ? ValueType.STRING : capabilities.getType();
|
||||
}
|
||||
}
|
||||
|
||||
private static class CursorOffsetHolderRowOffsetMatcherFactory implements RowOffsetMatcherFactory
|
||||
|
|
|
@ -24,13 +24,13 @@ import com.metamx.collections.bitmap.ImmutableBitmap;
|
|||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.BoundDimFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.query.ordering.StringComparators;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
|
@ -57,14 +57,7 @@ public class BoundFilter implements Filter
|
|||
return Filters.matchPredicate(
|
||||
boundDimFilter.getDimension(),
|
||||
selector,
|
||||
new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return doesMatch(input);
|
||||
}
|
||||
}
|
||||
getPredicateFactory().makeStringPredicate()
|
||||
);
|
||||
} else {
|
||||
final BitmapIndex bitmapIndex = selector.getBitmapIndex(boundDimFilter.getDimension());
|
||||
|
@ -142,17 +135,7 @@ public class BoundFilter implements Filter
|
|||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(
|
||||
boundDimFilter.getDimension(),
|
||||
new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return doesMatch(input);
|
||||
}
|
||||
}
|
||||
);
|
||||
return factory.makeValueMatcher(boundDimFilter.getDimension(), getPredicateFactory());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -161,6 +144,40 @@ public class BoundFilter implements Filter
|
|||
return selector.getBitmapIndex(boundDimFilter.getDimension()) != null;
|
||||
}
|
||||
|
||||
private DruidPredicateFactory getPredicateFactory()
|
||||
{
|
||||
return new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return doesMatch(input);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
// When BoundFilter has a 'numeric' comparator (see https://github.com/druid-io/druid/issues/2989)
|
||||
// this should be optimized to compare on longs instead of using string conversion.
|
||||
return doesMatch(String.valueOf(input));
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private boolean doesMatch(String input)
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
|
|
|
@ -24,41 +24,63 @@ import com.google.common.base.Predicate;
|
|||
import com.metamx.collections.bitmap.ImmutableBitmap;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DimensionPredicateFilter implements Filter
|
||||
{
|
||||
private final String dimension;
|
||||
private final Predicate<String> predicate;
|
||||
private final DruidPredicateFactory predicateFactory;
|
||||
private final String basePredicateString;
|
||||
private final ExtractionFn extractionFn;
|
||||
|
||||
public DimensionPredicateFilter(
|
||||
final String dimension,
|
||||
final Predicate<String> predicate,
|
||||
final DruidPredicateFactory predicateFactory,
|
||||
final ExtractionFn extractionFn
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(predicate, "predicate");
|
||||
Preconditions.checkNotNull(predicateFactory, "predicateFactory");
|
||||
this.dimension = Preconditions.checkNotNull(dimension, "dimension");
|
||||
this.basePredicateString = predicate.toString();
|
||||
this.basePredicateString = predicateFactory.toString();
|
||||
this.extractionFn = extractionFn;
|
||||
|
||||
if (extractionFn == null) {
|
||||
this.predicate = predicate;
|
||||
this.predicateFactory = predicateFactory;
|
||||
} else {
|
||||
this.predicate = new Predicate<String>()
|
||||
this.predicateFactory = new DruidPredicateFactory()
|
||||
{
|
||||
final Predicate<String> baseStringPredicate = predicateFactory.makeStringPredicate();
|
||||
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return predicate.apply(extractionFn.apply(input));
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return baseStringPredicate.apply(extractionFn.apply(input));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return baseStringPredicate.apply(extractionFn.apply(input));
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -67,13 +89,13 @@ public class DimensionPredicateFilter implements Filter
|
|||
@Override
|
||||
public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector)
|
||||
{
|
||||
return Filters.matchPredicate(dimension, selector, predicate);
|
||||
return Filters.matchPredicate(dimension, selector, predicateFactory.makeStringPredicate());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(dimension, predicate);
|
||||
return factory.makeValueMatcher(dimension, predicateFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,18 +24,28 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.collections.bitmap.ImmutableBitmap;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.parsers.ParseException;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.BooleanFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -43,6 +53,7 @@ import java.util.List;
|
|||
*/
|
||||
public class Filters
|
||||
{
|
||||
public static final List<ValueType> FILTERABLE_TYPES = ImmutableList.of(ValueType.STRING, ValueType.LONG);
|
||||
private static final String CTX_KEY_USE_FILTER_CNF = "useFilterCNF";
|
||||
|
||||
/**
|
||||
|
@ -157,6 +168,48 @@ public class Filters
|
|||
);
|
||||
}
|
||||
|
||||
public static ValueMatcher getLongValueMatcher(
|
||||
final LongColumnSelector longSelector,
|
||||
Comparable value
|
||||
)
|
||||
{
|
||||
if (value == null) {
|
||||
return new BooleanValueMatcher(false);
|
||||
}
|
||||
|
||||
final Long longValue = Longs.tryParse(value.toString());
|
||||
if (longValue == null) {
|
||||
return new BooleanValueMatcher(false);
|
||||
}
|
||||
|
||||
return new ValueMatcher()
|
||||
{
|
||||
// store the primitive, so we don't unbox for every comparison
|
||||
final long unboxedLong = longValue.longValue();
|
||||
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return longSelector.get() == unboxedLong;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static ValueMatcher getLongPredicateMatcher(
|
||||
final LongColumnSelector longSelector,
|
||||
final DruidLongPredicate predicate
|
||||
)
|
||||
{
|
||||
return new ValueMatcher()
|
||||
{
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return predicate.applyLong(longSelector.get());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Filter convertToCNFFromQueryContext(Query query, Filter filter)
|
||||
{
|
||||
if (filter == null) {
|
||||
|
|
|
@ -22,15 +22,16 @@ package io.druid.segment.filter;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.metamx.collections.bitmap.ImmutableBitmap;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -41,12 +42,19 @@ public class InFilter implements Filter
|
|||
private final String dimension;
|
||||
private final Set<String> values;
|
||||
private final ExtractionFn extractionFn;
|
||||
private final Supplier<DruidLongPredicate> longPredicateSupplier;
|
||||
|
||||
public InFilter(String dimension, Set<String> values, ExtractionFn extractionFn)
|
||||
public InFilter(
|
||||
String dimension,
|
||||
Set<String> values,
|
||||
Supplier<DruidLongPredicate> longPredicateSupplier,
|
||||
ExtractionFn extractionFn
|
||||
)
|
||||
{
|
||||
this.dimension = dimension;
|
||||
this.values = values;
|
||||
this.extractionFn = extractionFn;
|
||||
this.longPredicateSupplier = longPredicateSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -69,15 +77,7 @@ public class InFilter implements Filter
|
|||
return Filters.matchPredicate(
|
||||
dimension,
|
||||
selector,
|
||||
new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
// InDimFilter converts all null "values" to empty.
|
||||
return values.contains(Strings.nullToEmpty(extractionFn.apply(input)));
|
||||
}
|
||||
}
|
||||
getPredicateFactory().makeStringPredicate()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -85,19 +85,7 @@ public class InFilter implements Filter
|
|||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(
|
||||
dimension, new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
input = extractionFn.apply(input);
|
||||
}
|
||||
return values.contains(Strings.nullToEmpty(input));
|
||||
}
|
||||
}
|
||||
);
|
||||
return factory.makeValueMatcher(dimension, getPredicateFactory());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,4 +93,51 @@ public class InFilter implements Filter
|
|||
{
|
||||
return selector.getBitmapIndex(dimension) != null;
|
||||
}
|
||||
|
||||
private DruidPredicateFactory getPredicateFactory()
|
||||
{
|
||||
return new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return values.contains(Strings.nullToEmpty(extractionFn.apply(input)));
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return values.contains(Strings.nullToEmpty(input));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return values.contains(extractionFn.apply(input));
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return longPredicateSupplier.get();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,28 +20,31 @@
|
|||
package io.druid.segment.filter;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import com.metamx.collections.bitmap.ImmutableBitmap;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.JavaScriptDimFilter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import org.mozilla.javascript.Context;
|
||||
|
||||
public class JavaScriptFilter implements Filter
|
||||
{
|
||||
private final String dimension;
|
||||
private final JavaScriptDimFilter.JavaScriptPredicate predicate;
|
||||
private final JavaScriptDimFilter.JavaScriptPredicateFactory predicateFactory;
|
||||
|
||||
public JavaScriptFilter(
|
||||
String dimension,
|
||||
JavaScriptDimFilter.JavaScriptPredicate predicate
|
||||
JavaScriptDimFilter.JavaScriptPredicateFactory predicate
|
||||
)
|
||||
{
|
||||
this.dimension = dimension;
|
||||
this.predicate = predicate;
|
||||
this.predicateFactory = predicate;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -54,7 +57,7 @@ public class JavaScriptFilter implements Filter
|
|||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return predicate.applyInContext(cx, input);
|
||||
return predicateFactory.applyInContext(cx, input);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -69,7 +72,7 @@ public class JavaScriptFilter implements Filter
|
|||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
{
|
||||
// suboptimal, since we need create one context per call to predicate.apply()
|
||||
return factory.makeValueMatcher(dimension, predicate);
|
||||
return factory.makeValueMatcher(dimension, predicateFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,8 @@ package io.druid.segment.filter;
|
|||
|
||||
import com.google.common.base.Predicate;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -36,12 +38,32 @@ public class RegexFilter extends DimensionPredicateFilter
|
|||
{
|
||||
super(
|
||||
dimension,
|
||||
new Predicate<String>()
|
||||
new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return (input != null) && pattern.matcher(input).find();
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return (input != null) && pattern.matcher(input).find();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return pattern.matcher(String.valueOf(input)).find();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Predicate;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.search.search.SearchQuerySpec;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -40,20 +42,32 @@ public class SearchQueryFilter extends DimensionPredicateFilter
|
|||
{
|
||||
super(
|
||||
dimension,
|
||||
new Predicate<String>()
|
||||
new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable String input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return query.accept(input);
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable String input)
|
||||
{
|
||||
return query.accept(input);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return "SearchQueryFilter{" +
|
||||
", query=" + query +
|
||||
'}';
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return query.accept(String.valueOf(input));
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
extractionFn
|
||||
|
|
|
@ -23,11 +23,11 @@ import com.google.common.base.Predicate;
|
|||
import com.metamx.collections.bitmap.ImmutableBitmap;
|
||||
import com.metamx.collections.spatial.search.Bound;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.incremental.SpatialDimensionRowTransformer;
|
||||
|
||||
/**
|
||||
|
@ -58,17 +58,37 @@ public class SpatialFilter implements Filter
|
|||
{
|
||||
return factory.makeValueMatcher(
|
||||
dimension,
|
||||
new Predicate()
|
||||
new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Object input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
if (input instanceof String) {
|
||||
final float[] coordinate = SpatialDimensionRowTransformer.decode((String) input);
|
||||
return bound.contains(coordinate);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
if (input == null) {
|
||||
return false;
|
||||
}
|
||||
final float[] coordinate = SpatialDimensionRowTransformer.decode(input);
|
||||
return bound.contains(coordinate);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
// SpatialFilter does not currently support longs
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -253,6 +253,16 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
// This ColumnSelectorFactory implementation has no knowledge of column capabilities.
|
||||
// However, this method may still be called by FilteredAggregatorFactory's ValueMatcherFactory
|
||||
// to check column types.
|
||||
// Just return null, the caller will assume default types in that case.
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(
|
||||
DimensionSpec dimensionSpec
|
||||
|
@ -429,6 +439,11 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
columnCapabilities.put(dimSchema.getName(), capabilities);
|
||||
}
|
||||
|
||||
//__time capabilites
|
||||
ColumnCapabilitiesImpl timeCapabilities = new ColumnCapabilitiesImpl();
|
||||
timeCapabilities.setType(ValueType.LONG);
|
||||
columnCapabilities.put(Column.TIME_COLUMN_NAME, timeCapabilities);
|
||||
|
||||
// This should really be more generic
|
||||
List<SpatialDimensionSchema> spatialDimensions = dimensionsSpec.getSpatialDimensions();
|
||||
if (!spatialDimensions.isEmpty()) {
|
||||
|
|
|
@ -27,12 +27,15 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.common.UOE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
|
@ -48,10 +51,12 @@ import io.druid.segment.SingleScanTimeDimSelector;
|
|||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.data.ListIndexed;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import io.druid.segment.serde.ComplexMetricSerde;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -213,11 +218,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
new Function<Long, Cursor>()
|
||||
{
|
||||
EntryHolder currEntry = new EntryHolder();
|
||||
private final ValueMatcher filterMatcher;
|
||||
|
||||
{
|
||||
filterMatcher = makeFilterMatcher(filter, currEntry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cursor apply(@Nullable final Long input)
|
||||
|
@ -226,6 +226,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
|
||||
return new Cursor()
|
||||
{
|
||||
private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this, currEntry);
|
||||
private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> baseIter;
|
||||
private ConcurrentNavigableMap<IncrementalIndex.TimeAndDims, Integer> cursorMap;
|
||||
final DateTime time;
|
||||
|
@ -599,6 +600,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
return index.getCapabilities(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -613,11 +620,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
return value == null;
|
||||
}
|
||||
|
||||
private ValueMatcher makeFilterMatcher(final Filter filter, final EntryHolder holder)
|
||||
private ValueMatcher makeFilterMatcher(final Filter filter, final Cursor cursor, final EntryHolder holder)
|
||||
{
|
||||
return filter == null
|
||||
? new BooleanValueMatcher(true)
|
||||
: filter.makeMatcher(new EntryHolderValueMatcherFactory(holder));
|
||||
: filter.makeMatcher(new CursorAndEntryHolderValueMatcherFactory(cursor, holder));
|
||||
}
|
||||
|
||||
private static class EntryHolder
|
||||
|
@ -645,20 +652,28 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
}
|
||||
|
||||
private class EntryHolderValueMatcherFactory implements ValueMatcherFactory
|
||||
|
||||
private class CursorAndEntryHolderValueMatcherFactory implements ValueMatcherFactory
|
||||
{
|
||||
private final EntryHolder holder;
|
||||
private final Cursor cursor;
|
||||
|
||||
public EntryHolderValueMatcherFactory(
|
||||
public CursorAndEntryHolderValueMatcherFactory(
|
||||
Cursor cursor,
|
||||
EntryHolder holder
|
||||
)
|
||||
{
|
||||
this.cursor = cursor;
|
||||
this.holder = holder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(String dimension, final Comparable value)
|
||||
{
|
||||
if (getTypeForDimension(dimension) == ValueType.LONG) {
|
||||
return Filters.getLongValueMatcher(cursor.makeLongColumnSelector(dimension), value);
|
||||
}
|
||||
|
||||
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension);
|
||||
if (dimensionDesc == null) {
|
||||
return new BooleanValueMatcher(isComparableNullOrEmpty(value));
|
||||
|
@ -701,7 +716,20 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(String dimension, final Predicate predicate)
|
||||
public ValueMatcher makeValueMatcher(String dimension, final DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
ValueType type = getTypeForDimension(dimension);
|
||||
switch (type) {
|
||||
case LONG:
|
||||
return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate());
|
||||
case STRING:
|
||||
return makeStringValueMatcher(dimension, predicateFactory.makeStringPredicate());
|
||||
default:
|
||||
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
|
||||
}
|
||||
}
|
||||
|
||||
private ValueMatcher makeStringValueMatcher(String dimension, final Predicate<String> predicate)
|
||||
{
|
||||
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension);
|
||||
if (dimensionDesc == null) {
|
||||
|
@ -721,7 +749,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
|
||||
for (int dimVal : dims[dimIndex]) {
|
||||
if (predicate.apply(dimDim.getValue(dimVal))) {
|
||||
if (predicate.apply((String) dimDim.getValue(dimVal))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -729,6 +757,17 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private ValueMatcher makeLongValueMatcher(String dimension, DruidLongPredicate predicate)
|
||||
{
|
||||
return Filters.getLongPredicateMatcher(cursor.makeLongColumnSelector(dimension), predicate);
|
||||
}
|
||||
|
||||
private ValueType getTypeForDimension(String dimension)
|
||||
{
|
||||
ColumnCapabilities capabilities = index.getCapabilities(dimension);
|
||||
return capabilities == null ? ValueType.STRING : capabilities.getType();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,6 +34,7 @@ import io.druid.segment.DimensionSelector;
|
|||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -514,6 +515,12 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
return prev != null ? prev : newSelector;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
return delegate.getColumnCapabilities(columnName);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -40,6 +40,9 @@ import io.druid.segment.DimensionSelector;
|
|||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.ArrayBasedIndexedInts;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import org.junit.Assert;
|
||||
|
@ -162,6 +165,24 @@ public class FilteredAggregatorTest
|
|||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
ColumnCapabilitiesImpl caps;
|
||||
if (columnName.equals("value")) {
|
||||
caps = new ColumnCapabilitiesImpl();
|
||||
caps.setType(ValueType.FLOAT);
|
||||
caps.setDictionaryEncoded(false);
|
||||
caps.setHasBitmapIndexes(false);
|
||||
} else {
|
||||
caps = new ColumnCapabilitiesImpl();
|
||||
caps.setType(ValueType.STRING);
|
||||
caps.setDictionaryEncoded(true);
|
||||
caps.setHasBitmapIndexes(true);
|
||||
}
|
||||
return caps;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import io.druid.segment.DimensionSelector;
|
|||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -69,6 +70,12 @@ public class JavaScriptAggregatorTest
|
|||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
static {
|
||||
|
|
|
@ -26,6 +26,7 @@ import io.druid.segment.DimensionSelector;
|
|||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
|
||||
public class TestColumnSelectorFactory implements ColumnSelectorFactory
|
||||
{
|
||||
|
@ -86,4 +87,10 @@ public class TestColumnSelectorFactory implements ColumnSelectorFactory
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,15 +36,14 @@ import io.druid.data.input.impl.MapInputRowParser;
|
|||
import io.druid.data.input.impl.TimeAndDimsParseSpec;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.js.JavaScriptConfig;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.FilteredAggregatorFactory;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.extraction.JavaScriptExtractionFn;
|
||||
import io.druid.query.filter.AndDimFilter;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.OrDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
|
@ -91,11 +90,11 @@ public class FilterPartitionTest extends BaseFilterTest
|
|||
{
|
||||
public NoBitmapDimensionPredicateFilter(
|
||||
final String dimension,
|
||||
final Predicate<String> predicate,
|
||||
final DruidPredicateFactory predicateFactory,
|
||||
final ExtractionFn extractionFn
|
||||
)
|
||||
{
|
||||
super(dimension, predicate, extractionFn);
|
||||
super(dimension, predicateFactory, extractionFn);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,15 +124,36 @@ public class FilterPartitionTest extends BaseFilterTest
|
|||
return new NoBitmapSelectorFilter(dimension, value);
|
||||
} else {
|
||||
final String valueOrNull = Strings.emptyToNull(value);
|
||||
final Predicate<String> predicate = new Predicate<String>()
|
||||
final DruidPredicateFactory predicateFactory = new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return Objects.equals(valueOrNull, input);
|
||||
return new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
{
|
||||
return Objects.equals(valueOrNull, input);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return Objects.equals(valueOrNull, String.valueOf(input));
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
return new NoBitmapDimensionPredicateFilter(dimension, predicate, extractionFn);
|
||||
|
||||
return new NoBitmapDimensionPredicateFilter(dimension, predicateFactory, extractionFn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,179 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.filter;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.Pair;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
||||
import io.druid.data.input.impl.MapInputRowParser;
|
||||
import io.druid.data.input.impl.TimeAndDimsParseSpec;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.js.JavaScriptConfig;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.extraction.MapLookupExtractor;
|
||||
import io.druid.query.filter.BoundDimFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.InDimFilter;
|
||||
import io.druid.query.filter.JavaScriptDimFilter;
|
||||
import io.druid.query.filter.RegexDimFilter;
|
||||
import io.druid.query.filter.SearchQueryDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.query.lookup.LookupExtractionFn;
|
||||
import io.druid.query.lookup.LookupExtractor;
|
||||
import io.druid.query.search.search.ContainsSearchQuerySpec;
|
||||
import io.druid.segment.IndexBuilder;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class InvalidFilteringTest extends BaseFilterTest
|
||||
{
|
||||
private static final String COUNT_COLUMN = "count";
|
||||
private static final String TIMESTAMP_COLUMN = "ts";
|
||||
|
||||
private static final InputRowParser<Map<String, Object>> PARSER = new MapInputRowParser(
|
||||
new TimeAndDimsParseSpec(
|
||||
new TimestampSpec(TIMESTAMP_COLUMN, "millis", new DateTime("2000")),
|
||||
new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim0", "dim1", "dim2", "dim3")),
|
||||
null,
|
||||
null
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
private static final InputRow row0 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "dim1", "", "dim2", ImmutableList.of("a", "b")));
|
||||
private static final InputRow row1 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "dim1", "10", "dim2", ImmutableList.of()));
|
||||
private static final InputRow row2 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "dim1", "2", "dim2", ImmutableList.of("")));
|
||||
private static final InputRow row3 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "dim1", "1", "dim2", ImmutableList.of("a")));
|
||||
private static final InputRow row4 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "dim1", "def", "dim2", ImmutableList.of("c")));
|
||||
private static final InputRow row5 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 6L, "dim0", "6", "dim1", "abc"));
|
||||
|
||||
private static final List<InputRow> ROWS = ImmutableList.of(
|
||||
row0,
|
||||
row1,
|
||||
row2,
|
||||
row3,
|
||||
row4,
|
||||
row5
|
||||
);
|
||||
|
||||
public InvalidFilteringTest(
|
||||
String testName,
|
||||
IndexBuilder indexBuilder,
|
||||
Function<IndexBuilder, Pair<StorageAdapter, Closeable>> finisher,
|
||||
boolean optimize
|
||||
)
|
||||
{
|
||||
super(testName, ROWS, overrideIndexBuilderSchema(indexBuilder), finisher, optimize);
|
||||
}
|
||||
|
||||
private static IndexBuilder overrideIndexBuilderSchema(IndexBuilder indexBuilder)
|
||||
{
|
||||
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder().withMetrics(new AggregatorFactory[]{
|
||||
new CountAggregatorFactory("count"),
|
||||
new HyperUniquesAggregatorFactory("hyperion", "dim1"),
|
||||
new DoubleMaxAggregatorFactory("dmax", "dim0")
|
||||
}).build();
|
||||
|
||||
return indexBuilder.schema(schema);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception
|
||||
{
|
||||
BaseFilterTest.tearDown(InvalidFilteringTest.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterTheUnfilterable()
|
||||
{
|
||||
// single value matching
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter("hyperion", "a string", null),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter("hyperion", null, null),
|
||||
ImmutableList.<String>of("1", "2", "3", "4", "5", "6")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter("dmax", "another string", null),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter("dmax", null, null),
|
||||
ImmutableList.<String>of("1", "2", "3", "4", "5", "6")
|
||||
);
|
||||
|
||||
// predicate based matching
|
||||
assertFilterMatches(
|
||||
new InDimFilter("hyperion", Arrays.asList("hello", "world"), null),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new InDimFilter("hyperion", Arrays.asList("hello", "world", null), null),
|
||||
ImmutableList.<String>of("1", "2", "3", "4", "5", "6")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new InDimFilter("dmax", Arrays.asList("hello", "world"), null),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new InDimFilter("dmax", Arrays.asList("hello", "world", null), null),
|
||||
ImmutableList.<String>of("1", "2", "3", "4", "5", "6")
|
||||
);
|
||||
}
|
||||
|
||||
private void assertFilterMatches(
|
||||
final DimFilter filter,
|
||||
final List<String> expectedRows
|
||||
)
|
||||
{
|
||||
Assert.assertEquals(filter.toString(), expectedRows, selectColumnValuesMatchingFilter(filter, "dim0"));
|
||||
Assert.assertEquals(filter.toString(), expectedRows.size(), selectCountUsingFilteredAggregator(filter));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,320 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.filter;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.Pair;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
||||
import io.druid.data.input.impl.MapInputRowParser;
|
||||
import io.druid.data.input.impl.TimeAndDimsParseSpec;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.js.JavaScriptConfig;
|
||||
import io.druid.query.extraction.MapLookupExtractor;
|
||||
import io.druid.query.filter.BoundDimFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.InDimFilter;
|
||||
import io.druid.query.filter.JavaScriptDimFilter;
|
||||
import io.druid.query.filter.RegexDimFilter;
|
||||
import io.druid.query.filter.SearchQueryDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.query.lookup.LookupExtractionFn;
|
||||
import io.druid.query.lookup.LookupExtractor;
|
||||
import io.druid.query.search.search.ContainsSearchQuerySpec;
|
||||
import io.druid.segment.IndexBuilder;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RunnableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class LongFilteringTest extends BaseFilterTest
|
||||
{
|
||||
private static final String COUNT_COLUMN = "count";
|
||||
private static final String TIMESTAMP_COLUMN = "ts";
|
||||
private static int EXECUTOR_NUM_THREADS = 16;
|
||||
private static int EXECUTOR_NUM_TASKS = 2000;
|
||||
|
||||
private static final InputRowParser<Map<String, Object>> PARSER = new MapInputRowParser(
|
||||
new TimeAndDimsParseSpec(
|
||||
new TimestampSpec(TIMESTAMP_COLUMN, "millis", new DateTime("2000")),
|
||||
new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim0", "dim1", "dim2", "dim3")),
|
||||
null,
|
||||
null
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
private static final InputRow row0 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "dim1", "", "dim2", ImmutableList.of("a", "b")));
|
||||
private static final InputRow row1 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "dim1", "10", "dim2", ImmutableList.of()));
|
||||
private static final InputRow row2 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "dim1", "2", "dim2", ImmutableList.of("")));
|
||||
private static final InputRow row3 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "dim1", "1", "dim2", ImmutableList.of("a")));
|
||||
private static final InputRow row4 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "dim1", "def", "dim2", ImmutableList.of("c")));
|
||||
private static final InputRow row5 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 6L, "dim0", "6", "dim1", "abc"));
|
||||
|
||||
private static final List<InputRow> ROWS = ImmutableList.of(
|
||||
row0,
|
||||
row1, row1,
|
||||
row2, row2, row2,
|
||||
row3, row3, row3, row3,
|
||||
row4, row4, row4, row4, row4,
|
||||
row5, row5, row5, row5, row5, row5
|
||||
);
|
||||
|
||||
public LongFilteringTest(
|
||||
String testName,
|
||||
IndexBuilder indexBuilder,
|
||||
Function<IndexBuilder, Pair<StorageAdapter, Closeable>> finisher,
|
||||
boolean optimize
|
||||
)
|
||||
{
|
||||
super(testName, ROWS, indexBuilder, finisher, optimize);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception
|
||||
{
|
||||
BaseFilterTest.tearDown(LongFilteringTest.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeFilterAsLong()
|
||||
{
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(COUNT_COLUMN, "0", null),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(COUNT_COLUMN, "3", null),
|
||||
ImmutableList.<String>of("3")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(COUNT_COLUMN, "2", "5", false, false, true, null),
|
||||
ImmutableList.<String>of("2", "3", "4", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(COUNT_COLUMN, "1", "4", true, true, true, null),
|
||||
ImmutableList.<String>of("2", "3")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new InDimFilter(COUNT_COLUMN, Arrays.asList("2", "4", "8"), null),
|
||||
ImmutableList.<String>of("2", "4")
|
||||
);
|
||||
|
||||
// cross the hashing threshold to test hashset implementation, filter on even values
|
||||
List<String> infilterValues = new ArrayList<>(InDimFilter.LONG_HASHING_THRESHOLD * 2);
|
||||
for (int i = 0; i < InDimFilter.LONG_HASHING_THRESHOLD * 2; i++) {
|
||||
infilterValues.add(String.valueOf(i * 2));
|
||||
}
|
||||
assertFilterMatches(
|
||||
new InDimFilter(COUNT_COLUMN, infilterValues, null),
|
||||
ImmutableList.<String>of("2", "4", "6")
|
||||
);
|
||||
|
||||
String jsFn = "function(x) { return(x === 3 || x === 5) }";
|
||||
assertFilterMatches(
|
||||
new JavaScriptDimFilter(COUNT_COLUMN, jsFn, null, JavaScriptConfig.getDefault()),
|
||||
ImmutableList.<String>of("3", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new RegexDimFilter(COUNT_COLUMN, "4", null),
|
||||
ImmutableList.<String>of("4")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new SearchQueryDimFilter(COUNT_COLUMN, new ContainsSearchQuerySpec("2", true), null),
|
||||
ImmutableList.<String>of("2")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongFilterWithExtractionFn()
|
||||
{
|
||||
final Map<String, String> stringMap = new HashMap<>();
|
||||
stringMap.put("1", "Monday");
|
||||
stringMap.put("2", "Tuesday");
|
||||
stringMap.put("3", "Wednesday");
|
||||
stringMap.put("4", "Thursday");
|
||||
stringMap.put("5", "Friday");
|
||||
stringMap.put("6", "Saturday");
|
||||
LookupExtractor mapExtractor = new MapLookupExtractor(stringMap, false);
|
||||
LookupExtractionFn exfn = new LookupExtractionFn(mapExtractor, false, "UNKNOWN", false, true);
|
||||
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(COUNT_COLUMN, "Monday", exfn),
|
||||
ImmutableList.<String>of("1")
|
||||
);
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(COUNT_COLUMN, "Notaday", exfn),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(COUNT_COLUMN, "Fridax", "Fridaz", false, false, true, exfn),
|
||||
ImmutableList.<String>of("5")
|
||||
);
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(COUNT_COLUMN, "Friday", "Friday", true, true, true, exfn),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new InDimFilter(COUNT_COLUMN, Arrays.asList("Caturday", "Saturday", "Tuesday"), exfn),
|
||||
ImmutableList.<String>of("2", "6")
|
||||
);
|
||||
|
||||
// test InFilter HashSet implementation
|
||||
List<String> bigList = Arrays.asList(
|
||||
"Saturday", "Tuesday",
|
||||
"Caturday", "Xanaday", "Vojuday", "Gribaday", "Kipoday", "Dheferday", "Fakeday", "Qeearaday",
|
||||
"Hello", "World", "1", "2", "3", "4", "5", "6", "7"
|
||||
);
|
||||
assertFilterMatches(
|
||||
new InDimFilter(COUNT_COLUMN, bigList, exfn),
|
||||
ImmutableList.<String>of("2", "6")
|
||||
);
|
||||
|
||||
String jsFn = "function(x) { return(x === 'Wednesday' || x === 'Thursday') }";
|
||||
assertFilterMatches(
|
||||
new JavaScriptDimFilter(COUNT_COLUMN, jsFn, exfn, JavaScriptConfig.getDefault()),
|
||||
ImmutableList.<String>of("3", "4")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new RegexDimFilter(COUNT_COLUMN, ".*day", exfn),
|
||||
ImmutableList.<String>of("1", "2", "3", "4", "5", "6")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new SearchQueryDimFilter(COUNT_COLUMN, new ContainsSearchQuerySpec("s", true), exfn),
|
||||
ImmutableList.<String>of("2", "3", "4")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectorAndInFilterMultithreaded()
|
||||
{
|
||||
assertFilterMatchesMultithreaded(
|
||||
new SelectorDimFilter(COUNT_COLUMN, "3", null),
|
||||
ImmutableList.<String>of("3")
|
||||
);
|
||||
|
||||
assertFilterMatchesMultithreaded(
|
||||
new InDimFilter(COUNT_COLUMN, Arrays.asList("2", "4", "8"), null),
|
||||
ImmutableList.<String>of("2", "4")
|
||||
);
|
||||
|
||||
// cross the hashing threshold to test hashset implementation, filter on even values
|
||||
List<String> infilterValues = new ArrayList<>(InDimFilter.LONG_HASHING_THRESHOLD * 2);
|
||||
for (int i = 0; i < InDimFilter.LONG_HASHING_THRESHOLD * 2; i++) {
|
||||
infilterValues.add(String.valueOf(i * 2));
|
||||
}
|
||||
assertFilterMatchesMultithreaded(
|
||||
new InDimFilter(COUNT_COLUMN, infilterValues, null),
|
||||
ImmutableList.<String>of("2", "4", "6")
|
||||
);
|
||||
}
|
||||
|
||||
private void assertFilterMatches(
|
||||
final DimFilter filter,
|
||||
final List<String> expectedRows
|
||||
)
|
||||
{
|
||||
Assert.assertEquals(filter.toString(), expectedRows, selectColumnValuesMatchingFilter(filter, "dim0"));
|
||||
Assert.assertEquals(filter.toString(), expectedRows.size(), selectCountUsingFilteredAggregator(filter));
|
||||
}
|
||||
|
||||
private void assertFilterMatchesMultithreaded(
|
||||
final DimFilter filter,
|
||||
final List<String> expectedRows
|
||||
)
|
||||
{
|
||||
testWithExecutor(filter, expectedRows);
|
||||
}
|
||||
|
||||
private Runnable makeFilterRunner(
|
||||
final DimFilter filter,
|
||||
final List<String> expectedRows
|
||||
)
|
||||
{
|
||||
return new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
Assert.assertEquals(filter.toString(), expectedRows, selectColumnValuesMatchingFilter(filter, "dim0"));
|
||||
Assert.assertEquals(filter.toString(), expectedRows.size(), selectCountUsingFilteredAggregator(filter));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void testWithExecutor(
|
||||
final DimFilter filter,
|
||||
final List<String> expectedRows
|
||||
)
|
||||
{
|
||||
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(EXECUTOR_NUM_THREADS));
|
||||
|
||||
List<ListenableFuture<?>> futures = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < EXECUTOR_NUM_TASKS; i++) {
|
||||
Runnable runnable = makeFilterRunner(filter, expectedRows);
|
||||
ListenableFuture fut = executor.submit(runnable);
|
||||
futures.add(fut);
|
||||
}
|
||||
|
||||
try {
|
||||
Futures.allAsList(futures).get(60, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
Assert.fail(ex.getMessage());
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,240 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.filter;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.Pair;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
||||
import io.druid.data.input.impl.MapInputRowParser;
|
||||
import io.druid.data.input.impl.TimeAndDimsParseSpec;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.js.JavaScriptConfig;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.extraction.MapLookupExtractor;
|
||||
import io.druid.query.extraction.TimeFormatExtractionFn;
|
||||
import io.druid.query.filter.BoundDimFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.InDimFilter;
|
||||
import io.druid.query.filter.JavaScriptDimFilter;
|
||||
import io.druid.query.filter.RegexDimFilter;
|
||||
import io.druid.query.filter.SearchQueryDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.query.lookup.LookupExtractionFn;
|
||||
import io.druid.query.lookup.LookupExtractor;
|
||||
import io.druid.query.search.search.ContainsSearchQuerySpec;
|
||||
import io.druid.segment.IndexBuilder;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.Column;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TimeFilteringTest extends BaseFilterTest
|
||||
{
|
||||
private static final String TIMESTAMP_COLUMN = "ts";
|
||||
|
||||
private static final InputRowParser<Map<String, Object>> PARSER = new MapInputRowParser(
|
||||
new TimeAndDimsParseSpec(
|
||||
new TimestampSpec(TIMESTAMP_COLUMN, "millis", new DateTime("2000")),
|
||||
new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim0", "dim1", "dim2", "dim3")),
|
||||
null,
|
||||
null
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
private static final List<InputRow> ROWS = ImmutableList.of(
|
||||
PARSER.parse(ImmutableMap.<String, Object>of("ts", 0L, "dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))),
|
||||
PARSER.parse(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "dim1", "10", "dim2", ImmutableList.of())),
|
||||
PARSER.parse(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))),
|
||||
PARSER.parse(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))),
|
||||
PARSER.parse(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))),
|
||||
PARSER.parse(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "dim1", "abc"))
|
||||
);
|
||||
|
||||
public TimeFilteringTest(
|
||||
String testName,
|
||||
IndexBuilder indexBuilder,
|
||||
Function<IndexBuilder, Pair<StorageAdapter, Closeable>> finisher,
|
||||
boolean optimize
|
||||
)
|
||||
{
|
||||
super(testName, ROWS, indexBuilder, finisher, optimize);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception
|
||||
{
|
||||
BaseFilterTest.tearDown(TimeFilteringTest.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeFilterAsLong()
|
||||
{
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(Column.TIME_COLUMN_NAME, "0", null),
|
||||
ImmutableList.<String>of("0")
|
||||
);
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(Column.TIME_COLUMN_NAME, "9000", null),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(Column.TIME_COLUMN_NAME, "0", "4", false, false, true, null),
|
||||
ImmutableList.<String>of("0", "1", "2", "3", "4")
|
||||
);
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(Column.TIME_COLUMN_NAME, "0", "4", true, true, true, null),
|
||||
ImmutableList.<String>of("1", "2", "3")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new InDimFilter(Column.TIME_COLUMN_NAME, Arrays.asList("2", "4", "8"), null),
|
||||
ImmutableList.<String>of("2", "4")
|
||||
);
|
||||
|
||||
// cross the hashing threshold to test hashset implementation, filter on even values
|
||||
List<String> infilterValues = new ArrayList<>(InDimFilter.LONG_HASHING_THRESHOLD * 2);
|
||||
for (int i = 0; i < InDimFilter.LONG_HASHING_THRESHOLD * 2; i++) {
|
||||
infilterValues.add(String.valueOf(i*2));
|
||||
}
|
||||
assertFilterMatches(
|
||||
new InDimFilter(Column.TIME_COLUMN_NAME, infilterValues, null),
|
||||
ImmutableList.<String>of("0", "2", "4")
|
||||
);
|
||||
|
||||
String jsFn = "function(x) { return(x === 3 || x === 5) }";
|
||||
assertFilterMatches(
|
||||
new JavaScriptDimFilter(Column.TIME_COLUMN_NAME, jsFn, null, JavaScriptConfig.getDefault()),
|
||||
ImmutableList.<String>of("3", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new RegexDimFilter(Column.TIME_COLUMN_NAME, "4", null),
|
||||
ImmutableList.<String>of("4")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new SearchQueryDimFilter(Column.TIME_COLUMN_NAME, new ContainsSearchQuerySpec("2", true), null),
|
||||
ImmutableList.<String>of("2")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeFilterWithExtractionFn()
|
||||
{
|
||||
final Map<String, String> stringMap = new HashMap<>();
|
||||
stringMap.put("0", "Monday");
|
||||
stringMap.put("1", "Tuesday");
|
||||
stringMap.put("2", "Wednesday");
|
||||
stringMap.put("3", "Thursday");
|
||||
stringMap.put("4", "Friday");
|
||||
stringMap.put("5", "Saturday");
|
||||
LookupExtractor mapExtractor = new MapLookupExtractor(stringMap, false);
|
||||
LookupExtractionFn exfn = new LookupExtractionFn(mapExtractor, false, "UNKNOWN", false, true);
|
||||
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(Column.TIME_COLUMN_NAME, "Monday", exfn),
|
||||
ImmutableList.<String>of("0")
|
||||
);
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(Column.TIME_COLUMN_NAME, "Notaday", exfn),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(Column.TIME_COLUMN_NAME, "Fridax", "Fridaz", false, false, true, exfn),
|
||||
ImmutableList.<String>of("4")
|
||||
);
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(Column.TIME_COLUMN_NAME, "Friday", "Friday", true, true, true, exfn),
|
||||
ImmutableList.<String>of()
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new InDimFilter(Column.TIME_COLUMN_NAME, Arrays.asList("Caturday", "Saturday", "Tuesday"), exfn),
|
||||
ImmutableList.<String>of("1", "5")
|
||||
);
|
||||
|
||||
// test InFilter HashSet implementation
|
||||
List<String> bigList = Arrays.asList(
|
||||
"Saturday", "Tuesday",
|
||||
"Caturday", "Xanaday", "Vojuday", "Gribaday", "Kipoday", "Dheferday", "Fakeday", "Qeearaday",
|
||||
"Hello", "World", "1", "2", "3", "4", "5", "6", "7"
|
||||
);
|
||||
assertFilterMatches(
|
||||
new InDimFilter(Column.TIME_COLUMN_NAME, bigList, exfn),
|
||||
ImmutableList.<String>of("1", "5")
|
||||
);
|
||||
|
||||
String jsFn = "function(x) { return(x === 'Wednesday' || x === 'Thursday') }";
|
||||
assertFilterMatches(
|
||||
new JavaScriptDimFilter(Column.TIME_COLUMN_NAME, jsFn, exfn, JavaScriptConfig.getDefault()),
|
||||
ImmutableList.<String>of("2", "3")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new RegexDimFilter(Column.TIME_COLUMN_NAME, ".*day", exfn),
|
||||
ImmutableList.<String>of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
new SearchQueryDimFilter(Column.TIME_COLUMN_NAME, new ContainsSearchQuerySpec("s", true), exfn),
|
||||
ImmutableList.<String>of("1", "2", "3")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeFilterWithTimeFormatExtractionFn()
|
||||
{
|
||||
ExtractionFn exfn = new TimeFormatExtractionFn("EEEE", DateTimeZone.forID("America/New_York"), "en");
|
||||
assertFilterMatches(
|
||||
new SelectorDimFilter(Column.TIME_COLUMN_NAME, "Wednesday", exfn),
|
||||
ImmutableList.<String>of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
}
|
||||
|
||||
private void assertFilterMatches(
|
||||
final DimFilter filter,
|
||||
final List<String> expectedRows
|
||||
)
|
||||
{
|
||||
Assert.assertEquals(filter.toString(), expectedRows, selectColumnValuesMatchingFilter(filter, "dim0"));
|
||||
Assert.assertEquals(filter.toString(), expectedRows.size(), selectCountUsingFilteredAggregator(filter));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue