From 2daf72ff1e51b27db893f8fe128debc135fc6928 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 13 Apr 2010 16:13:57 +0000 Subject: [PATCH] 297104 use server threadpool git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1508 7e9141cc-0065-0410-87d8-b60c137991c4 --- .../java/org/eclipse/jetty/server/Server.java | 16 +- .../jetty/server/handler/ProxyHandler.java | 219 +++++++++++------- .../handler/ProxyHandlerConnectTest.java | 1 + 3 files changed, 142 insertions(+), 94 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index 1708c157cd6..344f4463ef2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -197,7 +197,13 @@ public class Server extends HandlerWrapper implements Attributes HttpGenerator.setServerVersion(_version); MultiException mex=new MultiException(); - Iterator itor = _dependentBeans.iterator(); + if (_threadPool==null) + { + QueuedThreadPool tp=new QueuedThreadPool(); + setThreadPool(tp); + } + + Iterator itor = _dependentBeans.iterator(); while (itor.hasNext()) { try @@ -209,12 +215,6 @@ public class Server extends HandlerWrapper implements Attributes catch (Throwable e) {mex.add(e);} } - if (_threadPool==null) - { - QueuedThreadPool tp=new QueuedThreadPool(); - setThreadPool(tp); - } - if (_sessionIdManager!=null) _sessionIdManager.start(); @@ -298,7 +298,7 @@ public class Server extends HandlerWrapper implements Attributes if (!_dependentBeans.isEmpty()) { - ListIterator itor = _dependentBeans.listIterator(_dependentBeans.size()); + ListIterator itor = _dependentBeans.listIterator(_dependentBeans.size()); while (itor.hasPrevious()) { try diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ProxyHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ProxyHandler.java index 37cf0c438b9..a3c2708ff01 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ProxyHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ProxyHandler.java @@ -21,21 +21,25 @@ import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.nio.SelectorManager; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ThreadPool; /** * @version $Revision$ $Date$ */ public class ProxyHandler extends AbstractHandler { - private final Logger logger = Log.getLogger(getClass().getName()); - private final SelectorManager selectorManager = new Manager(); - private final String serverAddress; - private volatile int connectTimeout = 5000; - private volatile int writeTimeout = 30000; - private volatile QueuedThreadPool threadPool; + private final Logger _logger = Log.getLogger(getClass().getName()); + private final SelectorManager _selectorManager = new Manager(); + private final String _serverAddress; + private volatile int _connectTimeout = 5000; + private volatile int _writeTimeout = 30000; + private volatile ThreadPool _threadPool; + private volatile ThreadPool _privateThreadPool; public ProxyHandler() { @@ -44,39 +48,72 @@ public class ProxyHandler extends AbstractHandler public ProxyHandler(String serverAddress) { - this.serverAddress = serverAddress; + _serverAddress = serverAddress; } public int getConnectTimeout() { - return connectTimeout; + return _connectTimeout; } public void setConnectTimeout(int connectTimeout) { - this.connectTimeout = connectTimeout; + _connectTimeout = connectTimeout; } public int getWriteTimeout() { - return writeTimeout; + return _writeTimeout; } public void setWriteTimeout(int writeTimeout) { - this.writeTimeout = writeTimeout; + _writeTimeout = writeTimeout; + } + + @Override + public void setServer(Server server) + { + super.setServer(server); + + server.getContainer().update(this,null,_selectorManager,"selectManager"); + + if (_privateThreadPool!=null) + server.getContainer().update(this,null,_privateThreadPool,"threadpool",true); + else + _threadPool=server.getThreadPool(); + } + + /** Get the threadPool. + * @return the threadPool + */ + public ThreadPool getThreadPool() + { + return _threadPool; + } + + /** Set the threadPool. + * @param threadPool the threadPool to set + */ + public void setThreadPool(ThreadPool threadPool) + { + if (getServer()!=null) + getServer().getContainer().update(this,_privateThreadPool,threadPool,"threadpool",true); + _threadPool=_privateThreadPool=threadPool; } @Override protected void doStart() throws Exception { super.doStart(); - // TODO: configure threadPool - threadPool = new QueuedThreadPool(); - threadPool.start(); + + if (_threadPool==null) + _threadPool=getServer().getThreadPool(); + if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning()) + ((LifeCycle)_threadPool).start(); - selectorManager.start(); - threadPool.dispatch(new Runnable() + _selectorManager.start(); + _threadPool.dispatch(new Runnable() { public void run() { @@ -84,11 +121,11 @@ public class ProxyHandler extends AbstractHandler { try { - selectorManager.doSelect(0); + _selectorManager.doSelect(0); } catch (IOException x) { - logger.warn("Unexpected exception", x); + _logger.warn("Unexpected exception", x); } } } @@ -98,11 +135,11 @@ public class ProxyHandler extends AbstractHandler @Override protected void doStop() throws Exception { - selectorManager.stop(); + _selectorManager.stop(); - QueuedThreadPool threadPool = this.threadPool; - if (threadPool != null) - threadPool.stop(); + ThreadPool threadPool = _threadPool; + if (threadPool != null && threadPool instanceof LifeCycle) + ((LifeCycle)threadPool).stop(); super.doStop(); } @@ -111,15 +148,15 @@ public class ProxyHandler extends AbstractHandler { if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod())) { - logger.info("CONNECT request for {}", request.getRequestURI(), null); + _logger.info("CONNECT request for {}", request.getRequestURI(), null); handle(request, response, request.getRequestURI()); } else { - logger.info("Plain request for {}", serverAddress, null); - if (serverAddress == null) + _logger.info("Plain request for {}", _serverAddress, null); + if (_serverAddress == null) throw new ServletException("Parameter 'serverAddress' cannot be null"); - handle(request, response, serverAddress); + handle(request, response, _serverAddress); } } @@ -196,22 +233,32 @@ public class ProxyHandler extends AbstractHandler // Setup connections, before registering the channel to avoid races // where the server sends data before the connections are set up - ProxyToServerConnection proxyToServer = new ProxyToServerConnection(secure, buffer); - ClientToProxyConnection clientToProxy = new ClientToProxyConnection(channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp()); + ProxyToServerConnection proxyToServer = newProxyToServerConnection(secure, buffer); + ClientToProxyConnection clientToProxy = newClientToProxyConnection(channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp()); clientToProxy.setConnection(proxyToServer); proxyToServer.setConnection(clientToProxy); upgradeConnection(request, response, clientToProxy); } + protected ClientToProxyConnection newClientToProxyConnection(SocketChannel channel, EndPoint endPoint, long timeStamp) + { + return new ClientToProxyConnection(channel, endPoint, timeStamp); + } + + protected ProxyToServerConnection newProxyToServerConnection(boolean secure, IndirectNIOBuffer buffer) + { + return new ProxyToServerConnection(secure, buffer); + } + protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException { - logger.info("Establishing connection to {}:{}", host, port); + _logger.info("Establishing connection to {}:{}", host, port); // Connect to remote server SocketChannel channel = SocketChannel.open(); channel.socket().setTcpNoDelay(true); channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout()); - logger.info("Established connection to {}:{}", host, port); + _logger.info("Established connection to {}:{}", host, port); return channel; } @@ -225,13 +272,13 @@ public class ProxyHandler extends AbstractHandler // so that Jetty understands that it has to upgrade the connection request.setAttribute("org.eclipse.jetty.io.Connection", connection); response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); - logger.info("Upgraded connection to {}", connection, null); + _logger.info("Upgraded connection to {}", connection, null); } private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException { - selectorManager.register(channel, proxyToServer); - proxyToServer.waitReady(connectTimeout); + _selectorManager.register(channel, proxyToServer); + proxyToServer.waitReady(_connectTimeout); } /** @@ -263,7 +310,7 @@ public class ProxyHandler extends AbstractHandler buffer.compact(); } } - logger.info("Written {}/{} bytes " + endPoint, builder, length); + _logger.info("Written {}/{} bytes " + endPoint, builder, length); } private class Manager extends SelectorManager @@ -279,7 +326,7 @@ public class ProxyHandler extends AbstractHandler protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException { ProxyToServerConnection proxyToServer = (ProxyToServerConnection)selectionKey.attachment(); - if (proxyToServer.secure) + if (proxyToServer._secure) { throw new UnsupportedOperationException(); // return new SslSelectChannelEndPoint(???, channel, selectSet, selectionKey, sslContext.createSSLEngine(address.host, address.port)); @@ -309,7 +356,7 @@ public class ProxyHandler extends AbstractHandler @Override public boolean dispatch(Runnable task) { - return threadPool.dispatch(task); + return _threadPool.dispatch(task); } @Override @@ -323,70 +370,70 @@ public class ProxyHandler extends AbstractHandler } } - private class ProxyToServerConnection implements Connection + public class ProxyToServerConnection implements Connection { - private final CountDownLatch ready = new CountDownLatch(1); - private final Buffer buffer = new IndirectNIOBuffer(1024); - private final boolean secure; - private volatile Buffer data; - private volatile ClientToProxyConnection connection; - private volatile long timestamp; - private volatile SelectChannelEndPoint endPoint; + private final CountDownLatch _ready = new CountDownLatch(1); + private final Buffer _buffer = new IndirectNIOBuffer(1024); + private final boolean _secure; + private volatile Buffer _data; + private volatile ClientToProxyConnection _toClient; + private volatile long _timestamp; + private volatile SelectChannelEndPoint _endPoint; public ProxyToServerConnection(boolean secure, Buffer data) { - this.secure = secure; - this.data = data; + _secure = secure; + _data = data; } public Connection handle() throws IOException { - logger.info("ProxyToServer: handle entered"); - if (data != null) + _logger.info("ProxyToServer: handle entered"); + if (_data != null) { - write(endPoint, data); - data = null; + write(_endPoint, _data); + _data = null; } while (true) { - int read = endPoint.fill(buffer); + int read = _endPoint.fill(_buffer); if (read == -1) { - logger.info("ProxyToServer: closed connection {}", endPoint, null); - connection.close(); + _logger.info("ProxyToServer: closed connection {}", _endPoint, null); + _toClient.close(); break; } if (read == 0) break; - logger.info("ProxyToServer: read {} bytes {}", read, endPoint); - write(connection.endPoint, buffer); + _logger.info("ProxyToServer: read {} bytes {}", read, _endPoint); + write(_toClient._endPoint, _buffer); } - logger.info("ProxyToServer: handle exited"); + _logger.info("ProxyToServer: handle exited"); return this; } public void setConnection(ClientToProxyConnection connection) { - this.connection = connection; + _toClient = connection; } public long getTimeStamp() { - return timestamp; + return _timestamp; } public void setTimeStamp(long timestamp) { - this.timestamp = timestamp; + _timestamp = timestamp; } public void setEndPoint(SelectChannelEndPoint endpoint) { - this.endPoint = endpoint; + _endPoint = endpoint; } public boolean isIdle() @@ -401,14 +448,14 @@ public class ProxyHandler extends AbstractHandler public void ready() { - ready.countDown(); + _ready.countDown(); } public void waitReady(long timeout) throws IOException { try { - ready.await(timeout, TimeUnit.MILLISECONDS); + _ready.await(timeout, TimeUnit.MILLISECONDS); } catch (final InterruptedException x) { @@ -418,60 +465,60 @@ public class ProxyHandler extends AbstractHandler public void close() throws IOException { - endPoint.close(); + _endPoint.close(); } } - private class ClientToProxyConnection implements Connection + public class ClientToProxyConnection implements Connection { - private final Buffer buffer = new IndirectNIOBuffer(1024); - private final SocketChannel channel; - private final EndPoint endPoint; - private final long timestamp; - private volatile ProxyToServerConnection connection; - private boolean firstTime = true; + private final Buffer _buffer = new IndirectNIOBuffer(1024); + private final SocketChannel _channel; + private final EndPoint _endPoint; + private final long _timestamp; + private volatile ProxyToServerConnection _toServer; + private boolean _firstTime = true; public ClientToProxyConnection(SocketChannel channel, EndPoint endPoint, long timestamp) { - this.channel = channel; - this.endPoint = endPoint; - this.timestamp = timestamp; + _channel = channel; + _endPoint = endPoint; + _timestamp = timestamp; } public Connection handle() throws IOException { - logger.info("ClientToProxy: handle entered"); + _logger.info("ClientToProxy: handle entered"); - if (firstTime) + if (_firstTime) { - firstTime = false; - register(channel, connection); + _firstTime = false; + register(_channel, _toServer); } while (true) { - int read = endPoint.fill(buffer); + int read = _endPoint.fill(_buffer); if (read == -1) { - logger.info("ClientToProxy: closed connection {}", endPoint, null); - connection.close(); + _logger.info("ClientToProxy: closed connection {}", _endPoint, null); + _toServer.close(); break; } if (read == 0) break; - logger.info("ClientToProxy: read {} bytes {}", read, endPoint); - write(connection.endPoint, buffer); + _logger.info("ClientToProxy: read {} bytes {}", read, _endPoint); + write(_toServer._endPoint, _buffer); } - logger.info("ClientToProxy: handle exited"); + _logger.info("ClientToProxy: handle exited"); return this; } public long getTimeStamp() { - return timestamp; + return _timestamp; } public boolean isIdle() @@ -486,12 +533,12 @@ public class ProxyHandler extends AbstractHandler public void setConnection(ProxyToServerConnection connection) { - this.connection = connection; + _toServer = connection; } public void close() throws IOException { - endPoint.close(); + _endPoint.close(); } } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ProxyHandlerConnectTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ProxyHandlerConnectTest.java index 3f40fc03016..7842278d2cc 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ProxyHandlerConnectTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ProxyHandlerConnectTest.java @@ -22,6 +22,7 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.util.log.Log; /** * @version $Revision$ $Date$