Refactoring of Storage Adapters (#4710)

* Factor QueryableIndexColumnSelectorFactory and IncrementalIndexColumnSelectorFactory out of QueryableIndexStorageAdapter and IncrementalIndexStorageAdapter; Add Offset.getBaseReadableOffset(); Remove OffsetHolder interface; Replace Cursor extends ColumnSelectorFactory with composition; Reduce indirection in ColumnValueSelectors created by QueryableIndexColumnSelectorFactory

* Don't override clone() in FilteredOffset (the prev. implementation was broken); Some warnings fixed

* Simplify Cursors in QueryableIndexStorageAdapter

* Address comments

* Remove unused and unimplemented methods from GenericColumn interface

* Comments
This commit is contained in:
Roman Leventov 2017-08-28 20:07:31 -05:00 committed by Charles Allen
parent 594a66f3c0
commit 4d109a358a
45 changed files with 1538 additions and 1303 deletions

View File

@ -176,7 +176,7 @@ public class ExpressionBenchmark
Sequences.map(
cursors,
cursor -> {
final BufferAggregator bufferAggregator = aggregatorFactory.apply(cursor);
final BufferAggregator bufferAggregator = aggregatorFactory.apply(cursor.getColumnSelectorFactory());
bufferAggregator.init(aggregationBuffer, 0);
while (!cursor.isDone()) {

View File

@ -514,7 +514,9 @@ public class FilterPartitionBenchmark
{
List<String> strings = new ArrayList<String>();
List<DimensionSelector> selectors = new ArrayList<>();
selectors.add(input.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null)));
selectors.add(
input.getColumnSelectorFactory().makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null))
);
//selectors.add(input.makeDimensionSelector(new DefaultDimensionSpec("dimB", null)));
while (!input.isDone()) {
for (DimensionSelector selector : selectors) {
@ -540,7 +542,7 @@ public class FilterPartitionBenchmark
public List<Long> apply(Cursor input)
{
List<Long> longvals = new ArrayList<Long>();
LongColumnSelector selector = input.makeLongColumnSelector("sumLongSequential");
LongColumnSelector selector = input.getColumnSelectorFactory().makeLongColumnSelector("sumLongSequential");
while (!input.isDone()) {
long rowval = selector.getLong();
blackhole.consume(rowval);

View File

@ -144,10 +144,10 @@ public class IncrementalIndexReadBenchmark
Cursor cursor = Sequences.toList(Sequences.limit(cursors, 1), Lists.<Cursor>newArrayList()).get(0);
List<DimensionSelector> selectors = new ArrayList<>();
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimZipf", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimUniform", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequentialHalfNull", null)));
selectors.add(makeDimensionSelector(cursor, "dimSequential"));
selectors.add(makeDimensionSelector(cursor, "dimZipf"));
selectors.add(makeDimensionSelector(cursor, "dimUniform"));
selectors.add(makeDimensionSelector(cursor, "dimSequentialHalfNull"));
cursor.reset();
while (!cursor.isDone()) {
@ -179,10 +179,10 @@ public class IncrementalIndexReadBenchmark
Cursor cursor = Sequences.toList(Sequences.limit(cursors, 1), Lists.<Cursor>newArrayList()).get(0);
List<DimensionSelector> selectors = new ArrayList<>();
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequential", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimZipf", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimUniform", null)));
selectors.add(cursor.makeDimensionSelector(new DefaultDimensionSpec("dimSequentialHalfNull", null)));
selectors.add(makeDimensionSelector(cursor, "dimSequential"));
selectors.add(makeDimensionSelector(cursor, "dimZipf"));
selectors.add(makeDimensionSelector(cursor, "dimUniform"));
selectors.add(makeDimensionSelector(cursor, "dimSequentialHalfNull"));
cursor.reset();
while (!cursor.isDone()) {
@ -205,4 +205,9 @@ public class IncrementalIndexReadBenchmark
null
);
}
private static DimensionSelector makeDimensionSelector(Cursor cursor, String name)
{
return cursor.getColumnSelectorFactory().makeDimensionSelector(new DefaultDimensionSpec(name, null));
}
}

View File

@ -129,19 +129,21 @@ public class ScanQueryEngine
@Override
public Iterator<ScanResultValue> make()
{
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final LongColumnSelector timestampColumnSelector =
cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final List<ColumnSelectorPlus<SelectQueryEngine.SelectColumnSelectorStrategy>> selectorPlusList = Arrays.asList(
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
Lists.newArrayList(dims),
cursor
cursor.getColumnSelectorFactory()
)
);
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
final ObjectColumnSelector metricSelector =
cursor.getColumnSelectorFactory().makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
}
final int batchSize = query.getBatchSize();

View File

@ -332,7 +332,7 @@ public class GroupByQueryEngine
);
}
final DimensionSelector selector = cursor.makeDimensionSelector(dimSpec);
final DimensionSelector selector = cursor.getColumnSelectorFactory().makeDimensionSelector(dimSpec);
if (selector != null) {
if (selector.getValueCardinality() == DimensionSelector.CARDINALITY_UNKNOWN) {
throw new UnsupportedOperationException(
@ -349,7 +349,7 @@ public class GroupByQueryEngine
sizesRequired = new int[aggregatorSpecs.size()];
for (int i = 0; i < aggregatorSpecs.size(); ++i) {
AggregatorFactory aggregatorSpec = aggregatorSpecs.get(i);
aggregators[i] = aggregatorSpec.factorizeBuffered(cursor);
aggregators[i] = aggregatorSpec.factorizeBuffered(cursor.getColumnSelectorFactory());
metricNames[i] = aggregatorSpec.getName();
sizesRequired[i] = aggregatorSpec.getMaxIntermediateSize();
}

View File

@ -142,7 +142,7 @@ public class GroupByQueryEngineV2
.createColumnSelectorPluses(
STRATEGY_FACTORY,
query.getDimensions(),
cursor
cursor.getColumnSelectorFactory()
);
GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(selectorPlus);
@ -434,7 +434,7 @@ public class GroupByQueryEngineV2
return new BufferHashGrouper<>(
Suppliers.ofInstance(buffer),
keySerde,
cursor,
cursor.getColumnSelectorFactory(),
query.getAggregatorSpecs()
.toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]),
querySpecificConfig.getBufferGrouperMaxSize(),
@ -587,7 +587,7 @@ public class GroupByQueryEngineV2
{
return new BufferArrayGrouper(
Suppliers.ofInstance(buffer),
cursor,
cursor.getColumnSelectorFactory(),
query.getAggregatorSpecs()
.toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]),
cardinality

View File

@ -270,12 +270,9 @@ public class SegmentAnalyzer
@Override
public Long accumulate(Long accumulated, Cursor cursor)
{
DimensionSelector selector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
columnName,
columnName
)
);
DimensionSelector selector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec(columnName, columnName));
if (selector == null) {
return accumulated;
}

View File

@ -117,7 +117,7 @@ public class CursorOnlyStrategy extends SearchStrategy
DimensionHandlerUtils.createColumnSelectorPluses(
SearchQueryRunner.SEARCH_COLUMN_SELECTOR_STRATEGY_FACTORY,
dimsToSearch,
cursor
cursor.getColumnSelectorFactory()
)
);

View File

@ -241,13 +241,14 @@ public class SelectQueryEngine
query.isDescending()
);
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final LongColumnSelector timestampColumnSelector =
cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final List<ColumnSelectorPlus<SelectColumnSelectorStrategy>> selectorPlusList = Arrays.asList(
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
Lists.newArrayList(dims),
cursor
cursor.getColumnSelectorFactory()
)
);
@ -257,7 +258,8 @@ public class SelectQueryEngine
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
final ObjectColumnSelector metricSelector =
cursor.getColumnSelectorFactory().makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
builder.addMetric(metric);
}

View File

@ -103,7 +103,8 @@ public class TimeBoundaryQueryRunnerFactory
if (cursor.isDone()) {
return null;
}
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final LongColumnSelector timestampColumnSelector =
cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final DateTime timestamp = DateTimes.utc(timestampColumnSelector.getLong());
return new Result<>(adapter.getInterval().getStart(), timestamp);
}

View File

@ -66,7 +66,7 @@ public class TimeseriesQueryEngine
String[] aggregatorNames = new String[aggregatorSpecs.size()];
for (int i = 0; i < aggregatorSpecs.size(); i++) {
aggregators[i] = aggregatorSpecs.get(i).factorize(cursor);
aggregators[i] = aggregatorSpecs.get(i).factorize(cursor.getColumnSelectorFactory());
aggregatorNames[i] = aggregatorSpecs.get(i).getName();
}

View File

@ -45,7 +45,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
int aggregatorIndex = 0;
for (AggregatorFactory spec : aggregatorSpecs) {
aggregators[aggregatorIndex] = spec.factorize(cursor);
aggregators[aggregatorIndex] = spec.factorize(cursor.getColumnSelectorFactory());
++aggregatorIndex;
}
return aggregators;
@ -56,7 +56,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
BufferAggregator[] aggregators = new BufferAggregator[aggregatorSpecs.size()];
int aggregatorIndex = 0;
for (AggregatorFactory spec : aggregatorSpecs) {
aggregators[aggregatorIndex] = spec.factorizeBuffered(cursor);
aggregators[aggregatorIndex] = spec.factorizeBuffered(cursor.getColumnSelectorFactory());
++aggregatorIndex;
}
return aggregators;

View File

@ -85,7 +85,7 @@ public class TopNMapFn
final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus(
STRATEGY_FACTORY,
query.getDimensionSpec(),
cursor
cursor.getColumnSelectorFactory()
);
if (selectorPlus.getSelector() == null) {

View File

@ -27,6 +27,7 @@ import io.druid.extendedset.intset.EmptyIntIterator;
import io.druid.java.util.common.RE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Offset;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import org.roaringbitmap.IntIterator;
@ -202,6 +203,12 @@ public class BitmapOffset extends Offset
value = valueForReset;
}
@Override
public ReadableOffset getBaseReadableOffset()
{
return this;
}
@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public Offset clone()

