414972 - HttpClient may read bytes with pre-tunnelled connection.

Now the receiver checks whether the connection is closed, and returns
immediately if it is without "stealing" the bytes to the tunnelled
connection.
This commit is contained in:
Simone Bordet 2013-08-13 16:00:54 +02:00
parent 9a7b0f5c10
commit 759c7096b2
3 changed files with 44 additions and 26 deletions

View File

@ -53,7 +53,7 @@ public class HttpConnection extends AbstractConnection implements Connection
private final HttpSender sender; private final HttpSender sender;
private final HttpReceiver receiver; private final HttpReceiver receiver;
private long idleTimeout; private long idleTimeout;
private volatile boolean closed; private boolean closed;
public HttpConnection(HttpClient client, EndPoint endPoint, HttpDestination destination) public HttpConnection(HttpClient client, EndPoint endPoint, HttpDestination destination)
{ {
@ -88,15 +88,9 @@ public class HttpConnection extends AbstractConnection implements Connection
super.onClose(); super.onClose();
} }
@Override protected boolean isClosed()
public void fillInterested()
{ {
// This is necessary when "upgrading" the connection for example after proxied return closed;
// CONNECT requests, because the old connection will read the CONNECT response
// and then set the read interest, while the new connection attached to the same
// EndPoint also will set the read interest, causing a ReadPendingException.
if (!closed)
super.fillInterested();
} }
@Override @Override

View File

@ -428,6 +428,19 @@ public class HttpDestination implements Destination, Closeable, Dumpable
getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause)); getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
} }
protected void tunnelSucceeded(Connection connection, Promise<Connection> promise)
{
// Wrap the connection with TLS
Connection tunnel = client.tunnel(connection);
promise.succeeded(tunnel);
}
protected void tunnelFailed(Connection connection, Promise<Connection> promise, Throwable failure)
{
promise.failed(failure);
connection.close();
}
@Override @Override
public String dump() public String dump()
{ {
@ -516,22 +529,18 @@ public class HttpDestination implements Destination, Closeable, Dumpable
{ {
if (result.isFailed()) if (result.isFailed())
{ {
failed(result.getFailure()); tunnelFailed(connection, delegate, result.getFailure());
connection.close();
} }
else else
{ {
Response response = result.getResponse(); Response response = result.getResponse();
if (response.getStatus() == 200) if (response.getStatus() == 200)
{ {
// Wrap the connection with TLS tunnelSucceeded(connection, delegate);
Connection tunnel = client.tunnel(connection);
delegate.succeeded(tunnel);
} }
else else
{ {
failed(new HttpResponseException("Received " + response + " for " + result.getRequest(), response)); tunnelFailed(connection, delegate, new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
connection.close();
} }
} }
} }

View File

@ -67,6 +67,14 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
try try
{ {
while (true) while (true)
{
// Connection may be closed in a parser callback
if (connection.isClosed())
{
LOG.debug("{} closed", connection);
break;
}
else
{ {
int read = endPoint.fill(buffer); int read = endPoint.fill(buffer);
LOG.debug("Read {} bytes from {}", read, connection); LOG.debug("Read {} bytes from {}", read, connection);
@ -86,6 +94,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
} }
} }
} }
}
catch (EofException x) catch (EofException x)
{ {
LOG.ignore(x); LOG.ignore(x);
@ -415,6 +424,12 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
return updated; return updated;
} }
@Override
public String toString()
{
return String.format("%s@%x on %s", getClass().getSimpleName(), hashCode(), connection);
}
private enum State private enum State
{ {
IDLE, RECEIVE, FAILURE IDLE, RECEIVE, FAILURE