377587 ConnectHandler write will block on partial write

This commit is contained in:
Greg Wilkins 2012-04-25 11:19:43 +10:00
parent c4416e2ed7
commit b5d377ab26
3 changed files with 67 additions and 48 deletions

View File

@ -43,7 +43,7 @@ import org.eclipse.jetty.util.thread.ThreadPool;
*/ */
public class ConnectHandler extends HandlerWrapper public class ConnectHandler extends HandlerWrapper
{ {
private final Logger _logger = Log.getLogger(getClass().getName()); private static final Logger LOG = Log.getLogger(ConnectHandler.class);
private final SelectorManager _selectorManager = new Manager(); private final SelectorManager _selectorManager = new Manager();
private volatile int _connectTimeout = 5000; private volatile int _connectTimeout = 5000;
private volatile int _writeTimeout = 30000; private volatile int _writeTimeout = 30000;
@ -171,15 +171,15 @@ public class ConnectHandler extends HandlerWrapper
{ {
if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod())) if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
{ {
_logger.debug("CONNECT request for {}", request.getRequestURI()); LOG.debug("CONNECT request for {}", request.getRequestURI());
try try
{ {
handleConnect(baseRequest, request, response, request.getRequestURI()); handleConnect(baseRequest, request, response, request.getRequestURI());
} }
catch(Exception e) catch(Exception e)
{ {
_logger.warn("ConnectHandler "+baseRequest.getUri()+" "+ e); LOG.warn("ConnectHandler "+baseRequest.getUri()+" "+ e);
_logger.debug(e); LOG.debug(e);
} }
} }
else else
@ -217,7 +217,7 @@ public class ConnectHandler extends HandlerWrapper
if (!validateDestination(host)) if (!validateDestination(host))
{ {
_logger.info("ProxyHandler: Forbidden destination " + host); LOG.info("ProxyHandler: Forbidden destination " + host);
response.setStatus(HttpServletResponse.SC_FORBIDDEN); response.setStatus(HttpServletResponse.SC_FORBIDDEN);
baseRequest.setHandled(true); baseRequest.setHandled(true);
return; return;
@ -326,22 +326,22 @@ public class ConnectHandler extends HandlerWrapper
try try
{ {
// Connect to remote server // Connect to remote server
_logger.debug("Establishing connection to {}:{}", host, port); LOG.debug("Establishing connection to {}:{}", host, port);
channel.socket().setTcpNoDelay(true); channel.socket().setTcpNoDelay(true);
channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout()); channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
_logger.debug("Established connection to {}:{}", host, port); LOG.debug("Established connection to {}:{}", host, port);
return channel; return channel;
} }
catch (IOException x) catch (IOException x)
{ {
_logger.debug("Failed to establish connection to " + host + ":" + port, x); LOG.debug("Failed to establish connection to " + host + ":" + port, x);
try try
{ {
channel.close(); channel.close();
} }
catch (IOException xx) catch (IOException xx)
{ {
_logger.ignore(xx); LOG.ignore(xx);
} }
throw x; throw x;
} }
@ -357,7 +357,7 @@ public class ConnectHandler extends HandlerWrapper
// so that Jetty understands that it has to upgrade the connection // so that Jetty understands that it has to upgrade the connection
request.setAttribute("org.eclipse.jetty.io.Connection", connection); request.setAttribute("org.eclipse.jetty.io.Connection", connection);
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
_logger.debug("Upgraded connection to {}", connection); LOG.debug("Upgraded connection to {}", connection);
} }
private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
@ -396,24 +396,27 @@ public class ConnectHandler extends HandlerWrapper
return 0; return 0;
int length = buffer.length(); int length = buffer.length();
StringBuilder builder = new StringBuilder(); final StringBuilder debug = LOG.isDebugEnabled()?new StringBuilder():null;
int written = endPoint.flush(buffer); int flushed = endPoint.flush(buffer);
builder.append(written); if (debug!=null)
buffer.compact(); debug.append(flushed);
if (!endPoint.isBlocking())
// Loop until all written
while (buffer.length()>0 && !endPoint.isOutputShutdown())
{ {
while (buffer.space() == 0) if (!endPoint.isBlocking())
{ {
boolean ready = endPoint.blockWritable(getWriteTimeout()); boolean ready = endPoint.blockWritable(getWriteTimeout());
if (!ready) if (!ready)
throw new IOException("Write timeout"); throw new IOException("Write timeout");
written = endPoint.flush(buffer);
builder.append("+").append(written);
buffer.compact();
} }
flushed = endPoint.flush(buffer);
if (debug!=null)
debug.append("+").append(flushed);
} }
_logger.debug("Written {}/{} bytes {}", builder, length, endPoint);
LOG.debug("Written {}/{} bytes {}", debug, length, endPoint);
buffer.compact();
return length; return length;
} }
@ -466,7 +469,7 @@ public class ConnectHandler extends HandlerWrapper
public class ProxyToServerConnection implements AsyncConnection public class ProxyToServerConnection implements AsyncConnection
{ {
private final CountDownLatch _ready = new CountDownLatch(1); private final CountDownLatch _ready = new CountDownLatch(1);
private final Buffer _buffer = new IndirectNIOBuffer(1024); private final Buffer _buffer = new IndirectNIOBuffer(4096);
private final ConcurrentMap<String, Object> _context; private final ConcurrentMap<String, Object> _context;
private volatile Buffer _data; private volatile Buffer _data;
private volatile ClientToProxyConnection _toClient; private volatile ClientToProxyConnection _toClient;
@ -490,7 +493,7 @@ public class ConnectHandler extends HandlerWrapper
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
_logger.debug("{}: begin reading from server", this); LOG.debug("{}: begin reading from server", this);
try try
{ {
writeData(); writeData();
@ -501,7 +504,7 @@ public class ConnectHandler extends HandlerWrapper
if (read == -1) if (read == -1)
{ {
_logger.debug("{}: server closed connection {}", this, _endPoint); LOG.debug("{}: server closed connection {}", this, _endPoint);
if (_endPoint.isOutputShutdown() || !_endPoint.isOpen()) if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
closeClient(); closeClient();
@ -514,32 +517,32 @@ public class ConnectHandler extends HandlerWrapper
if (read == 0) if (read == 0)
break; break;
_logger.debug("{}: read from server {} bytes {}", this, read, _endPoint); LOG.debug("{}: read from server {} bytes {}", this, read, _endPoint);
int written = write(_toClient._endPoint, _buffer, _context); int written = write(_toClient._endPoint, _buffer, _context);
_logger.debug("{}: written to {} {} bytes", this, _toClient, written); LOG.debug("{}: written to {} {} bytes", this, _toClient, written);
} }
return this; return this;
} }
catch (ClosedChannelException x) catch (ClosedChannelException x)
{ {
_logger.debug(x); LOG.debug(x);
throw x; throw x;
} }
catch (IOException x) catch (IOException x)
{ {
_logger.warn(this + ": unexpected exception", x); LOG.warn(this + ": unexpected exception", x);
close(); close();
throw x; throw x;
} }
catch (RuntimeException x) catch (RuntimeException x)
{ {
_logger.warn(this + ": unexpected exception", x); LOG.warn(this + ": unexpected exception", x);
close(); close();
throw x; throw x;
} }
finally finally
{ {
_logger.debug("{}: end reading from server", this); LOG.debug("{}: end reading from server", this);
} }
} }
@ -560,7 +563,7 @@ public class ConnectHandler extends HandlerWrapper
try try
{ {
int written = write(_endPoint, _data, _context); int written = write(_endPoint, _data, _context);
_logger.debug("{}: written to server {} bytes", this, written); LOG.debug("{}: written to server {} bytes", this, written);
} }
finally finally
{ {
@ -645,7 +648,7 @@ public class ConnectHandler extends HandlerWrapper
} }
catch (IOException x) catch (IOException x)
{ {
_logger.debug(this + ": unexpected exception closing the client", x); LOG.debug(this + ": unexpected exception closing the client", x);
} }
try try
@ -654,7 +657,7 @@ public class ConnectHandler extends HandlerWrapper
} }
catch (IOException x) catch (IOException x)
{ {
_logger.debug(this + ": unexpected exception closing the server", x); LOG.debug(this + ": unexpected exception closing the server", x);
} }
} }
@ -672,7 +675,7 @@ public class ConnectHandler extends HandlerWrapper
} }
catch(Exception e) catch(Exception e)
{ {
_logger.debug(e); LOG.debug(e);
close(); close();
} }
} }
@ -680,7 +683,7 @@ public class ConnectHandler extends HandlerWrapper
public class ClientToProxyConnection implements AsyncConnection public class ClientToProxyConnection implements AsyncConnection
{ {
private final Buffer _buffer = new IndirectNIOBuffer(1024); private final Buffer _buffer = new IndirectNIOBuffer(4096);
private final ConcurrentMap<String, Object> _context; private final ConcurrentMap<String, Object> _context;
private final SocketChannel _channel; private final SocketChannel _channel;
private final EndPoint _endPoint; private final EndPoint _endPoint;
@ -707,14 +710,14 @@ public class ConnectHandler extends HandlerWrapper
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
_logger.debug("{}: begin reading from client", this); LOG.debug("{}: begin reading from client", this);
try try
{ {
if (_firstTime) if (_firstTime)
{ {
_firstTime = false; _firstTime = false;
register(_channel, _toServer); register(_channel, _toServer);
_logger.debug("{}: registered channel {} with connection {}", this, _channel, _toServer); LOG.debug("{}: registered channel {} with connection {}", this, _channel, _toServer);
} }
while (true) while (true)
@ -723,7 +726,7 @@ public class ConnectHandler extends HandlerWrapper
if (read == -1) if (read == -1)
{ {
_logger.debug("{}: client closed connection {}", this, _endPoint); LOG.debug("{}: client closed connection {}", this, _endPoint);
if (_endPoint.isOutputShutdown() || !_endPoint.isOpen()) if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
closeServer(); closeServer();
@ -736,33 +739,33 @@ public class ConnectHandler extends HandlerWrapper
if (read == 0) if (read == 0)
break; break;
_logger.debug("{}: read from client {} bytes {}", this, read, _endPoint); LOG.debug("{}: read from client {} bytes {}", this, read, _endPoint);
int written = write(_toServer._endPoint, _buffer, _context); int written = write(_toServer._endPoint, _buffer, _context);
_logger.debug("{}: written to {} {} bytes", this, _toServer, written); LOG.debug("{}: written to {} {} bytes", this, _toServer, written);
} }
return this; return this;
} }
catch (ClosedChannelException x) catch (ClosedChannelException x)
{ {
_logger.debug(x); LOG.debug(x);
closeServer(); closeServer();
throw x; throw x;
} }
catch (IOException x) catch (IOException x)
{ {
_logger.warn(this + ": unexpected exception", x); LOG.warn(this + ": unexpected exception", x);
close(); close();
throw x; throw x;
} }
catch (RuntimeException x) catch (RuntimeException x)
{ {
_logger.warn(this + ": unexpected exception", x); LOG.warn(this + ": unexpected exception", x);
close(); close();
throw x; throw x;
} }
finally finally
{ {
_logger.debug("{}: end reading from client", this); LOG.debug("{}: end reading from client", this);
} }
} }
@ -813,7 +816,7 @@ public class ConnectHandler extends HandlerWrapper
} }
catch (IOException x) catch (IOException x)
{ {
_logger.debug(this + ": unexpected exception closing the client", x); LOG.debug(this + ": unexpected exception closing the client", x);
} }
try try
@ -822,7 +825,7 @@ public class ConnectHandler extends HandlerWrapper
} }
catch (IOException x) catch (IOException x)
{ {
_logger.debug(this + ": unexpected exception closing the server", x); LOG.debug(this + ": unexpected exception closing the server", x);
} }
} }
@ -839,7 +842,7 @@ public class ConnectHandler extends HandlerWrapper
} }
catch(Exception e) catch(Exception e)
{ {
_logger.debug(e); LOG.debug(e);
close(); close();
} }
} }

