#8993: Disallow creating new chunks with an empty ByteBuffer to make Chunk.isTerminal() contract stricter

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2022-12-01 15:58:09 +01:00
parent 45d0b68850
commit 42186bff18
10 changed files with 428 additions and 85 deletions

View File

@ -354,6 +354,38 @@ public class ResponseNotifier
private class ContentSource implements Content.Source
{
private static final Content.Chunk ALREADY_READ_CHUNK = new Content.Chunk()
{
@Override
public ByteBuffer getByteBuffer()
{
throw new UnsupportedOperationException();
}
@Override
public boolean isLast()
{
throw new UnsupportedOperationException();
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
throw new UnsupportedOperationException();
}
@Override
public String toString()
{
return "ALREADY_READ_CHUNK";
}
};
private final int index;
private final AtomicReference<Runnable> demandCallbackRef = new AtomicReference<>();
private volatile Content.Chunk chunk;
@ -368,8 +400,8 @@ public class ResponseNotifier
Content.Chunk currentChunk = this.chunk;
if (LOG.isDebugEnabled())
LOG.debug("Registering content in multiplexed content source #{} that contains {}", index, currentChunk);
if (currentChunk == null || currentChunk == AlreadyReadChunk.INSTANCE)
this.chunk = Content.Chunk.slice(chunk);
if (currentChunk == null || currentChunk == ALREADY_READ_CHUNK)
this.chunk = chunk.slice();
else if (!currentChunk.isLast())
throw new IllegalStateException("Cannot overwrite chunk");
onDemandCallback();
@ -396,7 +428,7 @@ public class ResponseNotifier
@Override
public Content.Chunk read()
{
if (chunk == AlreadyReadChunk.INSTANCE)
if (chunk == ALREADY_READ_CHUNK)
{
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} already read current chunk", index);
@ -405,7 +437,7 @@ public class ResponseNotifier
Content.Chunk result = chunk;
if (result != null && !result.isTerminal())
chunk = AlreadyReadChunk.INSTANCE;
chunk = ALREADY_READ_CHUNK;
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} reading current chunk {}", index, result);
return result;
@ -419,7 +451,7 @@ public class ResponseNotifier
Content.Chunk currentChunk = this.chunk;
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} demand while current chunk is {}", index, currentChunk);
if (currentChunk == null || currentChunk == AlreadyReadChunk.INSTANCE)
if (currentChunk == null || currentChunk == ALREADY_READ_CHUNK)
registerDemand();
else
onDemandCallback();
@ -440,44 +472,5 @@ public class ResponseNotifier
registerFailure(failure);
}
}
private static final class AlreadyReadChunk implements Content.Chunk
{
static final AlreadyReadChunk INSTANCE = new AlreadyReadChunk();
private AlreadyReadChunk()
{
}
@Override
public ByteBuffer getByteBuffer()
{
throw new UnsupportedOperationException();
}
@Override
public boolean isLast()
{
throw new UnsupportedOperationException();
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
throw new UnsupportedOperationException();
}
@Override
public String toString()
{
return getClass().getSimpleName();
}
}
}
}

View File

