diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
index 0ef54e2063d..932802c8df2 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
@@ -49,12 +49,26 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
/**
* {@inheritDoc}
- *
The write completes when the {@link Content.Chunk} returned by {@link #read()}
- * that wraps {@code byteBuffer} is released.
+ * The write completes:
+ *
+ * - immediately with a failure when this instance is closed or already in error
+ * - successfully when the {@link Content.Chunk} returned by {@link #read()} is released
+ * - successfully just before the {@link Content.Chunk} is returned if the latter {@link Content.Chunk#hasRemaining() has no remaining byte}
+ *
*/
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
+ // Since the contract is that the callback has to be succeeded when
+ // the chunk returned by read() is released, and since it is not
+ // possible to create chunks with no remaining byte, when the byte
+ // buffer is empty we need to replace it with EOF / EMPTY and cannot
+ // be notified about the release of the latter two.
+ // This is why read() succeeds the callback if it has no remaining
+ // byte, meaning it is either EOF or EMPTY. The callback is succeeded
+ // once and only once, but that happens either during read() if the
+ // byte buffer is empty or during Chunk.release() if it contains at
+ // least one byte.
Content.Chunk chunk;
if (byteBuffer.hasRemaining())
chunk = Content.Chunk.from(byteBuffer, last, callback::succeeded);
@@ -68,6 +82,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
* write is complete.
* The callback completes:
*
+ * - immediately with a failure when this instance is closed or already in error
* - immediately with a failure when the written chunk is an instance of {@link Content.Chunk.Error}
* - successfully when the {@link Content.Chunk} returned by {@link #read()} is released
* - successfully just before the {@link Content.Chunk} is returned if the latter {@link Content.Chunk#hasRemaining() has no remaining byte}
@@ -78,6 +93,10 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
*/
public void write(Content.Chunk chunk, Callback callback)
{
+ // Non-empty, chunks have to be wrapped to bind the succeeding
+ // of the callback to the release of the chunk. Empty chunks
+ // cannot be wrapped, so the callback is succeeded in read()
+ // for them.
Content.Chunk c;
if (chunk.isTerminal())
{
@@ -99,7 +118,9 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
}
/**
- * The callback is only ever going to be succeeded if the chunk is terminal.
+ * The callback is stored to be failed in case fail() is called
+ * or succeeded if and only if the chunk is terminal, as non-terminal
+ * chunks have to bind the succeeding of the callback to their release.
*/
private void offer(Content.Chunk chunk, Callback callback)
{
diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncContentSourceTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncContentTest.java
similarity index 85%
rename from jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncContentSourceTest.java
rename to jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncContentTest.java
index 9ee07354371..854ec495880 100644
--- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncContentSourceTest.java
+++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncContentTest.java
@@ -37,16 +37,15 @@ import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class AsyncContentSourceTest
+public class AsyncContentTest
{
// TODO make an OutputStreamContentSource version of this test
- // TODO add a test to actually read some content!
-
@Test
- public void testOfferInvokesDemandCallback() throws Exception
+ public void testWriteInvokesDemandCallback() throws Exception
{
try (AsyncContent async = new AsyncContent())
{
@@ -241,6 +240,49 @@ public class AsyncContentSourceTest
callback.assertNoFailureNoSuccess();
}
+ @Test
+ public void testFailFailsCallbacks()
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ AssertingCallback callback1 = new AssertingCallback();
+ async.write(false, ByteBuffer.wrap(new byte[1]), callback1);
+ AssertingCallback callback2 = new AssertingCallback();
+ async.write(false, ByteBuffer.wrap(new byte[2]), callback2);
+ AssertingCallback callback3 = new AssertingCallback();
+ async.write(false, ByteBuffer.wrap(new byte[3]), callback3);
+
+ Content.Chunk chunk = async.read();
+ callback1.assertNoFailureNoSuccess();
+ assertThat(chunk.getByteBuffer().remaining(), is(1));
+ assertThat(chunk.release(), is(true));
+ callback1.assertNoFailureWithSuccesses(1);
+
+ Exception error1 = new Exception("test1");
+ async.fail(error1);
+
+ chunk = async.read();
+ assertSame(error1, ((Content.Chunk.Error)chunk).getCause());
+
+ callback2.assertSingleFailureSameInstanceNoSuccess(error1);
+ callback3.assertSingleFailureSameInstanceNoSuccess(error1);
+ }
+ }
+
+ @Test
+ public void testWriteAfterFailImmediatelyFailsCallback()
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ Exception error = new Exception("test1");
+ async.fail(error);
+
+ AssertingCallback callback = new AssertingCallback();
+ async.write(false, ByteBuffer.wrap(new byte[1]), callback);
+ callback.assertSingleFailureSameInstanceNoSuccess(error);
+ }
+ }
+
private static class AssertingCallback implements Callback
{
private final AtomicInteger successCounter = new AtomicInteger();