View File

@ -89,11 +89,12 @@ public abstract class AbstractConnectHandlerTest
headers.put(headerName.toLowerCase(), headerValue.toLowerCase()); headers.put(headerName.toLowerCase(), headerValue.toLowerCase());
} }
StringBuilder body = new StringBuilder(); StringBuilder body;
if (headers.containsKey("content-length")) if (headers.containsKey("content-length"))
{ {
int readLen = 0; int readLen = 0;
int length = Integer.parseInt(headers.get("content-length")); int length = Integer.parseInt(headers.get("content-length"));
body=new StringBuilder(length);
try try
{ {
for (int i = 0; i < length; ++i) for (int i = 0; i < length; ++i)
@ -101,7 +102,9 @@ public abstract class AbstractConnectHandlerTest
char c = (char)reader.read(); char c = (char)reader.read();
body.append(c); body.append(c);
readLen++; readLen++;
} }
} }
catch (SocketTimeoutException e) catch (SocketTimeoutException e)
{ {
@ -111,6 +114,7 @@ public abstract class AbstractConnectHandlerTest
} }
else if ("chunked".equals(headers.get("transfer-encoding"))) else if ("chunked".equals(headers.get("transfer-encoding")))
{ {
body = new StringBuilder(64*1024);
while ((line = reader.readLine()) != null) while ((line = reader.readLine()) != null)
{ {
if ("0".equals(line)) if ("0".equals(line))
@ -120,6 +124,15 @@ public abstract class AbstractConnectHandlerTest
break; break;
} }
try
{
Thread.sleep(5);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
int length = Integer.parseInt(line, 16); int length = Integer.parseInt(line, 16);
for (int i = 0; i < length; ++i) for (int i = 0; i < length; ++i)
{ {
@ -130,6 +143,7 @@ public abstract class AbstractConnectHandlerTest
assertEquals("", line); assertEquals("", line);
} }
} }
else throw new IllegalStateException();
return new Response(code, headers, body.toString().trim()); return new Response(code, headers, body.toString().trim());
} }

View File

@ -19,6 +19,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.util.log.Log;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -354,6 +355,7 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest
@Test @Test
public void testCONNECTAndPOSTWithBigBody() throws Exception public void testCONNECTAndPOSTWithBigBody() throws Exception
{ {
// Log.getLogger(ConnectHandler.class).setDebugEnabled(true);
String hostPort = "localhost:" + serverConnector.getLocalPort(); String hostPort = "localhost:" + serverConnector.getLocalPort();
String request = "" + String request = "" +
"CONNECT " + hostPort + " HTTP/1.1\r\n" + "CONNECT " + hostPort + " HTTP/1.1\r\n" +