Reworked buffer releasing to ensure that it is always executed before

fillInterested() is called.
This is needed to avoid race conditions where fillInterested()
triggers a new thread entering onFillable() and acquiring a new buffer
while the previous thread is releasing the previous buffer.
This commit is contained in:
Simone Bordet 2015-01-02 18:22:39 +01:00
parent caa579ab4b
commit a85a74bbdc
2 changed files with 50 additions and 42 deletions

View File

@ -58,73 +58,79 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
}
public void receive()
{
buffer = acquireBuffer();
process(buffer);
}
private ByteBuffer acquireBuffer()
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
process();
return bufferPool.acquire(client.getResponseBufferSize(), true);
}
private void process()
private void releaseBuffer(ByteBuffer buffer)
{
if (readAndParse())
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(buffer);
// Don't linger the buffer around if we are idle.
buffer = null;
}
assert this.buffer == buffer;
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(buffer);
this.buffer = null;
}
private boolean readAndParse()
private void process(ByteBuffer buffer)
{
HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
ByteBuffer buffer = this.buffer;
while (true)
try
{
try
HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
boolean looping = false;
while (true)
{
// Connection may be closed in a parser callback.
if (connection.isClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed", connection);
return true;
releaseBuffer(buffer);
return;
}
if (!parse(buffer))
return false;
if (!looping && !parse(buffer))
return;
int read = endPoint.fill(buffer);
// Avoid boxing of variable 'read'
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes from {}", read, endPoint);
if (read > 0)
{
if (!parse(buffer))
return false;
return;
}
else if (read == 0)
{
releaseBuffer(buffer);
fillInterested();
return true;
return;
}
else
{
releaseBuffer(buffer);
shutdown();
return true;
return;
}
looping = true;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
failAndClose(x);
return true;
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
releaseBuffer(buffer);
failAndClose(x);
}
}
@ -139,14 +145,17 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
// Must parse even if the buffer is fully consumed, to allow the
// parser to advance from asynchronous content to response complete.
if (parser.parseNext(buffer))
{
// If the parser returns true, we need to differentiate two cases:
// A) the response is completed, so the parser is in START state;
// B) the content is handled asynchronously, so the parser is in CONTENT state.
return parser.isStart();
}
return true;
boolean handle = parser.parseNext(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} - {}", handle, parser);
if (!handle)
return true;
// If the parser returns true, we need to differentiate two cases:
// A) the response is completed, so the parser is in START state;
// B) the content is handled asynchronously, so the parser is in CONTENT state.
return parser.isStart();
}
private void fillInterested()
@ -230,7 +239,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
if (LOG.isDebugEnabled())
LOG.debug("Content consumed asynchronously, resuming processing");
process();
process(HttpReceiverOverHTTP.this.buffer);
}
public void abort(Throwable x)

View File

@ -109,7 +109,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo resource)
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
leaks.incrementAndGet();
}
@ -125,7 +125,6 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
leaks.incrementAndGet();
}
});