Cleanup non-retainable `Retainable`s (#9159)

`Retainable`s that return false from `canRetain()` now are noops if `retain()` is called, which allows for a simpler calling convention.
`AsyncContent` has also been reworked to allocate less and be clearer in its use of `canRetain()`.
This commit is contained in:
Greg Wilkins 2023-01-13 20:37:16 +11:00 committed by GitHub
parent 46355c6110
commit 8b3bcc3a50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 165 additions and 209 deletions

View File

@ -112,8 +112,7 @@ public class InputStreamResponseListener extends Listener.Adapter
{
if (LOG.isDebugEnabled())
LOG.debug("Queueing chunk {}", chunk);
if (chunk.canRetain())
chunk.retain();
chunk.retain();
chunkCallbacks.add(new ChunkCallback(chunk, demander, response::abort));
l.signalAll();
return;

View File

@ -574,7 +574,7 @@ public abstract class HttpReceiver
return _chunk;
// Retain the input chunk because its ByteBuffer will be referenced by the Inflater.
if (retain && _chunk.canRetain())
if (retain)
_chunk.retain();
if (LOG.isDebugEnabled())
LOG.debug("decoding: {}", _chunk);

View File

@ -412,8 +412,7 @@ public class ResponseNotifier
if (chunk.hasRemaining())
chunk = Content.Chunk.asChunk(chunk.getByteBuffer().slice(), chunk.isLast(), chunk);
// Retain the slice because it is stored for later reads.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
this.chunk = chunk;
}
else if (!currentChunk.isLast())

View File

@ -99,8 +99,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver
if (this.chunk != null)
throw new IllegalStateException();
// Retain the chunk because it is stored for later reads.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
this.chunk = chunk;
responseContentAvailable();
}

View File

@ -192,8 +192,7 @@ public class HttpStreamOverFCGI implements HttpStream
public void onContent(Content.Chunk chunk)
{
// Retain the chunk because it is stored for later reads.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
_chunk = chunk;
}

View File

@ -284,8 +284,7 @@ public class MultiPartByteRanges extends CompletableFuture<MultiPartByteRanges.P
public void onPartContent(Content.Chunk chunk)
{
// Retain the chunk because it is stored for later use.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
partChunks.add(chunk);
}

View File

@ -444,8 +444,7 @@ public class MultiPartFormData extends CompletableFuture<MultiPartFormData.Parts
}
}
// Retain the chunk because it is stored for later use.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
partChunks.add(chunk);
}

View File

@ -43,22 +43,4 @@ public class Trailers implements Content.Chunk
{
return trailers;
}
@Override
public boolean canRetain()
{
return false;
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
return true;
}
}

View File

@ -690,8 +690,7 @@ public class MultiPartTest
public void onPartContent(Content.Chunk chunk)
{
// Retain the chunk because it is stored for later use.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
partContent.add(chunk);
}

View File

@ -432,24 +432,6 @@ public interface Stream
{
super(new DataFrame(streamId, BufferUtil.EMPTY_BUFFER, true));
}
@Override
public boolean canRetain()
{
return false;
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
return true;
}
}
}
}

View File

@ -411,24 +411,6 @@ public interface Stream
{
super(new DataFrame(BufferUtil.EMPTY_BUFFER, true));
}
@Override
public boolean canRetain()
{
return false;
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
return true;
}
}
}
}

View File

