Relay final value to yielder in CombineSequence (Fix for #2586)

This commit is contained in:
navis.ryu 2016-03-07 10:04:55 +09:00
parent 4fa08a1329
commit 4ff1620131
2 changed files with 85 additions and 4 deletions

View File

@ -19,7 +19,6 @@
package io.druid.common.guava;
import com.google.common.base.Function;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
@ -81,7 +80,7 @@ public class CombiningSequence<T> implements Sequence<T>
return makeYielder(baseYielder, combiningAccumulator, false);
}
public <OutType, T> Yielder<OutType> makeYielder(
public <OutType> Yielder<OutType> makeYielder(
Yielder<T> yielder,
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
boolean finalValue
@ -102,13 +101,13 @@ public class CombiningSequence<T> implements Sequence<T>
finalFinalValue = true;
if(!combiningAccumulator.yielded()) {
return Yielders.done(null, yielder);
return Yielders.done(retVal, yielder);
} else {
finalYielder = Yielders.done(null, yielder);
}
}
else {
return Yielders.done(null, yielder);
return Yielders.done(combiningAccumulator.getRetVal(), yielder);
}
}

View File

@ -20,6 +20,8 @@
package io.druid.common.guava;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -36,6 +38,7 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@ -271,4 +274,83 @@ public class CombiningSequenceTest
Assert.assertFalse(expectedVals.hasNext());
yielder.close();
}
@Test
public void testComplexSequence()
{
List<Integer> combined = Sequences.toList(getComplexSequence(), new ArrayList<Integer>());
Assert.assertEquals(8, Iterables.getOnlyElement(combined).intValue());
Yielder<Integer> yielder = getComplexSequence().toYielder(
null,
new YieldingAccumulator<Integer, Integer>()
{
@Override
public Integer accumulate(Integer accumulated, Integer in)
{
yield();
return in;
}
}
);
List<Integer> combinedByYielder = new ArrayList<>();
while (!yielder.isDone()) {
combinedByYielder.add(yielder.get());
yielder = yielder.next(null);
}
Assert.assertEquals(8, Iterables.getOnlyElement(combinedByYielder).intValue());
}
private Sequence<Integer> getComplexSequence()
{
Ordering<Integer> alwaysSame = new Ordering<Integer>()
{
@Override
public int compare(Integer left, Integer right)
{
return 0;
}
};
BinaryFn<Integer, Integer, Integer> plus = new BinaryFn<Integer, Integer, Integer>()
{
@Override
public Integer apply(Integer arg1, Integer arg2)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
return arg1 + arg2;
}
};
return CombiningSequence.create(
Sequences.concat(
ImmutableList.<Sequence<Integer>>of(
CombiningSequence.create(
Sequences.simple(ImmutableList.<Integer>of(3))
,
alwaysSame,
plus
)
,
CombiningSequence.create(
Sequences.simple(ImmutableList.of(5))
,
alwaysSame,
plus
)
)
),
alwaysSame,
plus
);
}
}