diff --git a/benchmarks/src/main/java/io/druid/benchmark/ExpressionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/ExpressionBenchmark.java index ecc32d3fec6..7954ac782b1 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/ExpressionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/ExpressionBenchmark.java @@ -176,7 +176,7 @@ public class ExpressionBenchmark Sequences.map( cursors, cursor -> { - final BufferAggregator bufferAggregator = aggregatorFactory.apply(cursor); + final BufferAggregator bufferAggregator = aggregatorFactory.apply(cursor.getColumnSelectorFactory()); bufferAggregator.init(aggregationBuffer, 0); while (!cursor.isDone()) { diff --git a/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java index cf547d4c833..0ec205463d3 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java @@ -514,7 +514,9 @@ public class FilterPartitionBenchmark { List strings = new ArrayList(); List selectors = new ArrayList<>(); - selectors.add(input.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null))); + selectors.add( + input.getColumnSelectorFactory().makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null)) + ); //selectors.add(input.makeDimensionSelector(new DefaultDimensionSpec("dimB", null))); while (!input.isDone()) { for (DimensionSelector selector : selectors) { @@ -540,7 +542,7 @@ public class FilterPartitionBenchmark public List apply(Cursor input) { List longvals = new ArrayList(); - LongColumnSelector selector = input.makeLongColumnSelector("sumLongSequential"); + LongColumnSelector selector = input.getColumnSelectorFactory().makeLongColumnSelector("sumLongSequential"); while (!input.isDone()) { long rowval = selector.getLong(); blackhole.consume(rowval); diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java index 57adb66c88a..b61a46474de 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java @@ -144,10 +144,10 @@ public class IncrementalIndexReadBenchmark Cursor cursor = Sequences.toList(Sequences.limit(cursors, 1), Lists.newArrayList()).get(0); List selectors = new ArrayList<>(); - selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null))); - selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimZipf", null))); - selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimUniform", null))); - selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequentialHalfNull", null))); + selectors.add(makeDimensionSelector(cursor, "dimSequential")); + selectors.add(makeDimensionSelector(cursor, "dimZipf")); + selectors.add(makeDimensionSelector(cursor, "dimUniform")); + selectors.add(makeDimensionSelector(cursor, "dimSequentialHalfNull")); cursor.reset(); while (!cursor.isDone()) { @@ -179,10 +179,10 @@ public class IncrementalIndexReadBenchmark Cursor cursor = Sequences.toList(Sequences.limit(cursors, 1), Lists.newArrayList()).get(0); List selectors = new ArrayList<>(); - selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null))); - selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimZipf", null))); - selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimUniform", null))); - selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequentialHalfNull", null))); + selectors.add(makeDimensionSelector(cursor, "dimSequential")); + selectors.add(makeDimensionSelector(cursor, "dimZipf")); + selectors.add(makeDimensionSelector(cursor, "dimUniform")); + selectors.add(makeDimensionSelector(cursor, "dimSequentialHalfNull")); cursor.reset(); while (!cursor.isDone()) { @@ -205,4 +205,9 @@ public class IncrementalIndexReadBenchmark null ); } + + private static DimensionSelector makeDimensionSelector(Cursor cursor, String name) + { + return cursor.getColumnSelectorFactory().makeDimensionSelector(new DefaultDimensionSpec(name, null)); + } } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index bb23fdf5b72..ebaf70906d3 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -129,19 +129,21 @@ public class ScanQueryEngine @Override public Iterator make() { - final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + final LongColumnSelector timestampColumnSelector = + cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME); final List> selectorPlusList = Arrays.asList( DimensionHandlerUtils.createColumnSelectorPluses( STRATEGY_FACTORY, Lists.newArrayList(dims), - cursor + cursor.getColumnSelectorFactory() ) ); final Map metSelectors = Maps.newHashMap(); for (String metric : metrics) { - final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + final ObjectColumnSelector metricSelector = + cursor.getColumnSelectorFactory().makeObjectColumnSelector(metric); metSelectors.put(metric, metricSelector); } final int batchSize = query.getBatchSize(); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index d7af8929c39..298bee8b96b 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -332,7 +332,7 @@ public class GroupByQueryEngine ); } - final DimensionSelector selector = cursor.makeDimensionSelector(dimSpec); + final DimensionSelector selector = cursor.getColumnSelectorFactory().makeDimensionSelector(dimSpec); if (selector != null) { if (selector.getValueCardinality() == DimensionSelector.CARDINALITY_UNKNOWN) { throw new UnsupportedOperationException( @@ -349,7 +349,7 @@ public class GroupByQueryEngine sizesRequired = new int[aggregatorSpecs.size()]; for (int i = 0; i < aggregatorSpecs.size(); ++i) { AggregatorFactory aggregatorSpec = aggregatorSpecs.get(i); - aggregators[i] = aggregatorSpec.factorizeBuffered(cursor); + aggregators[i] = aggregatorSpec.factorizeBuffered(cursor.getColumnSelectorFactory()); metricNames[i] = aggregatorSpec.getName(); sizesRequired[i] = aggregatorSpec.getMaxIntermediateSize(); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 8465340ccc8..084b8a82bc1 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -142,7 +142,7 @@ public class GroupByQueryEngineV2 .createColumnSelectorPluses( STRATEGY_FACTORY, query.getDimensions(), - cursor + cursor.getColumnSelectorFactory() ); GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(selectorPlus); @@ -434,7 +434,7 @@ public class GroupByQueryEngineV2 return new BufferHashGrouper<>( Suppliers.ofInstance(buffer), keySerde, - cursor, + cursor.getColumnSelectorFactory(), query.getAggregatorSpecs() .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]), querySpecificConfig.getBufferGrouperMaxSize(), @@ -587,7 +587,7 @@ public class GroupByQueryEngineV2 { return new BufferArrayGrouper( Suppliers.ofInstance(buffer), - cursor, + cursor.getColumnSelectorFactory(), query.getAggregatorSpecs() .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]), cardinality diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java index 60719648a95..36c36d256b4 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java @@ -270,12 +270,9 @@ public class SegmentAnalyzer @Override public Long accumulate(Long accumulated, Cursor cursor) { - DimensionSelector selector = cursor.makeDimensionSelector( - new DefaultDimensionSpec( - columnName, - columnName - ) - ); + DimensionSelector selector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec(columnName, columnName)); if (selector == null) { return accumulated; } diff --git a/processing/src/main/java/io/druid/query/search/search/CursorOnlyStrategy.java b/processing/src/main/java/io/druid/query/search/search/CursorOnlyStrategy.java index 88c253cd76f..668b963d197 100644 --- a/processing/src/main/java/io/druid/query/search/search/CursorOnlyStrategy.java +++ b/processing/src/main/java/io/druid/query/search/search/CursorOnlyStrategy.java @@ -117,7 +117,7 @@ public class CursorOnlyStrategy extends SearchStrategy DimensionHandlerUtils.createColumnSelectorPluses( SearchQueryRunner.SEARCH_COLUMN_SELECTOR_STRATEGY_FACTORY, dimsToSearch, - cursor + cursor.getColumnSelectorFactory() ) ); diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index 36ae0de6ddf..78a0e1829e1 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -241,13 +241,14 @@ public class SelectQueryEngine query.isDescending() ); - final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + final LongColumnSelector timestampColumnSelector = + cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME); final List> selectorPlusList = Arrays.asList( DimensionHandlerUtils.createColumnSelectorPluses( STRATEGY_FACTORY, Lists.newArrayList(dims), - cursor + cursor.getColumnSelectorFactory() ) ); @@ -257,7 +258,8 @@ public class SelectQueryEngine final Map metSelectors = Maps.newHashMap(); for (String metric : metrics) { - final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + final ObjectColumnSelector metricSelector = + cursor.getColumnSelectorFactory().makeObjectColumnSelector(metric); metSelectors.put(metric, metricSelector); builder.addMetric(metric); } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index fbeba284998..01ad3166b9c 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -103,7 +103,8 @@ public class TimeBoundaryQueryRunnerFactory if (cursor.isDone()) { return null; } - final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + final LongColumnSelector timestampColumnSelector = + cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME); final DateTime timestamp = DateTimes.utc(timestampColumnSelector.getLong()); return new Result<>(adapter.getInterval().getStart(), timestamp); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java index bbe57f5df3d..4fa93de263b 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java @@ -66,7 +66,7 @@ public class TimeseriesQueryEngine String[] aggregatorNames = new String[aggregatorSpecs.size()]; for (int i = 0; i < aggregatorSpecs.size(); i++) { - aggregators[i] = aggregatorSpecs.get(i).factorize(cursor); + aggregators[i] = aggregatorSpecs.get(i).factorize(cursor.getColumnSelectorFactory()); aggregatorNames[i] = aggregatorSpecs.get(i).getName(); } diff --git a/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java index 8430c3878a8..2b1e4b61edf 100644 --- a/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java @@ -45,7 +45,7 @@ public abstract class BaseTopNAlgorithm The strategy type created by the provided strategy factory. * @param strategyFactory A factory provided by query engines that generates type-handling strategies * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for - * @param cursor Used to create value selectors for columns. + * @param columnSelectorFactory Used to create value selectors for columns. * @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs */ - public static ColumnSelectorPlus[] createColumnSelectorPluses( + public static + ColumnSelectorPlus[] createColumnSelectorPluses( ColumnSelectorStrategyFactory strategyFactory, List dimensionSpecs, - ColumnSelectorFactory cursor + ColumnSelectorFactory columnSelectorFactory ) { int dimCount = dimensionSpecs.size(); @@ -150,12 +151,12 @@ public final class DimensionHandlerUtils final String dimName = dimSpec.getDimension(); final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec( dimSpec, - cursor + columnSelectorFactory ); ColumnSelectorStrategyClass strategy = makeStrategy( strategyFactory, dimSpec, - cursor.getColumnCapabilities(dimSpec.getDimension()), + columnSelectorFactory.getColumnCapabilities(dimSpec.getDimension()), selector ); final ColumnSelectorPlus selectorPlus = new ColumnSelectorPlus<>( diff --git a/processing/src/main/java/io/druid/segment/DimensionIndexer.java b/processing/src/main/java/io/druid/segment/DimensionIndexer.java index dd885f05c5d..2a21ece1480 100644 --- a/processing/src/main/java/io/druid/segment/DimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DimensionIndexer.java @@ -25,7 +25,7 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.segment.column.ValueType; import io.druid.segment.data.Indexed; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import io.druid.segment.incremental.TimeAndDimsHolder; import javax.annotation.Nullable; @@ -217,7 +217,7 @@ public interface DimensionIndexer */ DimensionSelector makeDimensionSelector( DimensionSpec spec, - IncrementalIndexStorageAdapter.EntryHolder currEntry, + TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc ); @@ -229,10 +229,7 @@ public interface DimensionIndexer * @param desc Descriptor object for this dimension within an IncrementalIndex * @return A new object that reads rows from currEntry */ - LongColumnSelector makeLongColumnSelector( - IncrementalIndexStorageAdapter.EntryHolder currEntry, - IncrementalIndex.DimensionDesc desc - ); + LongColumnSelector makeLongColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc); /** @@ -242,10 +239,7 @@ public interface DimensionIndexer * @param desc Descriptor object for this dimension within an IncrementalIndex * @return A new object that reads rows from currEntry */ - FloatColumnSelector makeFloatColumnSelector( - IncrementalIndexStorageAdapter.EntryHolder currEntry, - IncrementalIndex.DimensionDesc desc - ); + FloatColumnSelector makeFloatColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc); /** @@ -255,10 +249,7 @@ public interface DimensionIndexer * @param desc Descriptor object for this dimension within an IncrementalIndex * @return A new object that reads rows from currEntry */ - DoubleColumnSelector makeDoubleColumnSelector( - IncrementalIndexStorageAdapter.EntryHolder currEntry, - IncrementalIndex.DimensionDesc desc - ); + DoubleColumnSelector makeDoubleColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc); /** * Compares the row values for this DimensionIndexer's dimension from a TimeAndDims key. diff --git a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java index 8334dfd1f46..b3b71e30005 100644 --- a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java @@ -26,7 +26,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.column.ValueType; import io.druid.segment.data.Indexed; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import io.druid.segment.incremental.TimeAndDimsHolder; import javax.annotation.Nullable; import java.util.List; @@ -86,19 +86,16 @@ public class DoubleDimensionIndexer implements DimensionIndexer currentOffset && iter.hasNext()) { iterOffset = iter.next(); } @@ -179,8 +172,7 @@ public final class FilteredOffset extends Offset @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - inspector.visit("holder", holder); - inspector.visit("offset", holder.getReadableOffset()); + inspector.visit("offset", offset); inspector.visit("iter", iter); } }; @@ -192,7 +184,7 @@ public final class FilteredOffset extends Offset @Override public boolean matches() { - int currentOffset = holder.getReadableOffset().getOffset(); + int currentOffset = offset.getOffset(); while (iterOffset < currentOffset && iter.hasNext()) { iterOffset = iter.next(); } @@ -203,8 +195,7 @@ public final class FilteredOffset extends Offset @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - inspector.visit("holder", holder); - inspector.visit("offset", holder.getReadableOffset()); + inspector.visit("offset", offset); inspector.visit("iter", iter); } }; diff --git a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java index f8844d7e0ed..ce925e04ddf 100644 --- a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java @@ -26,7 +26,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.column.ValueType; import io.druid.segment.data.Indexed; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import io.druid.segment.incremental.TimeAndDimsHolder; import javax.annotation.Nullable; import java.util.List; @@ -87,7 +87,7 @@ public class FloatDimensionIndexer implements DimensionIndexer @Override public DimensionSelector makeDimensionSelector( - DimensionSpec spec, IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc + DimensionSpec spec, TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc ) { return new LongWrappingDimensionSelector( @@ -98,7 +98,7 @@ public class LongDimensionIndexer implements DimensionIndexer @Override public LongColumnSelector makeLongColumnSelector( - final IncrementalIndexStorageAdapter.EntryHolder currEntry, + final TimeAndDimsHolder currEntry, final IncrementalIndex.DimensionDesc desc ) { @@ -129,7 +129,7 @@ public class LongDimensionIndexer implements DimensionIndexer @Override public FloatColumnSelector makeFloatColumnSelector( - final IncrementalIndexStorageAdapter.EntryHolder currEntry, + final TimeAndDimsHolder currEntry, final IncrementalIndex.DimensionDesc desc ) { @@ -161,7 +161,8 @@ public class LongDimensionIndexer implements DimensionIndexer @Override public DoubleColumnSelector makeDoubleColumnSelector( - IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc + final TimeAndDimsHolder currEntry, + final IncrementalIndex.DimensionDesc desc ) { final int dimIndex = desc.getIndex(); diff --git a/processing/src/main/java/io/druid/segment/NoFilterOffset.java b/processing/src/main/java/io/druid/segment/NoFilterOffset.java index 6f3c5432b60..8fdf37d1cdc 100644 --- a/processing/src/main/java/io/druid/segment/NoFilterOffset.java +++ b/processing/src/main/java/io/druid/segment/NoFilterOffset.java @@ -21,6 +21,7 @@ package io.druid.segment; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.data.Offset; +import io.druid.segment.data.ReadableOffset; public class NoFilterOffset extends Offset { @@ -55,6 +56,12 @@ public class NoFilterOffset extends Offset currentOffset = initialOffset; } + @Override + public ReadableOffset getBaseReadableOffset() + { + return this; + } + @Override public Offset clone() { diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexColumnSelectorFactory.java b/processing/src/main/java/io/druid/segment/QueryableIndexColumnSelectorFactory.java new file mode 100644 index 00000000000..41f8115284f --- /dev/null +++ b/processing/src/main/java/io/druid/segment/QueryableIndexColumnSelectorFactory.java @@ -0,0 +1,385 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import com.google.common.collect.Maps; +import io.druid.java.util.common.io.Closer; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.extraction.ExtractionFn; +import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ComplexColumn; +import io.druid.segment.column.DictionaryEncodedColumn; +import io.druid.segment.column.GenericColumn; +import io.druid.segment.column.ValueType; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.data.ReadableOffset; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.Map; + +/** + * The basic implementation of {@link ColumnSelectorFactory} over a historical segment (i. e. {@link QueryableIndex}). + * It's counterpart for incremental index is {@link io.druid.segment.incremental.IncrementalIndexColumnSelectorFactory}. + */ +class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory +{ + private final QueryableIndex index; + private final VirtualColumns virtualColumns; + private final boolean descending; + private final Closer closer; + protected final ReadableOffset offset; + + private final Map dictionaryColumnCache = Maps.newHashMap(); + private final Map genericColumnCache = Maps.newHashMap(); + private final Map objectColumnCache = Maps.newHashMap(); + + QueryableIndexColumnSelectorFactory( + QueryableIndex index, + VirtualColumns virtualColumns, + boolean descending, + Closer closer, + ReadableOffset offset + ) + { + this.index = index; + this.virtualColumns = virtualColumns; + this.descending = descending; + this.closer = closer; + this.offset = offset; + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (virtualColumns.exists(dimensionSpec.getDimension())) { + return virtualColumns.makeDimensionSelector(dimensionSpec, this); + } + + return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); + } + + private DimensionSelector makeDimensionSelectorUndecorated(DimensionSpec dimensionSpec) + { + final String dimension = dimensionSpec.getDimension(); + final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); + + final Column columnDesc = index.getColumn(dimension); + if (columnDesc == null) { + return DimensionSelectorUtils.constantSelector(null, extractionFn); + } + + if (dimension.equals(Column.TIME_COLUMN_NAME)) { + return new SingleScanTimeDimSelector( + makeLongColumnSelector(dimension), + extractionFn, + descending + ); + } + + if (columnDesc.getCapabilities().getType() == ValueType.LONG) { + return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn); + } + + if (columnDesc.getCapabilities().getType() == ValueType.FLOAT) { + return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn); + } + + if (columnDesc.getCapabilities().getType() == ValueType.DOUBLE) { + return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(dimension), extractionFn); + } + + DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimension); + if (cachedColumn == null) { + cachedColumn = columnDesc.getDictionaryEncoding(); + closer.register(cachedColumn); + dictionaryColumnCache.put(dimension, cachedColumn); + } + + final DictionaryEncodedColumn column = cachedColumn; + if (column == null) { + return DimensionSelectorUtils.constantSelector(null, extractionFn); + } else { + return column.makeDimensionSelector(offset, extractionFn); + } + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + if (virtualColumns.exists(columnName)) { + return virtualColumns.makeFloatColumnSelector(columnName, this); + } + + GenericColumn cachedMetricVals = genericColumnCache.get(columnName); + + if (cachedMetricVals == null) { + Column holder = index.getColumn(columnName); + if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) { + cachedMetricVals = holder.getGenericColumn(); + closer.register(cachedMetricVals); + genericColumnCache.put(columnName, cachedMetricVals); + } + } + + if (cachedMetricVals == null) { + return ZeroFloatColumnSelector.instance(); + } + + return cachedMetricVals.makeFloatSingleValueRowSelector(offset); + } + + @Override + public DoubleColumnSelector makeDoubleColumnSelector(String columnName) + { + if (virtualColumns.exists(columnName)) { + return virtualColumns.makeDoubleColumnSelector(columnName, this); + } + + GenericColumn cachedMetricVals = genericColumnCache.get(columnName); + + if (cachedMetricVals == null) { + Column holder = index.getColumn(columnName); + if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) { + cachedMetricVals = holder.getGenericColumn(); + closer.register(cachedMetricVals); + genericColumnCache.put(columnName, cachedMetricVals); + } + } + + if (cachedMetricVals == null) { + return ZeroDoubleColumnSelector.instance(); + } + + return cachedMetricVals.makeDoubleSingleValueRowSelector(offset); + } + + @Override + public LongColumnSelector makeLongColumnSelector(String columnName) + { + if (virtualColumns.exists(columnName)) { + return virtualColumns.makeLongColumnSelector(columnName, this); + } + + GenericColumn cachedMetricVals = genericColumnCache.get(columnName); + + if (cachedMetricVals == null) { + Column holder = index.getColumn(columnName); + if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) { + cachedMetricVals = holder.getGenericColumn(); + closer.register(cachedMetricVals); + genericColumnCache.put(columnName, cachedMetricVals); + } + } + + if (cachedMetricVals == null) { + return ZeroLongColumnSelector.instance(); + } + + return cachedMetricVals.makeLongSingleValueRowSelector(offset); + } + + @Nullable + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + if (virtualColumns.exists(column)) { + return virtualColumns.makeObjectColumnSelector(column, this); + } + + Object cachedColumnVals = objectColumnCache.get(column); + + if (cachedColumnVals == null) { + Column holder = index.getColumn(column); + + if (holder != null) { + final ColumnCapabilities capabilities = holder.getCapabilities(); + + if (capabilities.isDictionaryEncoded()) { + cachedColumnVals = holder.getDictionaryEncoding(); + } else if (capabilities.getType() == ValueType.COMPLEX) { + cachedColumnVals = holder.getComplexColumn(); + } else { + cachedColumnVals = holder.getGenericColumn(); + } + } + + if (cachedColumnVals != null) { + closer.register((Closeable) cachedColumnVals); + objectColumnCache.put(column, cachedColumnVals); + } + } + + if (cachedColumnVals == null) { + return null; + } + + if (cachedColumnVals instanceof GenericColumn) { + final GenericColumn columnVals = (GenericColumn) cachedColumnVals; + final ValueType type = columnVals.getType(); + + if (columnVals.hasMultipleValues()) { + throw new UnsupportedOperationException( + "makeObjectColumnSelector does not support multi-value GenericColumns" + ); + } + + if (type == ValueType.FLOAT) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.class; + } + + @Override + public Float get() + { + return columnVals.getFloatSingleValueRow(offset.getOffset()); + } + }; + } + if (type == ValueType.DOUBLE) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Double.class; + } + + @Override + public Double get() + { + return columnVals.getDoubleSingleValueRow(offset.getOffset()); + } + }; + } + if (type == ValueType.LONG) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Long.class; + } + + @Override + public Long get() + { + return columnVals.getLongSingleValueRow(offset.getOffset()); + } + }; + } + if (type == ValueType.STRING) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + return columnVals.getStringSingleValueRow(offset.getOffset()); + } + }; + } + } + + if (cachedColumnVals instanceof DictionaryEncodedColumn) { + final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals; + if (columnVals.hasMultipleValues()) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; + } + + @Override + @Nullable + public Object get() + { + final IndexedInts multiValueRow = columnVals.getMultiValueRow(offset.getOffset()); + if (multiValueRow.size() == 0) { + return null; + } else if (multiValueRow.size() == 1) { + return columnVals.lookupName(multiValueRow.get(0)); + } else { + final String[] strings = new String[multiValueRow.size()]; + for (int i = 0; i < multiValueRow.size(); i++) { + strings[i] = columnVals.lookupName(multiValueRow.get(i)); + } + return strings; + } + } + }; + } else { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + return columnVals.lookupName(columnVals.getSingleValueRow(offset.getOffset())); + } + }; + } + } + + final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals; + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return columnVals.getClazz(); + } + + @Override + public Object get() + { + return columnVals.getRowValue(offset.getOffset()); + } + }; + } + + @Override + @Nullable + public ColumnCapabilities getColumnCapabilities(String columnName) + { + if (virtualColumns.exists(columnName)) { + return virtualColumns.getColumnCapabilities(columnName); + } + + return QueryableIndexStorageAdapter.getColumnCapabilites(index, columnName); + } +} diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index 57d05f27e9f..4037b3fe8e9 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -22,7 +22,6 @@ package io.druid.segment; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.java.util.common.DateTimes; @@ -34,29 +33,22 @@ import io.druid.query.BaseQuery; import io.druid.query.BitmapResultFactory; import io.druid.query.DefaultBitmapResultFactory; import io.druid.query.QueryMetrics; -import io.druid.query.dimension.DimensionSpec; -import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.Filter; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ComplexColumn; -import io.druid.segment.column.DictionaryEncodedColumn; import io.druid.segment.column.GenericColumn; -import io.druid.segment.column.ValueType; import io.druid.segment.data.Indexed; -import io.druid.segment.data.IndexedInts; import io.druid.segment.data.Offset; import io.druid.segment.data.ReadableOffset; import io.druid.segment.filter.AndFilter; import io.druid.segment.historical.HistoricalCursor; -import io.druid.segment.historical.HistoricalFloatColumnSelector; import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -69,9 +61,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter { private final QueryableIndex index; - public QueryableIndexStorageAdapter( - QueryableIndex index - ) + public QueryableIndexStorageAdapter(QueryableIndex index) { this.index = index; } @@ -140,6 +130,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } @Override + @Nullable public Comparable getMinValue(String dimension) { Column column = index.getColumn(dimension); @@ -151,6 +142,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } @Override + @Nullable public Comparable getMaxValue(String dimension) { Column column = index.getColumn(dimension); @@ -168,6 +160,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } @Override + @Nullable public ColumnCapabilities getColumnCapabilities(String column) { return getColumnCapabilites(index, column); @@ -199,7 +192,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Nullable QueryMetrics queryMetrics ) { - Interval actualInterval = interval; DateTime minTime = getMinTime(); long minDataTimestamp = minTime.getMillis(); @@ -207,16 +199,11 @@ public class QueryableIndexStorageAdapter implements StorageAdapter long maxDataTimestamp = maxTime.getMillis(); final Interval dataInterval = new Interval(minTime, gran.bucketEnd(maxTime)); - if (!actualInterval.overlaps(dataInterval)) { + if (!interval.overlaps(dataInterval)) { return Sequences.empty(); } - if (actualInterval.getStart().isBefore(dataInterval.getStart())) { - actualInterval = actualInterval.withStart(dataInterval.getStart()); - } - if (actualInterval.getEnd().isAfter(dataInterval.getEnd())) { - actualInterval = actualInterval.withEnd(dataInterval.getEnd()); - } + final Interval actualInterval = interval.overlap(dataInterval); final ColumnSelectorBitmapIndexSelector selector = new ColumnSelectorBitmapIndexSelector( index.getBitmapFactoryForDimensions(), @@ -324,7 +311,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter ); } - private static ColumnCapabilities getColumnCapabilites(ColumnSelector index, String columnName) + @Nullable + static ColumnCapabilities getColumnCapabilites(ColumnSelector index, String columnName) { Column columnObj = index.getColumn(columnName); if (columnObj == null) { @@ -375,10 +363,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter { final Offset baseOffset = offset.clone(); - final Map dictionaryColumnCache = Maps.newHashMap(); - final Map genericColumnCache = Maps.newHashMap(); - final Map objectColumnCache = Maps.newHashMap(); - final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn(); final Closer closer = Closer.create(); @@ -432,488 +416,27 @@ public class QueryableIndexStorageAdapter implements StorageAdapter ); - final Offset initOffset = offset.clone(); + final Offset baseCursorOffset = offset.clone(); + final ColumnSelectorFactory columnSelectorFactory = new QueryableIndexColumnSelectorFactory( + index, + virtualColumns, + descending, + closer, + baseCursorOffset.getBaseReadableOffset() + ); final DateTime myBucket = gran.toDateTime(inputInterval.getStartMillis()); - abstract class QueryableIndexBaseCursor implements HistoricalCursor - { - OffsetType cursorOffset; - - @Override - public OffsetType getOffset() - { - return cursorOffset; - } - - @Override - public ReadableOffset getReadableOffset() - { - return cursorOffset; - } - - @Override - public DateTime getTime() - { - return myBucket; - } - - @Override - public void advanceTo(int offset) - { - int count = 0; - while (count < offset && !isDone()) { - advance(); - count++; - } - } - - @Override - public boolean isDone() - { - return !cursorOffset.withinBounds(); - } - - @Override - public boolean isDoneOrInterrupted() - { - return isDone() || Thread.currentThread().isInterrupted(); - } - - @Override - public DimensionSelector makeDimensionSelector( - DimensionSpec dimensionSpec - ) - { - if (virtualColumns.exists(dimensionSpec.getDimension())) { - return virtualColumns.makeDimensionSelector(dimensionSpec, this); - } - - return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); - } - - private DimensionSelector makeDimensionSelectorUndecorated( - DimensionSpec dimensionSpec - ) - { - final String dimension = dimensionSpec.getDimension(); - final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); - - final Column columnDesc = index.getColumn(dimension); - if (columnDesc == null) { - return DimensionSelectorUtils.constantSelector(null, extractionFn); - } - - if (dimension.equals(Column.TIME_COLUMN_NAME)) { - return new SingleScanTimeDimSelector( - makeLongColumnSelector(dimension), - extractionFn, - descending - ); - } - - if (columnDesc.getCapabilities().getType() == ValueType.LONG) { - return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn); - } - - if (columnDesc.getCapabilities().getType() == ValueType.FLOAT) { - return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn); - } - - if (columnDesc.getCapabilities().getType() == ValueType.DOUBLE) { - return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(dimension), extractionFn); - } - DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimension); - if (cachedColumn == null) { - cachedColumn = columnDesc.getDictionaryEncoding(); - closer.register(cachedColumn); - dictionaryColumnCache.put(dimension, cachedColumn); - } - - final DictionaryEncodedColumn column = cachedColumn; - if (column == null) { - return DimensionSelectorUtils.constantSelector(null, extractionFn); - } else { - return column.makeDimensionSelector(this, extractionFn); - } - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - if (virtualColumns.exists(columnName)) { - return virtualColumns.makeFloatColumnSelector(columnName, this); - } - - GenericColumn cachedMetricVals = genericColumnCache.get(columnName); - - if (cachedMetricVals == null) { - Column holder = index.getColumn(columnName); - if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) { - cachedMetricVals = holder.getGenericColumn(); - closer.register(cachedMetricVals); - genericColumnCache.put(columnName, cachedMetricVals); - } - } - - if (cachedMetricVals == null) { - return ZeroFloatColumnSelector.instance(); - } - - final GenericColumn metricVals = cachedMetricVals; - return new HistoricalFloatColumnSelector() - { - @Override - public float getFloat() - { - return metricVals.getFloatSingleValueRow(getReadableOffset().getOffset()); - } - - @Override - public float get(int offset) - { - return metricVals.getFloatSingleValueRow(offset); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("metricVals", metricVals); - inspector.visit("cursorOffset", getReadableOffset()); - } - }; - } - - @Override - public DoubleColumnSelector makeDoubleColumnSelector(String columnName) - { - if (virtualColumns.exists(columnName)) { - return virtualColumns.makeDoubleColumnSelector(columnName, this); - } - - GenericColumn cachedMetricVals = genericColumnCache.get(columnName); - - if (cachedMetricVals == null) { - Column holder = index.getColumn(columnName); - if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) { - cachedMetricVals = holder.getGenericColumn(); - closer.register(cachedMetricVals); - genericColumnCache.put(columnName, cachedMetricVals); - } - } - - if (cachedMetricVals == null) { - return ZeroDoubleColumnSelector.instance(); - } - - final GenericColumn metricVals = cachedMetricVals; - return new DoubleColumnSelector() - { - @Override - public double getDouble() - { - return metricVals.getDoubleSingleValueRow(getReadableOffset().getOffset()); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("metricVals", metricVals); - inspector.visit("cursorOffset", getReadableOffset()); - } - }; - } - - @Override - public LongColumnSelector makeLongColumnSelector(String columnName) - { - if (virtualColumns.exists(columnName)) { - return virtualColumns.makeLongColumnSelector(columnName, this); - } - - GenericColumn cachedMetricVals = genericColumnCache.get(columnName); - - if (cachedMetricVals == null) { - Column holder = index.getColumn(columnName); - if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) { - cachedMetricVals = holder.getGenericColumn(); - closer.register(cachedMetricVals); - genericColumnCache.put(columnName, cachedMetricVals); - } - } - - if (cachedMetricVals == null) { - return ZeroLongColumnSelector.instance(); - } - - final GenericColumn metricVals = cachedMetricVals; - return new LongColumnSelector() - { - @Override - public long getLong() - { - return metricVals.getLongSingleValueRow(getReadableOffset().getOffset()); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("metricVals", metricVals); - inspector.visit("cursorOffset", getReadableOffset()); - } - }; - } - - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) - { - if (virtualColumns.exists(column)) { - return virtualColumns.makeObjectColumnSelector(column, this); - } - - Object cachedColumnVals = objectColumnCache.get(column); - - if (cachedColumnVals == null) { - Column holder = index.getColumn(column); - - if (holder != null) { - final ColumnCapabilities capabilities = holder.getCapabilities(); - - if (capabilities.isDictionaryEncoded()) { - cachedColumnVals = holder.getDictionaryEncoding(); - } else if (capabilities.getType() == ValueType.COMPLEX) { - cachedColumnVals = holder.getComplexColumn(); - } else { - cachedColumnVals = holder.getGenericColumn(); - } - } - - if (cachedColumnVals != null) { - closer.register((Closeable) cachedColumnVals); - objectColumnCache.put(column, cachedColumnVals); - } - } - - if (cachedColumnVals == null) { - return null; - } - - if (cachedColumnVals instanceof GenericColumn) { - final GenericColumn columnVals = (GenericColumn) cachedColumnVals; - final ValueType type = columnVals.getType(); - - if (columnVals.hasMultipleValues()) { - throw new UnsupportedOperationException( - "makeObjectColumnSelector does not support multi-value GenericColumns" - ); - } - - if (type == ValueType.FLOAT) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Float.class; - } - - @Override - public Float get() - { - return columnVals.getFloatSingleValueRow(getReadableOffset().getOffset()); - } - }; - } - if (type == ValueType.DOUBLE) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Double.class; - } - - @Override - public Double get() - { - return columnVals.getDoubleSingleValueRow(getReadableOffset().getOffset()); - } - }; - } - if (type == ValueType.LONG) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Long.class; - } - - @Override - public Long get() - { - return columnVals.getLongSingleValueRow(getReadableOffset().getOffset()); - } - }; - } - if (type == ValueType.STRING) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return String.class; - } - - @Override - public String get() - { - return columnVals.getStringSingleValueRow(getReadableOffset().getOffset()); - } - }; - } - } - - if (cachedColumnVals instanceof DictionaryEncodedColumn) { - final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals; - if (columnVals.hasMultipleValues()) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - int currentOffset = getReadableOffset().getOffset(); - final IndexedInts multiValueRow = columnVals.getMultiValueRow(currentOffset); - if (multiValueRow.size() == 0) { - return null; - } else if (multiValueRow.size() == 1) { - return columnVals.lookupName(multiValueRow.get(0)); - } else { - final String[] strings = new String[multiValueRow.size()]; - for (int i = 0; i < multiValueRow.size(); i++) { - strings[i] = columnVals.lookupName(multiValueRow.get(i)); - } - return strings; - } - } - }; - } else { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return String.class; - } - - @Override - public String get() - { - int currentOffset = getReadableOffset().getOffset(); - return columnVals.lookupName(columnVals.getSingleValueRow(currentOffset)); - } - }; - } - } - - final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals; - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return columnVals.getClazz(); - } - - @Override - public Object get() - { - return columnVals.getRowValue(getReadableOffset().getOffset()); - } - }; - } - - @Override - public ColumnCapabilities getColumnCapabilities(String columnName) - { - if (virtualColumns.exists(columnName)) { - return virtualColumns.getColumnCapabilities(columnName); - } - - return getColumnCapabilites(index, columnName); - } - } - if (postFilter == null) { - return new QueryableIndexBaseCursor() - { - { - reset(); - } - - @Override - public void advance() - { - BaseQuery.checkInterrupted(); - cursorOffset.increment(); - } - - @Override - public void advanceUninterruptibly() - { - cursorOffset.increment(); - } - - @Override - public void reset() - { - cursorOffset = initOffset.clone(); - } - }; + return new QueryableIndexCursor(baseCursorOffset, columnSelectorFactory, myBucket); } else { - return new QueryableIndexBaseCursor() - { - private Offset baseOffset; - - { - cursorOffset = new FilteredOffset(this, descending, postFilter, bitmapIndexSelector); - reset(); - } - - @Override - public ReadableOffset getReadableOffset() - { - return baseOffset; - } - - @Override - public void advance() - { - BaseQuery.checkInterrupted(); - cursorOffset.incrementInterruptibly(); - } - - @Override - public void advanceUninterruptibly() - { - if (!Thread.currentThread().isInterrupted()) { - cursorOffset.increment(); - } - } - - @Override - public void reset() - { - baseOffset = initOffset.clone(); - cursorOffset.reset(baseOffset); - } - }; + FilteredOffset filteredOffset = new FilteredOffset( + baseCursorOffset, + columnSelectorFactory, + descending, + postFilter, + bitmapIndexSelector + ); + return new QueryableIndexCursor(filteredOffset, columnSelectorFactory, myBucket); } } @@ -924,14 +447,92 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } + private static class QueryableIndexCursor implements HistoricalCursor + { + private final Offset cursorOffset; + private final ColumnSelectorFactory columnSelectorFactory; + private final DateTime bucketStart; + + QueryableIndexCursor(Offset cursorOffset, ColumnSelectorFactory columnSelectorFactory, DateTime bucketStart) + { + this.cursorOffset = cursorOffset; + this.columnSelectorFactory = columnSelectorFactory; + this.bucketStart = bucketStart; + } + + @Override + public Offset getOffset() + { + return cursorOffset; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return columnSelectorFactory; + } + + @Override + public DateTime getTime() + { + return bucketStart; + } + + @Override + public void advance() + { + cursorOffset.increment(); + // Must call BaseQuery.checkInterrupted() after cursorOffset.increment(), not before, because + // FilteredOffset.increment() is a potentially long, not an "instant" operation (unlike to all other subclasses + // of Offset) and it returns early on interruption, leaving itself in an illegal state. We should not let + // aggregators, etc. access this illegal state and throw a QueryInterruptedException by calling + // BaseQuery.checkInterrupted(). + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + cursorOffset.increment(); + } + + @Override + public void advanceTo(int offset) + { + int count = 0; + while (count < offset && !isDone()) { + advance(); + count++; + } + } + + @Override + public boolean isDone() + { + return !cursorOffset.withinBounds(); + } + + @Override + public boolean isDoneOrInterrupted() + { + return isDone() || Thread.currentThread().isInterrupted(); + } + + @Override + public void reset() + { + cursorOffset.reset(); + } + } + public abstract static class TimestampCheckingOffset extends Offset { - protected final Offset baseOffset; - protected final GenericColumn timestamps; - protected final long timeLimit; - protected final boolean allWithinThreshold; + final Offset baseOffset; + final GenericColumn timestamps; + final long timeLimit; + final boolean allWithinThreshold; - public TimestampCheckingOffset( + TimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, long timeLimit, @@ -969,6 +570,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter baseOffset.reset(); } + @Override + public ReadableOffset getBaseReadableOffset() + { + return baseOffset.getBaseReadableOffset(); + } + protected abstract boolean timeInRange(long current); @Override @@ -977,6 +584,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter baseOffset.increment(); } + @SuppressWarnings("MethodDoesntCallSuperMethod") @Override public Offset clone() { @@ -994,7 +602,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter public static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset { - public AscendingTimestampCheckingOffset( + AscendingTimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, long timeLimit, @@ -1017,6 +625,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter "<" + timeLimit + "::" + baseOffset; } + @SuppressWarnings("MethodDoesntCallSuperMethod") @Override public Offset clone() { @@ -1026,7 +635,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter public static class DescendingTimestampCheckingOffset extends TimestampCheckingOffset { - public DescendingTimestampCheckingOffset( + DescendingTimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, long timeLimit, @@ -1050,6 +659,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter "::" + baseOffset; } + @SuppressWarnings("MethodDoesntCallSuperMethod") @Override public Offset clone() { diff --git a/processing/src/main/java/io/druid/segment/StorageAdapter.java b/processing/src/main/java/io/druid/segment/StorageAdapter.java index 62e3dfa250d..24f12c43760 100644 --- a/processing/src/main/java/io/druid/segment/StorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/StorageAdapter.java @@ -48,7 +48,9 @@ public interface StorageAdapter extends CursorFactory public int getDimensionCardinality(String column); public DateTime getMinTime(); public DateTime getMaxTime(); + @Nullable public Comparable getMinValue(String column); + @Nullable public Comparable getMaxValue(String column); public Capabilities getCapabilities(); diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index 6a7ab2b9ada..d7d81d3ef54 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -40,7 +40,7 @@ import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedIterable; import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import io.druid.segment.incremental.TimeAndDimsHolder; import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.objects.Object2IntMap; @@ -370,7 +370,7 @@ public class StringDimensionIndexer implements DimensionIndexer extends public int lookupId(ActualType name); public int getCardinality(); - DimensionSelector makeDimensionSelector(OffsetHolder offsetHolder, ExtractionFn extractionFn); + DimensionSelector makeDimensionSelector(ReadableOffset offset, ExtractionFn extractionFn); } diff --git a/processing/src/main/java/io/druid/segment/column/GenericColumn.java b/processing/src/main/java/io/druid/segment/column/GenericColumn.java index 6c1c1f7618e..f19e3fd1d17 100644 --- a/processing/src/main/java/io/druid/segment/column/GenericColumn.java +++ b/processing/src/main/java/io/druid/segment/column/GenericColumn.java @@ -21,9 +21,10 @@ package io.druid.segment.column; import io.druid.query.monomorphicprocessing.CalledFromHotLoop; import io.druid.query.monomorphicprocessing.HotLoopCallee; -import io.druid.segment.data.Indexed; -import io.druid.segment.data.IndexedFloats; -import io.druid.segment.data.IndexedLongs; +import io.druid.segment.DoubleColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.data.ReadableOffset; +import io.druid.segment.historical.HistoricalFloatColumnSelector; import java.io.Closeable; @@ -37,18 +38,18 @@ public interface GenericColumn extends HotLoopCallee, Closeable @CalledFromHotLoop public String getStringSingleValueRow(int rowNum); + @CalledFromHotLoop - public Indexed getStringMultiValueRow(int rowNum); + float getFloatSingleValueRow(int rowNum); + HistoricalFloatColumnSelector makeFloatSingleValueRowSelector(ReadableOffset offset); + @CalledFromHotLoop - public float getFloatSingleValueRow(int rowNum); - @CalledFromHotLoop - public IndexedFloats getFloatMultiValueRow(int rowNum); - @CalledFromHotLoop - public long getLongSingleValueRow(int rowNum); - @CalledFromHotLoop - public IndexedLongs getLongMultiValueRow(int rowNum); + long getLongSingleValueRow(int rowNum); + LongColumnSelector makeLongSingleValueRowSelector(ReadableOffset offset); + @CalledFromHotLoop double getDoubleSingleValueRow(int rowNum); + DoubleColumnSelector makeDoubleSingleValueRowSelector(ReadableOffset offset); @Override void close(); diff --git a/processing/src/main/java/io/druid/segment/column/IndexedDoublesGenericColumn.java b/processing/src/main/java/io/druid/segment/column/IndexedDoublesGenericColumn.java index ed053136881..9a69b3bdd6a 100644 --- a/processing/src/main/java/io/druid/segment/column/IndexedDoublesGenericColumn.java +++ b/processing/src/main/java/io/druid/segment/column/IndexedDoublesGenericColumn.java @@ -20,10 +20,11 @@ package io.druid.segment.column; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import io.druid.segment.data.Indexed; +import io.druid.segment.DoubleColumnSelector; +import io.druid.segment.LongColumnSelector; import io.druid.segment.data.IndexedDoubles; -import io.druid.segment.data.IndexedFloats; -import io.druid.segment.data.IndexedLongs; +import io.druid.segment.data.ReadableOffset; +import io.druid.segment.historical.HistoricalFloatColumnSelector; public class IndexedDoublesGenericColumn implements GenericColumn @@ -59,12 +60,6 @@ public class IndexedDoublesGenericColumn implements GenericColumn throw new UnsupportedOperationException(); } - @Override - public Indexed getStringMultiValueRow(int rowNum) - { - throw new UnsupportedOperationException(); - } - @Override public float getFloatSingleValueRow(int rowNum) { @@ -72,9 +67,9 @@ public class IndexedDoublesGenericColumn implements GenericColumn } @Override - public IndexedFloats getFloatMultiValueRow(int rowNum) + public HistoricalFloatColumnSelector makeFloatSingleValueRowSelector(ReadableOffset offset) { - throw new UnsupportedOperationException(); + return column.makeFloatColumnSelector(offset); } @Override @@ -84,9 +79,9 @@ public class IndexedDoublesGenericColumn implements GenericColumn } @Override - public IndexedLongs getLongMultiValueRow(int rowNum) + public LongColumnSelector makeLongSingleValueRowSelector(ReadableOffset offset) { - throw new UnsupportedOperationException(); + return column.makeLongColumnSelector(offset); } @Override @@ -95,6 +90,12 @@ public class IndexedDoublesGenericColumn implements GenericColumn return column.get(rowNum); } + @Override + public DoubleColumnSelector makeDoubleSingleValueRowSelector(ReadableOffset offset) + { + return column.makeDoubleColumnSelector(offset); + } + @Override public void close() { diff --git a/processing/src/main/java/io/druid/segment/column/IndexedFloatsGenericColumn.java b/processing/src/main/java/io/druid/segment/column/IndexedFloatsGenericColumn.java index af02877a364..0e9416f9c8e 100644 --- a/processing/src/main/java/io/druid/segment/column/IndexedFloatsGenericColumn.java +++ b/processing/src/main/java/io/druid/segment/column/IndexedFloatsGenericColumn.java @@ -20,9 +20,11 @@ package io.druid.segment.column; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import io.druid.segment.data.Indexed; +import io.druid.segment.DoubleColumnSelector; +import io.druid.segment.LongColumnSelector; import io.druid.segment.data.IndexedFloats; -import io.druid.segment.data.IndexedLongs; +import io.druid.segment.data.ReadableOffset; +import io.druid.segment.historical.HistoricalFloatColumnSelector; /** */ @@ -59,12 +61,6 @@ public class IndexedFloatsGenericColumn implements GenericColumn throw new UnsupportedOperationException(); } - @Override - public Indexed getStringMultiValueRow(int rowNum) - { - throw new UnsupportedOperationException(); - } - @Override public float getFloatSingleValueRow(int rowNum) { @@ -72,9 +68,9 @@ public class IndexedFloatsGenericColumn implements GenericColumn } @Override - public IndexedFloats getFloatMultiValueRow(int rowNum) + public HistoricalFloatColumnSelector makeFloatSingleValueRowSelector(ReadableOffset offset) { - throw new UnsupportedOperationException(); + return column.makeFloatColumnSelector(offset); } @Override @@ -84,9 +80,9 @@ public class IndexedFloatsGenericColumn implements GenericColumn } @Override - public IndexedLongs getLongMultiValueRow(int rowNum) + public LongColumnSelector makeLongSingleValueRowSelector(ReadableOffset offset) { - throw new UnsupportedOperationException(); + return column.makeLongColumnSelector(offset); } @Override @@ -95,6 +91,12 @@ public class IndexedFloatsGenericColumn implements GenericColumn return (double) column.get(rowNum); } + @Override + public DoubleColumnSelector makeDoubleSingleValueRowSelector(ReadableOffset offset) + { + return column.makeDoubleColumnSelector(offset); + } + @Override public void close() { diff --git a/processing/src/main/java/io/druid/segment/column/IndexedLongsGenericColumn.java b/processing/src/main/java/io/druid/segment/column/IndexedLongsGenericColumn.java index ecfd3330066..93e163f6458 100644 --- a/processing/src/main/java/io/druid/segment/column/IndexedLongsGenericColumn.java +++ b/processing/src/main/java/io/druid/segment/column/IndexedLongsGenericColumn.java @@ -20,9 +20,11 @@ package io.druid.segment.column; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import io.druid.segment.data.Indexed; -import io.druid.segment.data.IndexedFloats; +import io.druid.segment.DoubleColumnSelector; +import io.druid.segment.LongColumnSelector; import io.druid.segment.data.IndexedLongs; +import io.druid.segment.data.ReadableOffset; +import io.druid.segment.historical.HistoricalFloatColumnSelector; /** */ @@ -59,12 +61,6 @@ public class IndexedLongsGenericColumn implements GenericColumn throw new UnsupportedOperationException(); } - @Override - public Indexed getStringMultiValueRow(int rowNum) - { - throw new UnsupportedOperationException(); - } - @Override public float getFloatSingleValueRow(int rowNum) { @@ -72,9 +68,9 @@ public class IndexedLongsGenericColumn implements GenericColumn } @Override - public IndexedFloats getFloatMultiValueRow(int rowNum) + public HistoricalFloatColumnSelector makeFloatSingleValueRowSelector(ReadableOffset offset) { - throw new UnsupportedOperationException(); + return column.makeFloatColumnSelector(offset); } @Override @@ -84,9 +80,9 @@ public class IndexedLongsGenericColumn implements GenericColumn } @Override - public IndexedLongs getLongMultiValueRow(int rowNum) + public LongColumnSelector makeLongSingleValueRowSelector(ReadableOffset offset) { - throw new UnsupportedOperationException(); + return column.makeLongColumnSelector(offset); } @Override @@ -95,6 +91,12 @@ public class IndexedLongsGenericColumn implements GenericColumn return (double) column.get(rowNum); } + @Override + public DoubleColumnSelector makeDoubleSingleValueRowSelector(ReadableOffset offset) + { + return column.makeDoubleColumnSelector(offset); + } + @Override public void close() { diff --git a/processing/src/main/java/io/druid/segment/column/SimpleDictionaryEncodedColumn.java b/processing/src/main/java/io/druid/segment/column/SimpleDictionaryEncodedColumn.java index 16d309e898d..fb39014125a 100644 --- a/processing/src/main/java/io/druid/segment/column/SimpleDictionaryEncodedColumn.java +++ b/processing/src/main/java/io/druid/segment/column/SimpleDictionaryEncodedColumn.java @@ -31,10 +31,10 @@ import io.druid.segment.IdLookup; import io.druid.segment.data.CachingIndexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedMultivalue; +import io.druid.segment.data.ReadableOffset; import io.druid.segment.data.SingleIndexedInt; import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.historical.HistoricalDimensionSelector; -import io.druid.segment.historical.OffsetHolder; import io.druid.segment.historical.SingleValueHistoricalDimensionSelector; import javax.annotation.Nullable; @@ -105,10 +105,7 @@ public class SimpleDictionaryEncodedColumn } @Override - public HistoricalDimensionSelector makeDimensionSelector( - final OffsetHolder offsetHolder, - final ExtractionFn extractionFn - ) + public HistoricalDimensionSelector makeDimensionSelector(final ReadableOffset offset, final ExtractionFn extractionFn) { abstract class QueryableDimensionSelector implements HistoricalDimensionSelector, IdLookup { @@ -158,7 +155,7 @@ public class SimpleDictionaryEncodedColumn @Override public IndexedInts getRow() { - return multiValueColumn.get(offsetHolder.getReadableOffset().getOffset()); + return multiValueColumn.get(offset.getOffset()); } @Override @@ -183,8 +180,7 @@ public class SimpleDictionaryEncodedColumn public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("multiValueColumn", multiValueColumn); - inspector.visit("offsetHolder", offsetHolder); - inspector.visit("offset", offsetHolder.getReadableOffset()); + inspector.visit("offset", offset); inspector.visit("extractionFn", extractionFn); } } @@ -202,7 +198,7 @@ public class SimpleDictionaryEncodedColumn @Override public int getRowValue() { - return column.get(offsetHolder.getReadableOffset().getOffset()); + return column.get(offset.getOffset()); } @Override @@ -273,8 +269,7 @@ public class SimpleDictionaryEncodedColumn public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("column", column); - inspector.visit("offsetHolder", offsetHolder); - inspector.visit("offset", offsetHolder.getReadableOffset()); + inspector.visit("offset", offset); inspector.visit("extractionFn", extractionFn); } } diff --git a/processing/src/main/java/io/druid/segment/data/IndexedDoubles.java b/processing/src/main/java/io/druid/segment/data/IndexedDoubles.java index a5109ff05a8..84fbb8b21de 100644 --- a/processing/src/main/java/io/druid/segment/data/IndexedDoubles.java +++ b/processing/src/main/java/io/druid/segment/data/IndexedDoubles.java @@ -19,6 +19,11 @@ package io.druid.segment.data; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.DoubleColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.historical.HistoricalFloatColumnSelector; + import java.io.Closeable; public interface IndexedDoubles extends Closeable @@ -29,5 +34,68 @@ public interface IndexedDoubles extends Closeable @Override void close(); + + default DoubleColumnSelector makeDoubleColumnSelector(ReadableOffset offset) + { + return new DoubleColumnSelector() + { + @Override + public double getDouble() + { + return IndexedDoubles.this.get(offset.getOffset()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedDoubles.this); + inspector.visit("offset", offset); + } + }; + } + + default HistoricalFloatColumnSelector makeFloatColumnSelector(ReadableOffset offset) + { + return new HistoricalFloatColumnSelector() + { + @Override + public float get(int offset) + { + return (float) IndexedDoubles.this.get(offset); + } + + @Override + public float getFloat() + { + return (float) IndexedDoubles.this.get(offset.getOffset()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedDoubles.this); + inspector.visit("offset", offset); + } + }; + } + + default LongColumnSelector makeLongColumnSelector(ReadableOffset offset) + { + return new LongColumnSelector() + { + @Override + public long getLong() + { + return (long) IndexedDoubles.this.get(offset.getOffset()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedDoubles.this); + inspector.visit("offset", offset); + } + }; + } } diff --git a/processing/src/main/java/io/druid/segment/data/IndexedFloats.java b/processing/src/main/java/io/druid/segment/data/IndexedFloats.java index 8a6f3f51651..82b6f7a6468 100644 --- a/processing/src/main/java/io/druid/segment/data/IndexedFloats.java +++ b/processing/src/main/java/io/druid/segment/data/IndexedFloats.java @@ -19,6 +19,11 @@ package io.druid.segment.data; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.DoubleColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.historical.HistoricalFloatColumnSelector; + import java.io.Closeable; /** @@ -32,4 +37,67 @@ public interface IndexedFloats extends Closeable @Override void close(); + + default HistoricalFloatColumnSelector makeFloatColumnSelector(ReadableOffset offset) + { + return new HistoricalFloatColumnSelector() + { + @Override + public float getFloat() + { + return IndexedFloats.this.get(offset.getOffset()); + } + + @Override + public float get(int offset) + { + return IndexedFloats.this.get(offset); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedFloats.this); + inspector.visit("offset", offset); + } + }; + } + + default DoubleColumnSelector makeDoubleColumnSelector(ReadableOffset offset) + { + return new DoubleColumnSelector() + { + @Override + public double getDouble() + { + return IndexedFloats.this.get(offset.getOffset()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedFloats.this); + inspector.visit("offset", offset); + } + }; + } + + default LongColumnSelector makeLongColumnSelector(ReadableOffset offset) + { + return new LongColumnSelector() + { + @Override + public long getLong() + { + return (long) IndexedFloats.this.get(offset.getOffset()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedFloats.this); + inspector.visit("offset", offset); + } + }; + } } diff --git a/processing/src/main/java/io/druid/segment/data/IndexedLongs.java b/processing/src/main/java/io/druid/segment/data/IndexedLongs.java index c9b1bc67815..be40e1df5fa 100644 --- a/processing/src/main/java/io/druid/segment/data/IndexedLongs.java +++ b/processing/src/main/java/io/druid/segment/data/IndexedLongs.java @@ -19,6 +19,11 @@ package io.druid.segment.data; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.DoubleColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.historical.HistoricalFloatColumnSelector; + import java.io.Closeable; /** @@ -32,4 +37,67 @@ public interface IndexedLongs extends Closeable @Override void close(); + + default LongColumnSelector makeLongColumnSelector(ReadableOffset offset) + { + return new LongColumnSelector() + { + @Override + public long getLong() + { + return IndexedLongs.this.get(offset.getOffset()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedLongs.this); + inspector.visit("offset", offset); + } + }; + } + + default HistoricalFloatColumnSelector makeFloatColumnSelector(ReadableOffset offset) + { + return new HistoricalFloatColumnSelector() + { + @Override + public float getFloat() + { + return (float) IndexedLongs.this.get(offset.getOffset()); + } + + @Override + public float get(int offset) + { + return (float) IndexedLongs.this.get(offset); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedLongs.this); + inspector.visit("offset", offset); + } + }; + } + + default DoubleColumnSelector makeDoubleColumnSelector(ReadableOffset offset) + { + return new DoubleColumnSelector() + { + @Override + public double getDouble() + { + return (double) IndexedLongs.this.get(offset.getOffset()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("indexed", IndexedLongs.this); + inspector.visit("offset", offset); + } + }; + } } diff --git a/processing/src/main/java/io/druid/segment/data/Offset.java b/processing/src/main/java/io/druid/segment/data/Offset.java index 77f949d7a61..dacf0b7cfb4 100644 --- a/processing/src/main/java/io/druid/segment/data/Offset.java +++ b/processing/src/main/java/io/druid/segment/data/Offset.java @@ -33,6 +33,10 @@ import io.druid.query.monomorphicprocessing.CalledFromHotLoop; * io.druid.query.topn.Historical1SimpleDoubleAggPooledTopNScannerPrototype} and {@link * io.druid.query.topn.HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype} during * specialization, and specialized version of those prototypes must be able to any subclass of Offset. + * + * This interface is the core "pointer" interface that is used to create {@link io.druid.segment.ColumnValueSelector}s + * over historical segments. It's counterpart for incremental index is {@link + * io.druid.segment.incremental.TimeAndDimsHolder}. */ @SubclassesMustBePublic public abstract class Offset implements ReadableOffset, Cloneable @@ -48,6 +52,13 @@ public abstract class Offset implements ReadableOffset, Cloneable */ public abstract void reset(); + /** + * Returns the same offset ("this") or a readable "view" of this offset, which always returns the same value from + * {@link #getOffset()}, as this offset. This method is useful for "unwrapping" such offsets as {@link + * io.druid.segment.FilteredOffset} and reduce reference indirection, when only {@link ReadableOffset} API is needed. + */ + public abstract ReadableOffset getBaseReadableOffset(); + @Override public Offset clone() { diff --git a/processing/src/main/java/io/druid/segment/historical/HistoricalCursor.java b/processing/src/main/java/io/druid/segment/historical/HistoricalCursor.java index ce289ce27d0..db9f0f89dc2 100644 --- a/processing/src/main/java/io/druid/segment/historical/HistoricalCursor.java +++ b/processing/src/main/java/io/druid/segment/historical/HistoricalCursor.java @@ -20,7 +20,9 @@ package io.druid.segment.historical; import io.druid.segment.Cursor; +import io.druid.segment.data.Offset; -public interface HistoricalCursor extends Cursor, OffsetHolder +public interface HistoricalCursor extends Cursor { + Offset getOffset(); } diff --git a/processing/src/main/java/io/druid/segment/historical/OffsetHolder.java b/processing/src/main/java/io/druid/segment/historical/OffsetHolder.java deleted file mode 100644 index 9d773dc5370..00000000000 --- a/processing/src/main/java/io/druid/segment/historical/OffsetHolder.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.historical; - -import io.druid.segment.data.Offset; -import io.druid.segment.data.ReadableOffset; - -public interface OffsetHolder -{ - Offset getOffset(); - - /** - * Should return the same, or a "view" of the same offset as {@link #getOffset()}. The difference is that smaller - * interface allows to return unwrapped underlying offset sometimes, e. g. {@link - * io.druid.segment.FilteredOffset#baseOffset}, instead of the wrapper {@link io.druid.segment.FilteredOffset}. - */ - ReadableOffset getReadableOffset(); -} diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java new file mode 100644 index 00000000000..6433b277412 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java @@ -0,0 +1,341 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.incremental; + +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.extraction.ExtractionFn; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionIndexer; +import io.druid.segment.DimensionSelector; +import io.druid.segment.DimensionSelectorUtils; +import io.druid.segment.DoubleColumnSelector; +import io.druid.segment.DoubleWrappingDimensionSelector; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.FloatWrappingDimensionSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.LongWrappingDimensionSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.SingleScanTimeDimSelector; +import io.druid.segment.VirtualColumns; +import io.druid.segment.ZeroDoubleColumnSelector; +import io.druid.segment.ZeroFloatColumnSelector; +import io.druid.segment.ZeroLongColumnSelector; +import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ValueType; + +import javax.annotation.Nullable; + +/** + * The basic implementation of {@link ColumnSelectorFactory} over an {@link IncrementalIndex}. It's counterpart for + * historical segments is {@link io.druid.segment.QueryableIndexColumnSelectorFactory}. + */ +class IncrementalIndexColumnSelectorFactory implements ColumnSelectorFactory +{ + private final IncrementalIndex index; + private final VirtualColumns virtualColumns; + private final boolean descending; + private final TimeAndDimsHolder timeAndDimsHolder; + + IncrementalIndexColumnSelectorFactory( + IncrementalIndex index, + VirtualColumns virtualColumns, + boolean descending, + TimeAndDimsHolder timeAndDimsHolder + ) + { + this.index = index; + this.virtualColumns = virtualColumns; + this.descending = descending; + this.timeAndDimsHolder = timeAndDimsHolder; + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (virtualColumns.exists(dimensionSpec.getDimension())) { + return virtualColumns.makeDimensionSelector(dimensionSpec, this); + } + + return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); + } + + private DimensionSelector makeDimensionSelectorUndecorated(DimensionSpec dimensionSpec) + { + final String dimension = dimensionSpec.getDimension(); + final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); + + if (dimension.equals(Column.TIME_COLUMN_NAME)) { + return new SingleScanTimeDimSelector( + makeLongColumnSelector(dimension), + extractionFn, + descending + ); + } + + final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionSpec.getDimension()); + if (dimensionDesc == null) { + // not a dimension, column may be a metric + ColumnCapabilities capabilities = getColumnCapabilities(dimension); + if (capabilities == null) { + return DimensionSelectorUtils.constantSelector(null, extractionFn); + } + if (capabilities.getType() == ValueType.LONG) { + return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn); + } + if (capabilities.getType() == ValueType.FLOAT) { + return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn); + } + if (capabilities.getType() == ValueType.DOUBLE) { + return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(dimension), extractionFn); + } + + // if we can't wrap the base column, just return a column of all nulls + return DimensionSelectorUtils.constantSelector(null, extractionFn); + } else { + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + return indexer.makeDimensionSelector(dimensionSpec, timeAndDimsHolder, dimensionDesc); + } + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + if (virtualColumns.exists(columnName)) { + return virtualColumns.makeFloatColumnSelector(columnName, this); + } + + final Integer dimIndex = index.getDimensionIndex(columnName); + if (dimIndex != null) { + final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName); + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + return indexer.makeFloatColumnSelector(timeAndDimsHolder, dimensionDesc); + } + + final Integer metricIndexInt = index.getMetricIndex(columnName); + if (metricIndexInt == null) { + return ZeroFloatColumnSelector.instance(); + } + + final int metricIndex = metricIndexInt; + return new FloatColumnSelector() + { + @Override + public float getFloat() + { + return index.getMetricFloatValue(timeAndDimsHolder.getValue(), metricIndex); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("index", index); + } + }; + } + + @Override + public DoubleColumnSelector makeDoubleColumnSelector(String columnName) + { + if (virtualColumns.exists(columnName)) { + return virtualColumns.makeDoubleColumnSelector(columnName, this); + } + + final Integer dimIndex = index.getDimensionIndex(columnName); + if (dimIndex != null) { + final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName); + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + return indexer.makeDoubleColumnSelector(timeAndDimsHolder, dimensionDesc); + } + + final Integer metricIndexInt = index.getMetricIndex(columnName); + if (metricIndexInt == null) { + return ZeroDoubleColumnSelector.instance(); + } + + final int metricIndex = metricIndexInt; + return new DoubleColumnSelector() + { + @Override + public double getDouble() + { + return index.getMetricDoubleValue(timeAndDimsHolder.getValue(), metricIndex); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("index", index); + } + }; + } + + @Override + public LongColumnSelector makeLongColumnSelector(String columnName) + { + if (virtualColumns.exists(columnName)) { + return virtualColumns.makeLongColumnSelector(columnName, this); + } + + if (columnName.equals(Column.TIME_COLUMN_NAME)) { + class TimeLongColumnSelector implements LongColumnSelector + { + @Override + public long getLong() + { + return timeAndDimsHolder.getKey().getTimestamp(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // nothing to inspect + } + } + return new TimeLongColumnSelector(); + } + + final Integer dimIndex = index.getDimensionIndex(columnName); + if (dimIndex != null) { + final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName); + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + return indexer.makeLongColumnSelector(timeAndDimsHolder, dimensionDesc); + } + + final Integer metricIndexInt = index.getMetricIndex(columnName); + if (metricIndexInt == null) { + return ZeroLongColumnSelector.instance(); + } + + final int metricIndex = metricIndexInt; + + return new LongColumnSelector() + { + @Override + public long getLong() + { + return index.getMetricLongValue(timeAndDimsHolder.getValue(), metricIndex); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("index", index); + } + }; + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + if (virtualColumns.exists(column)) { + return virtualColumns.makeObjectColumnSelector(column, this); + } + + if (column.equals(Column.TIME_COLUMN_NAME)) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Long.class; + } + + @Override + public Long get() + { + return timeAndDimsHolder.getKey().getTimestamp(); + } + }; + } + + final Integer metricIndexInt = index.getMetricIndex(column); + if (metricIndexInt != null) { + final int metricIndex = metricIndexInt; + final Class classOfObject = index.getMetricClass(column); + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return classOfObject; + } + + @Override + public Object get() + { + return index.getMetricObjectValue( + timeAndDimsHolder.getValue(), + metricIndex + ); + } + }; + } + + IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(column); + + if (dimensionDesc == null) { + return null; + } else { + + final int dimensionIndex = dimensionDesc.getIndex(); + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; + } + + @Override + public Object get() + { + IncrementalIndex.TimeAndDims key = timeAndDimsHolder.getKey(); + if (key == null) { + return null; + } + + Object[] dims = key.getDims(); + if (dimensionIndex >= dims.length) { + return null; + } + + return indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList( + dims[dimensionIndex], DimensionIndexer.ARRAY + ); + } + }; + } + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String columnName) + { + if (virtualColumns.exists(columnName)) { + return virtualColumns.getColumnCapabilities(columnName); + } + + return index.getCapabilities(columnName); + } +} diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index a96d2b3d08d..325ff4f8902 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -19,7 +19,6 @@ package io.druid.segment.incremental; -import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -28,33 +27,17 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.BaseQuery; import io.druid.query.QueryMetrics; -import io.druid.query.dimension.DimensionSpec; -import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.Capabilities; +import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.Cursor; import io.druid.segment.DimensionIndexer; -import io.druid.segment.DimensionSelector; -import io.druid.segment.DimensionSelectorUtils; -import io.druid.segment.DoubleColumnSelector; -import io.druid.segment.DoubleWrappingDimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.FloatWrappingDimensionSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.LongWrappingDimensionSelector; import io.druid.segment.Metadata; -import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.SingleScanTimeDimSelector; import io.druid.segment.StorageAdapter; import io.druid.segment.VirtualColumns; -import io.druid.segment.ZeroDoubleColumnSelector; -import io.druid.segment.ZeroFloatColumnSelector; -import io.druid.segment.ZeroLongColumnSelector; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ValueType; import io.druid.segment.data.Indexed; import io.druid.segment.data.ListIndexed; import io.druid.segment.filter.BooleanValueMatcher; @@ -70,9 +53,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { private final IncrementalIndex index; - public IncrementalIndexStorageAdapter( - IncrementalIndex index - ) + public IncrementalIndexStorageAdapter(IncrementalIndex index) { this.index = index; } @@ -92,7 +73,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Indexed getAvailableDimensions() { - return new ListIndexed(index.getDimensionNames(), String.class); + return new ListIndexed<>(index.getDimensionNames(), String.class); } @Override @@ -135,6 +116,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return index.getMaxTime(); } + @Nullable @Override public Comparable getMinValue(String column) { @@ -147,6 +129,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return indexer.getMinValue(); } + @Nullable @Override public Comparable getMaxValue(String column) { @@ -198,508 +181,29 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return Sequences.empty(); } - Interval actualIntervalTmp = interval; - final Interval dataInterval = new Interval(getMinTime(), gran.bucketEnd(getMaxTime())); - if (!actualIntervalTmp.overlaps(dataInterval)) { + if (!interval.overlaps(dataInterval)) { return Sequences.empty(); } - if (actualIntervalTmp.getStart().isBefore(dataInterval.getStart())) { - actualIntervalTmp = actualIntervalTmp.withStart(dataInterval.getStart()); - } - if (actualIntervalTmp.getEnd().isAfter(dataInterval.getEnd())) { - actualIntervalTmp = actualIntervalTmp.withEnd(dataInterval.getEnd()); - } - - final Interval actualInterval = actualIntervalTmp; + final Interval actualInterval = interval.overlap(dataInterval); Iterable iterable = gran.getIterable(actualInterval); if (descending) { iterable = Lists.reverse(ImmutableList.copyOf(iterable)); } - return Sequences.map( - Sequences.simple(iterable), - new Function() - { - EntryHolder currEntry = new EntryHolder(); - - @Override - public Cursor apply(@Nullable final Interval interval) - { - final long timeStart = Math.max(interval.getStartMillis(), actualInterval.getStartMillis()); - - return new Cursor() - { - private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this); - private final int maxRowIndex; - private Iterator baseIter; - private Iterable cursorIterable; - private boolean emptyRange; - final DateTime time; - int numAdvanced = -1; - boolean done; - - { - maxRowIndex = index.getLastRowIndex(); - cursorIterable = index.getFacts().timeRangeIterable( - descending, - timeStart, - Math.min(actualInterval.getEndMillis(), gran.increment(interval.getStart()).getMillis()) - ); - emptyRange = !cursorIterable.iterator().hasNext(); - time = gran.toDateTime(interval.getStartMillis()); - - reset(); - } - - @Override - public DateTime getTime() - { - return time; - } - - @Override - public void advance() - { - if (!baseIter.hasNext()) { - done = true; - return; - } - - while (baseIter.hasNext()) { - BaseQuery.checkInterrupted(); - - IncrementalIndex.TimeAndDims entry = baseIter.next(); - if (beyondMaxRowIndex(entry.getRowIndex())) { - continue; - } - - currEntry.set(entry); - - if (filterMatcher.matches()) { - return; - } - } - - done = true; - } - - @Override - public void advanceUninterruptibly() - { - if (!baseIter.hasNext()) { - done = true; - return; - } - - while (baseIter.hasNext()) { - if (Thread.currentThread().isInterrupted()) { - return; - } - - IncrementalIndex.TimeAndDims entry = baseIter.next(); - if (beyondMaxRowIndex(entry.getRowIndex())) { - continue; - } - - currEntry.set(entry); - - if (filterMatcher.matches()) { - return; - } - } - - done = true; - } - - @Override - public void advanceTo(int offset) - { - int count = 0; - while (count < offset && !isDone()) { - advance(); - count++; - } - } - - @Override - public boolean isDone() - { - return done; - } - - @Override - public boolean isDoneOrInterrupted() - { - return isDone() || Thread.currentThread().isInterrupted(); - } - - @Override - public void reset() - { - baseIter = cursorIterable.iterator(); - - if (numAdvanced == -1) { - numAdvanced = 0; - } else { - Iterators.advance(baseIter, numAdvanced); - } - - BaseQuery.checkInterrupted(); - - boolean foundMatched = false; - while (baseIter.hasNext()) { - IncrementalIndex.TimeAndDims entry = baseIter.next(); - if (beyondMaxRowIndex(entry.getRowIndex())) { - numAdvanced++; - continue; - } - currEntry.set(entry); - if (filterMatcher.matches()) { - foundMatched = true; - break; - } - - numAdvanced++; - } - - done = !foundMatched && (emptyRange || !baseIter.hasNext()); - } - - private boolean beyondMaxRowIndex(int rowIndex) - { - // ignore rows whose rowIndex is beyond the maxRowIndex - // rows are order by timestamp, not rowIndex, - // so we still need to go through all rows to skip rows added after cursor created - return rowIndex > maxRowIndex; - } - - @Override - public DimensionSelector makeDimensionSelector( - DimensionSpec dimensionSpec - ) - { - if (virtualColumns.exists(dimensionSpec.getDimension())) { - return virtualColumns.makeDimensionSelector(dimensionSpec, this); - } - - return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); - } - - private DimensionSelector makeDimensionSelectorUndecorated( - DimensionSpec dimensionSpec - ) - { - final String dimension = dimensionSpec.getDimension(); - final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); - - if (dimension.equals(Column.TIME_COLUMN_NAME)) { - DimensionSelector selector = new SingleScanTimeDimSelector( - makeLongColumnSelector(dimension), - extractionFn, - descending - ); - return selector; - } - - final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionSpec.getDimension()); - if (dimensionDesc == null) { - // not a dimension, column may be a metric - ColumnCapabilities capabilities = getColumnCapabilities(dimension); - if (capabilities == null) { - return DimensionSelectorUtils.constantSelector(null, extractionFn); - } - if (capabilities.getType() == ValueType.LONG) { - return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn); - } - if (capabilities.getType() == ValueType.FLOAT) { - return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn); - } - if (capabilities.getType() == ValueType.DOUBLE) { - return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(dimension), extractionFn); - } - - // if we can't wrap the base column, just return a column of all nulls - return DimensionSelectorUtils.constantSelector(null, extractionFn); - } else { - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - return indexer.makeDimensionSelector(dimensionSpec, currEntry, dimensionDesc); - } - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - if (virtualColumns.exists(columnName)) { - return virtualColumns.makeFloatColumnSelector(columnName, this); - } - - final Integer dimIndex = index.getDimensionIndex(columnName); - if (dimIndex != null) { - final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName); - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - return indexer.makeFloatColumnSelector( - currEntry, - dimensionDesc - ); - } - - final Integer metricIndexInt = index.getMetricIndex(columnName); - if (metricIndexInt == null) { - return ZeroFloatColumnSelector.instance(); - } - - final int metricIndex = metricIndexInt; - return new FloatColumnSelector() - { - @Override - public float getFloat() - { - return index.getMetricFloatValue(currEntry.getValue(), metricIndex); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("index", index); - } - }; - } - - @Override - public LongColumnSelector makeLongColumnSelector(String columnName) - { - if (virtualColumns.exists(columnName)) { - return virtualColumns.makeLongColumnSelector(columnName, this); - } - - if (columnName.equals(Column.TIME_COLUMN_NAME)) { - class TimeLongColumnSelector implements LongColumnSelector - { - @Override - public long getLong() - { - return currEntry.getKey().getTimestamp(); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - // nothing to inspect - } - } - return new TimeLongColumnSelector(); - } - - final Integer dimIndex = index.getDimensionIndex(columnName); - if (dimIndex != null) { - final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName); - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - return indexer.makeLongColumnSelector( - currEntry, - dimensionDesc - ); - } - - final Integer metricIndexInt = index.getMetricIndex(columnName); - if (metricIndexInt == null) { - return ZeroLongColumnSelector.instance(); - } - - final int metricIndex = metricIndexInt; - - return new LongColumnSelector() - { - @Override - public long getLong() - { - return index.getMetricLongValue( - currEntry.getValue(), - metricIndex - ); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("index", index); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) - { - if (virtualColumns.exists(column)) { - return virtualColumns.makeObjectColumnSelector(column, this); - } - - if (column.equals(Column.TIME_COLUMN_NAME)) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Long.class; - } - - @Override - public Long get() - { - return currEntry.getKey().getTimestamp(); - } - }; - } - - final Integer metricIndexInt = index.getMetricIndex(column); - if (metricIndexInt != null) { - final int metricIndex = metricIndexInt; - final Class classOfObject = index.getMetricClass(column); - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return classOfObject; - } - - @Override - public Object get() - { - return index.getMetricObjectValue( - currEntry.getValue(), - metricIndex - ); - } - }; - } - - IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(column); - - if (dimensionDesc == null) { - return null; - } else { - - final int dimensionIndex = dimensionDesc.getIndex(); - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - IncrementalIndex.TimeAndDims key = currEntry.getKey(); - if (key == null) { - return null; - } - - Object[] dims = key.getDims(); - if (dimensionIndex >= dims.length) { - return null; - } - - return indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList( - dims[dimensionIndex], DimensionIndexer.ARRAY - ); - } - }; - } - } - - @Override - public DoubleColumnSelector makeDoubleColumnSelector(String columnName) - { - if (virtualColumns.exists(columnName)) { - return virtualColumns.makeDoubleColumnSelector(columnName, this); - } - - final Integer dimIndex = index.getDimensionIndex(columnName); - if (dimIndex != null) { - final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName); - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - return indexer.makeDoubleColumnSelector( - currEntry, - dimensionDesc - ); - } - - final Integer metricIndexInt = index.getMetricIndex(columnName); - if (metricIndexInt == null) { - return ZeroDoubleColumnSelector.instance(); - } - - final int metricIndex = metricIndexInt; - return new DoubleColumnSelector() - { - @Override - public double getDouble() - { - return index.getMetricDoubleValue(currEntry.getValue(), metricIndex); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("index", index); - } - }; - } - - @Nullable - @Override - public ColumnCapabilities getColumnCapabilities(String columnName) - { - if (virtualColumns.exists(columnName)) { - return virtualColumns.getColumnCapabilities(columnName); - } - - return index.getCapabilities(columnName); - } - }; - } - } - ); + return Sequences + .simple(iterable) + .map(i -> new IncrementalIndexCursor(virtualColumns, descending, filter, i, actualInterval, gran)); } private ValueMatcher makeFilterMatcher(final Filter filter, final Cursor cursor) { return filter == null ? BooleanValueMatcher.of(true) - : filter.makeMatcher(cursor); - } - - public static class EntryHolder - { - IncrementalIndex.TimeAndDims currEntry = null; - - public IncrementalIndex.TimeAndDims get() - { - return currEntry; - } - - public void set(IncrementalIndex.TimeAndDims currEntry) - { - this.currEntry = currEntry; - } - - public IncrementalIndex.TimeAndDims getKey() - { - return currEntry; - } - - public int getValue() - { - return currEntry.getRowIndex(); - } + : filter.makeMatcher(cursor.getColumnSelectorFactory()); } @Override @@ -707,4 +211,172 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { return index.getMetadata(); } + + private class IncrementalIndexCursor implements Cursor + { + private TimeAndDimsHolder currEntry; + private final ColumnSelectorFactory columnSelectorFactory; + private final ValueMatcher filterMatcher; + private final int maxRowIndex; + private Iterator baseIter; + private Iterable cursorIterable; + private boolean emptyRange; + private final DateTime time; + private int numAdvanced; + private boolean done; + + IncrementalIndexCursor( + VirtualColumns virtualColumns, + boolean descending, + Filter filter, + Interval interval, + Interval actualInterval, + Granularity gran + ) + { + currEntry = new TimeAndDimsHolder(); + columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(index, virtualColumns, descending, currEntry); + filterMatcher = makeFilterMatcher(filter, this); + numAdvanced = -1; + maxRowIndex = index.getLastRowIndex(); + final long timeStart = Math.max(interval.getStartMillis(), actualInterval.getStartMillis()); + cursorIterable = index.getFacts().timeRangeIterable( + descending, + timeStart, + Math.min(actualInterval.getEndMillis(), gran.increment(interval.getStart()).getMillis()) + ); + emptyRange = !cursorIterable.iterator().hasNext(); + time = gran.toDateTime(interval.getStartMillis()); + + reset(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return columnSelectorFactory; + } + + @Override + public DateTime getTime() + { + return time; + } + + @Override + public void advance() + { + if (!baseIter.hasNext()) { + done = true; + return; + } + + while (baseIter.hasNext()) { + BaseQuery.checkInterrupted(); + + IncrementalIndex.TimeAndDims entry = baseIter.next(); + if (beyondMaxRowIndex(entry.getRowIndex())) { + continue; + } + + currEntry.set(entry); + + if (filterMatcher.matches()) { + return; + } + } + + done = true; + } + + @Override + public void advanceUninterruptibly() + { + if (!baseIter.hasNext()) { + done = true; + return; + } + + while (baseIter.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + return; + } + + IncrementalIndex.TimeAndDims entry = baseIter.next(); + if (beyondMaxRowIndex(entry.getRowIndex())) { + continue; + } + + currEntry.set(entry); + + if (filterMatcher.matches()) { + return; + } + } + + done = true; + } + + @Override + public void advanceTo(int offset) + { + int count = 0; + while (count < offset && !isDone()) { + advance(); + count++; + } + } + + @Override + public boolean isDone() + { + return done; + } + + @Override + public boolean isDoneOrInterrupted() + { + return isDone() || Thread.currentThread().isInterrupted(); + } + + @Override + public void reset() + { + baseIter = cursorIterable.iterator(); + + if (numAdvanced == -1) { + numAdvanced = 0; + } else { + Iterators.advance(baseIter, numAdvanced); + } + + BaseQuery.checkInterrupted(); + + boolean foundMatched = false; + while (baseIter.hasNext()) { + IncrementalIndex.TimeAndDims entry = baseIter.next(); + if (beyondMaxRowIndex(entry.getRowIndex())) { + numAdvanced++; + continue; + } + currEntry.set(entry); + if (filterMatcher.matches()) { + foundMatched = true; + break; + } + + numAdvanced++; + } + + done = !foundMatched && (emptyRange || !baseIter.hasNext()); + } + + private boolean beyondMaxRowIndex(int rowIndex) + { + // ignore rows whose rowIndex is beyond the maxRowIndex + // rows are order by timestamp, not rowIndex, + // so we still need to go through all rows to skip rows added after cursor created + return rowIndex > maxRowIndex; + } + } } diff --git a/processing/src/main/java/io/druid/segment/incremental/TimeAndDimsHolder.java b/processing/src/main/java/io/druid/segment/incremental/TimeAndDimsHolder.java new file mode 100644 index 00000000000..664885897f5 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/TimeAndDimsHolder.java @@ -0,0 +1,56 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.incremental; + +/** + * This interface is the core "pointer" interface that is used to create {@link io.druid.segment.ColumnValueSelector}s + * over incremental index. It's counterpart for historical segments is {@link io.druid.segment.data.Offset}. + */ +public class TimeAndDimsHolder +{ + IncrementalIndex.TimeAndDims currEntry = null; + + public IncrementalIndex.TimeAndDims get() + { + return currEntry; + } + + public void set(IncrementalIndex.TimeAndDims currEntry) + { + this.currEntry = currEntry; + } + + /** + * This method doesn't have well-defined semantics ("key" of what?), should be removed in favor of {@link #get()}. + */ + public IncrementalIndex.TimeAndDims getKey() + { + return currEntry; + } + + /** + * This method doesn't have well-defined semantics ("value" of what?), should be removed in favor of chaining + * get().getRowIndex(). + */ + public int getValue() + { + return currEntry.getRowIndex(); + } +} diff --git a/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java b/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java index 61f4dd76447..58bc2bcec7c 100644 --- a/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java @@ -322,9 +322,9 @@ public abstract class BaseFilterTest @Override public List apply(Cursor input) { - final DimensionSelector selector = input.makeDimensionSelector( - new DefaultDimensionSpec(selectColumn, selectColumn) - ); + final DimensionSelector selector = input + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec(selectColumn, selectColumn)); final List values = Lists.newArrayList(); @@ -355,7 +355,7 @@ public abstract class BaseFilterTest Aggregator agg = new FilteredAggregatorFactory( new CountAggregatorFactory("count"), maybeOptimize(filter) - ).factorize(input); + ).factorize(input.getColumnSelectorFactory()); for (; !input.isDone(); input.advance()) { agg.aggregate(); @@ -417,9 +417,9 @@ public abstract class BaseFilterTest @Override public List apply(Cursor input) { - final DimensionSelector selector = input.makeDimensionSelector( - new DefaultDimensionSpec(selectColumn, selectColumn) - ); + final DimensionSelector selector = input + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec(selectColumn, selectColumn)); final List values = Lists.newArrayList(); diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 8cc1375ddb0..d3cf377e442 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -278,7 +278,9 @@ public class IncrementalIndexStorageAdapterTest Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.newArrayList()).get(0); DimensionSelector dimSelector; - dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); + dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); index.add( @@ -292,7 +294,9 @@ public class IncrementalIndexStorageAdapterTest // Cursor reset should not be affected by out of order values cursor.reset(); - dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); + dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("sally", "sally")); Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); } } @@ -430,12 +434,9 @@ public class IncrementalIndexStorageAdapterTest @Override public Object apply(Cursor cursor) { - DimensionSelector dimSelector = cursor.makeDimensionSelector( - new DefaultDimensionSpec( - "billy", - "billy" - ) - ); + DimensionSelector dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("billy", "billy")); int cardinality = dimSelector.getValueCardinality(); //index gets more rows at this point, while other thread is iterating over the cursor @@ -513,12 +514,9 @@ public class IncrementalIndexStorageAdapterTest @Override public Object apply(Cursor cursor) { - DimensionSelector dimSelector1A = cursor.makeDimensionSelector( - new DefaultDimensionSpec( - "billy", - "billy" - ) - ); + DimensionSelector dimSelector1A = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("billy", "billy")); int cardinalityA = dimSelector1A.getValueCardinality(); //index gets more rows at this point, while other thread is iterating over the cursor @@ -535,12 +533,9 @@ public class IncrementalIndexStorageAdapterTest throw new RuntimeException(ex); } - DimensionSelector dimSelector1B = cursor.makeDimensionSelector( - new DefaultDimensionSpec( - "billy", - "billy" - ) - ); + DimensionSelector dimSelector1B = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("billy", "billy")); //index gets more rows at this point, while other thread is iterating over the cursor try { index.add( @@ -562,19 +557,13 @@ public class IncrementalIndexStorageAdapterTest throw new RuntimeException(ex); } - DimensionSelector dimSelector1C = cursor.makeDimensionSelector( - new DefaultDimensionSpec( - "billy", - "billy" - ) - ); + DimensionSelector dimSelector1C = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("billy", "billy")); - DimensionSelector dimSelector2D = cursor.makeDimensionSelector( - new DefaultDimensionSpec( - "billy2", - "billy2" - ) - ); + DimensionSelector dimSelector2D = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("billy2", "billy2")); //index gets more rows at this point, while other thread is iterating over the cursor try { index.add( @@ -596,12 +585,9 @@ public class IncrementalIndexStorageAdapterTest throw new RuntimeException(ex); } - DimensionSelector dimSelector3E = cursor.makeDimensionSelector( - new DefaultDimensionSpec( - "billy3", - "billy3" - ) - ); + DimensionSelector dimSelector3E = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("billy3", "billy3")); int rowNumInCursor = 0; // and then, cursoring continues in the other thread diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java index 83c02d6f3bd..1e4c5a725cd 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -85,13 +85,14 @@ public class IngestSegmentFirehose implements Firehose @Override public Sequence apply(final Cursor cursor) { - final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + final LongColumnSelector timestampColumnSelector = + cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME); final Map dimSelectors = Maps.newHashMap(); for (String dim : dims) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector( - new DefaultDimensionSpec(dim, dim) - ); + final DimensionSelector dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec(dim, dim)); // dimSelector is null if the dimension is not present if (dimSelector != null) { dimSelectors.put(dim, dimSelector); @@ -100,7 +101,8 @@ public class IngestSegmentFirehose implements Firehose final Map metSelectors = Maps.newHashMap(); for (String metric : metrics) { - final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + final ObjectColumnSelector metricSelector = + cursor.getColumnSelectorFactory().makeObjectColumnSelector(metric); if (metricSelector != null) { metSelectors.put(metric, metricSelector); } diff --git a/services/src/main/java/io/druid/cli/DumpSegment.java b/services/src/main/java/io/druid/cli/DumpSegment.java index f2d6935e2c8..ce23c12e1e2 100644 --- a/services/src/main/java/io/druid/cli/DumpSegment.java +++ b/services/src/main/java/io/druid/cli/DumpSegment.java @@ -278,7 +278,9 @@ public class DumpSegment extends GuiceRunnable final List selectors = Lists.newArrayList(); for (String columnName : columnNames) { - selectors.add(makeSelector(columnName, index.getColumn(columnName), cursor)); + selectors.add( + makeSelector(columnName, index.getColumn(columnName), cursor.getColumnSelectorFactory()) + ); } while (!cursor.isDone()) {