From e7e49ec9c857641efe1548f438e2d203248de7ea Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 27 Apr 2022 10:52:20 -0700 Subject: [PATCH] For the various Yielder objects, don't create new Yielders and instead mutate state. (#12475) Co-authored-by: imply-cheddar <86940447+imply-cheddar@users.noreply.github.com> --- .../druid/common/guava/CombiningSequence.java | 73 ++++++++++--------- .../java/util/common/guava/BaseSequence.java | 51 +++++++------ .../util/common/guava/WrappingYielder.java | 5 +- .../druid/java/util/common/guava/Yielder.java | 15 ++-- .../common/guava/CombiningSequenceTest.java | 12 ++- 5 files changed, 82 insertions(+), 74 deletions(-) diff --git a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java index b779fc29d35..de33963ff42 100644 --- a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java +++ b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java @@ -78,7 +78,23 @@ public class CombiningSequence implements Sequence final Yielder baseYielder = baseSequence.toYielder(null, combiningAccumulator); try { - return makeYielder(baseYielder, combiningAccumulator, false); + // If the yielder is already done at this point, that means that it ran through all of the inputs + // without hitting a yield(), i.e. it's effectively just a single accumulate() call. As such we just + // return a done yielder with the correct accumulated value. + if (baseYielder.isDone()) { + if (combiningAccumulator.accumulatedSomething()) { + combiningAccumulator.accumulateLastValue(); + } + // If we yielded, then the expectation is that we get a Yielder with the yielded value, followed by a done + // yielder. This will happen if we fall through to the normal makeYielder. If the accumulator did not yield + // then the code expects a single Yielder that returns whatever was left over from the accumulation on the + // get() call. + if (!combiningAccumulator.yielded()) { + return Yielders.done(combiningAccumulator.getRetVal(), baseYielder); + } + } + + return makeYielder(baseYielder, combiningAccumulator); } catch (Throwable t1) { try { @@ -94,52 +110,37 @@ public class CombiningSequence implements Sequence private Yielder makeYielder( final Yielder yielder, - final CombiningYieldingAccumulator combiningAccumulator, - boolean finalValue + final CombiningYieldingAccumulator combiningAccumulator ) { - final Yielder finalYielder; - final OutType retVal; - final boolean finalFinalValue; - - if (!yielder.isDone()) { - retVal = combiningAccumulator.getRetVal(); - finalYielder = null; - finalFinalValue = false; - } else { - if (!finalValue && combiningAccumulator.accumulatedSomething()) { - combiningAccumulator.accumulateLastValue(); - retVal = combiningAccumulator.getRetVal(); - finalFinalValue = true; - - if (!combiningAccumulator.yielded()) { - return Yielders.done(retVal, yielder); - } else { - finalYielder = Yielders.done(null, yielder); - } - } else { - return Yielders.done(combiningAccumulator.getRetVal(), yielder); - } - } - - return new Yielder() { + private Yielder myYielder = yielder; + private CombiningYieldingAccumulator accum = combiningAccumulator; + @Override public OutType get() { - return retVal; + return accum.getRetVal(); } @Override public Yielder next(OutType initValue) { - combiningAccumulator.reset(); - return makeYielder( - finalYielder == null ? yielder.next(yielder.get()) : finalYielder, - combiningAccumulator, - finalFinalValue - ); + accum.reset(); + if (myYielder.isDone()) { + return Yielders.done(null, myYielder); + } + + myYielder = myYielder.next(myYielder.get()); + if (myYielder.isDone() && accum.accumulatedSomething()) { + accum.accumulateLastValue(); + if (!accum.yielded()) { + return Yielders.done(accum.getRetVal(), myYielder); + } + } + + return this; } @Override @@ -151,7 +152,7 @@ public class CombiningSequence implements Sequence @Override public void close() throws IOException { - yielder.close(); + myYielder.close(); } }; } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java index 2ef6f430773..bb849fa6598 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java @@ -66,7 +66,19 @@ public class BaseSequence> implements Sequence maker.cleanup(iterator) + ); + } + + return makeYielder(retVal, accumulator, iterator); } catch (Throwable t) { try { @@ -80,47 +92,34 @@ public class BaseSequence> implements Sequence Yielder makeYielder( - final OutType initValue, + final OutType retValue, final YieldingAccumulator accumulator, final IterType iter ) { - OutType retVal = initValue; - while (!accumulator.yielded() && iter.hasNext()) { - retVal = accumulator.accumulate(retVal, iter.next()); - } - - if (!accumulator.yielded()) { - return Yielders.done( - retVal, - (Closeable) () -> maker.cleanup(iter) - ); - } - - final OutType finalRetVal = retVal; return new Yielder() { + OutType retVal = retValue; + @Override public OutType get() { - return finalRetVal; + return retVal; } @Override public Yielder next(OutType initValue) { accumulator.reset(); - try { - return makeYielder(initValue, accumulator, iter); + retVal = initValue; + while (!accumulator.yielded() && iter.hasNext()) { + retVal = accumulator.accumulate(retVal, iter.next()); } - catch (Throwable t) { - try { - maker.cleanup(iter); - } - catch (Exception e) { - t.addSuppressed(e); - } - throw t; + + if (accumulator.yielded()) { + return this; + } else { + return Yielders.done(retVal, this); } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java b/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java index bb80fb23728..fe52ce7fbb3 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java @@ -26,7 +26,7 @@ import java.io.IOException; final class WrappingYielder implements Yielder { - private final Yielder baseYielder; + private Yielder baseYielder; private final SequenceWrapper wrapper; WrappingYielder(Yielder baseYielder, SequenceWrapper wrapper) @@ -50,7 +50,8 @@ final class WrappingYielder implements Yielder @Override public Yielder get() { - return new WrappingYielder<>(baseYielder.next(initValue), wrapper); + baseYielder = baseYielder.next(initValue); + return WrappingYielder.this; } }); } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java index 6a684cdd0c8..b51ff5423a0 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java @@ -26,9 +26,13 @@ import java.io.Closeable; * necessarily good at this job, but it works. I think. * * Essentially, you can think of a Yielder as a linked list of items where the Yielder gives you access to the current - * head via get() and it will give you another Yielder representing the next item in the chain via next(). A Yielder - * that isDone() may return anything from both get() and next(), there is no contract and depending on those return - * values will likely lead to bugs. + * head via get() and it will give you another Yielder representing the next item in the chain via next(). When using + * a yielder object, a call to yield() on the yielding accumulator will result in a new Yielder being returned whose + * get() method will return the return value of the accumulator from the call that called yield(). + * + * When a call to next() exhausts the underlying data stream without having a yield() call, various implementations + * of Sequences and Yielders assume that they will receive a Yielder where isDone() is true and get() will return the + * accumulated value up until that point. * * Once next is called, there is no guarantee and no requirement that references to old Yielder objects will continue * to obey the contract. @@ -60,9 +64,8 @@ public interface Yielder extends Closeable Yielder next(T initValue); /** - * Returns true if this is the last Yielder in the chain. A Yielder that isDone() may return anything - * from both get() and next(), there is no contract and depending on those return values will likely lead to bugs. - * It will probably break your code to call next() on a Yielder that is done and expect something good from it. + * Returns true if this is the last Yielder in the chain. Review the class level javadoc for an understanding + * of the contract for other methods when isDone() is true. * * Once next() is called on this Yielder object, all further operations on this object are undefined. * diff --git a/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java b/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java index b8872f08d38..7c97e364828 100644 --- a/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java +++ b/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.ExplodingSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -254,6 +255,8 @@ public class CombiningSequenceTest int limit ) throws Exception { + final String prefix = StringUtils.format("yieldEvery[%d], limit[%d]", yieldEvery, limit); + // Test that closing works too final CountDownLatch closed = new CountDownLatch(1); final Closeable closeable = closed::countDown; @@ -276,7 +279,7 @@ public class CombiningSequenceTest List> merged = seq.toList(); - Assert.assertEquals(expected, merged); + Assert.assertEquals(prefix, expected, merged); Yielder> yielder = seq.toYielder( null, @@ -318,16 +321,17 @@ public class CombiningSequenceTest } ); + int i = 0; if (expectedVals.hasNext()) { while (!yielder.isDone()) { final Pair expectedVal = expectedVals.next(); final Pair actual = yielder.get(); - Assert.assertEquals(expectedVal, actual); + Assert.assertEquals(StringUtils.format("%s, i[%s]", prefix, i++), expectedVal, actual); yielder = yielder.next(actual); } } - Assert.assertTrue(yielder.isDone()); - Assert.assertFalse(expectedVals.hasNext()); + Assert.assertTrue(prefix, yielder.isDone()); + Assert.assertFalse(prefix, expectedVals.hasNext()); yielder.close(); Assert.assertTrue("resource closed", closed.await(10000, TimeUnit.MILLISECONDS));