mirror of https://github.com/apache/druid.git
Fixes a mistake introduced in #16533 which can result in CursorGranularizer incorrectly trying to get values from a selector after calling cursor.advance because of a missing check for cursor.isDone
This commit is contained in:
parent
4123f2ca90
commit
2e5d30993e
|
@ -147,14 +147,18 @@ public class CursorGranularizer
|
||||||
if (descending) {
|
if (descending) {
|
||||||
while (currentTime >= currentBucketEnd && !cursor.isDone()) {
|
while (currentTime >= currentBucketEnd && !cursor.isDone()) {
|
||||||
cursor.advance();
|
cursor.advance();
|
||||||
|
if (!cursor.isDone()) {
|
||||||
currentTime = timeSelector.getLong();
|
currentTime = timeSelector.getLong();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
while (currentTime < currentBucketStart && !cursor.isDone()) {
|
while (currentTime < currentBucketStart && !cursor.isDone()) {
|
||||||
cursor.advance();
|
cursor.advance();
|
||||||
|
if (!cursor.isDone()) {
|
||||||
currentTime = timeSelector.getLong();
|
currentTime = timeSelector.getLong();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return currentBucketStart <= currentTime && currentTime < currentBucketEnd;
|
return currentBucketStart <= currentTime && currentTime < currentBucketEnd;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,11 @@ import com.google.common.collect.ImmutableList;
|
||||||
import org.apache.druid.data.input.ListBasedInputRow;
|
import org.apache.druid.data.input.ListBasedInputRow;
|
||||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
import org.apache.druid.java.util.common.guava.Sequence;
|
import org.apache.druid.java.util.common.guava.Sequence;
|
||||||
import org.apache.druid.java.util.common.guava.Sequences;
|
import org.apache.druid.java.util.common.guava.Sequences;
|
||||||
|
import org.apache.druid.query.filter.EqualityFilter;
|
||||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||||
import org.apache.druid.segment.ColumnValueSelector;
|
import org.apache.druid.segment.ColumnValueSelector;
|
||||||
import org.apache.druid.segment.Cursor;
|
import org.apache.druid.segment.Cursor;
|
||||||
|
@ -65,8 +67,11 @@ public class CursorGranularizerTest extends InitializedNullHandlingTest
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException
|
public void setup() throws IOException
|
||||||
{
|
{
|
||||||
final RowSignature signature = RowSignature.builder().add("x", ColumnType.STRING).build();
|
final RowSignature signature = RowSignature.builder()
|
||||||
final List<String> dims = ImmutableList.of("x");
|
.add("x", ColumnType.STRING)
|
||||||
|
.add("y", ColumnType.STRING)
|
||||||
|
.build();
|
||||||
|
final List<String> dims = ImmutableList.of("x", "y");
|
||||||
final IncrementalIndexSchema schema =
|
final IncrementalIndexSchema schema =
|
||||||
IncrementalIndexSchema.builder()
|
IncrementalIndexSchema.builder()
|
||||||
.withDimensionsSpec(DimensionsSpec.builder().useSchemaDiscovery(true).build())
|
.withDimensionsSpec(DimensionsSpec.builder().useSchemaDiscovery(true).build())
|
||||||
|
@ -81,79 +86,79 @@ public class CursorGranularizerTest extends InitializedNullHandlingTest
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T00:00Z"),
|
DateTimes.of("2024-01-01T00:00Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("a")
|
ImmutableList.of("a", "1")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T00:01Z"),
|
DateTimes.of("2024-01-01T00:01Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("b")
|
ImmutableList.of("b", "2")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T00:02Z"),
|
DateTimes.of("2024-01-01T00:02Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("c")
|
ImmutableList.of("c", "1")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T00:03Z"),
|
DateTimes.of("2024-01-01T00:03Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("d")
|
ImmutableList.of("d", "2")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T01:00Z"),
|
DateTimes.of("2024-01-01T01:00Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("e")
|
ImmutableList.of("e", "1")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T01:01Z"),
|
DateTimes.of("2024-01-01T01:01Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("f")
|
ImmutableList.of("f", "2")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T03:04Z"),
|
DateTimes.of("2024-01-01T03:04Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("g")
|
ImmutableList.of("g", "1")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T03:05Z"),
|
DateTimes.of("2024-01-01T03:05Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("h")
|
ImmutableList.of("h", "2")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T03:15Z"),
|
DateTimes.of("2024-01-01T03:15Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("i")
|
ImmutableList.of("i", "1")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T05:03Z"),
|
DateTimes.of("2024-01-01T05:03Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("j")
|
ImmutableList.of("j", "2")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T06:00Z"),
|
DateTimes.of("2024-01-01T06:00Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("k")
|
ImmutableList.of("k", "1")
|
||||||
),
|
),
|
||||||
new ListBasedInputRow(
|
new ListBasedInputRow(
|
||||||
signature,
|
signature,
|
||||||
DateTimes.of("2024-01-01T09:01Z"),
|
DateTimes.of("2024-01-01T09:01Z"),
|
||||||
dims,
|
dims,
|
||||||
ImmutableList.of("l")
|
ImmutableList.of("l", "2")
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
.tmpDir(temporaryFolder.newFolder());
|
.tmpDir(temporaryFolder.newFolder());
|
||||||
|
|
||||||
final QueryableIndex index = bob.buildMMappedIndex();
|
final QueryableIndex index = bob.buildMMappedIndex(Intervals.of("2024-01-01T00:00Z/2024-01-02T00:00Z"));
|
||||||
interval = index.getDataInterval();
|
interval = index.getDataInterval();
|
||||||
cursorFactory = new QueryableIndexCursorFactory(index);
|
cursorFactory = new QueryableIndexCursorFactory(index);
|
||||||
timeBoundaryInspector = QueryableIndexTimeBoundaryInspector.create(index);
|
timeBoundaryInspector = QueryableIndexTimeBoundaryInspector.create(index);
|
||||||
|
@ -261,4 +266,102 @@ public class CursorGranularizerTest extends InitializedNullHandlingTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGranularizeFiltered()
|
||||||
|
{
|
||||||
|
final CursorBuildSpec filtered = CursorBuildSpec.builder()
|
||||||
|
.setFilter(new EqualityFilter("y", ColumnType.STRING, "1", null))
|
||||||
|
.build();
|
||||||
|
try (CursorHolder cursorHolder = cursorFactory.makeCursorHolder(filtered)) {
|
||||||
|
final Cursor cursor = cursorHolder.asCursor();
|
||||||
|
CursorGranularizer granularizer = CursorGranularizer.create(
|
||||||
|
cursor,
|
||||||
|
timeBoundaryInspector,
|
||||||
|
Order.ASCENDING,
|
||||||
|
Granularities.HOUR,
|
||||||
|
interval
|
||||||
|
);
|
||||||
|
|
||||||
|
final ColumnSelectorFactory selectorFactory = cursor.getColumnSelectorFactory();
|
||||||
|
final ColumnValueSelector xSelector = selectorFactory.makeColumnValueSelector("x");
|
||||||
|
final Sequence<List<String>> theSequence =
|
||||||
|
Sequences.simple(granularizer.getBucketIterable())
|
||||||
|
.map(bucketInterval -> {
|
||||||
|
List<String> bucket = new ArrayList<>();
|
||||||
|
if (!granularizer.advanceToBucket(bucketInterval)) {
|
||||||
|
return bucket;
|
||||||
|
}
|
||||||
|
while (!cursor.isDone()) {
|
||||||
|
bucket.add((String) xSelector.getObject());
|
||||||
|
if (!granularizer.advanceCursorWithinBucket()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bucket;
|
||||||
|
});
|
||||||
|
|
||||||
|
List<List<String>> granularized = theSequence.toList();
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableList.of(
|
||||||
|
ImmutableList.of("a", "c"),
|
||||||
|
ImmutableList.of("e"),
|
||||||
|
ImmutableList.of(),
|
||||||
|
ImmutableList.of("g", "i"),
|
||||||
|
ImmutableList.of(),
|
||||||
|
ImmutableList.of(),
|
||||||
|
ImmutableList.of("k"),
|
||||||
|
ImmutableList.of(),
|
||||||
|
ImmutableList.of(),
|
||||||
|
ImmutableList.of()
|
||||||
|
),
|
||||||
|
granularized
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGranularizeFilteredClippedAndPartialOverlap()
|
||||||
|
{
|
||||||
|
final CursorBuildSpec filtered = CursorBuildSpec.builder()
|
||||||
|
.setFilter(new EqualityFilter("y", ColumnType.STRING, "1", null))
|
||||||
|
.build();
|
||||||
|
try (CursorHolder cursorHolder = cursorFactory.makeCursorHolder(filtered)) {
|
||||||
|
final Cursor cursor = cursorHolder.asCursor();
|
||||||
|
CursorGranularizer granularizer = CursorGranularizer.create(
|
||||||
|
cursor,
|
||||||
|
timeBoundaryInspector,
|
||||||
|
Order.ASCENDING,
|
||||||
|
Granularities.HOUR,
|
||||||
|
Intervals.of("2024-01-01T08:00Z/2024-01-03T00:00Z")
|
||||||
|
);
|
||||||
|
|
||||||
|
final ColumnSelectorFactory selectorFactory = cursor.getColumnSelectorFactory();
|
||||||
|
final ColumnValueSelector xSelector = selectorFactory.makeColumnValueSelector("x");
|
||||||
|
final Sequence<List<String>> theSequence =
|
||||||
|
Sequences.simple(granularizer.getBucketIterable())
|
||||||
|
.map(bucketInterval -> {
|
||||||
|
List<String> bucket = new ArrayList<>();
|
||||||
|
if (!granularizer.advanceToBucket(bucketInterval)) {
|
||||||
|
return bucket;
|
||||||
|
}
|
||||||
|
while (!cursor.isDone()) {
|
||||||
|
bucket.add((String) xSelector.getObject());
|
||||||
|
if (!granularizer.advanceCursorWithinBucket()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bucket;
|
||||||
|
});
|
||||||
|
|
||||||
|
List<List<String>> granularized = theSequence.toList();
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableList.of(
|
||||||
|
ImmutableList.of(),
|
||||||
|
ImmutableList.of()
|
||||||
|
),
|
||||||
|
granularized
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.druid.segment.transform.TransformSpec;
|
||||||
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
||||||
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
|
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -235,6 +236,11 @@ public class IndexBuilder
|
||||||
}
|
}
|
||||||
|
|
||||||
public File buildMMappedIndexFile()
|
public File buildMMappedIndexFile()
|
||||||
|
{
|
||||||
|
return buildMMappedIndexFile(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public File buildMMappedIndexFile(@Nullable Interval dataInterval)
|
||||||
{
|
{
|
||||||
Preconditions.checkNotNull(indexMerger, "indexMerger");
|
Preconditions.checkNotNull(indexMerger, "indexMerger");
|
||||||
Preconditions.checkNotNull(tmpDir, "tmpDir");
|
Preconditions.checkNotNull(tmpDir, "tmpDir");
|
||||||
|
@ -244,6 +250,7 @@ public class IndexBuilder
|
||||||
indexIO.loadIndex(
|
indexIO.loadIndex(
|
||||||
indexMerger.persist(
|
indexMerger.persist(
|
||||||
incrementalIndex,
|
incrementalIndex,
|
||||||
|
dataInterval == null ? incrementalIndex.getInterval() : dataInterval,
|
||||||
new File(
|
new File(
|
||||||
tmpDir,
|
tmpDir,
|
||||||
StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))
|
StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))
|
||||||
|
@ -276,7 +283,17 @@ public class IndexBuilder
|
||||||
public QueryableIndex buildMMappedIndex()
|
public QueryableIndex buildMMappedIndex()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
return indexIO.loadIndex(buildMMappedIndexFile());
|
return indexIO.loadIndex(buildMMappedIndexFile(null));
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueryableIndex buildMMappedIndex(Interval dataInterval)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return indexIO.loadIndex(buildMMappedIndexFile(dataInterval));
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
|
Loading…
Reference in New Issue