Sequence stuff is so dirty :(

This commit is contained in:
Justin Borromeo 2019-02-22 13:30:08 -08:00
parent e1fc2955d3
commit 023538d831
3 changed files with 113 additions and 46 deletions

View File

@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.PriorityQueue; import java.util.PriorityQueue;
/** /**
* Used to perform an n-way merge on n ordered sequences
*/ */
public class MergeSequence<T> extends YieldingSequenceBase<T> public class MergeSequence<T> extends YieldingSequenceBase<T>
{ {
@ -43,20 +44,18 @@ public class MergeSequence<T> extends YieldingSequenceBase<T>
this.baseSequences = (Sequence<? extends Sequence<T>>) baseSequences; this.baseSequences = (Sequence<? extends Sequence<T>>) baseSequences;
} }
/*
Note: the yielder for MergeSequence returns elements from the priority queue in order of increasing priority.
This is due to the fact that PriorityQueue#remove() polls from the head of the queue which is, according to
the PriorityQueue javadoc, "the least element with respect to the specified ordering"
*/
@Override @Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator) public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{ {
PriorityQueue<Yielder<T>> pQueue = new PriorityQueue<>( PriorityQueue<Yielder<T>> pQueue = new PriorityQueue<>(
32, 32,
ordering.onResultOf( ordering.onResultOf(
new Function<Yielder<T>, T>() (Function<Yielder<T>, T>) input -> input.get()
{
@Override
public T apply(Yielder<T> input)
{
return input.get();
}
}
) )
); );

View File

@ -20,16 +20,11 @@
package org.apache.druid.query.scan; package org.apache.druid.query.scan;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Functions; import com.google.common.base.Functions;
import com.google.common.collect.Iterables;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryMetrics;
@ -38,17 +33,6 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.aggregation.MetricManipulationFn;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, ScanQuery> public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, ScanQuery>
{ {
private static final TypeReference<ScanResultValue> TYPE_REFERENCE = new TypeReference<ScanResultValue>() private static final TypeReference<ScanResultValue> TYPE_REFERENCE = new TypeReference<ScanResultValue>()

View File

@ -20,23 +20,24 @@
package org.apache.druid.query.scan; package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator; import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase; import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -45,11 +46,10 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChest;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -134,20 +134,27 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
seq -> seq, seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator( Ordering.from(new ScanResultValueTimestampComparator(
query query
)).reverse() // TODO Figure out why this needs to be reversed )).reverse() // This needs to be reversed because
).limit( ).limit(
Math.toIntExact(query.getLimit()) Math.toIntExact(query.getLimit())
); );
// Batch the scan result values // Batch the scan result values
} else { return new ScanResultValueBatchingSequence(unbatched, query.getBatchSize());
} else if (query.getLimit() > scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
throw new UOE( throw new UOE(
"Time ordering for result set limit of %,d is not supported. Try lowering the " "Time ordering for query result set limit of %,d is not supported. Try lowering the result "
+ "result set size to less than or equal to the time ordering limit of %,d.", + "set size to less than or equal to the configurable time ordering limit of %,d rows.",
query.getLimit(), query.getLimit(),
scanQueryConfig.getMaxRowsQueuedForTimeOrdering() scanQueryConfig.getMaxRowsQueuedForTimeOrdering()
); );
} }
throw new UOE(
"Time ordering for queries of %,d segments per historical is not supported. Try reducing the scope "
+ "of the query to scan fewer segments than the configurable time ordering limit of %,d segments",
numSegments,
scanQueryConfig.getMaxSegmentsTimeOrderedInMemory()
);
}; };
} }
@ -157,13 +164,14 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
ScanQuery scanQuery ScanQuery scanQuery
) )
{ {
Comparator<ScanResultValue> priorityQComparator = Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
new ScanResultValueTimestampComparator(scanQuery);
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
int limit = Math.toIntExact(scanQuery.getLimit()); int limit = Math.toIntExact(scanQuery.getLimit());
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator);
// Comparator ordering is reversed since polling from the queue returns elements in reversed order
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator.reversed());
Yielder<ScanResultValue> yielder = inputSequence.toYielder( Yielder<ScanResultValue> yielder = inputSequence.toYielder(
null, null,
@ -192,28 +200,25 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
} }
// Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order // Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order
// will be maintained // will be maintained
final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size()); final List<ScanResultValue> sortedElements = new ArrayList<>(q.size());
while (q.size() != 0) { while (q.size() != 0) {
// We add at the front of the list because poll removes the tail of the queue. sortedElements.add(q.poll());
sortedElements.addFirst(q.poll());
} }
// We can use an iterator here because all the results have been materialized for sorting
return new BaseSequence( return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIterator>() new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIteratorIterator>()
{ {
@Override @Override
public ScanBatchedIterator make() public ScanBatchedIteratorIterator make()
{ {
return new ScanBatchedIterator( return new ScanBatchedIteratorIterator(
sortedElements.iterator(), sortedElements.iterator(),
scanQuery.getBatchSize() scanQuery.getBatchSize()
); );
} }
@Override @Override
public void cleanup(ScanBatchedIterator iterFromMake) public void cleanup(ScanBatchedIteratorIterator iterFromMake)
{ {
CloseQuietly.close(iterFromMake); CloseQuietly.close(iterFromMake);
} }
@ -260,12 +265,12 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
* ScanResultValue will be used to populate the column section. * ScanResultValue will be used to populate the column section.
*/ */
@VisibleForTesting @VisibleForTesting
static class ScanBatchedIterator implements CloseableIterator<ScanResultValue> static class ScanBatchedIteratorIterator implements CloseableIterator<ScanResultValue>
{ {
private final Iterator<ScanResultValue> itr; private final Iterator<ScanResultValue> itr;
private final int batchSize; private final int batchSize;
public ScanBatchedIterator(Iterator<ScanResultValue> iterator, int batchSize) public ScanBatchedIteratorIterator(Iterator<ScanResultValue> iterator, int batchSize)
{ {
this.itr = iterator; this.itr = iterator;
this.batchSize = batchSize; this.batchSize = batchSize;
@ -297,4 +302,83 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return new ScanResultValue(null, columns, eventsToAdd); return new ScanResultValue(null, columns, eventsToAdd);
} }
} }
@VisibleForTesting
static class ScanResultValueBatchingSequence extends YieldingSequenceBase<ScanResultValue>
{
Yielder<ScanResultValue> inputYielder;
int batchSize;
public ScanResultValueBatchingSequence(Sequence<ScanResultValue> inputSequence, int batchSize) {
this.inputYielder = inputSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
{
yield();
return in;
}
}
);
this.batchSize = batchSize;
}
@Override
@Nullable
public <OutType> Yielder<OutType> toYielder(
OutType initValue, YieldingAccumulator<OutType, ScanResultValue> accumulator
)
{
return makeYielder(initValue, accumulator);
}
private <OutType> Yielder<OutType> makeYielder(
OutType initVal,
final YieldingAccumulator<OutType, ScanResultValue> accumulator
)
{
return new Yielder<OutType>()
{
@Override
public OutType get()
{
// Create new ScanResultValue from event map
List<Object> eventsToAdd = new ArrayList<>(batchSize);
List<String> columns = new ArrayList<>();
while (eventsToAdd.size() < batchSize && !inputYielder.isDone()) {
ScanResultValue srv = inputYielder.get();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
inputYielder = inputYielder.next(null);
}
try {
return (OutType) new ScanResultValue(null, columns, eventsToAdd);
} catch (ClassCastException e) {
return initVal;
}
}
@Override
public Yielder<OutType> next(OutType initValue)
{
accumulator.reset();
return makeYielder(initValue, accumulator);
}
@Override
public boolean isDone()
{
return inputYielder.isDone();
}
@Override
public void close()
{
}
};
}
}
} }