Remove buffer from pool on write failure (#11951)

* Experiment with removable buffer from pool
* Changed remove return to be release boolean
* Fixes to avoid double release
* Tracking ByteBufferPool handles remove
* Adding assert on _channelState.isLockHeldByCurrentThread()

---------

Co-authored-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
Greg Wilkins 2024-06-28 09:37:09 +10:00 committed by GitHub
parent f89e3b8aee
commit 5e8cc2243e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 222 additions and 61 deletions

View File

@ -20,13 +20,13 @@ import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;
@ -205,24 +205,49 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
// No bucket, return non-pooled.
if (bucket == null)
return newRetainableByteBuffer(size, direct, null);
return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct));
bucket.recordAcquire();
// Try to acquire a pooled entry.
Pool.Entry<RetainableByteBuffer> entry = bucket.getPool().acquire();
if (entry != null)
if (entry == null)
{
bucket.recordPooled();
RetainableByteBuffer buffer = entry.getPooled();
((Buffer)buffer).acquire();
return buffer;
ByteBuffer buffer = BufferUtil.allocate(bucket.getCapacity(), direct);
return new ReservedBuffer(buffer, bucket);
}
return newRetainableByteBuffer(bucket.getCapacity(), direct, buffer -> reserve(bucket, buffer));
bucket.recordPooled();
RetainableByteBuffer buffer = entry.getPooled();
((Buffer)buffer).acquire();
return buffer;
}
private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
@Override
public boolean removeAndRelease(RetainableByteBuffer buffer)
{
RetainableByteBuffer actual = buffer;
while (actual instanceof RetainableByteBuffer.Wrapper wrapper)
actual = wrapper.getWrapped();
if (actual instanceof ReservedBuffer reservedBuffer)
{
// remove the actual reserved buffer, but release the wrapped buffer
reservedBuffer.remove();
return buffer.release();
}
if (actual instanceof Buffer poolBuffer)
{
// remove the actual pool buffer, but release the wrapped buffer
poolBuffer.remove();
return buffer.release();
}
return ByteBufferPool.super.removeAndRelease(buffer);
}
private void reserve(RetainedBucket bucket, ByteBuffer byteBuffer)
{
bucket.recordRelease();
@ -235,12 +260,11 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
}
// Add the buffer to the new entry.
ByteBuffer byteBuffer = buffer.getByteBuffer();
BufferUtil.reset(byteBuffer);
Buffer pooledBuffer = new Buffer(byteBuffer, b -> release(bucket, entry));
Buffer pooledBuffer = new Buffer(byteBuffer, bucket, entry);
if (entry.enable(pooledBuffer, false))
{
checkMaxMemory(bucket, buffer.isDirect());
checkMaxMemory(bucket, byteBuffer.isDirect());
return;
}
@ -270,6 +294,13 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
entry.remove();
}
private boolean remove(RetainedBucket bucket, Pool.Entry<RetainableByteBuffer> entry)
{
// Cannot release, discard this buffer.
bucket.recordRemove();
return entry.remove();
}
private void checkMaxMemory(RetainedBucket bucket, boolean direct)
{
long max = direct ? _maxDirectMemory : _maxHeapMemory;
@ -309,14 +340,6 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
}
}
private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<RetainableByteBuffer> releaser)
{
ByteBuffer buffer = BufferUtil.allocate(capacity, direct);
Buffer retainableByteBuffer = new Buffer(buffer, releaser);
retainableByteBuffer.acquire();
return retainableByteBuffer;
}
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
RetainedBucket bucket = bucketFor(capacity, direct);
@ -581,15 +604,45 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
}
}
private static class Buffer extends AbstractRetainableByteBuffer
private class ReservedBuffer extends AbstractRetainableByteBuffer
{
private final Consumer<RetainableByteBuffer> _releaser;
private int _usages;
private final RetainedBucket _bucket;
private final AtomicBoolean _removed = new AtomicBoolean();
private Buffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
private ReservedBuffer(ByteBuffer buffer, RetainedBucket bucket)
{
super(buffer);
this._releaser = releaser;
_bucket = Objects.requireNonNull(bucket);
acquire();
}
@Override
public boolean release()
{
boolean released = super.release();
if (released && _removed.compareAndSet(false, true))
reserve(_bucket, getByteBuffer());
return released;
}
boolean remove()
{
// Buffer never added to pool, so just prevent future reservation
return _removed.compareAndSet(false, true);
}
}
private class Buffer extends AbstractRetainableByteBuffer
{
private final RetainedBucket _bucket;
private final Pool.Entry<RetainableByteBuffer> _entry;
private int _usages;
private Buffer(ByteBuffer buffer, RetainedBucket bucket, Pool.Entry<RetainableByteBuffer> entry)
{
super(buffer);
_bucket = Objects.requireNonNull(bucket);
_entry = Objects.requireNonNull(entry);
}
@Override
@ -597,13 +650,15 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
{
boolean released = super.release();
if (released)
{
if (_releaser != null)
_releaser.accept(this);
}
ArrayByteBufferPool.this.release(_bucket, _entry);
return released;
}
boolean remove()
{
return ArrayByteBufferPool.this.remove(_bucket, _entry);
}
private int use()
{
if (++_usages < 0)

View File

@ -58,6 +58,20 @@ public interface ByteBufferPool
*/
RetainableByteBuffer acquire(int size, boolean direct);
/**
* {@link RetainableByteBuffer#release() Release} the buffer in a way that will remove it from any pool that it may be in.
* If the buffer is not in a pool, calling this method is equivalent to calling {@link RetainableByteBuffer#release()}.
* Calling this method satisfies any contract that requires a call to {@link RetainableByteBuffer#release()}.
* @return {@code true} if a call to {@link RetainableByteBuffer#release()} would have returned {@code true}.
* @see RetainableByteBuffer#release()
* @deprecated This API is experimental and may be removed in future releases
*/
@Deprecated
default boolean removeAndRelease(RetainableByteBuffer buffer)
{
return buffer != null && buffer.release();
}
/**
* <p>Removes all {@link RetainableByteBuffer#isRetained() non-retained}
* pooled instances from this pool.</p>

View File

@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -444,4 +445,43 @@ public class ArrayByteBufferPoolTest
assertThat(compoundPool.getPrimaryPool().size(), is(ConcurrentPool.OPTIMAL_MAX_SIZE));
assertThat(compoundPool.getSecondaryPool().size(), is(0));
}
@Test
public void testRemoveAndRelease()
{
ArrayByteBufferPool pool = new ArrayByteBufferPool();
RetainableByteBuffer reserved0 = pool.acquire(1024, false);
RetainableByteBuffer reserved1 = pool.acquire(1024, false);
RetainableByteBuffer acquired0 = pool.acquire(1024, false);
acquired0.release();
acquired0 = pool.acquire(1024, false);
RetainableByteBuffer acquired1 = pool.acquire(1024, false);
acquired1.release();
acquired1 = pool.acquire(1024, false);
RetainableByteBuffer retained0 = pool.acquire(1024, false);
retained0.release();
retained0 = pool.acquire(1024, false);
retained0.retain();
RetainableByteBuffer retained1 = pool.acquire(1024, false);
retained1.release();
retained1 = pool.acquire(1024, false);
retained1.retain();
assertTrue(pool.removeAndRelease(reserved1));
assertTrue(pool.removeAndRelease(acquired1));
assertFalse(pool.removeAndRelease(retained1));
assertTrue(retained1.release());
assertThat(pool.getHeapByteBufferCount(), is(2L));
assertTrue(reserved0.release());
assertThat(pool.getHeapByteBufferCount(), is(3L));
assertTrue(acquired0.release());
assertThat(pool.getHeapByteBufferCount(), is(3L));
assertFalse(retained0.release());
assertTrue(retained0.release());
assertThat(pool.getHeapByteBufferCount(), is(3L));
}
}

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.eclipse.jetty.http.HttpException;
@ -39,14 +40,15 @@ import org.eclipse.jetty.http.MimeTypes.Type;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http.QuotedQualityCSV;
import org.eclipse.jetty.io.ByteBufferOutputStream;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -198,7 +200,8 @@ public class ErrorHandler implements Request.Handler
int bufferSize = request.getConnectionMetaData().getHttpConfiguration().getOutputBufferSize();
bufferSize = Math.min(8192, bufferSize); // TODO ?
RetainableByteBuffer buffer = request.getComponents().getByteBufferPool().acquire(bufferSize, false);
ByteBufferPool byteBufferPool = request.getComponents().getByteBufferPool();
RetainableByteBuffer buffer = byteBufferPool.acquire(bufferSize, false);
try
{
@ -251,13 +254,14 @@ public class ErrorHandler implements Request.Handler
}
response.getHeaders().put(type.getContentTypeField(charset));
response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, buffer));
response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, byteBufferPool, buffer));
return true;
}
catch (Throwable x)
{
buffer.release();
if (buffer != null)
byteBufferPool.removeAndRelease(buffer);
throw x;
}
}
@ -579,20 +583,33 @@ public class ErrorHandler implements Request.Handler
* when calling {@link Response#write(boolean, ByteBuffer, Callback)} to wrap the passed in {@link Callback}
* so that the {@link RetainableByteBuffer} used can be released.
*/
private static class WriteErrorCallback extends Callback.Nested
private static class WriteErrorCallback implements Callback
{
private final Retainable _retainable;
private final AtomicReference<Callback> _callback;
private final ByteBufferPool _pool;
private final RetainableByteBuffer _buffer;
public WriteErrorCallback(Callback callback, Retainable retainable)
public WriteErrorCallback(Callback callback, ByteBufferPool pool, RetainableByteBuffer retainable)
{
super(callback);
_retainable = retainable;
_callback = new AtomicReference<>(callback);
_pool = pool;
_buffer = retainable;
}
@Override
public void completed()
public void succeeded()
{
_retainable.release();
Callback callback = _callback.getAndSet(null);
if (callback != null)
ExceptionUtil.callAndThen(_buffer::release, callback::succeeded);
}
@Override
public void failed(Throwable x)
{
Callback callback = _callback.getAndSet(null);
if (callback != null)
ExceptionUtil.callAndThen(x, t -> _pool.removeAndRelease(_buffer), callback::failed);
}
}
}

View File

@ -135,6 +135,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private long _written;
private long _flushed;
private long _firstByteNanoTime = -1;
private ByteBufferPool _pool;
private RetainableByteBuffer _aggregate;
private int _bufferSize;
private int _commitSize;
@ -222,7 +223,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_state = State.CLOSED;
closedCallback = _closedCallback;
_closedCallback = null;
releaseBuffer();
lockedReleaseBuffer(failure != null);
wake = updateApiState(failure);
}
else if (_state == State.CLOSE)
@ -444,7 +445,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
try (AutoLock l = _channelState.lock())
{
_state = State.CLOSED;
releaseBuffer();
lockedReleaseBuffer(failure != null);
}
}
@ -576,25 +577,36 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
try (AutoLock l = _channelState.lock())
{
return acquireBuffer().getByteBuffer();
return lockedAcquireBuffer().getByteBuffer();
}
}
private RetainableByteBuffer acquireBuffer()
private RetainableByteBuffer lockedAcquireBuffer()
{
assert _channelState.isLockHeldByCurrentThread();
boolean useOutputDirectByteBuffers = _servletChannel.getConnectionMetaData().getHttpConfiguration().isUseOutputDirectByteBuffers();
ByteBufferPool pool = _servletChannel.getRequest().getComponents().getByteBufferPool();
if (_aggregate == null)
_aggregate = pool.acquire(getBufferSize(), useOutputDirectByteBuffers);
{
_pool = _servletChannel.getRequest().getComponents().getByteBufferPool();
_aggregate = _pool.acquire(getBufferSize(), useOutputDirectByteBuffers);
}
return _aggregate;
}
private void releaseBuffer()
private void lockedReleaseBuffer(boolean failure)
{
assert _channelState.isLockHeldByCurrentThread();
if (_aggregate != null)
{
_aggregate.release();
if (failure && _pool != null)
_pool.removeAndRelease(_aggregate);
else
_aggregate.release();
_aggregate = null;
_pool = null;
}
}
@ -757,7 +769,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Should we aggregate?
if (aggregate)
{
acquireBuffer();
lockedAcquireBuffer();
int filled = BufferUtil.fill(_aggregate.getByteBuffer(), b, off, len);
// return if we are not complete, not full and filled all the content
@ -962,7 +974,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
_written = written;
acquireBuffer();
lockedAcquireBuffer();
BufferUtil.append(_aggregate.getByteBuffer(), (byte)b);
}
@ -1265,6 +1277,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
try (AutoLock l = _channelState.lock())
{
lockedReleaseBuffer(_state != State.CLOSED);
_state = State.OPEN;
_apiState = ApiState.BLOCKING;
_softClose = true; // Stay closed until next request
@ -1273,7 +1286,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
_commitSize = _bufferSize;
releaseBuffer();
_written = 0;
_writeListener = null;
_onError = null;

View File

@ -227,6 +227,11 @@ public class ServletChannelState
return _lock.lock();
}
boolean isLockHeldByCurrentThread()
{
return _lock.isHeldByCurrentThread();
}
public State getState()
{
try (AutoLock ignored = lock())

View File

@ -158,6 +158,11 @@ public class HttpChannelState
return _lock.lock();
}
boolean isLockHeldByCurrentThread()
{
return _lock.isHeldByCurrentThread();
}
public State getState()
{
try (AutoLock l = lock())

View File

@ -32,6 +32,7 @@ import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.WriteListener;
import org.eclipse.jetty.http.content.HttpContent;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IOResources;
import org.eclipse.jetty.io.RetainableByteBuffer;
@ -191,6 +192,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private long _written;
private long _flushed;
private long _firstByteNanoTime = -1;
private ByteBufferPool _pool;
private RetainableByteBuffer _aggregate;
private int _bufferSize;
private int _commitSize;
@ -292,7 +294,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_state = State.CLOSED;
closedCallback = _closedCallback;
_closedCallback = null;
releaseBuffer();
lockedReleaseBuffer(failure != null);
wake = updateApiState(failure);
}
else if (_state == State.CLOSE)
@ -513,7 +515,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
try (AutoLock l = _channelState.lock())
{
_state = State.CLOSED;
releaseBuffer();
lockedReleaseBuffer(failure != null);
}
}
@ -642,23 +644,34 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
try (AutoLock l = _channelState.lock())
{
return acquireBuffer().getByteBuffer();
return lockedAcquireBuffer().getByteBuffer();
}
}
private RetainableByteBuffer acquireBuffer()
private RetainableByteBuffer lockedAcquireBuffer()
{
assert _channelState.isLockHeldByCurrentThread();
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.isUseOutputDirectByteBuffers());
{
_pool = _channel.getByteBufferPool();
_aggregate = _pool.acquire(getBufferSize(), _channel.isUseOutputDirectByteBuffers());
}
return _aggregate;
}
private void releaseBuffer()
private void lockedReleaseBuffer(boolean failure)
{
assert _channelState.isLockHeldByCurrentThread();
if (_aggregate != null)
{
_aggregate.release();
if (failure && _pool != null)
_pool.removeAndRelease(_aggregate);
else
_aggregate.release();
_aggregate = null;
_pool = null;
}
}
@ -821,7 +834,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Should we aggregate?
if (aggregate)
{
acquireBuffer();
lockedAcquireBuffer();
int filled = BufferUtil.fill(_aggregate.getByteBuffer(), b, off, len);
// return if we are not complete, not full and filled all the content
@ -1026,7 +1039,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
_written = written;
acquireBuffer();
lockedAcquireBuffer();
BufferUtil.append(_aggregate.getByteBuffer(), (byte)b);
}
@ -1431,6 +1444,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
try (AutoLock l = _channelState.lock())
{
lockedReleaseBuffer(_state != State.CLOSED);
_state = State.OPEN;
_apiState = ApiState.BLOCKING;
_softClose = true; // Stay closed until next request
@ -1440,7 +1454,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
_commitSize = _bufferSize;
releaseBuffer();
_written = 0;
_writeListener = null;
_onError = null;