From b5d377ab2647caed8fa55cd9fb050bec3a359e47 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 25 Apr 2012 11:19:43 +1000 Subject: [PATCH] 377587 ConnectHandler write will block on partial write --- .../jetty/server/handler/ConnectHandler.java | 97 ++++++++++--------- .../handler/AbstractConnectHandlerTest.java | 16 ++- .../server/handler/ConnectHandlerTest.java | 2 + 3 files changed, 67 insertions(+), 48 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index 704f38d189d..cef2c5cc978 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -43,7 +43,7 @@ import org.eclipse.jetty.util.thread.ThreadPool; */ public class ConnectHandler extends HandlerWrapper { - private final Logger _logger = Log.getLogger(getClass().getName()); + private static final Logger LOG = Log.getLogger(ConnectHandler.class); private final SelectorManager _selectorManager = new Manager(); private volatile int _connectTimeout = 5000; private volatile int _writeTimeout = 30000; @@ -171,15 +171,15 @@ public class ConnectHandler extends HandlerWrapper { if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod())) { - _logger.debug("CONNECT request for {}", request.getRequestURI()); + LOG.debug("CONNECT request for {}", request.getRequestURI()); try { handleConnect(baseRequest, request, response, request.getRequestURI()); } catch(Exception e) { - _logger.warn("ConnectHandler "+baseRequest.getUri()+" "+ e); - _logger.debug(e); + LOG.warn("ConnectHandler "+baseRequest.getUri()+" "+ e); + LOG.debug(e); } } else @@ -217,7 +217,7 @@ public class ConnectHandler extends HandlerWrapper if (!validateDestination(host)) { - _logger.info("ProxyHandler: Forbidden destination " + host); + LOG.info("ProxyHandler: Forbidden destination " + host); response.setStatus(HttpServletResponse.SC_FORBIDDEN); baseRequest.setHandled(true); return; @@ -326,22 +326,22 @@ public class ConnectHandler extends HandlerWrapper try { // Connect to remote server - _logger.debug("Establishing connection to {}:{}", host, port); + LOG.debug("Establishing connection to {}:{}", host, port); channel.socket().setTcpNoDelay(true); channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout()); - _logger.debug("Established connection to {}:{}", host, port); + LOG.debug("Established connection to {}:{}", host, port); return channel; } catch (IOException x) { - _logger.debug("Failed to establish connection to " + host + ":" + port, x); + LOG.debug("Failed to establish connection to " + host + ":" + port, x); try { channel.close(); } catch (IOException xx) { - _logger.ignore(xx); + LOG.ignore(xx); } throw x; } @@ -357,7 +357,7 @@ public class ConnectHandler extends HandlerWrapper // so that Jetty understands that it has to upgrade the connection request.setAttribute("org.eclipse.jetty.io.Connection", connection); response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); - _logger.debug("Upgraded connection to {}", connection); + LOG.debug("Upgraded connection to {}", connection); } private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException @@ -396,24 +396,27 @@ public class ConnectHandler extends HandlerWrapper return 0; int length = buffer.length(); - StringBuilder builder = new StringBuilder(); - int written = endPoint.flush(buffer); - builder.append(written); - buffer.compact(); - if (!endPoint.isBlocking()) + final StringBuilder debug = LOG.isDebugEnabled()?new StringBuilder():null; + int flushed = endPoint.flush(buffer); + if (debug!=null) + debug.append(flushed); + + // Loop until all written + while (buffer.length()>0 && !endPoint.isOutputShutdown()) { - while (buffer.space() == 0) + if (!endPoint.isBlocking()) { boolean ready = endPoint.blockWritable(getWriteTimeout()); if (!ready) throw new IOException("Write timeout"); - - written = endPoint.flush(buffer); - builder.append("+").append(written); - buffer.compact(); } + flushed = endPoint.flush(buffer); + if (debug!=null) + debug.append("+").append(flushed); } - _logger.debug("Written {}/{} bytes {}", builder, length, endPoint); + + LOG.debug("Written {}/{} bytes {}", debug, length, endPoint); + buffer.compact(); return length; } @@ -466,7 +469,7 @@ public class ConnectHandler extends HandlerWrapper public class ProxyToServerConnection implements AsyncConnection { private final CountDownLatch _ready = new CountDownLatch(1); - private final Buffer _buffer = new IndirectNIOBuffer(1024); + private final Buffer _buffer = new IndirectNIOBuffer(4096); private final ConcurrentMap _context; private volatile Buffer _data; private volatile ClientToProxyConnection _toClient; @@ -490,7 +493,7 @@ public class ConnectHandler extends HandlerWrapper public Connection handle() throws IOException { - _logger.debug("{}: begin reading from server", this); + LOG.debug("{}: begin reading from server", this); try { writeData(); @@ -501,7 +504,7 @@ public class ConnectHandler extends HandlerWrapper if (read == -1) { - _logger.debug("{}: server closed connection {}", this, _endPoint); + LOG.debug("{}: server closed connection {}", this, _endPoint); if (_endPoint.isOutputShutdown() || !_endPoint.isOpen()) closeClient(); @@ -514,32 +517,32 @@ public class ConnectHandler extends HandlerWrapper if (read == 0) break; - _logger.debug("{}: read from server {} bytes {}", this, read, _endPoint); + LOG.debug("{}: read from server {} bytes {}", this, read, _endPoint); int written = write(_toClient._endPoint, _buffer, _context); - _logger.debug("{}: written to {} {} bytes", this, _toClient, written); + LOG.debug("{}: written to {} {} bytes", this, _toClient, written); } return this; } catch (ClosedChannelException x) { - _logger.debug(x); + LOG.debug(x); throw x; } catch (IOException x) { - _logger.warn(this + ": unexpected exception", x); + LOG.warn(this + ": unexpected exception", x); close(); throw x; } catch (RuntimeException x) { - _logger.warn(this + ": unexpected exception", x); + LOG.warn(this + ": unexpected exception", x); close(); throw x; } finally { - _logger.debug("{}: end reading from server", this); + LOG.debug("{}: end reading from server", this); } } @@ -560,7 +563,7 @@ public class ConnectHandler extends HandlerWrapper try { int written = write(_endPoint, _data, _context); - _logger.debug("{}: written to server {} bytes", this, written); + LOG.debug("{}: written to server {} bytes", this, written); } finally { @@ -645,7 +648,7 @@ public class ConnectHandler extends HandlerWrapper } catch (IOException x) { - _logger.debug(this + ": unexpected exception closing the client", x); + LOG.debug(this + ": unexpected exception closing the client", x); } try @@ -654,7 +657,7 @@ public class ConnectHandler extends HandlerWrapper } catch (IOException x) { - _logger.debug(this + ": unexpected exception closing the server", x); + LOG.debug(this + ": unexpected exception closing the server", x); } } @@ -672,7 +675,7 @@ public class ConnectHandler extends HandlerWrapper } catch(Exception e) { - _logger.debug(e); + LOG.debug(e); close(); } } @@ -680,7 +683,7 @@ public class ConnectHandler extends HandlerWrapper public class ClientToProxyConnection implements AsyncConnection { - private final Buffer _buffer = new IndirectNIOBuffer(1024); + private final Buffer _buffer = new IndirectNIOBuffer(4096); private final ConcurrentMap _context; private final SocketChannel _channel; private final EndPoint _endPoint; @@ -707,14 +710,14 @@ public class ConnectHandler extends HandlerWrapper public Connection handle() throws IOException { - _logger.debug("{}: begin reading from client", this); + LOG.debug("{}: begin reading from client", this); try { if (_firstTime) { _firstTime = false; register(_channel, _toServer); - _logger.debug("{}: registered channel {} with connection {}", this, _channel, _toServer); + LOG.debug("{}: registered channel {} with connection {}", this, _channel, _toServer); } while (true) @@ -723,7 +726,7 @@ public class ConnectHandler extends HandlerWrapper if (read == -1) { - _logger.debug("{}: client closed connection {}", this, _endPoint); + LOG.debug("{}: client closed connection {}", this, _endPoint); if (_endPoint.isOutputShutdown() || !_endPoint.isOpen()) closeServer(); @@ -736,33 +739,33 @@ public class ConnectHandler extends HandlerWrapper if (read == 0) break; - _logger.debug("{}: read from client {} bytes {}", this, read, _endPoint); + LOG.debug("{}: read from client {} bytes {}", this, read, _endPoint); int written = write(_toServer._endPoint, _buffer, _context); - _logger.debug("{}: written to {} {} bytes", this, _toServer, written); + LOG.debug("{}: written to {} {} bytes", this, _toServer, written); } return this; } catch (ClosedChannelException x) { - _logger.debug(x); + LOG.debug(x); closeServer(); throw x; } catch (IOException x) { - _logger.warn(this + ": unexpected exception", x); + LOG.warn(this + ": unexpected exception", x); close(); throw x; } catch (RuntimeException x) { - _logger.warn(this + ": unexpected exception", x); + LOG.warn(this + ": unexpected exception", x); close(); throw x; } finally { - _logger.debug("{}: end reading from client", this); + LOG.debug("{}: end reading from client", this); } } @@ -813,7 +816,7 @@ public class ConnectHandler extends HandlerWrapper } catch (IOException x) { - _logger.debug(this + ": unexpected exception closing the client", x); + LOG.debug(this + ": unexpected exception closing the client", x); } try @@ -822,7 +825,7 @@ public class ConnectHandler extends HandlerWrapper } catch (IOException x) { - _logger.debug(this + ": unexpected exception closing the server", x); + LOG.debug(this + ": unexpected exception closing the server", x); } } @@ -839,7 +842,7 @@ public class ConnectHandler extends HandlerWrapper } catch(Exception e) { - _logger.debug(e); + LOG.debug(e); close(); } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/AbstractConnectHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/AbstractConnectHandlerTest.java index 345e0ac13db..eb07c09aade 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/AbstractConnectHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/AbstractConnectHandlerTest.java @@ -89,11 +89,12 @@ public abstract class AbstractConnectHandlerTest headers.put(headerName.toLowerCase(), headerValue.toLowerCase()); } - StringBuilder body = new StringBuilder(); + StringBuilder body; if (headers.containsKey("content-length")) { int readLen = 0; int length = Integer.parseInt(headers.get("content-length")); + body=new StringBuilder(length); try { for (int i = 0; i < length; ++i) @@ -101,7 +102,9 @@ public abstract class AbstractConnectHandlerTest char c = (char)reader.read(); body.append(c); readLen++; + } + } catch (SocketTimeoutException e) { @@ -111,6 +114,7 @@ public abstract class AbstractConnectHandlerTest } else if ("chunked".equals(headers.get("transfer-encoding"))) { + body = new StringBuilder(64*1024); while ((line = reader.readLine()) != null) { if ("0".equals(line)) @@ -120,6 +124,15 @@ public abstract class AbstractConnectHandlerTest break; } + try + { + Thread.sleep(5); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + int length = Integer.parseInt(line, 16); for (int i = 0; i < length; ++i) { @@ -130,6 +143,7 @@ public abstract class AbstractConnectHandlerTest assertEquals("", line); } } + else throw new IllegalStateException(); return new Response(code, headers, body.toString().trim()); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java index d84453b03a0..da1f5c57075 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java @@ -19,6 +19,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.toolchain.test.OS; +import org.eclipse.jetty.util.log.Log; import org.junit.BeforeClass; import org.junit.Test; @@ -354,6 +355,7 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest @Test public void testCONNECTAndPOSTWithBigBody() throws Exception { + // Log.getLogger(ConnectHandler.class).setDebugEnabled(true); String hostPort = "localhost:" + serverConnector.getLocalPort(); String request = "" + "CONNECT " + hostPort + " HTTP/1.1\r\n" +