mirror of https://github.com/apache/druid.git
Merge pull request #834 from metamx/optimize-timestampchecking
skip timestamp checking if not required, remove duplicate code
This commit is contained in:
commit
7ef19009db
|
@ -154,16 +154,17 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
actualInterval = actualInterval.withEnd(dataInterval.getEnd());
|
||||
}
|
||||
|
||||
final Sequence<Cursor> sequence;
|
||||
final Offset offset;
|
||||
if (filter == null) {
|
||||
sequence = new NoFilterCursorSequenceBuilder(index, actualInterval, gran).build();
|
||||
offset = new NoFilterOffset(0, index.getNumRows());
|
||||
} else {
|
||||
Offset offset = new ConciseOffset(filter.goConcise(new ColumnSelectorBitmapIndexSelector(index)));
|
||||
|
||||
sequence = new CursorSequenceBuilder(index, actualInterval, gran, offset).build();
|
||||
offset = new ConciseOffset(filter.goConcise(new ColumnSelectorBitmapIndexSelector(index)));
|
||||
}
|
||||
|
||||
return Sequences.filter(sequence, Predicates.<Cursor>notNull());
|
||||
return Sequences.filter(
|
||||
new CursorSequenceBuilder(index, actualInterval, gran, offset).build(),
|
||||
Predicates.<Cursor>notNull()
|
||||
);
|
||||
}
|
||||
|
||||
private static class CursorSequenceBuilder
|
||||
|
@ -280,8 +281,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
final Column columnDesc = index.getColumn(dimensionName);
|
||||
|
||||
if (cachedColumn == null && columnDesc != null) {
|
||||
cachedColumn = columnDesc.getDictionaryEncoding();
|
||||
dictionaryColumnCache.put(dimensionName, cachedColumn);
|
||||
cachedColumn = columnDesc.getDictionaryEncoding();
|
||||
dictionaryColumnCache.put(dimensionName, cachedColumn);
|
||||
}
|
||||
|
||||
final DictionaryEncodedColumn column = cachedColumn;
|
||||
|
@ -514,7 +515,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
return columnVals.lookupName(multiValueRow.get(0));
|
||||
} else {
|
||||
final String[] strings = new String[multiValueRow.size()];
|
||||
for (int i = 0 ; i < multiValueRow.size() ; i++) {
|
||||
for (int i = 0; i < multiValueRow.size(); i++) {
|
||||
strings[i] = columnVals.lookupName(multiValueRow.get(i));
|
||||
}
|
||||
return strings;
|
||||
|
@ -575,7 +576,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
CloseQuietly.close(complexColumn);
|
||||
}
|
||||
for (Object column : objectColumnCache.values()) {
|
||||
if(column instanceof Closeable) {
|
||||
if (column instanceof Closeable) {
|
||||
CloseQuietly.close((Closeable) column);
|
||||
}
|
||||
}
|
||||
|
@ -590,6 +591,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
private final Offset baseOffset;
|
||||
private final GenericColumn timestamps;
|
||||
private final long threshold;
|
||||
private final boolean allWithinThreshold;
|
||||
|
||||
public TimestampCheckingOffset(
|
||||
Offset baseOffset,
|
||||
|
@ -600,6 +602,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
this.baseOffset = baseOffset;
|
||||
this.timestamps = timestamps;
|
||||
this.threshold = threshold;
|
||||
// checks if all the values are within the Threshold specified, skips timestamp lookups and checks if all values are within threshold.
|
||||
this.allWithinThreshold = timestamps.getLongSingleValueRow(timestamps.length() - 1) < threshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -617,7 +621,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
@Override
|
||||
public boolean withinBounds()
|
||||
{
|
||||
return baseOffset.withinBounds() && timestamps.getLongSingleValueRow(baseOffset.getOffset()) < threshold;
|
||||
return baseOffset.withinBounds() && (allWithinThreshold
|
||||
|| timestamps.getLongSingleValueRow(baseOffset.getOffset()) < threshold);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -627,416 +632,39 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
}
|
||||
|
||||
private static class NoFilterCursorSequenceBuilder
|
||||
private static class NoFilterOffset implements Offset
|
||||
{
|
||||
private final ColumnSelector index;
|
||||
private final Interval interval;
|
||||
private final QueryGranularity gran;
|
||||
private final int rowCount;
|
||||
private volatile int currentOffset;
|
||||
|
||||
public NoFilterCursorSequenceBuilder(
|
||||
ColumnSelector index,
|
||||
Interval interval,
|
||||
QueryGranularity gran
|
||||
)
|
||||
NoFilterOffset(int currentOffset, int rowCount)
|
||||
{
|
||||
this.index = index;
|
||||
this.interval = interval;
|
||||
this.gran = gran;
|
||||
this.currentOffset = currentOffset;
|
||||
this.rowCount = rowCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* This produces iterators of Cursor objects that must be fully processed (until isDone() returns true) before the
|
||||
* next Cursor is processed. It is *not* safe to pass these cursors off to another thread for parallel processing
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public Sequence<Cursor> build()
|
||||
@Override
|
||||
public void increment()
|
||||
{
|
||||
final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
|
||||
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
|
||||
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
|
||||
final Map<String, Object> objectColumnCache = Maps.newHashMap();
|
||||
currentOffset++;
|
||||
}
|
||||
|
||||
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
|
||||
@Override
|
||||
public boolean withinBounds()
|
||||
{
|
||||
return currentOffset < rowCount;
|
||||
}
|
||||
|
||||
return Sequences.withBaggage(
|
||||
Sequences.map(
|
||||
Sequences.simple(gran.iterable(interval.getStartMillis(), interval.getEndMillis())),
|
||||
new Function<Long, Cursor>()
|
||||
{
|
||||
private int currRow = 0;
|
||||
@Override
|
||||
public Offset clone()
|
||||
{
|
||||
return new NoFilterOffset(currentOffset, rowCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cursor apply(final Long input)
|
||||
{
|
||||
final long timeStart = Math.max(interval.getStartMillis(), input);
|
||||
while (currRow < timestamps.length() && timestamps.getLongSingleValueRow(currRow) < timeStart) {
|
||||
++currRow;
|
||||
}
|
||||
|
||||
return new Cursor()
|
||||
{
|
||||
private final DateTime myBucket = gran.toDateTime(input);
|
||||
private final long nextBucket = Math.min(gran.next(myBucket.getMillis()), interval.getEndMillis());
|
||||
private final int initRow = currRow;
|
||||
|
||||
@Override
|
||||
public DateTime getTime()
|
||||
{
|
||||
return myBucket;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void advance()
|
||||
{
|
||||
if (Thread.interrupted()) {
|
||||
throw new QueryInterruptedException();
|
||||
}
|
||||
++currRow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void advanceTo(int offset)
|
||||
{
|
||||
currRow += offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone()
|
||||
{
|
||||
return currRow >= timestamps.length() || timestamps.getLongSingleValueRow(currRow) >= nextBucket;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
{
|
||||
currRow = initRow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimestampColumnSelector makeTimestampColumnSelector()
|
||||
{
|
||||
return new TimestampColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long getTimestamp()
|
||||
{
|
||||
return timestamps.getLongSingleValueRow(currRow);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(String dimension)
|
||||
{
|
||||
final String dimensionName = dimension.toLowerCase();
|
||||
|
||||
DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimensionName);
|
||||
final Column columnDesc = index.getColumn(dimensionName);
|
||||
|
||||
if (cachedColumn == null && columnDesc != null) {
|
||||
cachedColumn = columnDesc.getDictionaryEncoding();
|
||||
dictionaryColumnCache.put(dimensionName, cachedColumn);
|
||||
}
|
||||
|
||||
final DictionaryEncodedColumn column = cachedColumn;
|
||||
|
||||
if (column == null) {
|
||||
return null;
|
||||
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
return column.getMultiValueRow(currRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return column.getCardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
final String retVal = column.lookupName(id);
|
||||
return retVal == null ? "" : retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
return column.lookupId(name);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
// using an anonymous class is faster than creating a class that stores a copy of the value
|
||||
return new IndexedInts()
|
||||
{
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
return column.getSingleValueRow(currRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return Iterators.singletonIterator(column.getSingleValueRow(currRow));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return column.getCardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return column.lookupName(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
return column.lookupId(name);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(String columnName)
|
||||
{
|
||||
final String metricName = columnName.toLowerCase();
|
||||
GenericColumn cachedMetricVals = genericColumnCache.get(metricName);
|
||||
|
||||
if (cachedMetricVals == null) {
|
||||
Column holder = index.getColumn(metricName);
|
||||
if (holder != null && holder.getCapabilities().getType() == ValueType.FLOAT) {
|
||||
cachedMetricVals = holder.getGenericColumn();
|
||||
genericColumnCache.put(metricName, cachedMetricVals);
|
||||
}
|
||||
}
|
||||
|
||||
if (cachedMetricVals == null) {
|
||||
return new FloatColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public float get()
|
||||
{
|
||||
return 0.0f;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
final GenericColumn metricVals = cachedMetricVals;
|
||||
return new FloatColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public float get()
|
||||
{
|
||||
return metricVals.getFloatSingleValueRow(currRow);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(String column)
|
||||
{
|
||||
final String columnName = column.toLowerCase();
|
||||
|
||||
Object cachedColumnVals = objectColumnCache.get(columnName);
|
||||
|
||||
if (cachedColumnVals == null) {
|
||||
Column holder = index.getColumn(columnName);
|
||||
|
||||
if (holder != null) {
|
||||
final ValueType type = holder.getCapabilities().getType();
|
||||
|
||||
if (holder.getCapabilities().isDictionaryEncoded()) {
|
||||
cachedColumnVals = holder.getDictionaryEncoding();
|
||||
} else if (type == ValueType.COMPLEX) {
|
||||
cachedColumnVals = holder.getComplexColumn();
|
||||
} else {
|
||||
cachedColumnVals = holder.getGenericColumn();
|
||||
}
|
||||
}
|
||||
|
||||
if (cachedColumnVals != null) {
|
||||
objectColumnCache.put(columnName, cachedColumnVals);
|
||||
}
|
||||
}
|
||||
|
||||
if (cachedColumnVals == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (cachedColumnVals instanceof GenericColumn) {
|
||||
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
|
||||
final ValueType type = columnVals.getType();
|
||||
|
||||
if (columnVals.hasMultipleValues()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"makeObjectColumnSelector does not support multivalued GenericColumns"
|
||||
);
|
||||
}
|
||||
|
||||
if (type == ValueType.FLOAT) {
|
||||
return new ObjectColumnSelector<Float>()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Float.TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Float get()
|
||||
{
|
||||
return columnVals.getFloatSingleValueRow(currRow);
|
||||
}
|
||||
};
|
||||
}
|
||||
if (type == ValueType.LONG) {
|
||||
return new ObjectColumnSelector<Long>()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Long.TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long get()
|
||||
{
|
||||
return columnVals.getLongSingleValueRow(currRow);
|
||||
}
|
||||
};
|
||||
}
|
||||
if (type == ValueType.STRING) {
|
||||
return new ObjectColumnSelector<String>()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return String.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get()
|
||||
{
|
||||
return columnVals.getStringSingleValueRow(currRow);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
|
||||
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
|
||||
if (columnVals.hasMultipleValues()) {
|
||||
return new ObjectColumnSelector<Object>()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
final IndexedInts multiValueRow = columnVals.getMultiValueRow(currRow);
|
||||
if (multiValueRow.size() == 0) {
|
||||
return null;
|
||||
} else if (multiValueRow.size() == 1) {
|
||||
return columnVals.lookupName(multiValueRow.get(0));
|
||||
} else {
|
||||
final String[] strings = new String[multiValueRow.size()];
|
||||
for (int i = 0 ; i < multiValueRow.size() ; i++) {
|
||||
strings[i] = columnVals.lookupName(multiValueRow.get(i));
|
||||
}
|
||||
return strings;
|
||||
}
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new ObjectColumnSelector<String>()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return String.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get()
|
||||
{
|
||||
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
final ComplexColumn columnVals = (ComplexColumn) cachedColumnVals;
|
||||
return new ObjectColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return columnVals.getClazz();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return columnVals.getRowValue(currRow);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
),
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
CloseQuietly.close(timestamps);
|
||||
for (DictionaryEncodedColumn column : dictionaryColumnCache.values()) {
|
||||
CloseQuietly.close(column);
|
||||
}
|
||||
for (GenericColumn column : genericColumnCache.values()) {
|
||||
CloseQuietly.close(column);
|
||||
}
|
||||
for (ComplexColumn complexColumn : complexColumnCache.values()) {
|
||||
CloseQuietly.close(complexColumn);
|
||||
}
|
||||
for (Object column : objectColumnCache.values()) {
|
||||
if (column instanceof Closeable) {
|
||||
CloseQuietly.close((Closeable) column);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
@Override
|
||||
public int getOffset()
|
||||
{
|
||||
return currentOffset;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue