Ensure buffers are returned to pool by MessageInputStream

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2021-05-21 10:59:12 +10:00
parent 087f486b44
commit ea51ba43c9
1 changed files with 24 additions and 8 deletions

View File

@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -83,7 +84,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
{ {
this.timeoutMs = timeoutMs; this.timeoutMs = timeoutMs;
this.session = session; this.session = session;
this.bufferPool = (session instanceof WebSocketSession) ? ((WebSocketSession)session).getBufferPool() : null; this.bufferPool = (session instanceof WebSocketSession) ? ((WebSocketSession)session).getBufferPool() : new NullByteBufferPool();
} }
@Override @Override
@ -155,8 +156,21 @@ public class MessageInputStream extends InputStream implements MessageAppender
if (remainingContent) if (remainingContent)
LOG.warn("MessageInputStream closed without fully consuming content {}", session); LOG.warn("MessageInputStream closed without fully consuming content {}", session);
state = State.CLOSED;
// Release any buffers taken from the pool.
if (activeBuffer != null && activeBuffer != EOF)
{
bufferPool.release(activeBuffer);
activeBuffer = null;
}
for (ByteBuffer buffer : buffers)
{
bufferPool.release(buffer);
}
buffers.clear(); buffers.clear();
state = State.CLOSED;
buffers.add(EOF); buffers.add(EOF);
} }
} }
@ -209,6 +223,13 @@ public class MessageInputStream extends InputStream implements MessageAppender
// grab a fresh buffer // grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining()) while (activeBuffer == null || !activeBuffer.hasRemaining())
{ {
if (activeBuffer != null)
{
// All content consumed, release back to pool.
bufferPool.release(activeBuffer);
activeBuffer = null;
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Waiting {} ms to read", timeoutMs); LOG.debug("Waiting {} ms to read", timeoutMs);
@ -309,11 +330,6 @@ public class MessageInputStream extends InputStream implements MessageAppender
private ByteBuffer acquire(int capacity, boolean direct) private ByteBuffer acquire(int capacity, boolean direct)
{ {
ByteBuffer buffer; return bufferPool.acquire(capacity, direct);
if (bufferPool != null)
buffer = bufferPool.acquire(capacity, direct);
else
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
return buffer;
} }
} }