improve javadoc and testing + generously comment the surprising implementation

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2022-12-08 15:32:31 +01:00
parent d3a0aa281b
commit 150cad510f
2 changed files with 70 additions and 7 deletions

View File

@ -49,12 +49,26 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
/**
* {@inheritDoc}
* <p>The write completes when the {@link Content.Chunk} returned by {@link #read()}
* that wraps {@code byteBuffer} is released.</p>
* <p>The write completes:</p>
* <ul>
* <li>immediately with a failure when this instance is closed or already in error</li>
* <li>successfully when the {@link Content.Chunk} returned by {@link #read()} is released</li>
* <li>successfully just before the {@link Content.Chunk} is returned if the latter {@link Content.Chunk#hasRemaining() has no remaining byte}</li>
* </ul>
*/
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
// Since the contract is that the callback has to be succeeded when
// the chunk returned by read() is released, and since it is not
// possible to create chunks with no remaining byte, when the byte
// buffer is empty we need to replace it with EOF / EMPTY and cannot
// be notified about the release of the latter two.
// This is why read() succeeds the callback if it has no remaining
// byte, meaning it is either EOF or EMPTY. The callback is succeeded
// once and only once, but that happens either during read() if the
// byte buffer is empty or during Chunk.release() if it contains at
// least one byte.
Content.Chunk chunk;
if (byteBuffer.hasRemaining())
chunk = Content.Chunk.from(byteBuffer, last, callback::succeeded);
@ -68,6 +82,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
* write is complete.</p>
* <p>The callback completes:</p>
* <ul>
* <li>immediately with a failure when this instance is closed or already in error</li>
* <li>immediately with a failure when the written chunk is an instance of {@link Content.Chunk.Error}</li>
* <li>successfully when the {@link Content.Chunk} returned by {@link #read()} is released</li>
* <li>successfully just before the {@link Content.Chunk} is returned if the latter {@link Content.Chunk#hasRemaining() has no remaining byte}</li>
@ -78,6 +93,10 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
*/
public void write(Content.Chunk chunk, Callback callback)
{
// Non-empty, chunks have to be wrapped to bind the succeeding
// of the callback to the release of the chunk. Empty chunks
// cannot be wrapped, so the callback is succeeded in read()
// for them.
Content.Chunk c;
if (chunk.isTerminal())
{
@ -99,7 +118,9 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
}
/**
* The callback is only ever going to be succeeded if the chunk is terminal.
* The callback is stored to be failed in case fail() is called
* or succeeded if and only if the chunk is terminal, as non-terminal
* chunks have to bind the succeeding of the callback to their release.
*/
private void offer(Content.Chunk chunk, Callback callback)
{

View File

@ -37,16 +37,15 @@ import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AsyncContentSourceTest
public class AsyncContentTest
{
// TODO make an OutputStreamContentSource version of this test
// TODO add a test to actually read some content!
@Test
public void testOfferInvokesDemandCallback() throws Exception
public void testWriteInvokesDemandCallback() throws Exception
{
try (AsyncContent async = new AsyncContent())
{
@ -241,6 +240,49 @@ public class AsyncContentSourceTest
callback.assertNoFailureNoSuccess();
}
@Test
public void testFailFailsCallbacks()
{
try (AsyncContent async = new AsyncContent())
{
AssertingCallback callback1 = new AssertingCallback();
async.write(false, ByteBuffer.wrap(new byte[1]), callback1);
AssertingCallback callback2 = new AssertingCallback();
async.write(false, ByteBuffer.wrap(new byte[2]), callback2);
AssertingCallback callback3 = new AssertingCallback();
async.write(false, ByteBuffer.wrap(new byte[3]), callback3);
Content.Chunk chunk = async.read();
callback1.assertNoFailureNoSuccess();
assertThat(chunk.getByteBuffer().remaining(), is(1));
assertThat(chunk.release(), is(true));
callback1.assertNoFailureWithSuccesses(1);
Exception error1 = new Exception("test1");
async.fail(error1);
chunk = async.read();
assertSame(error1, ((Content.Chunk.Error)chunk).getCause());
callback2.assertSingleFailureSameInstanceNoSuccess(error1);
callback3.assertSingleFailureSameInstanceNoSuccess(error1);
}
}
@Test
public void testWriteAfterFailImmediatelyFailsCallback()
{
try (AsyncContent async = new AsyncContent())
{
Exception error = new Exception("test1");
async.fail(error);
AssertingCallback callback = new AssertingCallback();
async.write(false, ByteBuffer.wrap(new byte[1]), callback);
callback.assertSingleFailureSameInstanceNoSuccess(error);
}
}
private static class AssertingCallback implements Callback
{
private final AtomicInteger successCounter = new AtomicInteger();