diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index c174a64e20e..eda2fe19cfe 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -154,16 +154,17 @@ public class QueryableIndexStorageAdapter implements StorageAdapter actualInterval = actualInterval.withEnd(dataInterval.getEnd()); } - final Sequence sequence; + final Offset offset; if (filter == null) { - sequence = new NoFilterCursorSequenceBuilder(index, actualInterval, gran).build(); + offset = new NoFilterOffset(0, index.getNumRows()); } else { - Offset offset = new ConciseOffset(filter.goConcise(new ColumnSelectorBitmapIndexSelector(index))); - - sequence = new CursorSequenceBuilder(index, actualInterval, gran, offset).build(); + offset = new ConciseOffset(filter.goConcise(new ColumnSelectorBitmapIndexSelector(index))); } - return Sequences.filter(sequence, Predicates.notNull()); + return Sequences.filter( + new CursorSequenceBuilder(index, actualInterval, gran, offset).build(), + Predicates.notNull() + ); } private static class CursorSequenceBuilder @@ -280,8 +281,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter final Column columnDesc = index.getColumn(dimensionName); if (cachedColumn == null && columnDesc != null) { - cachedColumn = columnDesc.getDictionaryEncoding(); - dictionaryColumnCache.put(dimensionName, cachedColumn); + cachedColumn = columnDesc.getDictionaryEncoding(); + dictionaryColumnCache.put(dimensionName, cachedColumn); } final DictionaryEncodedColumn column = cachedColumn; @@ -514,7 +515,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter return columnVals.lookupName(multiValueRow.get(0)); } else { final String[] strings = new String[multiValueRow.size()]; - for (int i = 0 ; i < multiValueRow.size() ; i++) { + for (int i = 0; i < multiValueRow.size(); i++) { strings[i] = columnVals.lookupName(multiValueRow.get(i)); } return strings; @@ -575,7 +576,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter CloseQuietly.close(complexColumn); } for (Object column : objectColumnCache.values()) { - if(column instanceof Closeable) { + if (column instanceof Closeable) { CloseQuietly.close((Closeable) column); } } @@ -590,6 +591,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter private final Offset baseOffset; private final GenericColumn timestamps; private final long threshold; + private final boolean allWithinThreshold; public TimestampCheckingOffset( Offset baseOffset, @@ -600,6 +602,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter this.baseOffset = baseOffset; this.timestamps = timestamps; this.threshold = threshold; + // checks if all the values are within the Threshold specified, skips timestamp lookups and checks if all values are within threshold. + this.allWithinThreshold = timestamps.getLongSingleValueRow(timestamps.length() - 1) < threshold; } @Override @@ -617,7 +621,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public boolean withinBounds() { - return baseOffset.withinBounds() && timestamps.getLongSingleValueRow(baseOffset.getOffset()) < threshold; + return baseOffset.withinBounds() && (allWithinThreshold + || timestamps.getLongSingleValueRow(baseOffset.getOffset()) < threshold); } @Override @@ -627,416 +632,39 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } - private static class NoFilterCursorSequenceBuilder + private static class NoFilterOffset implements Offset { - private final ColumnSelector index; - private final Interval interval; - private final QueryGranularity gran; + private final int rowCount; + private volatile int currentOffset; - public NoFilterCursorSequenceBuilder( - ColumnSelector index, - Interval interval, - QueryGranularity gran - ) + NoFilterOffset(int currentOffset, int rowCount) { - this.index = index; - this.interval = interval; - this.gran = gran; + this.currentOffset = currentOffset; + this.rowCount = rowCount; } - /** - * This produces iterators of Cursor objects that must be fully processed (until isDone() returns true) before the - * next Cursor is processed. It is *not* safe to pass these cursors off to another thread for parallel processing - * - * @return - */ - public Sequence build() + @Override + public void increment() { - final Map dictionaryColumnCache = Maps.newHashMap(); - final Map genericColumnCache = Maps.newHashMap(); - final Map complexColumnCache = Maps.newHashMap(); - final Map objectColumnCache = Maps.newHashMap(); + currentOffset++; + } - final GenericColumn timestamps = index.getTimeColumn().getGenericColumn(); + @Override + public boolean withinBounds() + { + return currentOffset < rowCount; + } - return Sequences.withBaggage( - Sequences.map( - Sequences.simple(gran.iterable(interval.getStartMillis(), interval.getEndMillis())), - new Function() - { - private int currRow = 0; + @Override + public Offset clone() + { + return new NoFilterOffset(currentOffset, rowCount); + } - @Override - public Cursor apply(final Long input) - { - final long timeStart = Math.max(interval.getStartMillis(), input); - while (currRow < timestamps.length() && timestamps.getLongSingleValueRow(currRow) < timeStart) { - ++currRow; - } - - return new Cursor() - { - private final DateTime myBucket = gran.toDateTime(input); - private final long nextBucket = Math.min(gran.next(myBucket.getMillis()), interval.getEndMillis()); - private final int initRow = currRow; - - @Override - public DateTime getTime() - { - return myBucket; - } - - @Override - public void advance() - { - if (Thread.interrupted()) { - throw new QueryInterruptedException(); - } - ++currRow; - } - - @Override - public void advanceTo(int offset) - { - currRow += offset; - } - - @Override - public boolean isDone() - { - return currRow >= timestamps.length() || timestamps.getLongSingleValueRow(currRow) >= nextBucket; - } - - @Override - public void reset() - { - currRow = initRow; - } - - @Override - public TimestampColumnSelector makeTimestampColumnSelector() - { - return new TimestampColumnSelector() - { - @Override - public long getTimestamp() - { - return timestamps.getLongSingleValueRow(currRow); - } - }; - } - - @Override - public DimensionSelector makeDimensionSelector(String dimension) - { - final String dimensionName = dimension.toLowerCase(); - - DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimensionName); - final Column columnDesc = index.getColumn(dimensionName); - - if (cachedColumn == null && columnDesc != null) { - cachedColumn = columnDesc.getDictionaryEncoding(); - dictionaryColumnCache.put(dimensionName, cachedColumn); - } - - final DictionaryEncodedColumn column = cachedColumn; - - if (column == null) { - return null; - } else if (columnDesc.getCapabilities().hasMultipleValues()) { - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - return column.getMultiValueRow(currRow); - } - - @Override - public int getValueCardinality() - { - return column.getCardinality(); - } - - @Override - public String lookupName(int id) - { - final String retVal = column.lookupName(id); - return retVal == null ? "" : retVal; - } - - @Override - public int lookupId(String name) - { - return column.lookupId(name); - } - }; - } else { - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - // using an anonymous class is faster than creating a class that stores a copy of the value - return new IndexedInts() - { - @Override - public int size() - { - return 1; - } - - @Override - public int get(int index) - { - return column.getSingleValueRow(currRow); - } - - @Override - public Iterator iterator() - { - return Iterators.singletonIterator(column.getSingleValueRow(currRow)); - } - }; - } - - @Override - public int getValueCardinality() - { - return column.getCardinality(); - } - - @Override - public String lookupName(int id) - { - return column.lookupName(id); - } - - @Override - public int lookupId(String name) - { - return column.lookupId(name); - } - }; - } - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - final String metricName = columnName.toLowerCase(); - GenericColumn cachedMetricVals = genericColumnCache.get(metricName); - - if (cachedMetricVals == null) { - Column holder = index.getColumn(metricName); - if (holder != null && holder.getCapabilities().getType() == ValueType.FLOAT) { - cachedMetricVals = holder.getGenericColumn(); - genericColumnCache.put(metricName, cachedMetricVals); - } - } - - if (cachedMetricVals == null) { - return new FloatColumnSelector() - { - @Override - public float get() - { - return 0.0f; - } - }; - } - - final GenericColumn metricVals = cachedMetricVals; - return new FloatColumnSelector() - { - @Override - public float get() - { - return metricVals.getFloatSingleValueRow(currRow); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) - { - final String columnName = column.toLowerCase(); - - Object cachedColumnVals = objectColumnCache.get(columnName); - - if (cachedColumnVals == null) { - Column holder = index.getColumn(columnName); - - if (holder != null) { - final ValueType type = holder.getCapabilities().getType(); - - if (holder.getCapabilities().isDictionaryEncoded()) { - cachedColumnVals = holder.getDictionaryEncoding(); - } else if (type == ValueType.COMPLEX) { - cachedColumnVals = holder.getComplexColumn(); - } else { - cachedColumnVals = holder.getGenericColumn(); - } - } - - if (cachedColumnVals != null) { - objectColumnCache.put(columnName, cachedColumnVals); - } - } - - if (cachedColumnVals == null) { - return null; - } - - if (cachedColumnVals instanceof GenericColumn) { - final GenericColumn columnVals = (GenericColumn) cachedColumnVals; - final ValueType type = columnVals.getType(); - - if (columnVals.hasMultipleValues()) { - throw new UnsupportedOperationException( - "makeObjectColumnSelector does not support multivalued GenericColumns" - ); - } - - if (type == ValueType.FLOAT) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Float.TYPE; - } - - @Override - public Float get() - { - return columnVals.getFloatSingleValueRow(currRow); - } - }; - } - if (type == ValueType.LONG) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Long.TYPE; - } - - @Override - public Long get() - { - return columnVals.getLongSingleValueRow(currRow); - } - }; - } - if (type == ValueType.STRING) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return String.class; - } - - @Override - public String get() - { - return columnVals.getStringSingleValueRow(currRow); - } - }; - } - } - - if (cachedColumnVals instanceof DictionaryEncodedColumn) { - final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals; - if (columnVals.hasMultipleValues()) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - final IndexedInts multiValueRow = columnVals.getMultiValueRow(currRow); - if (multiValueRow.size() == 0) { - return null; - } else if (multiValueRow.size() == 1) { - return columnVals.lookupName(multiValueRow.get(0)); - } else { - final String[] strings = new String[multiValueRow.size()]; - for (int i = 0 ; i < multiValueRow.size() ; i++) { - strings[i] = columnVals.lookupName(multiValueRow.get(i)); - } - return strings; - } - } - }; - } else { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return String.class; - } - - @Override - public String get() - { - return columnVals.lookupName(columnVals.getSingleValueRow(currRow)); - } - }; - } - } - - final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals; - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return columnVals.getClazz(); - } - - @Override - public Object get() - { - return columnVals.getRowValue(currRow); - } - }; - } - }; - } - } - ), - new Closeable() - { - @Override - public void close() throws IOException - { - CloseQuietly.close(timestamps); - for (DictionaryEncodedColumn column : dictionaryColumnCache.values()) { - CloseQuietly.close(column); - } - for (GenericColumn column : genericColumnCache.values()) { - CloseQuietly.close(column); - } - for (ComplexColumn complexColumn : complexColumnCache.values()) { - CloseQuietly.close(complexColumn); - } - for (Object column : objectColumnCache.values()) { - if (column instanceof Closeable) { - CloseQuietly.close((Closeable) column); - } - } - } - } - ); + @Override + public int getOffset() + { + return currentOffset; } } }