Merged branch 'master' into 'jetty-9.1'.

This commit is contained in:
Simone Bordet 2013-08-13 16:25:04 +02:00
commit 6019a37064
3 changed files with 51 additions and 28 deletions

View File

@ -236,6 +236,18 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause)); getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
} }
protected void tunnelSucceeded(Connection connection, Promise<Connection> promise)
{
// Wrap the connection with TLS
promise.succeeded(client.getTransport().tunnel(connection));
}
protected void tunnelFailed(Connection connection, Promise<Connection> promise, Throwable failure)
{
promise.failed(failure);
connection.close();
}
@Override @Override
public String dump() public String dump()
{ {
@ -321,22 +333,18 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
{ {
if (result.isFailed()) if (result.isFailed())
{ {
failed(result.getFailure()); tunnelFailed(connection, delegate, result.getFailure());
connection.close();
} }
else else
{ {
Response response = result.getResponse(); Response response = result.getResponse();
if (response.getStatus() == 200) if (response.getStatus() == 200)
{ {
// Wrap the connection with TLS tunnelSucceeded(connection, delegate);
Connection tunnel = client.getTransport().tunnel(connection);
delegate.succeeded(tunnel);
} }
else else
{ {
failed(new HttpResponseException("Received " + response + " for " + result.getRequest(), response)); tunnelFailed(connection, delegate, new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
connection.close();
} }
} }
} }

View File

@ -36,8 +36,8 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
private final Delegate delegate; private final Delegate delegate;
private final HttpChannelOverHTTP channel; private final HttpChannelOverHTTP channel;
private volatile boolean closed; private boolean closed;
private volatile long idleTimeout; private long idleTimeout;
public HttpConnectionOverHTTP(HttpClient client, EndPoint endPoint, HttpDestination destination) public HttpConnectionOverHTTP(HttpClient client, EndPoint endPoint, HttpDestination destination)
{ {
@ -81,15 +81,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
super.onClose(); super.onClose();
} }
@Override protected boolean isClosed()
public void fillInterested()
{ {
// This is necessary when "upgrading" the connection for example after proxied return closed;
// CONNECT requests, because the old connection will read the CONNECT response
// and then set the read interest, while the new connection attached to the same
// EndPoint also will set the read interest, causing a ReadPendingException.
if (!closed)
super.fillInterested();
} }
@Override @Override

View File

@ -50,9 +50,15 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
return (HttpChannelOverHTTP)super.getHttpChannel(); return (HttpChannelOverHTTP)super.getHttpChannel();
} }
private HttpConnectionOverHTTP getHttpConnection()
{
return getHttpChannel().getHttpConnection();
}
public void receive() public void receive()
{ {
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint(); HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
HttpClient client = getHttpDestination().getHttpClient(); HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool(); ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true); ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
@ -60,21 +66,30 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{ {
while (true) while (true)
{ {
int read = endPoint.fill(buffer); // Connection may be closed in a parser callback
LOG.debug("Read {} bytes from {}", read, endPoint); if (connection.isClosed())
if (read > 0)
{ {
parse(buffer); LOG.debug("{} closed", connection);
}
else if (read == 0)
{
fillInterested();
break; break;
} }
else else
{ {
shutdown(); int read = endPoint.fill(buffer);
break; LOG.debug("Read {} bytes from {}", read, endPoint);
if (read > 0)
{
parse(buffer);
}
else if (read == 0)
{
fillInterested();
break;
}
else
{
shutdown();
break;
}
} }
} }
} }
@ -218,4 +233,10 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (responseFailure(failure)) if (responseFailure(failure))
getHttpChannel().getHttpConnection().close(); getHttpChannel().getHttpConnection().close();
} }
@Override
public String toString()
{
return String.format("%s@%x on %s", getClass().getSimpleName(), hashCode(), getHttpConnection());
}
} }