@ -335,8 +335,7 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
public void onData(Data data)
{
// Retain the data because it is stored for later reads.
if (data.canRetain())
data.retain();
data.retain();
if (!dataRef.compareAndSet(null, data))
throw new IllegalStateException();

View File

@ -436,11 +436,50 @@ public class Content
/**
* <p>An empty, non-last, chunk.</p>
*/
Chunk EMPTY = ByteBufferChunk.EMPTY;
Chunk EMPTY = new Chunk()
{
@Override
public ByteBuffer getByteBuffer()
{
return BufferUtil.EMPTY_BUFFER;
}
@Override
public boolean isLast()
{
return false;
}
@Override
public String toString()
{
return "EMPTY";
}
};
/**
* <p>An empty, last, chunk.</p>
*/
Content.Chunk EOF = ByteBufferChunk.EOF;
Content.Chunk EOF = new Chunk()
{
@Override
public ByteBuffer getByteBuffer()
{
return BufferUtil.EMPTY_BUFFER;
}
@Override
public boolean isLast()
{
return true;
}
@Override
public String toString()
{
return "EOF";
}
};
/**
* <p>Creates a Chunk with the given ByteBuffer.</p>
@ -657,24 +696,6 @@ public class Content
return true;
}
@Override
public boolean canRetain()
{
return false;
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
return true;
}
@Override
public String toString()
{

View File

@ -27,30 +27,36 @@ import java.util.concurrent.atomic.AtomicInteger;
public interface Retainable
{
/**
* <p>Returns whether this resource can be retained, that is whether {@link #retain()}
* can be called safely.</p>
* <p>Implementations may decide that special resources are not retainable (for example,
* {@code static} constants) so calling {@link #retain()} is not safe because it may throw.</p>
* <p>Calling {@link #release()} on those special resources is typically allowed, and
* it is a no-operation.</p>
* <p>Returns whether this resource is referenced counted by calls to {@link #retain()}
* and {@link #release()}.</p>
* <p>Implementations may decide that special resources are not not referenced counted (for example,
* {@code static} constants) so calling {@link #retain()} is a no-operation, and
* calling {@link #release()} on those special resources is a no-operation that always returns true.</p>
*
* @return whether it is safe to call {@link #retain()}
* @return true if calls to {@link #retain()} are reference counted.
*/
boolean canRetain();
default boolean canRetain()
{
return false;
}
/**
* <p>Retains this resource, incrementing the reference count.</p>
* <p>Retains this resource, potentially incrementing a reference count if there are resources that will be released.</p>
*/
void retain();
default void retain()
{
}
/**
* <p>Releases this resource, decrementing the reference count.</p>
* <p>This method returns {@code true} when the reference count goes to zero,
* {@code false} otherwise.</p>
* <p>Releases this resource, potentially decrementing a reference count (if any).</p>
*
* @return whether the invocation of this method decremented the reference count to zero
* @return {@code true} when the reference count goes to zero or if there was no reference count,
* {@code false} otherwise.
*/
boolean release();
default boolean release()
{
return true;
}
/**
* A wrapper of {@link Retainable} instances.

View File

@ -23,6 +23,9 @@ import java.util.Objects;
import java.util.Queue;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.internal.ByteBufferChunk;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
@ -37,10 +40,18 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class AsyncContent implements Content.Sink, Content.Source, Closeable
{
private static final int UNDETERMINED_LENGTH = -2;
private static final AsyncChunk ASYNC_EOF = new AsyncChunk(true, BufferUtil.EMPTY_BUFFER, Callback.NOOP)
{
@Override
public String toString()
{
return "ASYNC_EOF";
}
};
private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
private final SerializedInvoker invoker = new SerializedInvoker();
private final Queue<ChunkCallback> chunks = new ArrayDeque<>();
private final Queue<AsyncChunk> chunks = new ArrayDeque<>();
private Content.Chunk.Error errorChunk;
private boolean readClosed;
private boolean writeClosed;
@ -52,20 +63,15 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
* <p>The write completes:</p>
* <ul>
* <li>immediately with a failure when this instance is closed or already in error</li>
* <li>successfully when the {@link Content.Chunk} returned by {@link #read()} is released</li>
* <li>successfully when a non empty {@link Content.Chunk} returned by {@link #read()} is released</li>
* <li>successfully just before the {@link Content.Chunk} is returned by {@link #read()},
* if the chunk {@link Content.Chunk#canRetain() cannot be retained}</li>
* for any empty chunk {@link Content.Chunk}.</li>
* </ul>
*/
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
Content.Chunk chunk;
if (byteBuffer.hasRemaining())
chunk = Content.Chunk.from(byteBuffer, last, callback::succeeded);
else
chunk = last ? Content.Chunk.EOF : Content.Chunk.EMPTY;
offer(chunk, callback);
offer(new AsyncChunk(last, byteBuffer, callback));
}
/**
@ -73,14 +79,8 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
* or succeeded if and only if the chunk is terminal, as non-terminal
* chunks have to bind the succeeding of the callback to their release.
*/
private void offer(Content.Chunk chunk, Callback callback)
private void offer(AsyncChunk chunk)
{
if (chunk instanceof Content.Chunk.Error)
{
callback.failed(new IllegalArgumentException("Cannot not write Chunk.Error instances, call fail(Throwable) instead"));
return;
}
Throwable failure = null;
boolean wasEmpty = false;
try (AutoLock ignored = lock.lock())
@ -98,17 +98,21 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
wasEmpty = chunks.isEmpty();
// No need to retain the chunk, because it's created internally
// from a ByteBuffer and it will be released by the caller of read().
chunks.offer(new ChunkCallback(chunk, callback));
chunks.offer(chunk);
if (chunk.isLast())
{
writeClosed = true;
if (length == UNDETERMINED_LENGTH)
length = chunks.stream().mapToLong(cc -> cc.chunk().remaining()).sum();
{
length = 0;
for (AsyncChunk c : chunks)
length += c.remaining();
}
}
}
}
if (failure != null)
callback.failed(failure);
chunk.failed(failure);
if (wasEmpty)
invoker.run(this::invokeDemandCallback);
}
@ -128,7 +132,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
// Special case for a last empty chunk that may not be read.
if (writeClosed && chunks.size() == 1)
{
Content.Chunk chunk = chunks.peek().chunk();
AsyncChunk chunk = chunks.peek();
if (chunk.isLast() && !chunk.hasRemaining())
return;
}
@ -144,7 +148,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
@Override
public void close()
{
offer(Content.Chunk.EOF, Callback.NOOP);
offer(ASYNC_EOF);
}
public boolean isClosed()
@ -167,7 +171,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
@Override
public Content.Chunk read()
{
ChunkCallback current;
AsyncChunk current;
try (AutoLock.WithCondition condition = lock.lock())
{
if (length == UNDETERMINED_LENGTH)
@ -181,13 +185,18 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
return errorChunk;
return null;
}
readClosed = current.chunk().isLast();
readClosed = current.isLast();
if (chunks.isEmpty())
condition.signal();
}
if (!current.chunk().canRetain())
current.callback().succeeded();
return current.chunk();
// If the chunk is reference counted, the callback is succeeded when it is released.
if (current.canRetain())
return current;
// If the chunk is not reference counted, we can succeed it now and return a chunk with a noop release.
current.succeeded();
return current.isLast() ? Content.Chunk.EOF : Content.Chunk.EMPTY;
}
@Override
@ -232,7 +241,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
@Override
public void fail(Throwable failure)
{
List<ChunkCallback> drained;
List<AsyncChunk> drained;
try (AutoLock.WithCondition condition = lock.lock())
{
if (readClosed)
@ -244,7 +253,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
chunks.clear();
condition.signal();
}
drained.forEach(cc -> cc.callback().failed(failure));
drained.forEach(ac -> ac.failed(failure));
invoker.run(this::invokeDemandCallback);
}
@ -256,7 +265,52 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
}
}
private record ChunkCallback(Content.Chunk chunk, Callback callback)
private static class AsyncChunk extends ByteBufferChunk implements Callback
{
private final Callback callback;
private final Retainable.ReferenceCounter referenceCounter;
public AsyncChunk(boolean last, ByteBuffer byteBuffer, Callback callback)
{
super(byteBuffer.hasRemaining() ? byteBuffer : BufferUtil.EMPTY_BUFFER, last);
this.callback = callback;
referenceCounter = getByteBuffer() == BufferUtil.EMPTY_BUFFER ? null : new ReferenceCounter();
}
@Override
public boolean canRetain()
{
return referenceCounter != null;
}
@Override
public void retain()
{
if (canRetain())
referenceCounter.retain();
}
@Override
public boolean release()
{
if (!canRetain())
return true;
boolean released = referenceCounter.release();
if (released)
succeeded();
return released;
}
@Override
public void succeeded()
{
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
}
}

