From fa5c3bb014e557ee6dc6f2278f2ed64cf1584700 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sat, 19 Dec 2015 02:58:54 -0600 Subject: [PATCH] adding decorate(DimensionSelector) to DimensionSpec to enable support for arbitrary filtering/transformations to returned dimension values --- .../CardinalityAggregatorFactory.java | 3 +- .../query/dimension/DefaultDimensionSpec.java | 7 ++ .../druid/query/dimension/DimensionSpec.java | 15 ++-- .../dimension/ExtractionDimensionSpec.java | 7 ++ .../query/groupby/GroupByQueryEngine.java | 5 +- .../druid/query/search/SearchQueryRunner.java | 2 +- .../druid/query/select/SelectQueryEngine.java | 3 +- .../java/io/druid/query/topn/TopNMapFn.java | 3 +- .../druid/segment/ColumnSelectorFactory.java | 6 +- .../segment/QueryableIndexStorageAdapter.java | 15 +++- .../segment/filter/ExtractionFilter.java | 5 +- .../druid/segment/filter/SelectorFilter.java | 5 +- .../segment/incremental/IncrementalIndex.java | 15 +++- .../IncrementalIndexStorageAdapter.java | 14 ++- .../aggregation/FilteredAggregatorTest.java | 88 ++++++++++--------- .../IncrementalIndexStorageAdapterTest.java | 5 +- .../firehose/IngestSegmentFirehose.java | 5 +- 17 files changed, 133 insertions(+), 70 deletions(-) diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index a87dbb51240..6b177116e03 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -33,6 +33,7 @@ import io.druid.query.aggregation.Aggregators; import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; import org.apache.commons.codec.binary.Base64; @@ -107,7 +108,7 @@ public class CardinalityAggregatorFactory implements AggregatorFactory @Override public DimensionSelector apply(@Nullable String input) { - return columnFactory.makeDimensionSelector(input, null); + return columnFactory.makeDimensionSelector(new DefaultDimensionSpec(input, input)); } } ), Predicates.notNull() diff --git a/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java index 4ffb0a9fa67..86ecfd38f96 100644 --- a/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/DefaultDimensionSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.metamx.common.StringUtils; import io.druid.query.extraction.ExtractionFn; +import io.druid.segment.DimensionSelector; import java.nio.ByteBuffer; @@ -66,6 +67,12 @@ public class DefaultDimensionSpec implements DimensionSpec return null; } + @Override + public DimensionSelector decorate(DimensionSelector selector) + { + return selector; + } + @Override public byte[] getCacheKey() { diff --git a/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java index 58b0f24fb74..0b36f791a03 100644 --- a/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java @@ -22,6 +22,7 @@ package io.druid.query.dimension; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.query.extraction.ExtractionFn; +import io.druid.segment.DimensionSelector; /** */ @@ -32,13 +33,17 @@ import io.druid.query.extraction.ExtractionFn; }) public interface DimensionSpec { - public String getDimension(); + String getDimension(); - public String getOutputName(); + String getOutputName(); - public ExtractionFn getExtractionFn(); + //ExtractionFn can be implemented with decorate(..) fn + @Deprecated + ExtractionFn getExtractionFn(); - public byte[] getCacheKey(); + DimensionSelector decorate(DimensionSelector selector); - public boolean preservesOrdering(); + byte[] getCacheKey(); + + boolean preservesOrdering(); } diff --git a/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java index b5fb9332935..ca30b89b0d4 100644 --- a/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/ExtractionDimensionSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.metamx.common.StringUtils; import io.druid.query.extraction.ExtractionFn; +import io.druid.segment.DimensionSelector; import java.nio.ByteBuffer; @@ -77,6 +78,12 @@ public class ExtractionDimensionSpec implements DimensionSpec return extractionFn; } + @Override + public DimensionSelector decorate(DimensionSelector selector) + { + return selector; + } + @Override public byte[] getCacheKey() { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index a750a273fe3..bdb8dae2d84 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -316,10 +316,7 @@ public class GroupByQueryEngine for (int i = 0; i < dimensionSpecs.size(); ++i) { final DimensionSpec dimSpec = dimensionSpecs.get(i); - final DimensionSelector selector = cursor.makeDimensionSelector( - dimSpec.getDimension(), - dimSpec.getExtractionFn() - ); + final DimensionSelector selector = cursor.makeDimensionSelector(dimSpec); if (selector != null) { dimensions.add(selector); dimNames.add(dimSpec.getOutputName()); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java index 102f70c34a3..2be205b83e2 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java @@ -175,7 +175,7 @@ public class SearchQueryRunner implements QueryRunner> for (DimensionSpec dim : dimsToSearch) { dimSelectors.put( dim.getOutputName(), - cursor.makeDimensionSelector(dim.getDimension(), dim.getExtractionFn()) + cursor.makeDimensionSelector(dim) ); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index d905ef232c6..87d9735ad4e 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -26,6 +26,7 @@ import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; import io.druid.query.QueryRunnerHelper; import io.druid.query.Result; +import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; import io.druid.segment.LongColumnSelector; @@ -89,7 +90,7 @@ public class SelectQueryEngine final Map dimSelectors = Maps.newHashMap(); for (String dim : dims) { // switching to using DimensionSpec for select would allow the use of extractionFn here. - final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); + final DimensionSelector dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec(dim, dim)); dimSelectors.put(dim, dimSelector); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java index cbc586e5109..d31e84fa791 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java @@ -43,8 +43,7 @@ public class TopNMapFn implements Function> public Result apply(Cursor cursor) { final DimensionSelector dimSelector = cursor.makeDimensionSelector( - query.getDimensionSpec().getDimension(), - query.getDimensionSpec().getExtractionFn() + query.getDimensionSpec() ); if (dimSelector == null) { return null; diff --git a/processing/src/main/java/io/druid/segment/ColumnSelectorFactory.java b/processing/src/main/java/io/druid/segment/ColumnSelectorFactory.java index cae2784b2c9..2aff7f831c1 100644 --- a/processing/src/main/java/io/druid/segment/ColumnSelectorFactory.java +++ b/processing/src/main/java/io/druid/segment/ColumnSelectorFactory.java @@ -19,16 +19,14 @@ package io.druid.segment; -import io.druid.query.extraction.ExtractionFn; - -import javax.annotation.Nullable; +import io.druid.query.dimension.DimensionSpec; /** * Factory class for MetricSelectors */ public interface ColumnSelectorFactory { - public DimensionSelector makeDimensionSelector(String dimensionName, @Nullable ExtractionFn extractionFn); + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec); public FloatColumnSelector makeFloatColumnSelector(String columnName); public LongColumnSelector makeLongColumnSelector(String columnName); public ObjectColumnSelector makeObjectColumnSelector(String columnName); diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index f067ceb578c..e67cf05df40 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; import io.druid.query.QueryInterruptedException; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.Filter; import io.druid.segment.column.Column; @@ -44,7 +45,6 @@ import io.druid.segment.data.Offset; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Iterator; @@ -296,10 +296,19 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public DimensionSelector makeDimensionSelector( - final String dimension, - @Nullable final ExtractionFn extractionFn + DimensionSpec dimensionSpec ) { + return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); + } + + private DimensionSelector makeDimensionSelectorUndecorated( + DimensionSpec dimensionSpec + ) + { + final String dimension = dimensionSpec.getDimension(); + final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); + final Column columnDesc = index.getColumn(dimension); if (columnDesc == null) { return NULL_DIMENSION_SELECTOR; diff --git a/processing/src/main/java/io/druid/segment/filter/ExtractionFilter.java b/processing/src/main/java/io/druid/segment/filter/ExtractionFilter.java index cfea3f3737c..7be2e97da44 100644 --- a/processing/src/main/java/io/druid/segment/filter/ExtractionFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/ExtractionFilter.java @@ -23,6 +23,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.metamx.collections.bitmap.ImmutableBitmap; +import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; @@ -124,7 +125,9 @@ public class ExtractionFilter implements Filter @Override public ValueMatcher makeMatcher(ColumnSelectorFactory columnSelectorFactory) { - final DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimension, null); + final DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector( + new DefaultDimensionSpec(dimension, dimension) + ); if (dimensionSelector == null) { return new BooleanValueMatcher(value.equals(Strings.nullToEmpty(fn.apply(null)))); } else { diff --git a/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java b/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java index aacf9a95de4..861eb35e245 100644 --- a/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java @@ -21,6 +21,7 @@ package io.druid.segment.filter; import com.google.common.base.Strings; import com.metamx.collections.bitmap.ImmutableBitmap; +import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; @@ -60,7 +61,9 @@ public class SelectorFilter implements Filter @Override public ValueMatcher makeMatcher(ColumnSelectorFactory columnSelectorFactory) { - final DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimension, null); + final DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector( + new DefaultDimensionSpec(dimension, dimension) + ); // Missing columns match a null or empty string value and don't match anything else if (dimensionSelector == null) { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 64323924299..cf7ff364c8c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -38,6 +38,7 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; @@ -169,8 +170,20 @@ public abstract class IncrementalIndex implements Iterable, } @Override - public DimensionSelector makeDimensionSelector(final String dimension, final ExtractionFn extractionFn) + public DimensionSelector makeDimensionSelector( + DimensionSpec dimensionSpec + ) { + return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); + } + + private DimensionSelector makeDimensionSelectorUndecorated( + DimensionSpec dimensionSpec + ) + { + final String dimension = dimensionSpec.getDimension(); + final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); + return new DimensionSelector() { @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index b5163ce522c..9f3b058e2b5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; import io.druid.query.QueryInterruptedException; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; @@ -294,10 +295,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public DimensionSelector makeDimensionSelector( - final String dimension, - @Nullable final ExtractionFn extractionFn + DimensionSpec dimensionSpec ) { + return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); + } + + private DimensionSelector makeDimensionSelectorUndecorated( + DimensionSpec dimensionSpec + ) + { + final String dimension = dimensionSpec.getDimension(); + final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); + if (dimension.equals(Column.TIME_COLUMN_NAME)) { return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn); } diff --git a/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java index 8f1cbb8e240..7f41e2be25c 100644 --- a/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java @@ -20,6 +20,7 @@ package io.druid.query.aggregation; import com.google.common.collect.Lists; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.AndDimFilter; import io.druid.query.filter.DimFilter; @@ -73,53 +74,58 @@ public class FilteredAggregatorTest return new ColumnSelectorFactory() { @Override - public DimensionSelector makeDimensionSelector(String dimensionName, ExtractionFn extractionFn) + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) { + final String dimensionName = dimensionSpec.getDimension(); + final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); + if (dimensionName.equals("dim")) { - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - if (selector.getIndex() % 3 == 2) { - return new ArrayBasedIndexedInts(new int[]{1}); - } else { - return new ArrayBasedIndexedInts(new int[]{0}); - } - } + return dimensionSpec.decorate( + new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + if (selector.getIndex() % 3 == 2) { + return new ArrayBasedIndexedInts(new int[]{1}); + } else { + return new ArrayBasedIndexedInts(new int[]{0}); + } + } - @Override - public int getValueCardinality() - { - return 2; - } + @Override + public int getValueCardinality() + { + return 2; + } - @Override - public String lookupName(int id) - { - switch (id) { - case 0: - return "a"; - case 1: - return "b"; - default: - throw new IllegalArgumentException(); - } - } + @Override + public String lookupName(int id) + { + switch (id) { + case 0: + return "a"; + case 1: + return "b"; + default: + throw new IllegalArgumentException(); + } + } - @Override - public int lookupId(String name) - { - switch (name) { - case "a": - return 0; - case "b": - return 1; - default: - throw new IllegalArgumentException(); + @Override + public int lookupId(String name) + { + switch (name) { + case "a": + return 0; + case "b": + return 1; + default: + throw new IllegalArgumentException(); + } + } } - } - }; + ); } else { throw new UnsupportedOperationException(); } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 6cea28faa4f..6816f40ed99 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -36,6 +36,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.filter.DimFilters; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; @@ -260,7 +261,7 @@ public class IncrementalIndexStorageAdapterTest Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.newArrayList()).get(0); DimensionSelector dimSelector; - dimSelector = cursor.makeDimensionSelector("sally", null); + dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); index.add( @@ -274,7 +275,7 @@ public class IncrementalIndexStorageAdapterTest // Cursor reset should not be affected by out of order values cursor.reset(); - dimSelector = cursor.makeDimensionSelector("sally", null); + dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java index ad1b6f705bf..3b885b0b8cb 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -31,6 +31,7 @@ import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; +import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.filter.DimFilter; import io.druid.query.select.EventHolder; import io.druid.segment.Cursor; @@ -85,7 +86,9 @@ public class IngestSegmentFirehose implements Firehose final Map dimSelectors = Maps.newHashMap(); for (String dim : dims) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); + final DimensionSelector dimSelector = cursor.makeDimensionSelector( + new DefaultDimensionSpec(dim, dim) + ); // dimSelector is null if the dimension is not present if (dimSelector != null) { dimSelectors.put(dim, dimSelector);