Fixed bug introduced by replacing deque with list

This commit is contained in:
Justin Borromeo 2019-02-22 14:03:22 -08:00
parent 023538d831
commit 3b923dac9c
1 changed files with 14 additions and 16 deletions

View File

@ -20,23 +20,19 @@
package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
@ -48,8 +44,10 @@ import org.apache.druid.segment.Segment;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -170,8 +168,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
int limit = Math.toIntExact(scanQuery.getLimit());
// Comparator ordering is reversed since polling from the queue returns elements in reversed order
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator.reversed());
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator);
Yielder<ScanResultValue> yielder = inputSequence.toYielder(
null,
@ -198,27 +195,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}
yielder = yielder.next(null);
}
// Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order
// will be maintained
final List<ScanResultValue> sortedElements = new ArrayList<>(q.size());
// Need to convert to a Deque because Priority Queue's iterator doesn't guarantee that the sorted order
// will be maintained. Deque was chosen over list because its addFirst is O(1).
final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size());
while (q.size() != 0) {
sortedElements.add(q.poll());
// addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first.
sortedElements.addFirst(q.poll());
}
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIteratorIterator>()
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIterator>()
{
@Override
public ScanBatchedIteratorIterator make()
public ScanBatchedIterator make()
{
return new ScanBatchedIteratorIterator(
return new ScanBatchedIterator(
sortedElements.iterator(),
scanQuery.getBatchSize()
);
}
@Override
public void cleanup(ScanBatchedIteratorIterator iterFromMake)
public void cleanup(ScanBatchedIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
@ -265,12 +263,12 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
* ScanResultValue will be used to populate the column section.
*/
@VisibleForTesting
static class ScanBatchedIteratorIterator implements CloseableIterator<ScanResultValue>
static class ScanBatchedIterator implements CloseableIterator<ScanResultValue>
{
private final Iterator<ScanResultValue> itr;
private final int batchSize;
public ScanBatchedIteratorIterator(Iterator<ScanResultValue> iterator, int batchSize)
public ScanBatchedIterator(Iterator<ScanResultValue> iterator, int batchSize)
{
this.itr = iterator;
this.batchSize = batchSize;