@ -216,7 +216,7 @@ public class MultiPartFormDataTest
public void testNoBody() throws Exception
{
MultiPartFormData formData = new MultiPartFormData("boundary");
formData.parse(Content.Chunk.from(ByteBuffer.allocate(0), true));
formData.parse(Content.Chunk.EOF);
formData.handle((parts, failure) ->
{

View File

@ -58,7 +58,7 @@ public class MultiPartTest
assertEquals(0, listener.events.size());
parser.parse(Content.Chunk.from(BufferUtil.EMPTY_BUFFER, true));
parser.parse(Content.Chunk.EOF);
assertEquals(1, listener.events.size());
assertEquals("complete", listener.events.poll());

View File

@ -425,8 +425,16 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (frame.isLast())
shutdownInput();
networkBuffer.retain();
StreamData data = new StreamData(frame, networkBuffer);
Stream.Data data;
if (!frame.getByteBuffer().hasRemaining() && frame.isLast())
{
data = Stream.Data.EOF;
}
else
{
networkBuffer.retain();
data = new StreamData(frame, networkBuffer);
}
Runnable existing = action.getAndSet(() ->
{

View File

@ -444,7 +444,9 @@ public class Content
*/
static Chunk from(ByteBuffer byteBuffer, boolean last)
{
return new ByteBufferChunk.WithReferenceCount(byteBuffer, last);
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.WithReferenceCount(byteBuffer, last);
return last ? EOF : EMPTY;
}
/**
@ -453,11 +455,15 @@ public class Content
*
* @param byteBuffer the ByteBuffer with the bytes of this Chunk
* @param last whether the Chunk is the last one
* @param releaser the code to run when this Chunk is released
* @return a new Chunk
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Runnable releaser)
{
return new ByteBufferChunk.ReleasedByRunnable(byteBuffer, last, Objects.requireNonNull(releaser));
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.ReleasedByRunnable(byteBuffer, last, Objects.requireNonNull(releaser));
releaser.run();
return last ? EOF : EMPTY;
}
/**
@ -471,7 +477,10 @@ public class Content
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Consumer<ByteBuffer> releaser)
{
return new ByteBufferChunk.ReleasedByConsumer(byteBuffer, last, Objects.requireNonNull(releaser));
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.ReleasedByConsumer(byteBuffer, last, Objects.requireNonNull(releaser));
releaser.accept(byteBuffer);
return last ? EOF : EMPTY;
}
/**
@ -486,7 +495,10 @@ public class Content
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Retainable retainable)
{
return new ByteBufferChunk.WithRetainable(byteBuffer, last, Objects.requireNonNull(retainable));
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.WithRetainable(byteBuffer, last, Objects.requireNonNull(retainable));
retainable.release();
return last ? EOF : EMPTY;
}
/**
@ -539,28 +551,6 @@ public class Content
return null;
}
/**
* <p>Creates a chunk that is a slice of the given chunk.</p>
* <p>
* The chunk slice has a byte buffer that is a slice of the original chunk's byte buffer, the last flag
* copied over and the original chunk used as the retainable of the chunk slice.
* Note that after this method returns, an extra Chunk instance refers to the same byte buffer,
* so the original chunk is retained and is used as the {@link Retainable} for the returned chunk.
* </p>
* <p>Passing a null chunk returns null.</p>
* <p>Passing a terminal chunk returns it.</p>
*
* @param chunk the chunk to slice.
* @return the chunk slice.
*/
static Chunk slice(Chunk chunk)
{
if (chunk == null || chunk.isTerminal())
return chunk;
chunk.retain();
return Chunk.from(chunk.getByteBuffer().slice(), chunk.isLast(), chunk);
}
/**
* @return the ByteBuffer of this Chunk
*/
@ -571,9 +561,38 @@ public class Content
*/
boolean isLast();
/**
* <p>Returns a new {@code Chunk} whose {@code ByteBuffer} is a slice of the
* {@code ByteBuffer} of the source {@code Chunk} unless the source
* {@link #hasRemaining() has no remaining byte} in which case:</p>
* <ul>
* <li>{@code this} is returned if it is an instance of {@link Error}</li>
* <li>{@link #EOF} is returned if {@link #isLast()} is {@code true}</li>
* <li>{@link #EMPTY} is returned if {@link #isLast()} is {@code false}</li>
* </ul>
* <p>If the source has remaining bytes, the returned {@code Chunk} retains
* the source {@code Chunk} and it is linked to it via
* {@link #from(ByteBuffer, boolean, Retainable)}.</p>
*
* @return a new {@code Chunk} retained from the source {@code Chunk} with a slice
* of the source {@code Chunk}'s {@code ByteBuffer}
*/
default Chunk slice()
{
if (isTerminal())
return this;
if (!hasRemaining())
return EMPTY;
retain();
return from(getByteBuffer().slice(), isLast(), this);
}
/**
* <p>Returns a new {@code Chunk} whose {@code ByteBuffer} is a slice, with the given
* position and limit, of the {@code ByteBuffer} of the source {@code Chunk}.</p>
* position and limit, of the {@code ByteBuffer} of the source {@code Chunk} unless the
* source is {@link #isTerminal() terminal} in which case {@code this} is returned, or
* if {@code position == limit} in which case {@link #EOF} or {@link #EMPTY} is
* returned depending on the value of {@code last}.</p>
* <p>The returned {@code Chunk} retains the source {@code Chunk} and it is linked
* to it via {@link #from(ByteBuffer, boolean, Retainable)}.</p>
*
@ -587,6 +606,8 @@ public class Content
{
if (isTerminal())
return this;
if (position == limit)
return last ? EOF : EMPTY;
ByteBuffer sourceBuffer = getByteBuffer();
int sourceLimit = sourceBuffer.limit();
sourceBuffer.limit(limit);
@ -649,20 +670,27 @@ public class Content
/**
* <p>Returns whether this Chunk is a <em>terminal</em> chunk.</p>
* <p>A terminal chunk is either an {@link Error error chunk},
* or a Chunk that {@link #isLast()} is true and has no remaining
* bytes.</p>
* <p>A terminal chunk is a Chunk that {@link #isLast()} is true
* and has no remaining bytes.</p>
* <p><em>Terminal</em> chunks cannot be lifecycled using the
* {@link Retainable} contract like other chunks: they always throw
* {@link UnsupportedOperationException} on {@link #retain()} and
* always return {@code true} on {@link #release()}. As such, they
* cannot contain a recyclable buffer and calling their
* {@link #release()} method isn't necessary, although harmless.
* </p>
*
* @return whether this Chunk is a terminal chunk
*/
default boolean isTerminal()
{
return this instanceof Error || isLast() && !hasRemaining();
return isLast() && !hasRemaining();
}
/**
* <p>A chunk that wraps a failure.</p>
* <p>Error Chunks are always last and have no bytes to read.</p>
* <p>Error Chunks are always last and have no bytes to read,
* as such they are <em>terminal</em> Chunks.</p>
*
* @see #from(Throwable)
*/

View File

@ -47,13 +47,61 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
private Runnable demandCallback;
private long length = UNDETERMINED_LENGTH;
/**
* {@inheritDoc}
* <p>The write completes when the {@link Content.Chunk} returned by {@link #read()}
* that wraps {@code byteBuffer} is released.</p>
*/
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
write(Content.Chunk.from(byteBuffer, last, callback::succeeded), callback);
Content.Chunk chunk;
if (byteBuffer.hasRemaining())
chunk = Content.Chunk.from(byteBuffer, last, callback::succeeded);
else
chunk = last ? Content.Chunk.EOF : Content.Chunk.EMPTY;
offer(chunk, callback);
}
/**
* <p>Writes the given {@link Content.Chunk}, notifying the {@link Callback} when the
* write is complete.</p>
* <p>The callback completes:</p>
* <ul>
* <li>immediately with a failure when the written chunk is an instance of {@link Content.Chunk.Error}</li>
* <li>successfully when the {@link Content.Chunk} returned by {@link #read()} is released</li>
* <li>successfully just before the {@link Content.Chunk} is returned if the latter {@link Content.Chunk#hasRemaining() has no remaining byte}</li>
* </ul>
*
* @param chunk the Content.Chunk to write
* @param callback the callback to notify when the write operation is complete
*/
public void write(Content.Chunk chunk, Callback callback)
{
Content.Chunk c;
if (chunk.isTerminal())
{
c = chunk;
}
else if (!chunk.hasRemaining())
{
c = Content.Chunk.EMPTY;
}
else
{
c = Content.Chunk.from(chunk.getByteBuffer(), chunk.isLast(), () ->
{
chunk.release();
callback.succeeded();
});
}
offer(c, callback);
}
/**
* The callback is only ever going to be succeeded if the chunk is terminal.
*/
private void offer(Content.Chunk chunk, Callback callback)
{
Throwable failure = null;
boolean wasEmpty = false;
@ -109,7 +157,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
if (writeClosed && chunks.size() == 1)
{
Content.Chunk chunk = chunks.peek().chunk();
if (chunk.isLast() && !chunk.hasRemaining())
if (chunk.isTerminal())
return;
}
l.await();
@ -166,6 +214,8 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
if (chunks.isEmpty())
l.signal();
}
if (!current.chunk().hasRemaining())
current.callback().succeeded();
return current.chunk();
}

View File

@ -73,7 +73,7 @@ public class ChunksContentSource implements Content.Source
if (last)
terminated = Content.Chunk.EOF;
}
return Content.Chunk.from(chunk.getByteBuffer().slice(), chunk.isLast(), chunk);
return chunk;
}
@Override

View File

@ -13,9 +13,14 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.content.AsyncContent;
@ -26,6 +31,8 @@ import org.junit.jupiter.api.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -125,4 +132,156 @@ public class AsyncContentSourceTest
assertThat(((Content.Chunk.Error)chunk).getCause(), sameInstance(error));
}
}
@Test
public void testChunkReleaseSucceedsWriteCallback()
{
try (AsyncContent async = new AsyncContent())
{
AtomicInteger successCounter = new AtomicInteger();
AtomicReference<Throwable> failureRef = new AtomicReference<>();
async.write(false, ByteBuffer.wrap(new byte[1]), Callback.from(successCounter::incrementAndGet, failureRef::set));
Content.Chunk chunk = async.read();
assertThat(successCounter.get(), is(0));
chunk.retain();
assertThat(chunk.release(), is(false));
assertThat(successCounter.get(), is(0));
assertThat(chunk.release(), is(true));
assertThat(successCounter.get(), is(1));
assertThat(failureRef.get(), is(nullValue()));
}
}
@Test
public void testEmptyChunkReadSucceedsWriteCallback()
{
try (AsyncContent async = new AsyncContent())
{
AtomicInteger successCounter = new AtomicInteger();
AtomicReference<Throwable> failureRef = new AtomicReference<>();
async.write(false, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set));
Content.Chunk chunk = async.read();
assertThat(successCounter.get(), is(1));
assertThat(chunk.isTerminal(), is(false));
assertThat(chunk.release(), is(true));
assertThat(successCounter.get(), is(1));
assertThat(failureRef.get(), is(nullValue()));
}
}
@Test
public void testLastEmptyChunkReadSucceedsWriteCallback()
{
try (AsyncContent async = new AsyncContent())
{
AtomicInteger successCounter = new AtomicInteger();
AtomicReference<Throwable> failureRef = new AtomicReference<>();
async.write(true, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set));
Content.Chunk chunk = async.read();
assertThat(successCounter.get(), is(1));
assertThat(chunk.isTerminal(), is(true));
assertThat(chunk.release(), is(true));
assertThat(successCounter.get(), is(1));
assertThat(failureRef.get(), is(nullValue()));
}
}
@Test
public void testWriteAndReadErrors()
{
try (AsyncContent async = new AsyncContent())
{
AssertingCallback callback = new AssertingCallback();
Exception error1 = new Exception("error1");
async.write(Content.Chunk.from(error1), callback);
callback.assertSingleFailureSameInstanceNoSuccess(error1);
Content.Chunk chunk = async.read();
assertThat(((Content.Chunk.Error)chunk).getCause(), sameInstance(error1));
chunk = async.read();
assertThat(((Content.Chunk.Error)chunk).getCause(), sameInstance(error1));
callback.assertNoFailureNoSuccess();
Exception error2 = new Exception("error2");
async.write(Content.Chunk.from(error2), callback);
callback.assertSingleFailureSameInstanceNoSuccess(error1);
async.write(Content.Chunk.from(ByteBuffer.wrap(new byte[1]), false), callback);
callback.assertSingleFailureSameInstanceNoSuccess(error1);
}
}
@Test
public void testCloseAfterWritingEof()
{
AssertingCallback callback = new AssertingCallback();
try (AsyncContent async = new AsyncContent())
{
async.write(Content.Chunk.EOF, callback);
callback.assertNoFailureNoSuccess();
Content.Chunk chunk = async.read();
assertThat(chunk.isTerminal(), is(true));
callback.assertNoFailureWithSuccesses(1);
chunk = async.read();
assertThat(chunk.isTerminal(), is(true));
callback.assertNoFailureWithSuccesses(0);
async.write(Content.Chunk.EOF, callback);
callback.assertSingleFailureIsInstanceNoSuccess(IOException.class);
}
callback.assertNoFailureNoSuccess();
}
private static class AssertingCallback implements Callback
{
private final AtomicInteger successCounter = new AtomicInteger();
private final List<Throwable> throwables = new CopyOnWriteArrayList<>();
@Override
public void succeeded()
{
successCounter.incrementAndGet();
}
@Override
public void failed(Throwable x)
{
throwables.add(x);
}
public void assertNoFailureNoSuccess()
{
assertThat(successCounter.get(), is(0));
assertThat(throwables.isEmpty(), is(true));
}
public void assertNoFailureWithSuccesses(int successCount)
{
assertThat(successCounter.getAndSet(0), is(successCount));
assertThat(throwables.isEmpty(), is(true));
}
public void assertSingleFailureSameInstanceNoSuccess(Throwable x)
{
assertThat(successCounter.get(), is(0));
assertThat(throwables.size(), is(1));
assertThat(throwables.remove(0), sameInstance(x));
}
public void assertSingleFailureIsInstanceNoSuccess(Class<? extends Throwable> clazz)
{
assertThat(successCounter.get(), is(0));
assertThat(throwables.size(), is(1));
assertThat(throwables.remove(0), instanceOf(clazz));
}
}
}