View File

@ -24,8 +24,9 @@ import org.joda.time.DateTime;
/**
*/
public interface Cursor extends ColumnSelectorFactory
public interface Cursor
{
ColumnSelectorFactory getColumnSelectorFactory();
DateTime getTime();
void advance();
void advanceUninterruptibly();

View File

@ -134,13 +134,14 @@ public final class DimensionHandlerUtils
* @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
* @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
* @param cursor Used to create value selectors for columns.
* @param columnSelectorFactory Used to create value selectors for columns.
* @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
*/
public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorPlus<ColumnSelectorStrategyClass>[] createColumnSelectorPluses(
public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
ColumnSelectorPlus<ColumnSelectorStrategyClass>[] createColumnSelectorPluses(
ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
List<DimensionSpec> dimensionSpecs,
ColumnSelectorFactory cursor
ColumnSelectorFactory columnSelectorFactory
)
{
int dimCount = dimensionSpecs.size();
@ -150,12 +151,12 @@ public final class DimensionHandlerUtils
final String dimName = dimSpec.getDimension();
final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec(
dimSpec,
cursor
columnSelectorFactory
);
ColumnSelectorStrategyClass strategy = makeStrategy(
strategyFactory,
dimSpec,
cursor.getColumnCapabilities(dimSpec.getDimension()),
columnSelectorFactory.getColumnCapabilities(dimSpec.getDimension()),
selector
);
final ColumnSelectorPlus<ColumnSelectorStrategyClass> selectorPlus = new ColumnSelectorPlus<>(

View File

@ -25,7 +25,7 @@ import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.TimeAndDimsHolder;
import javax.annotation.Nullable;
@ -217,7 +217,7 @@ public interface DimensionIndexer
*/
DimensionSelector makeDimensionSelector(
DimensionSpec spec,
IncrementalIndexStorageAdapter.EntryHolder currEntry,
TimeAndDimsHolder currEntry,
IncrementalIndex.DimensionDesc desc
);
@ -229,10 +229,7 @@ public interface DimensionIndexer
* @param desc Descriptor object for this dimension within an IncrementalIndex
* @return A new object that reads rows from currEntry
*/
LongColumnSelector makeLongColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry,
IncrementalIndex.DimensionDesc desc
);
LongColumnSelector makeLongColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc);
/**
@ -242,10 +239,7 @@ public interface DimensionIndexer
* @param desc Descriptor object for this dimension within an IncrementalIndex
* @return A new object that reads rows from currEntry
*/
FloatColumnSelector makeFloatColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry,
IncrementalIndex.DimensionDesc desc
);
FloatColumnSelector makeFloatColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc);
/**
@ -255,10 +249,7 @@ public interface DimensionIndexer
* @param desc Descriptor object for this dimension within an IncrementalIndex
* @return A new object that reads rows from currEntry
*/
DoubleColumnSelector makeDoubleColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry,
IncrementalIndex.DimensionDesc desc
);
DoubleColumnSelector makeDoubleColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc);
/**
* Compares the row values for this DimensionIndexer's dimension from a TimeAndDims key.

View File

@ -26,7 +26,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.TimeAndDimsHolder;
import javax.annotation.Nullable;
import java.util.List;
@ -86,19 +86,16 @@ public class DoubleDimensionIndexer implements DimensionIndexer<Double, Double,
@Override
public DimensionSelector makeDimensionSelector(
DimensionSpec spec, IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
DimensionSpec spec,
TimeAndDimsHolder currEntry,
IncrementalIndex.DimensionDesc desc
)
{
return new DoubleWrappingDimensionSelector(
makeDoubleColumnSelector(currEntry, desc),
spec.getExtractionFn()
);
return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(currEntry, desc), spec.getExtractionFn());
}
@Override
public LongColumnSelector makeLongColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
public LongColumnSelector makeLongColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc)
{
final int dimIndex = desc.getIndex();
class IndexerLongColumnSelector implements LongColumnSelector
@ -127,9 +124,7 @@ public class DoubleDimensionIndexer implements DimensionIndexer<Double, Double,
}
@Override
public FloatColumnSelector makeFloatColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
public FloatColumnSelector makeFloatColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc)
{
final int dimIndex = desc.getIndex();
class IndexerFloatColumnSelector implements FloatColumnSelector
@ -158,9 +153,7 @@ public class DoubleDimensionIndexer implements DimensionIndexer<Double, Double,
}
@Override
public DoubleColumnSelector makeDoubleColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
public DoubleColumnSelector makeDoubleColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc)
{
final int dimIndex = desc.getIndex();
class IndexerDoubleColumnSelector implements DoubleColumnSelector

View File

@ -27,31 +27,32 @@ import io.druid.query.filter.RowOffsetMatcherFactory;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Offset;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.OffsetHolder;
import org.roaringbitmap.IntIterator;
public final class FilteredOffset extends Offset
{
private Offset baseOffset;
private final Offset baseOffset;
private final ValueMatcher filterMatcher;
FilteredOffset(
HistoricalCursor cursor,
Offset baseOffset,
ColumnSelectorFactory columnSelectorFactory,
boolean descending,
Filter postFilter,
ColumnSelectorBitmapIndexSelector bitmapIndexSelector
)
{
this.baseOffset = baseOffset;
RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory(
cursor,
baseOffset.getBaseReadableOffset(),
descending
);
if (postFilter instanceof BooleanFilter) {
filterMatcher = ((BooleanFilter) postFilter).makeMatcher(
bitmapIndexSelector,
cursor,
columnSelectorFactory,
rowOffsetMatcherFactory
);
} else {
@ -60,45 +61,19 @@ public final class FilteredOffset extends Offset
postFilter.getBitmapIndex(bitmapIndexSelector)
);
} else {
filterMatcher = postFilter.makeMatcher(cursor);
}
}
}
void reset(Offset baseOffset)
{
this.baseOffset = baseOffset;
if (baseOffset.withinBounds()) {
if (!filterMatcher.matches()) {
BaseQuery.checkInterrupted();
incrementInterruptibly();
filterMatcher = postFilter.makeMatcher(columnSelectorFactory);
}
}
incrementIfNeededOnCreationOrReset();
}
@Override
public void increment()
{
baseOffset.increment();
while (baseOffset.withinBounds() && !Thread.currentThread().isInterrupted()) {
if (filterMatcher.matches()) {
while (!Thread.currentThread().isInterrupted()) {
baseOffset.increment();
if (!baseOffset.withinBounds() || filterMatcher.matches()) {
return;
} else {
baseOffset.increment();
}
}
}
void incrementInterruptibly()
{
baseOffset.increment();
while (baseOffset.withinBounds()) {
BaseQuery.checkInterrupted();
if (filterMatcher.matches()) {
return;
} else {
baseOffset.increment();
}
}
}
@ -113,14 +88,32 @@ public final class FilteredOffset extends Offset
public void reset()
{
baseOffset.reset();
incrementIfNeededOnCreationOrReset();
}
private void incrementIfNeededOnCreationOrReset()
{
if (baseOffset.withinBounds()) {
if (!filterMatcher.matches()) {
increment();
// increment() returns early if it detects the current Thread is interrupted. It will leave this
// FilteredOffset in an illegal state, because it may point to an offset that should be filtered. So must to
// call BaseQuery.checkInterrupted() and thereby throw a QueryInterruptedException.
BaseQuery.checkInterrupted();
}
}
}
@Override
public ReadableOffset getBaseReadableOffset()
{
return baseOffset.getBaseReadableOffset();
}
@Override
public Offset clone()
{
FilteredOffset offset = (FilteredOffset) super.clone();
offset.baseOffset = offset.baseOffset.clone();
return offset;
throw new UnsupportedOperationException("FilteredOffset should not be cloned");
}
@Override
@ -138,12 +131,12 @@ public final class FilteredOffset extends Offset
private static class CursorOffsetHolderRowOffsetMatcherFactory implements RowOffsetMatcherFactory
{
private final OffsetHolder holder;
private final ReadableOffset offset;
private final boolean descending;
CursorOffsetHolderRowOffsetMatcherFactory(OffsetHolder holder, boolean descending)
CursorOffsetHolderRowOffsetMatcherFactory(ReadableOffset offset, boolean descending)
{
this.holder = holder;
this.offset = offset;
this.descending = descending;
}
@ -168,7 +161,7 @@ public final class FilteredOffset extends Offset
@Override
public boolean matches()
{
int currentOffset = holder.getReadableOffset().getOffset();
int currentOffset = offset.getOffset();
while (iterOffset > currentOffset && iter.hasNext()) {
iterOffset = iter.next();
}
@ -179,8 +172,7 @@ public final class FilteredOffset extends Offset
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("holder", holder);
inspector.visit("offset", holder.getReadableOffset());
inspector.visit("offset", offset);
inspector.visit("iter", iter);
}
};
@ -192,7 +184,7 @@ public final class FilteredOffset extends Offset
@Override
public boolean matches()
{
int currentOffset = holder.getReadableOffset().getOffset();
int currentOffset = offset.getOffset();
while (iterOffset < currentOffset && iter.hasNext()) {
iterOffset = iter.next();
}
@ -203,8 +195,7 @@ public final class FilteredOffset extends Offset
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("holder", holder);
inspector.visit("offset", holder.getReadableOffset());
inspector.visit("offset", offset);
inspector.visit("iter", iter);
}
};

View File

@ -26,7 +26,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.TimeAndDimsHolder;
import javax.annotation.Nullable;
import java.util.List;
@ -87,7 +87,7 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
@Override
public DimensionSelector makeDimensionSelector(
DimensionSpec spec, IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
DimensionSpec spec, TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc
)
{
return new FloatWrappingDimensionSelector(
@ -98,7 +98,7 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
@Override
public LongColumnSelector makeLongColumnSelector(
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final TimeAndDimsHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
@ -130,7 +130,7 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
@Override
public FloatColumnSelector makeFloatColumnSelector(
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final TimeAndDimsHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
@ -161,7 +161,8 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
@Override
public DoubleColumnSelector makeDoubleColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
final TimeAndDimsHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();

View File

@ -26,7 +26,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.TimeAndDimsHolder;
import javax.annotation.Nullable;
import java.util.List;
@ -87,7 +87,7 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
@Override
public DimensionSelector makeDimensionSelector(
DimensionSpec spec, IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
DimensionSpec spec, TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc
)
{
return new LongWrappingDimensionSelector(
@ -98,7 +98,7 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
@Override
public LongColumnSelector makeLongColumnSelector(
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final TimeAndDimsHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
@ -129,7 +129,7 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
@Override
public FloatColumnSelector makeFloatColumnSelector(
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final TimeAndDimsHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
@ -161,7 +161,8 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
@Override
public DoubleColumnSelector makeDoubleColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
final TimeAndDimsHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Offset;
import io.druid.segment.data.ReadableOffset;
public class NoFilterOffset extends Offset
{
@ -55,6 +56,12 @@ public class NoFilterOffset extends Offset
currentOffset = initialOffset;
}
@Override
public ReadableOffset getBaseReadableOffset()
{
return this;
}
@Override
public Offset clone()
{

View File

@ -0,0 +1,385 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.collect.Maps;
import io.druid.java.util.common.io.Closer;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.column.GenericColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ReadableOffset;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.Map;
/**
* The basic implementation of {@link ColumnSelectorFactory} over a historical segment (i. e. {@link QueryableIndex}).
* It's counterpart for incremental index is {@link io.druid.segment.incremental.IncrementalIndexColumnSelectorFactory}.
*/
class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory
{
private final QueryableIndex index;
private final VirtualColumns virtualColumns;
private final boolean descending;
private final Closer closer;
protected final ReadableOffset offset;
private final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
private final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
private final Map<String, Object> objectColumnCache = Maps.newHashMap();
QueryableIndexColumnSelectorFactory(
QueryableIndex index,
VirtualColumns virtualColumns,
boolean descending,
Closer closer,
ReadableOffset offset
)
{
this.index = index;
this.virtualColumns = virtualColumns;
this.descending = descending;
this.closer = closer;
this.offset = offset;
}
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
if (virtualColumns.exists(dimensionSpec.getDimension())) {
return virtualColumns.makeDimensionSelector(dimensionSpec, this);
}
return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec));
}
private DimensionSelector makeDimensionSelectorUndecorated(DimensionSpec dimensionSpec)
{
final String dimension = dimensionSpec.getDimension();
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
final Column columnDesc = index.getColumn(dimension);
if (columnDesc == null) {
return DimensionSelectorUtils.constantSelector(null, extractionFn);
}
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return new SingleScanTimeDimSelector(
makeLongColumnSelector(dimension),
extractionFn,
descending
);
}
if (columnDesc.getCapabilities().getType() == ValueType.LONG) {
return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn);
}
if (columnDesc.getCapabilities().getType() == ValueType.FLOAT) {
return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn);
}
if (columnDesc.getCapabilities().getType() == ValueType.DOUBLE) {
return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(dimension), extractionFn);
}
DictionaryEncodedColumn<String> cachedColumn = dictionaryColumnCache.get(dimension);
if (cachedColumn == null) {
cachedColumn = columnDesc.getDictionaryEncoding();
closer.register(cachedColumn);
dictionaryColumnCache.put(dimension, cachedColumn);
}
final DictionaryEncodedColumn<String> column = cachedColumn;
if (column == null) {
return DimensionSelectorUtils.constantSelector(null, extractionFn);
} else {
return column.makeDimensionSelector(offset, extractionFn);
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeFloatColumnSelector(columnName, this);
}
GenericColumn cachedMetricVals = genericColumnCache.get(columnName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(columnName);
if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) {
cachedMetricVals = holder.getGenericColumn();
closer.register(cachedMetricVals);
genericColumnCache.put(columnName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return ZeroFloatColumnSelector.instance();
}
return cachedMetricVals.makeFloatSingleValueRowSelector(offset);
}
@Override
public DoubleColumnSelector makeDoubleColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeDoubleColumnSelector(columnName, this);
}
GenericColumn cachedMetricVals = genericColumnCache.get(columnName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(columnName);
if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) {
cachedMetricVals = holder.getGenericColumn();
closer.register(cachedMetricVals);
genericColumnCache.put(columnName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return ZeroDoubleColumnSelector.instance();
}
return cachedMetricVals.makeDoubleSingleValueRowSelector(offset);
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeLongColumnSelector(columnName, this);
}
GenericColumn cachedMetricVals = genericColumnCache.get(columnName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(columnName);
if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) {
cachedMetricVals = holder.getGenericColumn();
closer.register(cachedMetricVals);
genericColumnCache.put(columnName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return ZeroLongColumnSelector.instance();
}
return cachedMetricVals.makeLongSingleValueRowSelector(offset);
}
@Nullable
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
if (virtualColumns.exists(column)) {
return virtualColumns.makeObjectColumnSelector(column, this);
}
Object cachedColumnVals = objectColumnCache.get(column);
if (cachedColumnVals == null) {
Column holder = index.getColumn(column);
if (holder != null) {
final ColumnCapabilities capabilities = holder.getCapabilities();
if (capabilities.isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
} else if (capabilities.getType() == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
} else {
cachedColumnVals = holder.getGenericColumn();
}
}
if (cachedColumnVals != null) {
closer.register((Closeable) cachedColumnVals);
objectColumnCache.put(column, cachedColumnVals);
}
}
if (cachedColumnVals == null) {
return null;
}
if (cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if (columnVals.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multi-value GenericColumns"
);
}
if (type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class<Float> classOfObject()
{
return Float.class;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(offset.getOffset());
}
};
}
if (type == ValueType.DOUBLE) {
return new ObjectColumnSelector<Double>()
{
@Override
public Class<Double> classOfObject()
{
return Double.class;
}
@Override
public Double get()
{
return columnVals.getDoubleSingleValueRow(offset.getOffset());
}
};
}
if (type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class<Long> classOfObject()
{
return Long.class;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(offset.getOffset());
}
};
}
if (type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class<String> classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(offset.getOffset());
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn<String> columnVals = (DictionaryEncodedColumn) cachedColumnVals;
if (columnVals.hasMultipleValues()) {
return new ObjectColumnSelector<Object>()
{
@Override
public Class<Object> classOfObject()
{
return Object.class;
}
@Override
@Nullable
public Object get()
{
final IndexedInts multiValueRow = columnVals.getMultiValueRow(offset.getOffset());
if (multiValueRow.size() == 0) {
return null;
} else if (multiValueRow.size() == 1) {
return columnVals.lookupName(multiValueRow.get(0));
} else {
final String[] strings = new String[multiValueRow.size()];
for (int i = 0; i < multiValueRow.size(); i++) {
strings[i] = columnVals.lookupName(multiValueRow.get(i));
}
return strings;
}
}
};
} else {
return new ObjectColumnSelector<String>()
{
@Override
public Class<String> classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(offset.getOffset()));
}
};
}
}
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(offset.getOffset());
}
};
}
@Override
@Nullable
public ColumnCapabilities getColumnCapabilities(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.getColumnCapabilities(columnName);
}
return QueryableIndexStorageAdapter.getColumnCapabilites(index, columnName);
}
}

