Add HistoricalCursor.getReadableOffset() to access unwrapped offset in selectors (#4633)

* Add HistoricalCursor.getReadableOffset() to access unwrapped offset in selectors, when the 'main' offset if FilteredOffset (fixes #4628)

* Stack overflow test
This commit is contained in:
Roman Leventov 2017-08-04 22:51:48 +03:00 committed by Gian Merlino
parent 8c8759da03
commit 7a005088d9
5 changed files with 89 additions and 20 deletions

View File

@ -162,7 +162,7 @@ public final class FilteredOffset extends Offset
@Override
public boolean matches()
{
int currentOffset = holder.getOffset().getOffset();
int currentOffset = holder.getReadableOffset().getOffset();
while (iterOffset > currentOffset && iter.hasNext()) {
iterOffset = iter.next();
}
@ -174,6 +174,7 @@ public final class FilteredOffset extends Offset
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("holder", holder);
inspector.visit("offset", holder.getReadableOffset());
inspector.visit("iter", iter);
}
};
@ -185,7 +186,7 @@ public final class FilteredOffset extends Offset
@Override
public boolean matches()
{
int currentOffset = holder.getOffset().getOffset();
int currentOffset = holder.getReadableOffset().getOffset();
while (iterOffset < currentOffset && iter.hasNext()) {
iterOffset = iter.next();
}
@ -197,6 +198,7 @@ public final class FilteredOffset extends Offset
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("holder", holder);
inspector.visit("offset", holder.getReadableOffset());
inspector.visit("iter", iter);
}
};

View File

@ -47,6 +47,7 @@ import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.Offset;
import io.druid.segment.data.ReadableOffset;
import io.druid.segment.filter.AndFilter;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
@ -447,6 +448,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return cursorOffset;
}
@Override
public ReadableOffset getReadableOffset()
{
return cursorOffset;
}
@Override
public DateTime getTime()
{
@ -561,7 +568,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public float get()
{
return metricVals.getFloatSingleValueRow(cursorOffset.getOffset());
return metricVals.getFloatSingleValueRow(getReadableOffset().getOffset());
}
@Override
@ -574,7 +581,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("metricVals", metricVals);
inspector.visit("cursorOffset", cursorOffset);
inspector.visit("cursorOffset", getReadableOffset());
}
};
}
@ -607,14 +614,14 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public double get()
{
return metricVals.getDoubleSingleValueRow(cursorOffset.getOffset());
return metricVals.getDoubleSingleValueRow(getReadableOffset().getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("metricVals", metricVals);
inspector.visit("cursorOffset", cursorOffset);
inspector.visit("cursorOffset", getReadableOffset());
}
};
}
@ -647,14 +654,14 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public long get()
{
return metricVals.getLongSingleValueRow(cursorOffset.getOffset());
return metricVals.getLongSingleValueRow(getReadableOffset().getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("metricVals", metricVals);
inspector.visit("cursorOffset", cursorOffset);
inspector.visit("cursorOffset", getReadableOffset());
}
};
}
@ -716,7 +723,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(cursorOffset.getOffset());
return columnVals.getFloatSingleValueRow(getReadableOffset().getOffset());
}
};
}
@ -732,7 +739,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public Double get()
{
return columnVals.getDoubleSingleValueRow(cursorOffset.getOffset());
return columnVals.getDoubleSingleValueRow(getReadableOffset().getOffset());
}
};
}
@ -748,7 +755,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(cursorOffset.getOffset());
return columnVals.getLongSingleValueRow(getReadableOffset().getOffset());
}
};
}
@ -764,7 +771,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public String get()
{
return columnVals.getStringSingleValueRow(cursorOffset.getOffset());
return columnVals.getStringSingleValueRow(getReadableOffset().getOffset());
}
};
}
@ -784,7 +791,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public Object get()
{
final IndexedInts multiValueRow = columnVals.getMultiValueRow(cursorOffset.getOffset());
int currentOffset = getReadableOffset().getOffset();
final IndexedInts multiValueRow = columnVals.getMultiValueRow(currentOffset);
if (multiValueRow.size() == 0) {
return null;
} else if (multiValueRow.size() == 1) {
@ -810,7 +818,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset()));
int currentOffset = getReadableOffset().getOffset();
return columnVals.lookupName(columnVals.getSingleValueRow(currentOffset));
}
};
}
@ -828,7 +837,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public Object get()
{
return columnVals.getRowValue(cursorOffset.getOffset());
return columnVals.getRowValue(getReadableOffset().getOffset());
}
};
}
@ -873,11 +882,19 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
} else {
return new QueryableIndexBaseCursor<FilteredOffset>()
{
private Offset baseOffset;
{
cursorOffset = new FilteredOffset(this, descending, postFilter, bitmapIndexSelector);
reset();
}
@Override
public ReadableOffset getReadableOffset()
{
return baseOffset;
}
@Override
public void advance()
{
@ -896,7 +913,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void reset()
{
cursorOffset.reset(initOffset.clone());
baseOffset = initOffset.clone();
cursorOffset.reset(baseOffset);
}
};
}

View File

@ -158,7 +158,7 @@ public class SimpleDictionaryEncodedColumn
@Override
public IndexedInts getRow()
{
return multiValueColumn.get(offsetHolder.getOffset().getOffset());
return multiValueColumn.get(offsetHolder.getReadableOffset().getOffset());
}
@Override
@ -184,7 +184,7 @@ public class SimpleDictionaryEncodedColumn
{
inspector.visit("multiValueColumn", multiValueColumn);
inspector.visit("offsetHolder", offsetHolder);
inspector.visit("offset", offsetHolder.getOffset());
inspector.visit("offset", offsetHolder.getReadableOffset());
inspector.visit("extractionFn", extractionFn);
}
}
@ -202,7 +202,7 @@ public class SimpleDictionaryEncodedColumn
@Override
public int getRowValue()
{
return column.get(offsetHolder.getOffset().getOffset());
return column.get(offsetHolder.getReadableOffset().getOffset());
}
@Override
@ -274,7 +274,7 @@ public class SimpleDictionaryEncodedColumn
{
inspector.visit("column", column);
inspector.visit("offsetHolder", offsetHolder);
inspector.visit("offset", offsetHolder.getOffset());
inspector.visit("offset", offsetHolder.getReadableOffset());
inspector.visit("extractionFn", extractionFn);
}
}