View File

@ -0,0 +1,80 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ContentTest
{
@Test
public void testFromEmptyByteBufferWithoutReleaser()
{
assertThrows(IllegalArgumentException.class, () -> Content.Chunk.from(ByteBuffer.wrap(new byte[0]), true));
assertThrows(IllegalArgumentException.class, () -> Content.Chunk.from(ByteBuffer.wrap(new byte[0]), false));
}
@Test
public void testFromEmptyByteBufferWithRunnableReleaser()
{
AtomicInteger counter1 = new AtomicInteger();
assertThat(Content.Chunk.from(ByteBuffer.wrap(new byte[0]), true, counter1::incrementAndGet), sameInstance(Content.Chunk.EOF));
assertThat(counter1.get(), is(1));
AtomicInteger counter2 = new AtomicInteger();
assertThat(Content.Chunk.from(ByteBuffer.wrap(new byte[0]), false, counter2::incrementAndGet), sameInstance(Content.Chunk.EMPTY));
assertThat(counter2.get(), is(1));
}
@Test
public void testFromEmptyByteBufferWithConsumerReleaser()
{
List<ByteBuffer> buffers = new ArrayList<>();
ByteBuffer buffer1 = ByteBuffer.wrap(new byte[0]);
assertThat(Content.Chunk.from(buffer1, true, buffers::add), sameInstance(Content.Chunk.EOF));
assertThat(buffers.size(), is(1));
assertThat(buffers.remove(0), sameInstance(buffer1));
ByteBuffer buffer2 = ByteBuffer.wrap(new byte[0]);
assertThat(Content.Chunk.from(buffer2, false, buffers::add), sameInstance(Content.Chunk.EMPTY));
assertThat(buffers.size(), is(1));
assertThat(buffers.remove(0), sameInstance(buffer2));
}
@Test
public void testFromEmptyByteBufferWithRetainableReleaser()
{
Retainable.ReferenceCounter referenceCounter1 = new Retainable.ReferenceCounter(2);
assertThat(referenceCounter1.isRetained(), is(true));
assertThat(Content.Chunk.from(ByteBuffer.wrap(new byte[0]), true, referenceCounter1), sameInstance(Content.Chunk.EOF));
assertThat(referenceCounter1.isRetained(), is(false));
assertThat(referenceCounter1.release(), is(true));
Retainable.ReferenceCounter referenceCounter2 = new Retainable.ReferenceCounter(2);
assertThat(referenceCounter2.isRetained(), is(true));
assertThat(Content.Chunk.from(ByteBuffer.wrap(new byte[0]), false, referenceCounter2), sameInstance(Content.Chunk.EMPTY));
assertThat(referenceCounter2.isRetained(), is(false));
assertThat(referenceCounter2.release(), is(true));
}
}

View File

@ -33,7 +33,32 @@ import org.eclipse.jetty.util.NanoTime;
public class MockHttpStream implements HttpStream
{
private static final Throwable SUCCEEDED = new Throwable();
private static final Content.Chunk DEMAND = Content.Chunk.from(BufferUtil.EMPTY_BUFFER, false);
private static final Content.Chunk DEMAND = new Content.Chunk()
{
@Override
public ByteBuffer getByteBuffer()
{
return BufferUtil.EMPTY_BUFFER;
}
@Override
public boolean isLast()
{
return false;
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
return true;
}
};
private final long _nanoTime = NanoTime.now();
private final AtomicReference<Content.Chunk> _content = new AtomicReference<>();
private final AtomicReference<Throwable> _complete = new AtomicReference<>();
@ -65,7 +90,7 @@ public class MockHttpStream implements HttpStream
public Runnable addContent(ByteBuffer buffer, boolean last)
{
return addContent((last && BufferUtil.isEmpty(buffer)) ? Content.Chunk.EOF : Content.Chunk.from(buffer, last));
return addContent(Content.Chunk.from(buffer, last));
}
public Runnable addContent(String content, boolean last)