View File

@ -22,7 +22,6 @@ package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.java.util.common.DateTimes;
@ -34,29 +33,22 @@ import io.druid.query.BaseQuery;
import io.druid.query.BitmapResultFactory;
import io.druid.query.DefaultBitmapResultFactory;
import io.druid.query.QueryMetrics;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.Filter;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.column.GenericColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.Offset;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.filter.AndFilter;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -69,9 +61,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
private final QueryableIndex index;
public QueryableIndexStorageAdapter(
QueryableIndex index
)
public QueryableIndexStorageAdapter(QueryableIndex index)
{
this.index = index;
}
@ -140,6 +130,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
@Override
@Nullable
public Comparable getMinValue(String dimension)
{
Column column = index.getColumn(dimension);
@ -151,6 +142,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
@Override
@Nullable
public Comparable getMaxValue(String dimension)
{
Column column = index.getColumn(dimension);
@ -168,6 +160,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
@Override
@Nullable
public ColumnCapabilities getColumnCapabilities(String column)
{
return getColumnCapabilites(index, column);
@ -199,7 +192,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Nullable QueryMetrics<?> queryMetrics
)
{
Interval actualInterval = interval;
DateTime minTime = getMinTime();
long minDataTimestamp = minTime.getMillis();
@ -207,16 +199,11 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
long maxDataTimestamp = maxTime.getMillis();
final Interval dataInterval = new Interval(minTime, gran.bucketEnd(maxTime));
if (!actualInterval.overlaps(dataInterval)) {
if (!interval.overlaps(dataInterval)) {
return Sequences.empty();
}
if (actualInterval.getStart().isBefore(dataInterval.getStart())) {
actualInterval = actualInterval.withStart(dataInterval.getStart());
}
if (actualInterval.getEnd().isAfter(dataInterval.getEnd())) {
actualInterval = actualInterval.withEnd(dataInterval.getEnd());
}
final Interval actualInterval = interval.overlap(dataInterval);
final ColumnSelectorBitmapIndexSelector selector = new ColumnSelectorBitmapIndexSelector(
index.getBitmapFactoryForDimensions(),
@ -324,7 +311,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
);
}
private static ColumnCapabilities getColumnCapabilites(ColumnSelector index, String columnName)
@Nullable
static ColumnCapabilities getColumnCapabilites(ColumnSelector index, String columnName)
{
Column columnObj = index.getColumn(columnName);
if (columnObj == null) {
@ -375,10 +363,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
final Offset baseOffset = offset.clone();
final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
final Closer closer = Closer.create();
@ -432,488 +416,27 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
);
final Offset initOffset = offset.clone();
final Offset baseCursorOffset = offset.clone();
final ColumnSelectorFactory columnSelectorFactory = new QueryableIndexColumnSelectorFactory(
index,
virtualColumns,
descending,
closer,
baseCursorOffset.getBaseReadableOffset()
);
final DateTime myBucket = gran.toDateTime(inputInterval.getStartMillis());
abstract class QueryableIndexBaseCursor<OffsetType extends Offset> implements HistoricalCursor
{
OffsetType cursorOffset;
@Override
public OffsetType getOffset()
{
return cursorOffset;
}
@Override
public ReadableOffset getReadableOffset()
{
return cursorOffset;
}
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public DimensionSelector makeDimensionSelector(
DimensionSpec dimensionSpec
)
{
if (virtualColumns.exists(dimensionSpec.getDimension())) {
return virtualColumns.makeDimensionSelector(dimensionSpec, this);
}
return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec));
}
private DimensionSelector makeDimensionSelectorUndecorated(
DimensionSpec dimensionSpec
)
{
final String dimension = dimensionSpec.getDimension();
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
final Column columnDesc = index.getColumn(dimension);
if (columnDesc == null) {
return DimensionSelectorUtils.constantSelector(null, extractionFn);
}
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return new SingleScanTimeDimSelector(
makeLongColumnSelector(dimension),
extractionFn,
descending
);
}
if (columnDesc.getCapabilities().getType() == ValueType.LONG) {
return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn);
}
if (columnDesc.getCapabilities().getType() == ValueType.FLOAT) {
return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn);
}
if (columnDesc.getCapabilities().getType() == ValueType.DOUBLE) {
return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(dimension), extractionFn);
}
DictionaryEncodedColumn<String> cachedColumn = dictionaryColumnCache.get(dimension);
if (cachedColumn == null) {
cachedColumn = columnDesc.getDictionaryEncoding();
closer.register(cachedColumn);
dictionaryColumnCache.put(dimension, cachedColumn);
}
final DictionaryEncodedColumn<String> column = cachedColumn;
if (column == null) {
return DimensionSelectorUtils.constantSelector(null, extractionFn);
} else {
return column.makeDimensionSelector(this, extractionFn);
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeFloatColumnSelector(columnName, this);
}
GenericColumn cachedMetricVals = genericColumnCache.get(columnName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(columnName);
if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) {
cachedMetricVals = holder.getGenericColumn();
closer.register(cachedMetricVals);
genericColumnCache.put(columnName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return ZeroFloatColumnSelector.instance();
}
final GenericColumn metricVals = cachedMetricVals;
return new HistoricalFloatColumnSelector()
{
@Override
public float getFloat()
{
return metricVals.getFloatSingleValueRow(getReadableOffset().getOffset());
}
@Override
public float get(int offset)
{
return metricVals.getFloatSingleValueRow(offset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("metricVals", metricVals);
inspector.visit("cursorOffset", getReadableOffset());
}
};
}
@Override
public DoubleColumnSelector makeDoubleColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeDoubleColumnSelector(columnName, this);
}
GenericColumn cachedMetricVals = genericColumnCache.get(columnName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(columnName);
if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) {
cachedMetricVals = holder.getGenericColumn();
closer.register(cachedMetricVals);
genericColumnCache.put(columnName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return ZeroDoubleColumnSelector.instance();
}
final GenericColumn metricVals = cachedMetricVals;
return new DoubleColumnSelector()
{
@Override
public double getDouble()
{
return metricVals.getDoubleSingleValueRow(getReadableOffset().getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("metricVals", metricVals);
inspector.visit("cursorOffset", getReadableOffset());
}
};
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeLongColumnSelector(columnName, this);
}
GenericColumn cachedMetricVals = genericColumnCache.get(columnName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(columnName);
if (holder != null && ValueType.isNumeric(holder.getCapabilities().getType())) {
cachedMetricVals = holder.getGenericColumn();
closer.register(cachedMetricVals);
genericColumnCache.put(columnName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return ZeroLongColumnSelector.instance();
}
final GenericColumn metricVals = cachedMetricVals;
return new LongColumnSelector()
{
@Override
public long getLong()
{
return metricVals.getLongSingleValueRow(getReadableOffset().getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("metricVals", metricVals);
inspector.visit("cursorOffset", getReadableOffset());
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
if (virtualColumns.exists(column)) {
return virtualColumns.makeObjectColumnSelector(column, this);
}
Object cachedColumnVals = objectColumnCache.get(column);
if (cachedColumnVals == null) {
Column holder = index.getColumn(column);
if (holder != null) {
final ColumnCapabilities capabilities = holder.getCapabilities();
if (capabilities.isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
} else if (capabilities.getType() == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
} else {
cachedColumnVals = holder.getGenericColumn();
}
}
if (cachedColumnVals != null) {
closer.register((Closeable) cachedColumnVals);
objectColumnCache.put(column, cachedColumnVals);
}
}
if (cachedColumnVals == null) {
return null;
}
if (cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if (columnVals.hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multi-value GenericColumns"
);
}
if (type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.class;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(getReadableOffset().getOffset());
}
};
}
if (type == ValueType.DOUBLE) {
return new ObjectColumnSelector<Double>()
{
@Override
public Class classOfObject()
{
return Double.class;
}
@Override
public Double get()
{
return columnVals.getDoubleSingleValueRow(getReadableOffset().getOffset());
}
};
}
if (type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.class;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(getReadableOffset().getOffset());
}
};
}
if (type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(getReadableOffset().getOffset());
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn<String> columnVals = (DictionaryEncodedColumn) cachedColumnVals;
if (columnVals.hasMultipleValues()) {
return new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
int currentOffset = getReadableOffset().getOffset();
final IndexedInts multiValueRow = columnVals.getMultiValueRow(currentOffset);
if (multiValueRow.size() == 0) {
return null;
} else if (multiValueRow.size() == 1) {
return columnVals.lookupName(multiValueRow.get(0));
} else {
final String[] strings = new String[multiValueRow.size()];
for (int i = 0; i < multiValueRow.size(); i++) {
strings[i] = columnVals.lookupName(multiValueRow.get(i));
}
return strings;
}
}
};
} else {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
int currentOffset = getReadableOffset().getOffset();
return columnVals.lookupName(columnVals.getSingleValueRow(currentOffset));
}
};
}
}
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(getReadableOffset().getOffset());
}
};
}
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.getColumnCapabilities(columnName);
}
return getColumnCapabilites(index, columnName);
}
}
if (postFilter == null) {
return new QueryableIndexBaseCursor<Offset>()
{
{
reset();
}
@Override
public void advance()
{
BaseQuery.checkInterrupted();
cursorOffset.increment();
}
@Override
public void advanceUninterruptibly()
{
cursorOffset.increment();
}
@Override
public void reset()
{
cursorOffset = initOffset.clone();
}
};
return new QueryableIndexCursor(baseCursorOffset, columnSelectorFactory, myBucket);
} else {
return new QueryableIndexBaseCursor<FilteredOffset>()
{
private Offset baseOffset;
{
cursorOffset = new FilteredOffset(this, descending, postFilter, bitmapIndexSelector);
reset();
}
@Override
public ReadableOffset getReadableOffset()
{
return baseOffset;
}
@Override
public void advance()
{
BaseQuery.checkInterrupted();
cursorOffset.incrementInterruptibly();
}
@Override
public void advanceUninterruptibly()
{
if (!Thread.currentThread().isInterrupted()) {
cursorOffset.increment();
}
}
@Override
public void reset()
{
baseOffset = initOffset.clone();
cursorOffset.reset(baseOffset);
}
};
FilteredOffset filteredOffset = new FilteredOffset(
baseCursorOffset,
columnSelectorFactory,
descending,
postFilter,
bitmapIndexSelector
);
return new QueryableIndexCursor(filteredOffset, columnSelectorFactory, myBucket);
}
}
@ -924,14 +447,92 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
private static class QueryableIndexCursor implements HistoricalCursor
{
private final Offset cursorOffset;
private final ColumnSelectorFactory columnSelectorFactory;
private final DateTime bucketStart;
QueryableIndexCursor(Offset cursorOffset, ColumnSelectorFactory columnSelectorFactory, DateTime bucketStart)
{
this.cursorOffset = cursorOffset;
this.columnSelectorFactory = columnSelectorFactory;
this.bucketStart = bucketStart;
}
@Override
public Offset getOffset()
{
return cursorOffset;
}
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
return columnSelectorFactory;
}
@Override
public DateTime getTime()
{
return bucketStart;
}
@Override
public void advance()
{
cursorOffset.increment();
// Must call BaseQuery.checkInterrupted() after cursorOffset.increment(), not before, because
// FilteredOffset.increment() is a potentially long, not an "instant" operation (unlike to all other subclasses
// of Offset) and it returns early on interruption, leaving itself in an illegal state. We should not let
// aggregators, etc. access this illegal state and throw a QueryInterruptedException by calling
// BaseQuery.checkInterrupted().
BaseQuery.checkInterrupted();
}
@Override
public void advanceUninterruptibly()
{
cursorOffset.increment();
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
cursorOffset.reset();
}
}
public abstract static class TimestampCheckingOffset extends Offset
{
protected final Offset baseOffset;
protected final GenericColumn timestamps;
protected final long timeLimit;
protected final boolean allWithinThreshold;
final Offset baseOffset;
final GenericColumn timestamps;
final long timeLimit;
final boolean allWithinThreshold;
public TimestampCheckingOffset(
TimestampCheckingOffset(
Offset baseOffset,
GenericColumn timestamps,
long timeLimit,
@ -969,6 +570,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
baseOffset.reset();
}
@Override
public ReadableOffset getBaseReadableOffset()
{
return baseOffset.getBaseReadableOffset();
}
protected abstract boolean timeInRange(long current);
@Override
@ -977,6 +584,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
baseOffset.increment();
}
@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public Offset clone()
{
@ -994,7 +602,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
public static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset
{
public AscendingTimestampCheckingOffset(
AscendingTimestampCheckingOffset(
Offset baseOffset,
GenericColumn timestamps,
long timeLimit,
@ -1017,6 +625,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
"<" + timeLimit + "::" + baseOffset;
}
@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public Offset clone()
{
@ -1026,7 +635,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
public static class DescendingTimestampCheckingOffset extends TimestampCheckingOffset
{
public DescendingTimestampCheckingOffset(
DescendingTimestampCheckingOffset(
Offset baseOffset,
GenericColumn timestamps,
long timeLimit,
@ -1050,6 +659,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
"::" + baseOffset;
}
@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public Offset clone()
{

View File

@ -48,7 +48,9 @@ public interface StorageAdapter extends CursorFactory
public int getDimensionCardinality(String column);
public DateTime getMinTime();
public DateTime getMaxTime();
@Nullable
public Comparable getMinValue(String column);
@Nullable
public Comparable getMaxValue(String column);
public Capabilities getCapabilities();

View File

@ -40,7 +40,7 @@ import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.TimeAndDimsHolder;
import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
@ -370,7 +370,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
@Override
public DimensionSelector makeDimensionSelector(
final DimensionSpec spec,
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final TimeAndDimsHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
@ -553,25 +553,19 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
@Override
public LongColumnSelector makeLongColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
public LongColumnSelector makeLongColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc)
{
return ZeroLongColumnSelector.instance();
}
@Override
public FloatColumnSelector makeFloatColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
public FloatColumnSelector makeFloatColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc)
{
return ZeroFloatColumnSelector.instance();
}
@Override
public DoubleColumnSelector makeDoubleColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
public DoubleColumnSelector makeDoubleColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc)
{
return ZeroDoubleColumnSelector.instance();
}

View File

@ -22,7 +22,7 @@ package io.druid.segment.column;
import io.druid.query.extraction.ExtractionFn;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.historical.OffsetHolder;
import io.druid.segment.data.ReadableOffset;
import java.io.Closeable;
@ -38,5 +38,5 @@ public interface DictionaryEncodedColumn<ActualType extends Comparable> extends
public int lookupId(ActualType name);
public int getCardinality();
DimensionSelector makeDimensionSelector(OffsetHolder offsetHolder, ExtractionFn extractionFn);
DimensionSelector makeDimensionSelector(ReadableOffset offset, ExtractionFn extractionFn);
}

View File

@ -21,9 +21,10 @@ package io.druid.segment.column;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedLongs;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import java.io.Closeable;
@ -37,18 +38,18 @@ public interface GenericColumn extends HotLoopCallee, Closeable
@CalledFromHotLoop
public String getStringSingleValueRow(int rowNum);
@CalledFromHotLoop
public Indexed<String> getStringMultiValueRow(int rowNum);
float getFloatSingleValueRow(int rowNum);
HistoricalFloatColumnSelector makeFloatSingleValueRowSelector(ReadableOffset offset);
@CalledFromHotLoop
public float getFloatSingleValueRow(int rowNum);
@CalledFromHotLoop
public IndexedFloats getFloatMultiValueRow(int rowNum);
@CalledFromHotLoop
public long getLongSingleValueRow(int rowNum);
@CalledFromHotLoop
public IndexedLongs getLongMultiValueRow(int rowNum);
long getLongSingleValueRow(int rowNum);
LongColumnSelector makeLongSingleValueRowSelector(ReadableOffset offset);
@CalledFromHotLoop
double getDoubleSingleValueRow(int rowNum);
DoubleColumnSelector makeDoubleSingleValueRowSelector(ReadableOffset offset);
@Override
void close();

View File

@ -20,10 +20,11 @@
package io.druid.segment.column;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Indexed;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.data.IndexedDoubles;
import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedLongs;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
public class IndexedDoublesGenericColumn implements GenericColumn
@ -59,12 +60,6 @@ public class IndexedDoublesGenericColumn implements GenericColumn
throw new UnsupportedOperationException();
}
@Override
public Indexed<String> getStringMultiValueRow(int rowNum)
{
throw new UnsupportedOperationException();
}
@Override
public float getFloatSingleValueRow(int rowNum)
{
@ -72,9 +67,9 @@ public class IndexedDoublesGenericColumn implements GenericColumn
}
@Override
public IndexedFloats getFloatMultiValueRow(int rowNum)
public HistoricalFloatColumnSelector makeFloatSingleValueRowSelector(ReadableOffset offset)
{
throw new UnsupportedOperationException();
return column.makeFloatColumnSelector(offset);
}
@Override
@ -84,9 +79,9 @@ public class IndexedDoublesGenericColumn implements GenericColumn
}
@Override
public IndexedLongs getLongMultiValueRow(int rowNum)
public LongColumnSelector makeLongSingleValueRowSelector(ReadableOffset offset)
{
throw new UnsupportedOperationException();
return column.makeLongColumnSelector(offset);
}
@Override
@ -95,6 +90,12 @@ public class IndexedDoublesGenericColumn implements GenericColumn
return column.get(rowNum);
}
@Override
public DoubleColumnSelector makeDoubleSingleValueRowSelector(ReadableOffset offset)
{
return column.makeDoubleColumnSelector(offset);
}
@Override
public void close()
{

View File

@ -20,9 +20,11 @@
package io.druid.segment.column;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Indexed;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedLongs;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
/**
*/
@ -59,12 +61,6 @@ public class IndexedFloatsGenericColumn implements GenericColumn
throw new UnsupportedOperationException();
}
@Override
public Indexed<String> getStringMultiValueRow(int rowNum)
{
throw new UnsupportedOperationException();
}
@Override
public float getFloatSingleValueRow(int rowNum)
{
@ -72,9 +68,9 @@ public class IndexedFloatsGenericColumn implements GenericColumn
}
@Override
public IndexedFloats getFloatMultiValueRow(int rowNum)
public HistoricalFloatColumnSelector makeFloatSingleValueRowSelector(ReadableOffset offset)
{
throw new UnsupportedOperationException();
return column.makeFloatColumnSelector(offset);
}
@Override
@ -84,9 +80,9 @@ public class IndexedFloatsGenericColumn implements GenericColumn
}
@Override
public IndexedLongs getLongMultiValueRow(int rowNum)
public LongColumnSelector makeLongSingleValueRowSelector(ReadableOffset offset)
{
throw new UnsupportedOperationException();
return column.makeLongColumnSelector(offset);
}
@Override
@ -95,6 +91,12 @@ public class IndexedFloatsGenericColumn implements GenericColumn
return (double) column.get(rowNum);
}
@Override
public DoubleColumnSelector makeDoubleSingleValueRowSelector(ReadableOffset offset)
{
return column.makeDoubleColumnSelector(offset);
}
@Override
public void close()
{

View File

@ -20,9 +20,11 @@
package io.druid.segment.column;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.data.IndexedLongs;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
/**
*/
@ -59,12 +61,6 @@ public class IndexedLongsGenericColumn implements GenericColumn
throw new UnsupportedOperationException();
}
@Override
public Indexed<String> getStringMultiValueRow(int rowNum)
{
throw new UnsupportedOperationException();
}
@Override
public float getFloatSingleValueRow(int rowNum)
{
@ -72,9 +68,9 @@ public class IndexedLongsGenericColumn implements GenericColumn
}
@Override
public IndexedFloats getFloatMultiValueRow(int rowNum)
public HistoricalFloatColumnSelector makeFloatSingleValueRowSelector(ReadableOffset offset)
{
throw new UnsupportedOperationException();
return column.makeFloatColumnSelector(offset);
}
@Override
@ -84,9 +80,9 @@ public class IndexedLongsGenericColumn implements GenericColumn
}
@Override
public IndexedLongs getLongMultiValueRow(int rowNum)
public LongColumnSelector makeLongSingleValueRowSelector(ReadableOffset offset)
{
throw new UnsupportedOperationException();
return column.makeLongColumnSelector(offset);
}
@Override
@ -95,6 +91,12 @@ public class IndexedLongsGenericColumn implements GenericColumn
return (double) column.get(rowNum);
}
@Override
public DoubleColumnSelector makeDoubleSingleValueRowSelector(ReadableOffset offset)
{
return column.makeDoubleColumnSelector(offset);
}
@Override
public void close()
{

View File

@ -31,10 +31,10 @@ import io.druid.segment.IdLookup;
import io.druid.segment.data.CachingIndexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.data.SingleIndexedInt;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.historical.HistoricalDimensionSelector;
import io.druid.segment.historical.OffsetHolder;
import io.druid.segment.historical.SingleValueHistoricalDimensionSelector;
import javax.annotation.Nullable;
@ -105,10 +105,7 @@ public class SimpleDictionaryEncodedColumn
}
@Override
public HistoricalDimensionSelector makeDimensionSelector(
final OffsetHolder offsetHolder,
final ExtractionFn extractionFn
)
public HistoricalDimensionSelector makeDimensionSelector(final ReadableOffset offset, final ExtractionFn extractionFn)
{
abstract class QueryableDimensionSelector implements HistoricalDimensionSelector, IdLookup
{
@ -158,7 +155,7 @@ public class SimpleDictionaryEncodedColumn
@Override
public IndexedInts getRow()
{
return multiValueColumn.get(offsetHolder.getReadableOffset().getOffset());
return multiValueColumn.get(offset.getOffset());
}
@Override
@ -183,8 +180,7 @@ public class SimpleDictionaryEncodedColumn
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("multiValueColumn", multiValueColumn);
inspector.visit("offsetHolder", offsetHolder);
inspector.visit("offset", offsetHolder.getReadableOffset());
inspector.visit("offset", offset);
inspector.visit("extractionFn", extractionFn);
}
}
@ -202,7 +198,7 @@ public class SimpleDictionaryEncodedColumn
@Override
public int getRowValue()
{
return column.get(offsetHolder.getReadableOffset().getOffset());
return column.get(offset.getOffset());
}
@Override
@ -273,8 +269,7 @@ public class SimpleDictionaryEncodedColumn
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", column);
inspector.visit("offsetHolder", offsetHolder);
inspector.visit("offset", offsetHolder.getReadableOffset());
inspector.visit("offset", offset);
inspector.visit("extractionFn", extractionFn);
}
}