View File

@ -20,8 +20,16 @@
package io.druid.segment.historical;
import io.druid.segment.data.Offset;
import io.druid.segment.data.ReadableOffset;
public interface OffsetHolder
{
Offset getOffset();
/**
* Should return the same, or a "view" of the same offset as {@link #getOffset()}. The difference is that smaller
* interface allows to return unwrapped underlying offset sometimes, e. g. {@link
* io.druid.segment.FilteredOffset#baseOffset}, instead of the wrapper {@link io.druid.segment.FilteredOffset}.
*/
ReadableOffset getReadableOffset();
}

View File

@ -80,6 +80,7 @@ import io.druid.query.extraction.RegexDimExtractionFn;
import io.druid.query.extraction.StrlenExtractionFn;
import io.druid.query.extraction.TimeFormatExtractionFn;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.ExtractionDimFilter;
import io.druid.query.filter.SelectorDimFilter;
@ -5481,4 +5482,44 @@ public class TopNQueryRunnerTest
assertExpectedResults(expectedResults, query);
}
}
@Test
public void testFullOnTopNBoundFilterAndLongSumMetric()
{
// this tests the stack overflow issue from https://github.com/druid-io/druid/issues/4628
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension, "Market")
.filters(new BoundDimFilter(
QueryRunnerTestHelper.indexMetric,
"0",
"46.64980229268867",
true,
true,
false,
null,
StringComparators.NUMERIC
))
.metric("Count")
.threshold(5)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.asList(new LongSumAggregatorFactory("Count", "qualityLong"))
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(Arrays.asList())
)
);
final Sequence<Result<TopNResultValue>> retval = runWithMerge(query);
List<Result<TopNResultValue>> results = Sequences.toList(retval, Lists.<Result<TopNResultValue>>newArrayList());
assertExpectedResults(expectedResults, query);
}
}