mirror of https://github.com/apache/druid.git
Fix early return from YieldingSequenceBase#accumulate. (#9293)
Fixes #9291.
This commit is contained in:
parent
6b44d4aa80
commit
07a91f9022
|
@ -47,9 +47,7 @@ final class LimitedSequence<T> extends YieldingSequenceBase<T>
|
||||||
@Override
|
@Override
|
||||||
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
||||||
{
|
{
|
||||||
final LimitedYieldingAccumulator<OutType, T> limitedAccumulator = new LimitedYieldingAccumulator<>(
|
final LimitedYieldingAccumulator<OutType> limitedAccumulator = new LimitedYieldingAccumulator<>(accumulator);
|
||||||
accumulator
|
|
||||||
);
|
|
||||||
final Yielder<OutType> subYielder = baseSequence.toYielder(initValue, limitedAccumulator);
|
final Yielder<OutType> subYielder = baseSequence.toYielder(initValue, limitedAccumulator);
|
||||||
return new LimitedYielder<>(subYielder, limitedAccumulator);
|
return new LimitedYielder<>(subYielder, limitedAccumulator);
|
||||||
}
|
}
|
||||||
|
@ -57,11 +55,11 @@ final class LimitedSequence<T> extends YieldingSequenceBase<T>
|
||||||
private class LimitedYielder<OutType> implements Yielder<OutType>
|
private class LimitedYielder<OutType> implements Yielder<OutType>
|
||||||
{
|
{
|
||||||
private final Yielder<OutType> subYielder;
|
private final Yielder<OutType> subYielder;
|
||||||
private final LimitedYieldingAccumulator<OutType, T> limitedAccumulator;
|
private final LimitedYieldingAccumulator<OutType> limitedAccumulator;
|
||||||
|
|
||||||
LimitedYielder(
|
LimitedYielder(
|
||||||
Yielder<OutType> subYielder,
|
Yielder<OutType> subYielder,
|
||||||
LimitedYieldingAccumulator<OutType, T> limitedAccumulator
|
LimitedYieldingAccumulator<OutType> limitedAccumulator
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.subYielder = subYielder;
|
this.subYielder = subYielder;
|
||||||
|
@ -104,7 +102,7 @@ final class LimitedSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class LimitedYieldingAccumulator<OutType, T> extends DelegatingYieldingAccumulator<OutType, T>
|
private class LimitedYieldingAccumulator<OutType> extends DelegatingYieldingAccumulator<OutType, T>
|
||||||
{
|
{
|
||||||
long count;
|
long count;
|
||||||
boolean interruptYield = false;
|
boolean interruptYield = false;
|
||||||
|
|
|
@ -32,6 +32,9 @@ public abstract class YieldingSequenceBase<T> implements Sequence<T>
|
||||||
Yielder<OutType> yielder = toYielder(initValue, YieldingAccumulators.fromAccumulator(accumulator));
|
Yielder<OutType> yielder = toYielder(initValue, YieldingAccumulators.fromAccumulator(accumulator));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
while (!yielder.isDone()) {
|
||||||
|
yielder = yielder.next(yielder.get());
|
||||||
|
}
|
||||||
return yielder.get();
|
return yielder.get();
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
|
|
@ -25,45 +25,100 @@ import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
public class LimitedSequenceTest
|
public class LimitedSequenceTest
|
||||||
{
|
{
|
||||||
|
private static final List<Integer> NUMS = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSanityAccumulate() throws Exception
|
public void testSanityAccumulate() throws Exception
|
||||||
{
|
{
|
||||||
final List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
|
||||||
final int threshold = 5;
|
final int threshold = 5;
|
||||||
SequenceTestHelper.testAll(
|
SequenceTestHelper.testAll(
|
||||||
Sequences.simple(nums).limit(threshold),
|
Sequences.simple(NUMS).limit(threshold),
|
||||||
Lists.newArrayList(Iterables.limit(nums, threshold))
|
Lists.newArrayList(Iterables.limit(NUMS, threshold))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTwo() throws Exception
|
public void testTwo() throws Exception
|
||||||
{
|
{
|
||||||
final List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
|
||||||
final int threshold = 2;
|
final int threshold = 2;
|
||||||
|
|
||||||
SequenceTestHelper.testAll(
|
SequenceTestHelper.testAll(
|
||||||
Sequences.simple(nums).limit(threshold),
|
Sequences.simple(NUMS).limit(threshold),
|
||||||
Lists.newArrayList(Iterables.limit(nums, threshold))
|
Lists.newArrayList(Iterables.limit(NUMS, threshold))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOne() throws Exception
|
public void testOne() throws Exception
|
||||||
{
|
{
|
||||||
final List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
|
||||||
final int threshold = 1;
|
final int threshold = 1;
|
||||||
|
|
||||||
SequenceTestHelper.testAll(
|
SequenceTestHelper.testAll(
|
||||||
Sequences.simple(nums).limit(threshold),
|
Sequences.simple(NUMS).limit(threshold),
|
||||||
Lists.newArrayList(Iterables.limit(nums, threshold))
|
Lists.newArrayList(Iterables.limit(NUMS, threshold))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithYieldingSequence()
|
||||||
|
{
|
||||||
|
// Regression test for https://github.com/apache/druid/issues/9291.
|
||||||
|
|
||||||
|
// Create a Sequence whose Yielders will yield for each element, regardless of what the accumulator passed
|
||||||
|
// to "toYielder" does.
|
||||||
|
final BaseSequence<Integer, Iterator<Integer>> sequence = new BaseSequence<Integer, Iterator<Integer>>(
|
||||||
|
new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Iterator<Integer> make()
|
||||||
|
{
|
||||||
|
return NUMS.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup(Iterator<Integer> iterFromMake)
|
||||||
|
{
|
||||||
|
// Do nothing.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public <OutType> Yielder<OutType> toYielder(
|
||||||
|
final OutType initValue,
|
||||||
|
final YieldingAccumulator<OutType, Integer> accumulator
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return super.toYielder(
|
||||||
|
initValue,
|
||||||
|
new DelegatingYieldingAccumulator<OutType, Integer>(accumulator)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public OutType accumulate(OutType accumulated, Integer in)
|
||||||
|
{
|
||||||
|
final OutType retVal = super.accumulate(accumulated, in);
|
||||||
|
yield();
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
final int threshold = 4;
|
||||||
|
|
||||||
|
// Can't use "testAll" because its "testYield" implementation depends on the underlying Sequence _not_ yielding.
|
||||||
|
SequenceTestHelper.testAccumulation(
|
||||||
|
"",
|
||||||
|
sequence.limit(threshold),
|
||||||
|
Lists.newArrayList(Iterables.limit(NUMS, threshold))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,15 +19,13 @@
|
||||||
|
|
||||||
package org.apache.druid.java.util.common.guava;
|
package org.apache.druid.java.util.common.guava;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class SequenceTestHelper
|
public class SequenceTestHelper
|
||||||
{
|
{
|
||||||
public static void testAll(Sequence<Integer> seq, List<Integer> nums) throws IOException
|
public static void testAll(Sequence<Integer> seq, List<Integer> nums) throws IOException
|
||||||
|
|
Loading…
Reference in New Issue