Issue #6728 - QUIC and HTTP/3
- Fixed buffer release to avoid leaks. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
aadc86d36a
commit
85a13cfc20
|
@ -14,6 +14,7 @@
|
|||
package org.eclipse.jetty.http3.api;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
|
@ -258,8 +259,8 @@ public interface Stream
|
|||
|
||||
public Data(DataFrame frame, Runnable complete)
|
||||
{
|
||||
this.frame = frame;
|
||||
this.complete = complete;
|
||||
this.frame = Objects.requireNonNull(frame);
|
||||
this.complete = Objects.requireNonNull(complete);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -113,7 +113,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
processDataDemand();
|
||||
if (!parserDataMode)
|
||||
{
|
||||
if (buffer.hasRemaining())
|
||||
if (buffer != null && buffer.hasRemaining())
|
||||
processNonDataFrames();
|
||||
else
|
||||
fillInterested();
|
||||
|
@ -124,10 +124,15 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
{
|
||||
try
|
||||
{
|
||||
tryAcquireBuffer();
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (parseAndFill(true) == MessageParser.Result.NO_FRAME)
|
||||
break;
|
||||
{
|
||||
tryReleaseBuffer(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: we should also exit if the connection was closed due to errors.
|
||||
// There is not yet a isClosed() primitive though.
|
||||
|
@ -138,7 +143,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
// However, the last frame may have
|
||||
// caused a write that we need to flush.
|
||||
getEndPoint().getQuicSession().flush();
|
||||
break;
|
||||
tryReleaseBuffer(false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (parserDataMode)
|
||||
|
@ -161,12 +167,14 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
fillInterested();
|
||||
}
|
||||
}
|
||||
break;
|
||||
tryReleaseBuffer(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
tryReleaseBuffer(true);
|
||||
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
|
||||
getEndPoint().close(error, x);
|
||||
// Notify the application that a failure happened.
|
||||
|
@ -183,6 +191,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("reading data on {}", this);
|
||||
|
||||
tryAcquireBuffer();
|
||||
|
||||
switch (parseAndFill(false))
|
||||
{
|
||||
case FRAME:
|
||||
|
@ -191,12 +201,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
dataFrame = null;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("read data {} on {}", frame, this);
|
||||
if (frame == null)
|
||||
return null;
|
||||
|
||||
buffer.retain();
|
||||
|
||||
return new Stream.Data(frame, buffer::release);
|
||||
return new Stream.Data(frame, this::completeReadData);
|
||||
}
|
||||
case MODE_SWITCH:
|
||||
{
|
||||
|
@ -205,12 +211,14 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
dataLast = true;
|
||||
parserDataMode = false;
|
||||
parser.setDataMode(false);
|
||||
tryReleaseBuffer(false);
|
||||
return null;
|
||||
}
|
||||
case NO_FRAME:
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("read no data on {}", this);
|
||||
tryReleaseBuffer(false);
|
||||
return null;
|
||||
}
|
||||
default:
|
||||
|
@ -222,12 +230,20 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
catch (Throwable x)
|
||||
{
|
||||
cancelDemand();
|
||||
tryReleaseBuffer(true);
|
||||
getEndPoint().close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
|
||||
// Rethrow so the application has a chance to handle it.
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
||||
private void completeReadData()
|
||||
{
|
||||
buffer.release();
|
||||
if (!buffer.isRetained())
|
||||
tryReleaseBuffer(false);
|
||||
}
|
||||
|
||||
public void demand()
|
||||
{
|
||||
boolean hasData;
|
||||
|
@ -314,16 +330,39 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
}
|
||||
}
|
||||
|
||||
public MessageParser.Result parseAndFill(boolean setFillInterest)
|
||||
private void tryAcquireBuffer()
|
||||
{
|
||||
if (buffer == null)
|
||||
{
|
||||
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("acquired {}", buffer);
|
||||
}
|
||||
}
|
||||
|
||||
private void tryReleaseBuffer(boolean force)
|
||||
{
|
||||
if (buffer != null)
|
||||
{
|
||||
if (buffer.hasRemaining() && force)
|
||||
buffer.clear();
|
||||
if (!buffer.hasRemaining())
|
||||
{
|
||||
buffer.release();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("released {}", buffer);
|
||||
buffer = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private MessageParser.Result parseAndFill(boolean setFillInterest)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, buffer);
|
||||
|
||||
if (buffer == null)
|
||||
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
|
||||
|
||||
setNoData(false);
|
||||
|
||||
while (true)
|
||||
|
@ -359,29 +398,17 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
|
||||
if (filled == 0)
|
||||
{
|
||||
buffer.release();
|
||||
buffer = null;
|
||||
setNoData(true);
|
||||
if (setFillInterest)
|
||||
fillInterested();
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer.release();
|
||||
buffer = null;
|
||||
break;
|
||||
}
|
||||
return MessageParser.Result.NO_FRAME;
|
||||
}
|
||||
return MessageParser.Result.NO_FRAME;
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("parse+fill failure on {}", this, x);
|
||||
if (buffer != null)
|
||||
buffer.release();
|
||||
buffer = null;
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -202,15 +202,15 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
|
||||
private Runnable receiveAndProcess()
|
||||
{
|
||||
boolean interested = isFillInterested();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("receiveAndProcess() fillInterested={}", interested);
|
||||
if (interested)
|
||||
return null;
|
||||
|
||||
ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
|
||||
try
|
||||
{
|
||||
boolean interested = isFillInterested();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("receiveAndProcess() fillInterested={}", interested);
|
||||
if (interested)
|
||||
return null;
|
||||
|
||||
ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
|
||||
while (true)
|
||||
{
|
||||
BufferUtil.clear(cipherBuffer);
|
||||
|
@ -266,7 +266,10 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("processing creation task {} on {}", task, session);
|
||||
if (task != null)
|
||||
{
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
return task;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -282,13 +285,17 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("produced session task {} on {}", task, this);
|
||||
if (task != null)
|
||||
{
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
return task;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("receiveAndProcess() failure", x);
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
// TODO: close?
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -492,6 +492,8 @@ public abstract class QuicSession extends ContainerLifeCycle
|
|||
Action action = connectionClosed ? Action.SUCCEEDED : Action.IDLE;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("connection draining={} closed={}, action={} on {}", quicheConnection.isDraining(), connectionClosed, action, QuicSession.this);
|
||||
if (action == Action.IDLE)
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
return action;
|
||||
}
|
||||
BufferUtil.flipToFlush(cipherBuffer, pos);
|
||||
|
|
|
@ -161,7 +161,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
|||
System.lineSeparator(), scenario.server.dump(),
|
||||
System.lineSeparator(), scenario.client.dump());
|
||||
testThread.interrupt();
|
||||
}, (long)iterations * factor, TimeUnit.MILLISECONDS);
|
||||
}, Math.max(5000, (long)iterations * factor), TimeUnit.MILLISECONDS);
|
||||
|
||||
long begin = System.nanoTime();
|
||||
for (int i = 0; i < iterations; ++i)
|
||||
|
|
Loading…
Reference in New Issue