460443 - Race condition releasing the response buffer.

Fixed by making sure that the response buffer is released at the
appropriate times.
This commit is contained in:
Simone Bordet 2015-02-20 14:16:28 +01:00
parent ddde0db339
commit b69b8dfe50
4 changed files with 116 additions and 45 deletions

View File

@ -37,8 +37,18 @@ public class HttpChannelOverHTTP extends HttpChannel
{
super(connection.getHttpDestination());
this.connection = connection;
this.sender = new HttpSenderOverHTTP(this);
this.receiver = new HttpReceiverOverHTTP(this);
this.sender = newHttpSender();
this.receiver = newHttpReceiver();
}
protected HttpSenderOverHTTP newHttpSender()
{
return new HttpSenderOverHTTP(this);
}
protected HttpReceiverOverHTTP newHttpReceiver()
{
return new HttpReceiverOverHTTP(this);
}
public HttpConnectionOverHTTP getHttpConnection()

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client.http;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -47,7 +46,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
{
super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
this.delegate = new Delegate(destination);
this.channel = new HttpChannelOverHTTP(this);
this.channel = newHttpChannel();
}
protected HttpChannelOverHTTP newHttpChannel()
{
return new HttpChannelOverHTTP(this);
}
public HttpChannelOverHTTP getHttpChannel()

View File

@ -57,74 +57,82 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
return getHttpChannel().getHttpConnection();
}
protected ByteBuffer getResponseBuffer()
{
return buffer;
}
public void receive()
{
buffer = acquireBuffer();
process(buffer);
}
private ByteBuffer acquireBuffer()
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
process();
return bufferPool.acquire(client.getResponseBufferSize(), true);
}
private void process()
private void releaseBuffer(ByteBuffer buffer)
{
if (readAndParse())
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(buffer);
// Don't linger the buffer around if we are idle.
buffer = null;
}
assert this.buffer == buffer;
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(buffer);
this.buffer = null;
}
private boolean readAndParse()
private void process(ByteBuffer buffer)
{
HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
ByteBuffer buffer = this.buffer;
while (true)
try
{
try
HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
while (true)
{
// Connection may be closed in a parser callback.
if (connection.isClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed", connection);
return true;
releaseBuffer(buffer);
return;
}
if (!parse(buffer))
return false;
return;
int read = endPoint.fill(buffer);
// Avoid boxing of variable 'read'
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes from {}", read, endPoint);
LOG.debug("Read {} bytes {} from {}", read, BufferUtil.toDetailString(buffer), endPoint);
if (read > 0)
{
if (!parse(buffer))
return false;
return;
}
else if (read == 0)
{
releaseBuffer(buffer);
fillInterested();
return true;
return;
}
else
{
releaseBuffer(buffer);
shutdown();
return true;
return;
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
failAndClose(x);
return true;
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
releaseBuffer(buffer);
failAndClose(x);
}
}
@ -139,17 +147,20 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
// Must parse even if the buffer is fully consumed, to allow the
// parser to advance from asynchronous content to response complete.
if (parser.parseNext(buffer))
{
// If the parser returns true, we need to differentiate two cases:
// A) the response is completed, so the parser is in START state;
// B) the content is handled asynchronously, so the parser is in CONTENT state.
return parser.isStart();
}
return true;
boolean handle = parser.parseNext(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Parsed {}, remaining {} {}", handle, buffer.remaining(), parser);
if (!handle)
return true;
// If the parser returns true, we need to differentiate two cases:
// A) the response is completed, so the parser is in START state;
// B) the content is handled asynchronously, so the parser is in CONTENT state.
return parser.isStart();
}
private void fillInterested()
protected void fillInterested()
{
getHttpChannel().getHttpConnection().fillInterested();
}
@ -233,7 +244,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
if (LOG.isDebugEnabled())
LOG.debug("Content consumed asynchronously, resuming processing");
process();
process(getResponseBuffer());
}
public void abort(Throwable x)

View File

@ -200,4 +200,50 @@ public class HttpReceiverOverHTTPTest
Assert.assertTrue(e.getCause() instanceof HttpResponseException);
}
}
@Test
public void test_FillInterested_RacingWith_BufferRelease() throws Exception
{
connection = new HttpConnectionOverHTTP(endPoint, destination)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()
{
return new HttpChannelOverHTTP(this)
{
@Override
protected HttpReceiverOverHTTP newHttpReceiver()
{
return new HttpReceiverOverHTTP(this)
{
@Override
protected void fillInterested()
{
// Verify that the buffer has been released
// before fillInterested() is called.
Assert.assertNull(getResponseBuffer());
// Fill the endpoint so receive is called again.
endPoint.setInput("X");
super.fillInterested();
}
};
}
};
}
};
// Partial response to trigger the call to fillInterested().
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-Length: 1\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
}
}