CombiningSequence: Delay making next yielder on creation until it is actually asked for. (#2892)

This fixes the behavior of limited combining sequences (otherwise limit = 1 would
actually yield 2 elements).
This commit is contained in:
Gian Merlino 2016-04-29 11:12:58 -07:00 committed by Xavier Léauté
parent 2203a812bc
commit 488d12d592
2 changed files with 55 additions and 25 deletions

View File

@ -81,7 +81,7 @@ public class CombiningSequence<T> implements Sequence<T>
}
public <OutType> Yielder<OutType> makeYielder(
Yielder<T> yielder,
final Yielder<T> yielder,
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
boolean finalValue
)
@ -92,7 +92,7 @@ public class CombiningSequence<T> implements Sequence<T>
if(!yielder.isDone()) {
retVal = combiningAccumulator.getRetVal();
finalYielder = yielder.next(yielder.get());
finalYielder = null;
finalFinalValue = false;
} else {
if(!finalValue && combiningAccumulator.accumulatedSomething()) {
@ -124,7 +124,11 @@ public class CombiningSequence<T> implements Sequence<T>
public Yielder<OutType> next(OutType initValue)
{
combiningAccumulator.reset();
return makeYielder(finalYielder, combiningAccumulator, finalFinalValue);
return makeYielder(
finalYielder == null ? yielder.next(yielder.get()) : finalYielder,
combiningAccumulator,
finalFinalValue
);
}
@Override
@ -136,7 +140,9 @@ public class CombiningSequence<T> implements Sequence<T>
@Override
public void close() throws IOException
{
finalYielder.close();
if (finalYielder != null) {
finalYielder.close();
}
}
};
}

View File

@ -20,6 +20,7 @@
package io.druid.common.guava;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -47,7 +48,7 @@ public class CombiningSequenceTest
@Parameterized.Parameters
public static Collection<Object[]> valuesToTry()
{
return Arrays.asList(new Object[][] {
return Arrays.asList(new Object[][]{
{1}, {2}, {3}, {4}, {5}, {1000}
});
}
@ -194,27 +195,48 @@ public class CombiningSequenceTest
private void testCombining(List<Pair<Integer, Integer>> pairs, List<Pair<Integer, Integer>> expected)
throws IOException
{
Sequence<Pair<Integer, Integer>> seq = new CombiningSequence<Pair<Integer, Integer>>(
Sequences.simple(pairs),
Ordering.natural().onResultOf(Pair.<Integer, Integer>lhsFn()),
new BinaryFn<Pair<Integer, Integer>, Pair<Integer, Integer>, Pair<Integer, Integer>>()
{
@Override
public Pair<Integer, Integer> apply(
Pair<Integer, Integer> lhs, Pair<Integer, Integer> rhs
)
{
if (lhs == null) {
return rhs;
}
for (int limit = 0; limit < expected.size() + 1; limit++) {
// limit = 0 doesn't work properly; it returns 1 element
final int expectedLimit = limit == 0 ? 1 : limit;
if (rhs == null) {
return lhs;
}
testCombining(
pairs,
Lists.newArrayList(Iterables.limit(expected, expectedLimit)),
limit
);
}
}
return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs);
}
}
private void testCombining(
List<Pair<Integer, Integer>> pairs,
List<Pair<Integer, Integer>> expected,
int limit
) throws IOException
{
Sequence<Pair<Integer, Integer>> seq = Sequences.limit(
new CombiningSequence<>(
Sequences.simple(pairs),
Ordering.natural().onResultOf(Pair.<Integer, Integer>lhsFn()),
new BinaryFn<Pair<Integer, Integer>, Pair<Integer, Integer>, Pair<Integer, Integer>>()
{
@Override
public Pair<Integer, Integer> apply(
Pair<Integer, Integer> lhs, Pair<Integer, Integer> rhs
)
{
if (lhs == null) {
return rhs;
}
if (rhs == null) {
return lhs;
}
return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs);
}
}
),
limit
);
List<Pair<Integer, Integer>> merged = Sequences.toList(seq, Lists.<Pair<Integer, Integer>>newArrayList());
@ -233,7 +255,9 @@ public class CombiningSequenceTest
)
{
count++;
if(count % yieldEvery == 0) yield();
if (count % yieldEvery == 0) {
yield();
}
return rhs;
}
}