View File

@ -19,6 +19,11 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import java.io.Closeable;
public interface IndexedDoubles extends Closeable
@ -29,5 +34,68 @@ public interface IndexedDoubles extends Closeable
@Override
void close();
default DoubleColumnSelector makeDoubleColumnSelector(ReadableOffset offset)
{
return new DoubleColumnSelector()
{
@Override
public double getDouble()
{
return IndexedDoubles.this.get(offset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedDoubles.this);
inspector.visit("offset", offset);
}
};
}
default HistoricalFloatColumnSelector makeFloatColumnSelector(ReadableOffset offset)
{
return new HistoricalFloatColumnSelector()
{
@Override
public float get(int offset)
{
return (float) IndexedDoubles.this.get(offset);
}
@Override
public float getFloat()
{
return (float) IndexedDoubles.this.get(offset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedDoubles.this);
inspector.visit("offset", offset);
}
};
}
default LongColumnSelector makeLongColumnSelector(ReadableOffset offset)
{
return new LongColumnSelector()
{
@Override
public long getLong()
{
return (long) IndexedDoubles.this.get(offset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedDoubles.this);
inspector.visit("offset", offset);
}
};
}
}

View File

@ -19,6 +19,11 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import java.io.Closeable;
/**
@ -32,4 +37,67 @@ public interface IndexedFloats extends Closeable
@Override
void close();
default HistoricalFloatColumnSelector makeFloatColumnSelector(ReadableOffset offset)
{
return new HistoricalFloatColumnSelector()
{
@Override
public float getFloat()
{
return IndexedFloats.this.get(offset.getOffset());
}
@Override
public float get(int offset)
{
return IndexedFloats.this.get(offset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedFloats.this);
inspector.visit("offset", offset);
}
};
}
default DoubleColumnSelector makeDoubleColumnSelector(ReadableOffset offset)
{
return new DoubleColumnSelector()
{
@Override
public double getDouble()
{
return IndexedFloats.this.get(offset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedFloats.this);
inspector.visit("offset", offset);
}
};
}
default LongColumnSelector makeLongColumnSelector(ReadableOffset offset)
{
return new LongColumnSelector()
{
@Override
public long getLong()
{
return (long) IndexedFloats.this.get(offset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedFloats.this);
inspector.visit("offset", offset);
}
};
}
}

View File

@ -19,6 +19,11 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import java.io.Closeable;
/**
@ -32,4 +37,67 @@ public interface IndexedLongs extends Closeable
@Override
void close();
default LongColumnSelector makeLongColumnSelector(ReadableOffset offset)
{
return new LongColumnSelector()
{
@Override
public long getLong()
{
return IndexedLongs.this.get(offset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedLongs.this);
inspector.visit("offset", offset);
}
};
}
default HistoricalFloatColumnSelector makeFloatColumnSelector(ReadableOffset offset)
{
return new HistoricalFloatColumnSelector()
{
@Override
public float getFloat()
{
return (float) IndexedLongs.this.get(offset.getOffset());
}
@Override
public float get(int offset)
{
return (float) IndexedLongs.this.get(offset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedLongs.this);
inspector.visit("offset", offset);
}
};
}
default DoubleColumnSelector makeDoubleColumnSelector(ReadableOffset offset)
{
return new DoubleColumnSelector()
{
@Override
public double getDouble()
{
return (double) IndexedLongs.this.get(offset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("indexed", IndexedLongs.this);
inspector.visit("offset", offset);
}
};
}
}

View File

@ -33,6 +33,10 @@ import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
* io.druid.query.topn.Historical1SimpleDoubleAggPooledTopNScannerPrototype} and {@link
* io.druid.query.topn.HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype} during
* specialization, and specialized version of those prototypes must be able to any subclass of Offset.
*
* This interface is the core "pointer" interface that is used to create {@link io.druid.segment.ColumnValueSelector}s
* over historical segments. It's counterpart for incremental index is {@link
* io.druid.segment.incremental.TimeAndDimsHolder}.
*/
@SubclassesMustBePublic
public abstract class Offset implements ReadableOffset, Cloneable
@ -48,6 +52,13 @@ public abstract class Offset implements ReadableOffset, Cloneable
*/
public abstract void reset();
/**
* Returns the same offset ("this") or a readable "view" of this offset, which always returns the same value from
* {@link #getOffset()}, as this offset. This method is useful for "unwrapping" such offsets as {@link
* io.druid.segment.FilteredOffset} and reduce reference indirection, when only {@link ReadableOffset} API is needed.
*/
public abstract ReadableOffset getBaseReadableOffset();
@Override
public Offset clone()
{

View File

@ -20,7 +20,9 @@
package io.druid.segment.historical;
import io.druid.segment.Cursor;
import io.druid.segment.data.Offset;
public interface HistoricalCursor extends Cursor, OffsetHolder
public interface HistoricalCursor extends Cursor
{
Offset getOffset();
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.historical;
import io.druid.segment.data.Offset;
import io.druid.segment.data.ReadableOffset;
public interface OffsetHolder
{
Offset getOffset();
/**
* Should return the same, or a "view" of the same offset as {@link #getOffset()}. The difference is that smaller
* interface allows to return unwrapped underlying offset sometimes, e. g. {@link
* io.druid.segment.FilteredOffset#baseOffset}, instead of the wrapper {@link io.druid.segment.FilteredOffset}.
*/
ReadableOffset getReadableOffset();
}

View File

@ -0,0 +1,341 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionIndexer;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionSelectorUtils;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.DoubleWrappingDimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.FloatWrappingDimensionSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.LongWrappingDimensionSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.SingleScanTimeDimSelector;
import io.druid.segment.VirtualColumns;
import io.druid.segment.ZeroDoubleColumnSelector;
import io.druid.segment.ZeroFloatColumnSelector;
import io.druid.segment.ZeroLongColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import javax.annotation.Nullable;
/**
* The basic implementation of {@link ColumnSelectorFactory} over an {@link IncrementalIndex}. It's counterpart for
* historical segments is {@link io.druid.segment.QueryableIndexColumnSelectorFactory}.
*/
class IncrementalIndexColumnSelectorFactory implements ColumnSelectorFactory
{
private final IncrementalIndex<?> index;
private final VirtualColumns virtualColumns;
private final boolean descending;
private final TimeAndDimsHolder timeAndDimsHolder;
IncrementalIndexColumnSelectorFactory(
IncrementalIndex<?> index,
VirtualColumns virtualColumns,
boolean descending,
TimeAndDimsHolder timeAndDimsHolder
)
{
this.index = index;
this.virtualColumns = virtualColumns;
this.descending = descending;
this.timeAndDimsHolder = timeAndDimsHolder;
}
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
if (virtualColumns.exists(dimensionSpec.getDimension())) {
return virtualColumns.makeDimensionSelector(dimensionSpec, this);
}
return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec));
}
private DimensionSelector makeDimensionSelectorUndecorated(DimensionSpec dimensionSpec)
{
final String dimension = dimensionSpec.getDimension();
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return new SingleScanTimeDimSelector(
makeLongColumnSelector(dimension),
extractionFn,
descending
);
}
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionSpec.getDimension());
if (dimensionDesc == null) {
// not a dimension, column may be a metric
ColumnCapabilities capabilities = getColumnCapabilities(dimension);
if (capabilities == null) {
return DimensionSelectorUtils.constantSelector(null, extractionFn);
}
if (capabilities.getType() == ValueType.LONG) {
return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn);
}
if (capabilities.getType() == ValueType.FLOAT) {
return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn);
}
if (capabilities.getType() == ValueType.DOUBLE) {
return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(dimension), extractionFn);
}
// if we can't wrap the base column, just return a column of all nulls
return DimensionSelectorUtils.constantSelector(null, extractionFn);
} else {
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeDimensionSelector(dimensionSpec, timeAndDimsHolder, dimensionDesc);
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeFloatColumnSelector(columnName, this);
}
final Integer dimIndex = index.getDimensionIndex(columnName);
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeFloatColumnSelector(timeAndDimsHolder, dimensionDesc);
}
final Integer metricIndexInt = index.getMetricIndex(columnName);
if (metricIndexInt == null) {
return ZeroFloatColumnSelector.instance();
}
final int metricIndex = metricIndexInt;
return new FloatColumnSelector()
{
@Override
public float getFloat()
{
return index.getMetricFloatValue(timeAndDimsHolder.getValue(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", index);
}
};
}
@Override
public DoubleColumnSelector makeDoubleColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeDoubleColumnSelector(columnName, this);
}
final Integer dimIndex = index.getDimensionIndex(columnName);
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeDoubleColumnSelector(timeAndDimsHolder, dimensionDesc);
}
final Integer metricIndexInt = index.getMetricIndex(columnName);
if (metricIndexInt == null) {
return ZeroDoubleColumnSelector.instance();
}
final int metricIndex = metricIndexInt;
return new DoubleColumnSelector()
{
@Override
public double getDouble()
{
return index.getMetricDoubleValue(timeAndDimsHolder.getValue(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", index);
}
};
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeLongColumnSelector(columnName, this);
}
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
class TimeLongColumnSelector implements LongColumnSelector
{
@Override
public long getLong()
{
return timeAndDimsHolder.getKey().getTimestamp();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}
return new TimeLongColumnSelector();
}
final Integer dimIndex = index.getDimensionIndex(columnName);
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeLongColumnSelector(timeAndDimsHolder, dimensionDesc);
}
final Integer metricIndexInt = index.getMetricIndex(columnName);
if (metricIndexInt == null) {
return ZeroLongColumnSelector.instance();
}
final int metricIndex = metricIndexInt;
return new LongColumnSelector()
{
@Override
public long getLong()
{
return index.getMetricLongValue(timeAndDimsHolder.getValue(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", index);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
if (virtualColumns.exists(column)) {
return virtualColumns.makeObjectColumnSelector(column, this);
}
if (column.equals(Column.TIME_COLUMN_NAME)) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class<Long> classOfObject()
{
return Long.class;
}
@Override
public Long get()
{
return timeAndDimsHolder.getKey().getTimestamp();
}
};
}
final Integer metricIndexInt = index.getMetricIndex(column);
if (metricIndexInt != null) {
final int metricIndex = metricIndexInt;
final Class classOfObject = index.getMetricClass(column);
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return classOfObject;
}
@Override
public Object get()
{
return index.getMetricObjectValue(
timeAndDimsHolder.getValue(),
metricIndex
);
}
};
}
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(column);
if (dimensionDesc == null) {
return null;
} else {
final int dimensionIndex = dimensionDesc.getIndex();
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
IncrementalIndex.TimeAndDims key = timeAndDimsHolder.getKey();
if (key == null) {
return null;
}
Object[] dims = key.getDims();
if (dimensionIndex >= dims.length) {
return null;
}
return indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList(
dims[dimensionIndex], DimensionIndexer.ARRAY
);
}
};
}
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.getColumnCapabilities(columnName);
}
return index.getCapabilities(columnName);
}
}

