Modified sequence limit to accept longs and added test for long limits

This commit is contained in:
Justin Borromeo 2019-03-27 17:38:20 -07:00
parent 1df50de321
commit 231a72e7d9
4 changed files with 32 additions and 18 deletions

View File

@ -30,11 +30,11 @@ import java.io.IOException;
final class LimitedSequence<T> extends YieldingSequenceBase<T> final class LimitedSequence<T> extends YieldingSequenceBase<T>
{ {
private final Sequence<T> baseSequence; private final Sequence<T> baseSequence;
private final int limit; private final long limit;
LimitedSequence( LimitedSequence(
Sequence<T> baseSequence, Sequence<T> baseSequence,
int limit long limit
) )
{ {
Preconditions.checkNotNull(baseSequence); Preconditions.checkNotNull(baseSequence);
@ -106,7 +106,7 @@ final class LimitedSequence<T> extends YieldingSequenceBase<T>
private class LimitedYieldingAccumulator<OutType, T> extends DelegatingYieldingAccumulator<OutType, T> private class LimitedYieldingAccumulator<OutType, T> extends DelegatingYieldingAccumulator<OutType, T>
{ {
int count; long count;
boolean interruptYield = false; boolean interruptYield = false;
LimitedYieldingAccumulator(YieldingAccumulator<OutType, T> accumulator) LimitedYieldingAccumulator(YieldingAccumulator<OutType, T> accumulator)

View File

@ -82,7 +82,7 @@ public interface Sequence<T>
return accumulate(new ArrayList<>(), Accumulators.list()); return accumulate(new ArrayList<>(), Accumulators.list());
} }
default Sequence<T> limit(int limit) default Sequence<T> limit(long limit)
{ {
return new LimitedSequence<>(this, limit); return new LimitedSequence<>(this, limit);
} }

View File

@ -207,6 +207,12 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
{ {
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
if (scanQuery.getLimit() > Integer.MAX_VALUE) {
throw new UOE("Limit of %,d rows not supported for priority queue strategy of time-ordering scan results",
scanQuery.getLimit()
);
}
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
int limit = Math.toIntExact(scanQuery.getLimit()); int limit = Math.toIntExact(scanQuery.getLimit());
@ -282,7 +288,6 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp // (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group // (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence // (5) Join all the results into a single sequence
return Sequences.concat( return Sequences.concat(
Sequences.map( Sequences.map(
Sequences.simple(groupedRunners), Sequences.simple(groupedRunners),
@ -303,7 +308,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
) )
) )
).limit( ).limit(
Math.toIntExact(((ScanQuery) (queryPlus.getQuery())).getLimit()) ((ScanQuery) (queryPlus.getQuery())).getLimit()
); );
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
@ -83,7 +84,7 @@ public class ScanQueryRunnerFactoryTest
{ {
List<Integer> numsElements = ImmutableList.of(0, 10, 100); List<Integer> numsElements = ImmutableList.of(0, 10, 100);
List<Integer> batchSizes = ImmutableList.of(1, 100); List<Integer> batchSizes = ImmutableList.of(1, 100);
List<Long> limits = ImmutableList.of(3L, 1000L); List<Long> limits = ImmutableList.of(3L, 1000L, Long.MAX_VALUE);
List<ScanQuery.ResultFormat> resultFormats = ImmutableList.of( List<ScanQuery.ResultFormat> resultFormats = ImmutableList.of(
ScanQuery.ResultFormat.RESULT_FORMAT_LIST, ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST
@ -125,17 +126,25 @@ public class ScanQueryRunnerFactoryTest
return retVal; return retVal;
}); });
Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs); Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
List<ScanResultValue> output = try {
factory.priorityQueueSortAndLimit( List<ScanResultValue> output = factory.priorityQueueSortAndLimit(
inputSequence, inputSequence,
query, query,
ImmutableList.of(new SegmentDescriptor(new Interval( ImmutableList.of(new SegmentDescriptor(new Interval(
DateTimes.of("2010-01-01"), DateTimes.of("2010-01-01"),
DateTimes.of("2019-01-01").plusHours(1) DateTimes.of("2019-01-01").plusHours(1)
), "1", 0)) ), "1", 0))
).toList(); ).toList();
if (query.getLimit() > Integer.MAX_VALUE) {
validateSortedOutput(output, expectedEventTimestamps); Assert.fail("Unsupported exception should have been thrown due to high limit");
}
validateSortedOutput(output, expectedEventTimestamps);
}
catch (UOE e) {
if (query.getLimit() <= Integer.MAX_VALUE) {
Assert.fail("Unsupported operation exception should not have been thrown here");
}
}
} }
@Test @Test