View File

@ -48,8 +48,7 @@ public class ContentSinkSubscriber implements Flow.Subscriber<Content.Chunk>
public void onNext(Content.Chunk chunk)
{
// Retain the chunk because the write may not complete immediately.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
sink.write(chunk.isLast(), chunk.getByteBuffer(), Callback.from(() -> succeeded(chunk), x -> failed(chunk, x)));
}

View File

@ -24,23 +24,6 @@ import org.eclipse.jetty.util.BufferUtil;
public abstract class ByteBufferChunk implements Content.Chunk
{
public static final ByteBufferChunk EMPTY = new ByteBufferChunk(BufferUtil.EMPTY_BUFFER, false)
{
@Override
public String toString()
{
return "%s[EMPTY]".formatted(ByteBufferChunk.class.getSimpleName());
}
};
public static final ByteBufferChunk EOF = new ByteBufferChunk(BufferUtil.EMPTY_BUFFER, true)
{
@Override
public String toString()
{
return "%s[EOF]".formatted(ByteBufferChunk.class.getSimpleName());
}
};
private final ByteBuffer byteBuffer;
private final boolean last;
@ -62,24 +45,6 @@ public abstract class ByteBufferChunk implements Content.Chunk
return last;
}
@Override
public boolean canRetain()
{
return false;
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
return true;
}
@Override
public String toString()
{

View File

@ -525,8 +525,7 @@ public class ContentSourceTest
private void add(Content.Chunk chunk)
{
// Retain the chunk because it is stored for later use.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
_chunks.add(chunk);
}

View File

@ -688,8 +688,7 @@ public abstract class ProxyHandler extends Handler.Abstract
if (LOG.isDebugEnabled())
LOG.debug("{} S2P received content {}", requestId(clientToProxyRequest), BufferUtil.toDetailString(serverToProxyContent));
if (serverToProxyChunk.canRetain())
serverToProxyChunk.retain();
serverToProxyChunk.retain();
Callback callback = new Callback()
{
@Override

View File

@ -165,7 +165,7 @@ public class GzipRequest extends Request.Wrapper
return Content.Chunk.EOF;
// Retain the input chunk because its ByteBuffer will be referenced by the Inflater.
if (retain && _chunk.canRetain())
if (retain)
_chunk.retain();
ByteBuffer decodedBuffer = _decoder.decode(_chunk);

View File

@ -46,24 +46,6 @@ public class MockHttpStream implements HttpStream
{
return false;
}
@Override
public boolean canRetain()
{
return false;
}
@Override
public void retain()
{
throw new UnsupportedOperationException();
}
@Override
public boolean release()
{
return true;
}
};
private final long _nanoTime = NanoTime.now();
private final AtomicReference<Content.Chunk> _content = new AtomicReference<>();
@ -106,8 +88,7 @@ public class MockHttpStream implements HttpStream
private Runnable addContent(Content.Chunk chunk)
{
if (chunk.canRetain())
chunk.retain();
chunk.retain();
chunk = _content.getAndSet(chunk);
if (chunk == DEMAND)
return _channel.onContentAvailable();

View File

@ -154,8 +154,7 @@ public class HttpClientDemandTest extends AbstractTest
public void onContent(Response response, Content.Chunk chunk, Runnable demander)
{
// Store the chunk and don't demand.
if (chunk.canRetain())
chunk.retain();
chunk.retain();
contentQueue.offer(chunk);
demanderQueue.offer(demander);
}
@ -244,8 +243,7 @@ public class HttpClientDemandTest extends AbstractTest
client.newRequest(newURI(transport))
.onResponseContentAsync((response, chunk, demander) ->
{
if (chunk.canRetain())
chunk.retain();
chunk.retain();
chunkRef.set(chunk);
try
{

View File

@ -439,8 +439,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
@Override
public void onContent(Response serverResponse, Content.Chunk chunk, Runnable demander)
{
if (chunk.canRetain())
chunk.retain();
chunk.retain();
Callback callback = Callback.from(chunk::release, Callback.from(demander, serverResponse::abort));
try
{

View File

@ -216,8 +216,7 @@ public class ProxyServlet extends AbstractProxyServlet
content.get(buffer);
offset = 0;
}
if (chunk.canRetain())
chunk.retain();
chunk.retain();
Callback callback = Callback.from(chunk::release, Callback.from(demander, proxyResponse::abort));
onResponseContent(request, response, proxyResponse, buffer, offset, length, callback);
}

View File

@ -439,8 +439,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
@Override
public void onContent(Response serverResponse, Content.Chunk chunk, Runnable demander)
{
if (chunk.canRetain())
chunk.retain();
chunk.retain();
Callback callback = Callback.from(chunk::release, Callback.from(demander, serverResponse::abort));
try
{

View File

@ -216,8 +216,7 @@ public class ProxyServlet extends AbstractProxyServlet
content.get(buffer);
offset = 0;
}
if (chunk.canRetain())
chunk.retain();
chunk.retain();
Callback callback = Callback.from(chunk::release, Callback.from(demander, proxyResponse::abort));
onResponseContent(request, response, proxyResponse, buffer, offset, length, callback);
}