View File

@ -19,7 +19,6 @@
package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@ -28,33 +27,17 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BaseQuery;
import io.druid.query.QueryMetrics;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.Capabilities;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionIndexer;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionSelectorUtils;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.DoubleWrappingDimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.FloatWrappingDimensionSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.LongWrappingDimensionSelector;
import io.druid.segment.Metadata;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.SingleScanTimeDimSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.ZeroDoubleColumnSelector;
import io.druid.segment.ZeroFloatColumnSelector;
import io.druid.segment.ZeroLongColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.filter.BooleanValueMatcher;
@ -70,9 +53,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
private final IncrementalIndex<?> index;
public IncrementalIndexStorageAdapter(
IncrementalIndex<?> index
)
public IncrementalIndexStorageAdapter(IncrementalIndex<?> index)
{
this.index = index;
}
@ -92,7 +73,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public Indexed<String> getAvailableDimensions()
{
return new ListIndexed<String>(index.getDimensionNames(), String.class);
return new ListIndexed<>(index.getDimensionNames(), String.class);
}
@Override
@ -135,6 +116,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return index.getMaxTime();
}
@Nullable
@Override
public Comparable getMinValue(String column)
{
@ -147,6 +129,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return indexer.getMinValue();
}
@Nullable
@Override
public Comparable getMaxValue(String column)
{
@ -198,508 +181,29 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return Sequences.empty();
}
Interval actualIntervalTmp = interval;
final Interval dataInterval = new Interval(getMinTime(), gran.bucketEnd(getMaxTime()));
if (!actualIntervalTmp.overlaps(dataInterval)) {
if (!interval.overlaps(dataInterval)) {
return Sequences.empty();
}
if (actualIntervalTmp.getStart().isBefore(dataInterval.getStart())) {
actualIntervalTmp = actualIntervalTmp.withStart(dataInterval.getStart());
}
if (actualIntervalTmp.getEnd().isAfter(dataInterval.getEnd())) {
actualIntervalTmp = actualIntervalTmp.withEnd(dataInterval.getEnd());
}
final Interval actualInterval = actualIntervalTmp;
final Interval actualInterval = interval.overlap(dataInterval);
Iterable<Interval> iterable = gran.getIterable(actualInterval);
if (descending) {
iterable = Lists.reverse(ImmutableList.copyOf(iterable));
}
return Sequences.map(
Sequences.simple(iterable),
new Function<Interval, Cursor>()
{
EntryHolder currEntry = new EntryHolder();
@Override
public Cursor apply(@Nullable final Interval interval)
{
final long timeStart = Math.max(interval.getStartMillis(), actualInterval.getStartMillis());
return new Cursor()
{
private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this);
private final int maxRowIndex;
private Iterator<IncrementalIndex.TimeAndDims> baseIter;
private Iterable<IncrementalIndex.TimeAndDims> cursorIterable;
private boolean emptyRange;
final DateTime time;
int numAdvanced = -1;
boolean done;
{
maxRowIndex = index.getLastRowIndex();
cursorIterable = index.getFacts().timeRangeIterable(
descending,
timeStart,
Math.min(actualInterval.getEndMillis(), gran.increment(interval.getStart()).getMillis())
);
emptyRange = !cursorIterable.iterator().hasNext();
time = gran.toDateTime(interval.getStartMillis());
reset();
}
@Override
public DateTime getTime()
{
return time;
}
@Override
public void advance()
{
if (!baseIter.hasNext()) {
done = true;
return;
}
while (baseIter.hasNext()) {
BaseQuery.checkInterrupted();
IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getRowIndex())) {
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) {
return;
}
}
done = true;
}
@Override
public void advanceUninterruptibly()
{
if (!baseIter.hasNext()) {
done = true;
return;
}
while (baseIter.hasNext()) {
if (Thread.currentThread().isInterrupted()) {
return;
}
IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getRowIndex())) {
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) {
return;
}
}
done = true;
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
return done;
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
baseIter = cursorIterable.iterator();
if (numAdvanced == -1) {
numAdvanced = 0;
} else {
Iterators.advance(baseIter, numAdvanced);
}
BaseQuery.checkInterrupted();
boolean foundMatched = false;
while (baseIter.hasNext()) {
IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getRowIndex())) {
numAdvanced++;
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) {
foundMatched = true;
break;
}
numAdvanced++;
}
done = !foundMatched && (emptyRange || !baseIter.hasNext());
}
private boolean beyondMaxRowIndex(int rowIndex)
{
// ignore rows whose rowIndex is beyond the maxRowIndex
// rows are order by timestamp, not rowIndex,
// so we still need to go through all rows to skip rows added after cursor created
return rowIndex > maxRowIndex;
}
@Override
public DimensionSelector makeDimensionSelector(
DimensionSpec dimensionSpec
)
{
if (virtualColumns.exists(dimensionSpec.getDimension())) {
return virtualColumns.makeDimensionSelector(dimensionSpec, this);
}
return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec));
}
private DimensionSelector makeDimensionSelectorUndecorated(
DimensionSpec dimensionSpec
)
{
final String dimension = dimensionSpec.getDimension();
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
DimensionSelector selector = new SingleScanTimeDimSelector(
makeLongColumnSelector(dimension),
extractionFn,
descending
);
return selector;
}
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionSpec.getDimension());
if (dimensionDesc == null) {
// not a dimension, column may be a metric
ColumnCapabilities capabilities = getColumnCapabilities(dimension);
if (capabilities == null) {
return DimensionSelectorUtils.constantSelector(null, extractionFn);
}
if (capabilities.getType() == ValueType.LONG) {
return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn);
}
if (capabilities.getType() == ValueType.FLOAT) {
return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn);
}
if (capabilities.getType() == ValueType.DOUBLE) {
return new DoubleWrappingDimensionSelector(makeDoubleColumnSelector(dimension), extractionFn);
}
// if we can't wrap the base column, just return a column of all nulls
return DimensionSelectorUtils.constantSelector(null, extractionFn);
} else {
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeDimensionSelector(dimensionSpec, currEntry, dimensionDesc);
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeFloatColumnSelector(columnName, this);
}
final Integer dimIndex = index.getDimensionIndex(columnName);
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeFloatColumnSelector(
currEntry,
dimensionDesc
);
}
final Integer metricIndexInt = index.getMetricIndex(columnName);
if (metricIndexInt == null) {
return ZeroFloatColumnSelector.instance();
}
final int metricIndex = metricIndexInt;
return new FloatColumnSelector()
{
@Override
public float getFloat()
{
return index.getMetricFloatValue(currEntry.getValue(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", index);
}
};
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeLongColumnSelector(columnName, this);
}
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
class TimeLongColumnSelector implements LongColumnSelector
{
@Override
public long getLong()
{
return currEntry.getKey().getTimestamp();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}
return new TimeLongColumnSelector();
}
final Integer dimIndex = index.getDimensionIndex(columnName);
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeLongColumnSelector(
currEntry,
dimensionDesc
);
}
final Integer metricIndexInt = index.getMetricIndex(columnName);
if (metricIndexInt == null) {
return ZeroLongColumnSelector.instance();
}
final int metricIndex = metricIndexInt;
return new LongColumnSelector()
{
@Override
public long getLong()
{
return index.getMetricLongValue(
currEntry.getValue(),
metricIndex
);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", index);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
if (virtualColumns.exists(column)) {
return virtualColumns.makeObjectColumnSelector(column, this);
}
if (column.equals(Column.TIME_COLUMN_NAME)) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.class;
}
@Override
public Long get()
{
return currEntry.getKey().getTimestamp();
}
};
}
final Integer metricIndexInt = index.getMetricIndex(column);
if (metricIndexInt != null) {
final int metricIndex = metricIndexInt;
final Class classOfObject = index.getMetricClass(column);
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return classOfObject;
}
@Override
public Object get()
{
return index.getMetricObjectValue(
currEntry.getValue(),
metricIndex
);
}
};
}
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(column);
if (dimensionDesc == null) {
return null;
} else {
final int dimensionIndex = dimensionDesc.getIndex();
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
IncrementalIndex.TimeAndDims key = currEntry.getKey();
if (key == null) {
return null;
}
Object[] dims = key.getDims();
if (dimensionIndex >= dims.length) {
return null;
}
return indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList(
dims[dimensionIndex], DimensionIndexer.ARRAY
);
}
};
}
}
@Override
public DoubleColumnSelector makeDoubleColumnSelector(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeDoubleColumnSelector(columnName, this);
}
final Integer dimIndex = index.getDimensionIndex(columnName);
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeDoubleColumnSelector(
currEntry,
dimensionDesc
);
}
final Integer metricIndexInt = index.getMetricIndex(columnName);
if (metricIndexInt == null) {
return ZeroDoubleColumnSelector.instance();
}
final int metricIndex = metricIndexInt;
return new DoubleColumnSelector()
{
@Override
public double getDouble()
{
return index.getMetricDoubleValue(currEntry.getValue(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", index);
}
};
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
if (virtualColumns.exists(columnName)) {
return virtualColumns.getColumnCapabilities(columnName);
}
return index.getCapabilities(columnName);
}
};
}
}
);
return Sequences
.simple(iterable)
.map(i -> new IncrementalIndexCursor(virtualColumns, descending, filter, i, actualInterval, gran));
}
private ValueMatcher makeFilterMatcher(final Filter filter, final Cursor cursor)
{
return filter == null
? BooleanValueMatcher.of(true)
: filter.makeMatcher(cursor);
}
public static class EntryHolder
{
IncrementalIndex.TimeAndDims currEntry = null;
public IncrementalIndex.TimeAndDims get()
{
return currEntry;
}
public void set(IncrementalIndex.TimeAndDims currEntry)
{
this.currEntry = currEntry;
}
public IncrementalIndex.TimeAndDims getKey()
{
return currEntry;
}
public int getValue()
{
return currEntry.getRowIndex();
}
: filter.makeMatcher(cursor.getColumnSelectorFactory());
}
@Override
@ -707,4 +211,172 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
return index.getMetadata();
}
private class IncrementalIndexCursor implements Cursor
{
private TimeAndDimsHolder currEntry;
private final ColumnSelectorFactory columnSelectorFactory;
private final ValueMatcher filterMatcher;
private final int maxRowIndex;
private Iterator<IncrementalIndex.TimeAndDims> baseIter;
private Iterable<IncrementalIndex.TimeAndDims> cursorIterable;
private boolean emptyRange;
private final DateTime time;
private int numAdvanced;
private boolean done;
IncrementalIndexCursor(
VirtualColumns virtualColumns,
boolean descending,
Filter filter,
Interval interval,
Interval actualInterval,
Granularity gran
)
{
currEntry = new TimeAndDimsHolder();
columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(index, virtualColumns, descending, currEntry);
filterMatcher = makeFilterMatcher(filter, this);
numAdvanced = -1;
maxRowIndex = index.getLastRowIndex();
final long timeStart = Math.max(interval.getStartMillis(), actualInterval.getStartMillis());
cursorIterable = index.getFacts().timeRangeIterable(
descending,
timeStart,
Math.min(actualInterval.getEndMillis(), gran.increment(interval.getStart()).getMillis())
);
emptyRange = !cursorIterable.iterator().hasNext();
time = gran.toDateTime(interval.getStartMillis());
reset();
}
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
return columnSelectorFactory;
}
@Override
public DateTime getTime()
{
return time;
}
@Override
public void advance()
{
if (!baseIter.hasNext()) {
done = true;
return;
}
while (baseIter.hasNext()) {
BaseQuery.checkInterrupted();
IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getRowIndex())) {
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) {
return;
}
}
done = true;
}
@Override
public void advanceUninterruptibly()
{
if (!baseIter.hasNext()) {
done = true;
return;
}
while (baseIter.hasNext()) {
if (Thread.currentThread().isInterrupted()) {
return;
}
IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getRowIndex())) {
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) {
return;
}
}
done = true;
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
return done;
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
baseIter = cursorIterable.iterator();
if (numAdvanced == -1) {
numAdvanced = 0;
} else {
Iterators.advance(baseIter, numAdvanced);
}
BaseQuery.checkInterrupted();
boolean foundMatched = false;
while (baseIter.hasNext()) {
IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getRowIndex())) {
numAdvanced++;
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) {
foundMatched = true;
break;
}
numAdvanced++;
}
done = !foundMatched && (emptyRange || !baseIter.hasNext());
}
private boolean beyondMaxRowIndex(int rowIndex)
{
// ignore rows whose rowIndex is beyond the maxRowIndex
// rows are order by timestamp, not rowIndex,
// so we still need to go through all rows to skip rows added after cursor created
return rowIndex > maxRowIndex;
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental;
/**
* This interface is the core "pointer" interface that is used to create {@link io.druid.segment.ColumnValueSelector}s
* over incremental index. It's counterpart for historical segments is {@link io.druid.segment.data.Offset}.
*/
public class TimeAndDimsHolder
{
IncrementalIndex.TimeAndDims currEntry = null;
public IncrementalIndex.TimeAndDims get()
{
return currEntry;
}
public void set(IncrementalIndex.TimeAndDims currEntry)
{
this.currEntry = currEntry;
}
/**
* This method doesn't have well-defined semantics ("key" of what?), should be removed in favor of {@link #get()}.
*/
public IncrementalIndex.TimeAndDims getKey()
{
return currEntry;
}
/**
* This method doesn't have well-defined semantics ("value" of what?), should be removed in favor of chaining
* get().getRowIndex().
*/
public int getValue()
{
return currEntry.getRowIndex();
}
}

View File

@ -322,9 +322,9 @@ public abstract class BaseFilterTest
@Override
public List<String> apply(Cursor input)
{
final DimensionSelector selector = input.makeDimensionSelector(
new DefaultDimensionSpec(selectColumn, selectColumn)
);
final DimensionSelector selector = input
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec(selectColumn, selectColumn));
final List<String> values = Lists.newArrayList();
@ -355,7 +355,7 @@ public abstract class BaseFilterTest
Aggregator agg = new FilteredAggregatorFactory(
new CountAggregatorFactory("count"),
maybeOptimize(filter)
).factorize(input);
).factorize(input.getColumnSelectorFactory());
for (; !input.isDone(); input.advance()) {
agg.aggregate();
@ -417,9 +417,9 @@ public abstract class BaseFilterTest
@Override
public List<String> apply(Cursor input)
{
final DimensionSelector selector = input.makeDimensionSelector(
new DefaultDimensionSpec(selectColumn, selectColumn)
);
final DimensionSelector selector = input
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec(selectColumn, selectColumn));
final List<String> values = Lists.newArrayList();

View File

@ -278,7 +278,9 @@ public class IncrementalIndexStorageAdapterTest
Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.<Cursor>newArrayList()).get(0);
DimensionSelector dimSelector;
dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally"));
dimSelector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally"));
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
index.add(
@ -292,7 +294,9 @@ public class IncrementalIndexStorageAdapterTest
// Cursor reset should not be affected by out of order values
cursor.reset();
dimSelector = cursor.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally"));
dimSelector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("sally", "sally"));
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
}
}
@ -430,12 +434,9 @@ public class IncrementalIndexStorageAdapterTest
@Override
public Object apply(Cursor cursor)
{
DimensionSelector dimSelector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy",
"billy"
)
);
DimensionSelector dimSelector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("billy", "billy"));
int cardinality = dimSelector.getValueCardinality();
//index gets more rows at this point, while other thread is iterating over the cursor
@ -513,12 +514,9 @@ public class IncrementalIndexStorageAdapterTest
@Override
public Object apply(Cursor cursor)
{
DimensionSelector dimSelector1A = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy",
"billy"
)
);
DimensionSelector dimSelector1A = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("billy", "billy"));
int cardinalityA = dimSelector1A.getValueCardinality();
//index gets more rows at this point, while other thread is iterating over the cursor
@ -535,12 +533,9 @@ public class IncrementalIndexStorageAdapterTest
throw new RuntimeException(ex);
}
DimensionSelector dimSelector1B = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy",
"billy"
)
);
DimensionSelector dimSelector1B = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("billy", "billy"));
//index gets more rows at this point, while other thread is iterating over the cursor
try {
index.add(
@ -562,19 +557,13 @@ public class IncrementalIndexStorageAdapterTest
throw new RuntimeException(ex);
}
DimensionSelector dimSelector1C = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy",
"billy"
)
);
DimensionSelector dimSelector1C = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("billy", "billy"));
DimensionSelector dimSelector2D = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy2",
"billy2"
)
);
DimensionSelector dimSelector2D = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("billy2", "billy2"));
//index gets more rows at this point, while other thread is iterating over the cursor
try {
index.add(
@ -596,12 +585,9 @@ public class IncrementalIndexStorageAdapterTest
throw new RuntimeException(ex);
}
DimensionSelector dimSelector3E = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy3",
"billy3"
)
);
DimensionSelector dimSelector3E = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("billy3", "billy3"));
int rowNumInCursor = 0;
// and then, cursoring continues in the other thread

View File

@ -85,13 +85,14 @@ public class IngestSegmentFirehose implements Firehose
@Override
public Sequence<InputRow> apply(final Cursor cursor)
{
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final LongColumnSelector timestampColumnSelector =
cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(dim, dim)
);
final DimensionSelector dimSelector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec(dim, dim));
// dimSelector is null if the dimension is not present
if (dimSelector != null) {
dimSelectors.put(dim, dimSelector);
@ -100,7 +101,8 @@ public class IngestSegmentFirehose implements Firehose
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
final ObjectColumnSelector metricSelector =
cursor.getColumnSelectorFactory().makeObjectColumnSelector(metric);
if (metricSelector != null) {
metSelectors.put(metric, metricSelector);
}

View File

@ -278,7 +278,9 @@ public class DumpSegment extends GuiceRunnable
final List<ObjectColumnSelector> selectors = Lists.newArrayList();
for (String columnName : columnNames) {
selectors.add(makeSelector(columnName, index.getColumn(columnName), cursor));
selectors.add(
makeSelector(columnName, index.getColumn(columnName), cursor.getColumnSelectorFactory())
);
}
while (!cursor.isDone()) {