diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java index f1e98eee312..9fefc147a08 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.io.content; import java.util.Objects; import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.util.thread.SerializedInvoker; /** *

This abstract {@link Content.Source} wraps another {@link Content.Source} and implementers need only @@ -28,11 +29,12 @@ import org.eclipse.jetty.io.Content; */ public abstract class ContentSourceTransformer implements Content.Source { + private final SerializedInvoker invoker = new SerializedInvoker(); private final Content.Source rawSource; private Content.Chunk rawChunk; private Content.Chunk transformedChunk; - private boolean needsRawRead; - private Runnable demandCallback; + private volatile boolean needsRawRead; + private volatile Runnable demandCallback; public ContentSourceTransformer(Content.Source rawSource) { @@ -79,7 +81,10 @@ public abstract class ContentSourceTransformer implements Content.Source public void demand(Runnable demandCallback) { this.demandCallback = Objects.requireNonNull(demandCallback); - rawSource.demand(this::onRawAvailable); + if (needsRawRead) + rawSource.demand(() -> invoker.run(this::invokeDemandCallback)); + else + invoker.run(this::invokeDemandCallback); } @Override @@ -88,11 +93,12 @@ public abstract class ContentSourceTransformer implements Content.Source rawSource.fail(failure); } - private void onRawAvailable() + private void invokeDemandCallback() { Runnable demandCallback = this.demandCallback; this.demandCallback = null; - runDemandCallback(demandCallback); + if (demandCallback != null) + runDemandCallback(demandCallback); } private void runDemandCallback(Runnable demandCallback) diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTransformerTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTransformerTest.java index a2d73229cdc..a09a0e53dd4 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTransformerTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTransformerTest.java @@ -30,6 +30,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -138,7 +140,54 @@ public class ContentSourceTransformerTest } }); - assertTrue(expected.isEmpty()); + assertThat(expected, empty()); + } + + @Test + public void testDemandFirstWithoutLoopStallAfterTwoExpectedChunks() + { + AsyncContent source = new AsyncContent(); + source.write(Content.Chunk.from(UTF_8.encode("ONE NOOP two"), false), Callback.NOOP); + WordSplitLowCaseTransformer transformer = new WordSplitLowCaseTransformer(source); + + AtomicBoolean reEnter = new AtomicBoolean(); + Queue expected = new ArrayDeque<>(List.of("one", "two")); + transformer.demand(new Runnable() + { + @Override + public void run() + { + if (!reEnter.compareAndSet(false, true)) + throw new IllegalStateException(); + + Content.Chunk chunk = transformer.read(); + if (chunk != null) + assertEquals(expected.poll(), UTF_8.decode(chunk.getByteBuffer()).toString()); + + if (chunk == null || !chunk.isLast()) + transformer.demand(this); + + if (!reEnter.compareAndSet(true, false)) + throw new IllegalStateException(); + } + }); + + assertThat(expected, empty()); + + expected.offer("three"); + source.write(Content.Chunk.from(UTF_8.encode("three"), true), Callback.NOOP); + assertThat(expected, empty()); + + expected.offer("EOF"); + transformer.demand(() -> + { + Content.Chunk chunk = transformer.read(); + assertTrue(chunk.isLast()); + assertFalse(chunk.hasRemaining()); + expected.poll(); + }); + + assertThat(expected, empty()); } @Test