mirror of https://github.com/apache/druid.git
For the various Yielder objects, don't create new Yielders and instead mutate state. (#12475)
Co-authored-by: imply-cheddar <86940447+imply-cheddar@users.noreply.github.com>
This commit is contained in:
parent
2fe053c5cb
commit
e7e49ec9c8
|
@ -78,7 +78,23 @@ public class CombiningSequence<T> implements Sequence<T>
|
|||
final Yielder<T> baseYielder = baseSequence.toYielder(null, combiningAccumulator);
|
||||
|
||||
try {
|
||||
return makeYielder(baseYielder, combiningAccumulator, false);
|
||||
// If the yielder is already done at this point, that means that it ran through all of the inputs
|
||||
// without hitting a yield(), i.e. it's effectively just a single accumulate() call. As such we just
|
||||
// return a done yielder with the correct accumulated value.
|
||||
if (baseYielder.isDone()) {
|
||||
if (combiningAccumulator.accumulatedSomething()) {
|
||||
combiningAccumulator.accumulateLastValue();
|
||||
}
|
||||
// If we yielded, then the expectation is that we get a Yielder with the yielded value, followed by a done
|
||||
// yielder. This will happen if we fall through to the normal makeYielder. If the accumulator did not yield
|
||||
// then the code expects a single Yielder that returns whatever was left over from the accumulation on the
|
||||
// get() call.
|
||||
if (!combiningAccumulator.yielded()) {
|
||||
return Yielders.done(combiningAccumulator.getRetVal(), baseYielder);
|
||||
}
|
||||
}
|
||||
|
||||
return makeYielder(baseYielder, combiningAccumulator);
|
||||
}
|
||||
catch (Throwable t1) {
|
||||
try {
|
||||
|
@ -94,52 +110,37 @@ public class CombiningSequence<T> implements Sequence<T>
|
|||
|
||||
private <OutType> Yielder<OutType> makeYielder(
|
||||
final Yielder<T> yielder,
|
||||
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
|
||||
boolean finalValue
|
||||
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator
|
||||
)
|
||||
{
|
||||
final Yielder<T> finalYielder;
|
||||
final OutType retVal;
|
||||
final boolean finalFinalValue;
|
||||
|
||||
if (!yielder.isDone()) {
|
||||
retVal = combiningAccumulator.getRetVal();
|
||||
finalYielder = null;
|
||||
finalFinalValue = false;
|
||||
} else {
|
||||
if (!finalValue && combiningAccumulator.accumulatedSomething()) {
|
||||
combiningAccumulator.accumulateLastValue();
|
||||
retVal = combiningAccumulator.getRetVal();
|
||||
finalFinalValue = true;
|
||||
|
||||
if (!combiningAccumulator.yielded()) {
|
||||
return Yielders.done(retVal, yielder);
|
||||
} else {
|
||||
finalYielder = Yielders.done(null, yielder);
|
||||
}
|
||||
} else {
|
||||
return Yielders.done(combiningAccumulator.getRetVal(), yielder);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return new Yielder<OutType>()
|
||||
{
|
||||
private Yielder<T> myYielder = yielder;
|
||||
private CombiningYieldingAccumulator<OutType, T> accum = combiningAccumulator;
|
||||
|
||||
@Override
|
||||
public OutType get()
|
||||
{
|
||||
return retVal;
|
||||
return accum.getRetVal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Yielder<OutType> next(OutType initValue)
|
||||
{
|
||||
combiningAccumulator.reset();
|
||||
return makeYielder(
|
||||
finalYielder == null ? yielder.next(yielder.get()) : finalYielder,
|
||||
combiningAccumulator,
|
||||
finalFinalValue
|
||||
);
|
||||
accum.reset();
|
||||
if (myYielder.isDone()) {
|
||||
return Yielders.done(null, myYielder);
|
||||
}
|
||||
|
||||
myYielder = myYielder.next(myYielder.get());
|
||||
if (myYielder.isDone() && accum.accumulatedSomething()) {
|
||||
accum.accumulateLastValue();
|
||||
if (!accum.yielded()) {
|
||||
return Yielders.done(accum.getRetVal(), myYielder);
|
||||
}
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -151,7 +152,7 @@ public class CombiningSequence<T> implements Sequence<T>
|
|||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
yielder.close();
|
||||
myYielder.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -66,7 +66,19 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
|
|||
final IterType iterator = maker.make();
|
||||
|
||||
try {
|
||||
return makeYielder(initValue, accumulator, iterator);
|
||||
OutType retVal = initValue;
|
||||
while (!accumulator.yielded() && iterator.hasNext()) {
|
||||
retVal = accumulator.accumulate(retVal, iterator.next());
|
||||
}
|
||||
|
||||
if (!accumulator.yielded()) {
|
||||
return Yielders.done(
|
||||
retVal,
|
||||
(Closeable) () -> maker.cleanup(iterator)
|
||||
);
|
||||
}
|
||||
|
||||
return makeYielder(retVal, accumulator, iterator);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
try {
|
||||
|
@ -80,47 +92,34 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
|
|||
}
|
||||
|
||||
private <OutType> Yielder<OutType> makeYielder(
|
||||
final OutType initValue,
|
||||
final OutType retValue,
|
||||
final YieldingAccumulator<OutType, T> accumulator,
|
||||
final IterType iter
|
||||
)
|
||||
{
|
||||
OutType retVal = initValue;
|
||||
while (!accumulator.yielded() && iter.hasNext()) {
|
||||
retVal = accumulator.accumulate(retVal, iter.next());
|
||||
}
|
||||
|
||||
if (!accumulator.yielded()) {
|
||||
return Yielders.done(
|
||||
retVal,
|
||||
(Closeable) () -> maker.cleanup(iter)
|
||||
);
|
||||
}
|
||||
|
||||
final OutType finalRetVal = retVal;
|
||||
return new Yielder<OutType>()
|
||||
{
|
||||
OutType retVal = retValue;
|
||||
|
||||
@Override
|
||||
public OutType get()
|
||||
{
|
||||
return finalRetVal;
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Yielder<OutType> next(OutType initValue)
|
||||
{
|
||||
accumulator.reset();
|
||||
try {
|
||||
return makeYielder(initValue, accumulator, iter);
|
||||
retVal = initValue;
|
||||
while (!accumulator.yielded() && iter.hasNext()) {
|
||||
retVal = accumulator.accumulate(retVal, iter.next());
|
||||
}
|
||||
catch (Throwable t) {
|
||||
try {
|
||||
maker.cleanup(iter);
|
||||
}
|
||||
catch (Exception e) {
|
||||
t.addSuppressed(e);
|
||||
}
|
||||
throw t;
|
||||
|
||||
if (accumulator.yielded()) {
|
||||
return this;
|
||||
} else {
|
||||
return Yielders.done(retVal, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.io.IOException;
|
|||
|
||||
final class WrappingYielder<OutType> implements Yielder<OutType>
|
||||
{
|
||||
private final Yielder<OutType> baseYielder;
|
||||
private Yielder<OutType> baseYielder;
|
||||
private final SequenceWrapper wrapper;
|
||||
|
||||
WrappingYielder(Yielder<OutType> baseYielder, SequenceWrapper wrapper)
|
||||
|
@ -50,7 +50,8 @@ final class WrappingYielder<OutType> implements Yielder<OutType>
|
|||
@Override
|
||||
public Yielder<OutType> get()
|
||||
{
|
||||
return new WrappingYielder<>(baseYielder.next(initValue), wrapper);
|
||||
baseYielder = baseYielder.next(initValue);
|
||||
return WrappingYielder.this;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -26,9 +26,13 @@ import java.io.Closeable;
|
|||
* necessarily good at this job, but it works. I think.
|
||||
*
|
||||
* Essentially, you can think of a Yielder as a linked list of items where the Yielder gives you access to the current
|
||||
* head via get() and it will give you another Yielder representing the next item in the chain via next(). A Yielder
|
||||
* that isDone() may return anything from both get() and next(), there is no contract and depending on those return
|
||||
* values will likely lead to bugs.
|
||||
* head via get() and it will give you another Yielder representing the next item in the chain via next(). When using
|
||||
* a yielder object, a call to yield() on the yielding accumulator will result in a new Yielder being returned whose
|
||||
* get() method will return the return value of the accumulator from the call that called yield().
|
||||
*
|
||||
* When a call to next() exhausts the underlying data stream without having a yield() call, various implementations
|
||||
* of Sequences and Yielders assume that they will receive a Yielder where isDone() is true and get() will return the
|
||||
* accumulated value up until that point.
|
||||
*
|
||||
* Once next is called, there is no guarantee and no requirement that references to old Yielder objects will continue
|
||||
* to obey the contract.
|
||||
|
@ -60,9 +64,8 @@ public interface Yielder<T> extends Closeable
|
|||
Yielder<T> next(T initValue);
|
||||
|
||||
/**
|
||||
* Returns true if this is the last Yielder in the chain. A Yielder that isDone() may return anything
|
||||
* from both get() and next(), there is no contract and depending on those return values will likely lead to bugs.
|
||||
* It will probably break your code to call next() on a Yielder that is done and expect something good from it.
|
||||
* Returns true if this is the last Yielder in the chain. Review the class level javadoc for an understanding
|
||||
* of the contract for other methods when isDone() is true.
|
||||
*
|
||||
* Once next() is called on this Yielder object, all further operations on this object are undefined.
|
||||
*
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.collect.Iterators;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.guava.ExplodingSequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
|
@ -254,6 +255,8 @@ public class CombiningSequenceTest
|
|||
int limit
|
||||
) throws Exception
|
||||
{
|
||||
final String prefix = StringUtils.format("yieldEvery[%d], limit[%d]", yieldEvery, limit);
|
||||
|
||||
// Test that closing works too
|
||||
final CountDownLatch closed = new CountDownLatch(1);
|
||||
final Closeable closeable = closed::countDown;
|
||||
|
@ -276,7 +279,7 @@ public class CombiningSequenceTest
|
|||
|
||||
List<Pair<Integer, Integer>> merged = seq.toList();
|
||||
|
||||
Assert.assertEquals(expected, merged);
|
||||
Assert.assertEquals(prefix, expected, merged);
|
||||
|
||||
Yielder<Pair<Integer, Integer>> yielder = seq.toYielder(
|
||||
null,
|
||||
|
@ -318,16 +321,17 @@ public class CombiningSequenceTest
|
|||
}
|
||||
);
|
||||
|
||||
int i = 0;
|
||||
if (expectedVals.hasNext()) {
|
||||
while (!yielder.isDone()) {
|
||||
final Pair<Integer, Integer> expectedVal = expectedVals.next();
|
||||
final Pair<Integer, Integer> actual = yielder.get();
|
||||
Assert.assertEquals(expectedVal, actual);
|
||||
Assert.assertEquals(StringUtils.format("%s, i[%s]", prefix, i++), expectedVal, actual);
|
||||
yielder = yielder.next(actual);
|
||||
}
|
||||
}
|
||||
Assert.assertTrue(yielder.isDone());
|
||||
Assert.assertFalse(expectedVals.hasNext());
|
||||
Assert.assertTrue(prefix, yielder.isDone());
|
||||
Assert.assertFalse(prefix, expectedVals.hasNext());
|
||||
yielder.close();
|
||||
|
||||
Assert.assertTrue("resource closed", closed.await(10000, TimeUnit.MILLISECONDS));
|
||||
|
|
Loading…
Reference in New Issue