fix ContentSourceTransformer demand() to serialize callback invocations and delegate demand to the raw source only when needed

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2022-10-21 15:41:12 +02:00
parent 38289e46d4
commit 502cc77da9
2 changed files with 61 additions and 6 deletions

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.io.content;
import java.util.Objects;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.thread.SerializedInvoker;
/**
* <p>This abstract {@link Content.Source} wraps another {@link Content.Source} and implementers need only
@ -28,11 +29,12 @@ import org.eclipse.jetty.io.Content;
*/
public abstract class ContentSourceTransformer implements Content.Source
{
private final SerializedInvoker invoker = new SerializedInvoker();
private final Content.Source rawSource;
private Content.Chunk rawChunk;
private Content.Chunk transformedChunk;
private boolean needsRawRead;
private Runnable demandCallback;
private volatile boolean needsRawRead;
private volatile Runnable demandCallback;
public ContentSourceTransformer(Content.Source rawSource)
{
@ -79,7 +81,10 @@ public abstract class ContentSourceTransformer implements Content.Source
public void demand(Runnable demandCallback)
{
this.demandCallback = Objects.requireNonNull(demandCallback);
rawSource.demand(this::onRawAvailable);
if (needsRawRead)
rawSource.demand(() -> invoker.run(this::invokeDemandCallback));
else
invoker.run(this::invokeDemandCallback);
}
@Override
@ -88,11 +93,12 @@ public abstract class ContentSourceTransformer implements Content.Source
rawSource.fail(failure);
}
private void onRawAvailable()
private void invokeDemandCallback()
{
Runnable demandCallback = this.demandCallback;
this.demandCallback = null;
runDemandCallback(demandCallback);
if (demandCallback != null)
runDemandCallback(demandCallback);
}
private void runDemandCallback(Runnable demandCallback)

View File

@ -30,6 +30,8 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -138,7 +140,54 @@ public class ContentSourceTransformerTest
}
});
assertTrue(expected.isEmpty());
assertThat(expected, empty());
}
@Test
public void testDemandFirstWithoutLoopStallAfterTwoExpectedChunks()
{
AsyncContent source = new AsyncContent();
source.write(Content.Chunk.from(UTF_8.encode("ONE NOOP two"), false), Callback.NOOP);
WordSplitLowCaseTransformer transformer = new WordSplitLowCaseTransformer(source);
AtomicBoolean reEnter = new AtomicBoolean();
Queue<String> expected = new ArrayDeque<>(List.of("one", "two"));
transformer.demand(new Runnable()
{
@Override
public void run()
{
if (!reEnter.compareAndSet(false, true))
throw new IllegalStateException();
Content.Chunk chunk = transformer.read();
if (chunk != null)
assertEquals(expected.poll(), UTF_8.decode(chunk.getByteBuffer()).toString());
if (chunk == null || !chunk.isLast())
transformer.demand(this);
if (!reEnter.compareAndSet(true, false))
throw new IllegalStateException();
}
});
assertThat(expected, empty());
expected.offer("three");
source.write(Content.Chunk.from(UTF_8.encode("three"), true), Callback.NOOP);
assertThat(expected, empty());
expected.offer("EOF");
transformer.demand(() ->
{
Content.Chunk chunk = transformer.read();
assertTrue(chunk.isLast());
assertFalse(chunk.hasRemaining());
expected.poll();
});
assertThat(expected, empty());
}
@Test