297104 use server threadpool

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1508 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2010-04-13 16:13:57 +00:00
parent 8c91c6c166
commit 2daf72ff1e
3 changed files with 142 additions and 94 deletions

View File

@ -197,7 +197,13 @@ public class Server extends HandlerWrapper implements Attributes
HttpGenerator.setServerVersion(_version); HttpGenerator.setServerVersion(_version);
MultiException mex=new MultiException(); MultiException mex=new MultiException();
Iterator itor = _dependentBeans.iterator(); if (_threadPool==null)
{
QueuedThreadPool tp=new QueuedThreadPool();
setThreadPool(tp);
}
Iterator<Object> itor = _dependentBeans.iterator();
while (itor.hasNext()) while (itor.hasNext())
{ {
try try
@ -209,12 +215,6 @@ public class Server extends HandlerWrapper implements Attributes
catch (Throwable e) {mex.add(e);} catch (Throwable e) {mex.add(e);}
} }
if (_threadPool==null)
{
QueuedThreadPool tp=new QueuedThreadPool();
setThreadPool(tp);
}
if (_sessionIdManager!=null) if (_sessionIdManager!=null)
_sessionIdManager.start(); _sessionIdManager.start();
@ -298,7 +298,7 @@ public class Server extends HandlerWrapper implements Attributes
if (!_dependentBeans.isEmpty()) if (!_dependentBeans.isEmpty())
{ {
ListIterator itor = _dependentBeans.listIterator(_dependentBeans.size()); ListIterator<Object> itor = _dependentBeans.listIterator(_dependentBeans.size());
while (itor.hasPrevious()) while (itor.hasPrevious())
{ {
try try

View File

@ -21,21 +21,25 @@ import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager; import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
/** /**
* @version $Revision$ $Date$ * @version $Revision$ $Date$
*/ */
public class ProxyHandler extends AbstractHandler public class ProxyHandler extends AbstractHandler
{ {
private final Logger logger = Log.getLogger(getClass().getName()); private final Logger _logger = Log.getLogger(getClass().getName());
private final SelectorManager selectorManager = new Manager(); private final SelectorManager _selectorManager = new Manager();
private final String serverAddress; private final String _serverAddress;
private volatile int connectTimeout = 5000; private volatile int _connectTimeout = 5000;
private volatile int writeTimeout = 30000; private volatile int _writeTimeout = 30000;
private volatile QueuedThreadPool threadPool; private volatile ThreadPool _threadPool;
private volatile ThreadPool _privateThreadPool;
public ProxyHandler() public ProxyHandler()
{ {
@ -44,39 +48,72 @@ public class ProxyHandler extends AbstractHandler
public ProxyHandler(String serverAddress) public ProxyHandler(String serverAddress)
{ {
this.serverAddress = serverAddress; _serverAddress = serverAddress;
} }
public int getConnectTimeout() public int getConnectTimeout()
{ {
return connectTimeout; return _connectTimeout;
} }
public void setConnectTimeout(int connectTimeout) public void setConnectTimeout(int connectTimeout)
{ {
this.connectTimeout = connectTimeout; _connectTimeout = connectTimeout;
} }
public int getWriteTimeout() public int getWriteTimeout()
{ {
return writeTimeout; return _writeTimeout;
} }
public void setWriteTimeout(int writeTimeout) public void setWriteTimeout(int writeTimeout)
{ {
this.writeTimeout = writeTimeout; _writeTimeout = writeTimeout;
}
@Override
public void setServer(Server server)
{
super.setServer(server);
server.getContainer().update(this,null,_selectorManager,"selectManager");
if (_privateThreadPool!=null)
server.getContainer().update(this,null,_privateThreadPool,"threadpool",true);
else
_threadPool=server.getThreadPool();
}
/** Get the threadPool.
* @return the threadPool
*/
public ThreadPool getThreadPool()
{
return _threadPool;
}
/** Set the threadPool.
* @param threadPool the threadPool to set
*/
public void setThreadPool(ThreadPool threadPool)
{
if (getServer()!=null)
getServer().getContainer().update(this,_privateThreadPool,threadPool,"threadpool",true);
_threadPool=_privateThreadPool=threadPool;
} }
@Override @Override
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
super.doStart(); super.doStart();
// TODO: configure threadPool
threadPool = new QueuedThreadPool();
threadPool.start();
selectorManager.start(); if (_threadPool==null)
threadPool.dispatch(new Runnable() _threadPool=getServer().getThreadPool();
if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning())
((LifeCycle)_threadPool).start();
_selectorManager.start();
_threadPool.dispatch(new Runnable()
{ {
public void run() public void run()
{ {
@ -84,11 +121,11 @@ public class ProxyHandler extends AbstractHandler
{ {
try try
{ {
selectorManager.doSelect(0); _selectorManager.doSelect(0);
} }
catch (IOException x) catch (IOException x)
{ {
logger.warn("Unexpected exception", x); _logger.warn("Unexpected exception", x);
} }
} }
} }
@ -98,11 +135,11 @@ public class ProxyHandler extends AbstractHandler
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
selectorManager.stop(); _selectorManager.stop();
QueuedThreadPool threadPool = this.threadPool; ThreadPool threadPool = _threadPool;
if (threadPool != null) if (threadPool != null && threadPool instanceof LifeCycle)
threadPool.stop(); ((LifeCycle)threadPool).stop();
super.doStop(); super.doStop();
} }
@ -111,15 +148,15 @@ public class ProxyHandler extends AbstractHandler
{ {
if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod())) if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
{ {
logger.info("CONNECT request for {}", request.getRequestURI(), null); _logger.info("CONNECT request for {}", request.getRequestURI(), null);
handle(request, response, request.getRequestURI()); handle(request, response, request.getRequestURI());
} }
else else
{ {
logger.info("Plain request for {}", serverAddress, null); _logger.info("Plain request for {}", _serverAddress, null);
if (serverAddress == null) if (_serverAddress == null)
throw new ServletException("Parameter 'serverAddress' cannot be null"); throw new ServletException("Parameter 'serverAddress' cannot be null");
handle(request, response, serverAddress); handle(request, response, _serverAddress);
} }
} }
@ -196,22 +233,32 @@ public class ProxyHandler extends AbstractHandler
// Setup connections, before registering the channel to avoid races // Setup connections, before registering the channel to avoid races
// where the server sends data before the connections are set up // where the server sends data before the connections are set up
ProxyToServerConnection proxyToServer = new ProxyToServerConnection(secure, buffer); ProxyToServerConnection proxyToServer = newProxyToServerConnection(secure, buffer);
ClientToProxyConnection clientToProxy = new ClientToProxyConnection(channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp()); ClientToProxyConnection clientToProxy = newClientToProxyConnection(channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
clientToProxy.setConnection(proxyToServer); clientToProxy.setConnection(proxyToServer);
proxyToServer.setConnection(clientToProxy); proxyToServer.setConnection(clientToProxy);
upgradeConnection(request, response, clientToProxy); upgradeConnection(request, response, clientToProxy);
} }
protected ClientToProxyConnection newClientToProxyConnection(SocketChannel channel, EndPoint endPoint, long timeStamp)
{
return new ClientToProxyConnection(channel, endPoint, timeStamp);
}
protected ProxyToServerConnection newProxyToServerConnection(boolean secure, IndirectNIOBuffer buffer)
{
return new ProxyToServerConnection(secure, buffer);
}
protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
{ {
logger.info("Establishing connection to {}:{}", host, port); _logger.info("Establishing connection to {}:{}", host, port);
// Connect to remote server // Connect to remote server
SocketChannel channel = SocketChannel.open(); SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true); channel.socket().setTcpNoDelay(true);
channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout()); channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
logger.info("Established connection to {}:{}", host, port); _logger.info("Established connection to {}:{}", host, port);
return channel; return channel;
} }
@ -225,13 +272,13 @@ public class ProxyHandler extends AbstractHandler
// so that Jetty understands that it has to upgrade the connection // so that Jetty understands that it has to upgrade the connection
request.setAttribute("org.eclipse.jetty.io.Connection", connection); request.setAttribute("org.eclipse.jetty.io.Connection", connection);
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
logger.info("Upgraded connection to {}", connection, null); _logger.info("Upgraded connection to {}", connection, null);
} }
private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
{ {
selectorManager.register(channel, proxyToServer); _selectorManager.register(channel, proxyToServer);
proxyToServer.waitReady(connectTimeout); proxyToServer.waitReady(_connectTimeout);
} }
/** /**
@ -263,7 +310,7 @@ public class ProxyHandler extends AbstractHandler
buffer.compact(); buffer.compact();
} }
} }
logger.info("Written {}/{} bytes " + endPoint, builder, length); _logger.info("Written {}/{} bytes " + endPoint, builder, length);
} }
private class Manager extends SelectorManager private class Manager extends SelectorManager
@ -279,7 +326,7 @@ public class ProxyHandler extends AbstractHandler
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException
{ {
ProxyToServerConnection proxyToServer = (ProxyToServerConnection)selectionKey.attachment(); ProxyToServerConnection proxyToServer = (ProxyToServerConnection)selectionKey.attachment();
if (proxyToServer.secure) if (proxyToServer._secure)
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
// return new SslSelectChannelEndPoint(???, channel, selectSet, selectionKey, sslContext.createSSLEngine(address.host, address.port)); // return new SslSelectChannelEndPoint(???, channel, selectSet, selectionKey, sslContext.createSSLEngine(address.host, address.port));
@ -309,7 +356,7 @@ public class ProxyHandler extends AbstractHandler
@Override @Override
public boolean dispatch(Runnable task) public boolean dispatch(Runnable task)
{ {
return threadPool.dispatch(task); return _threadPool.dispatch(task);
} }
@Override @Override
@ -323,70 +370,70 @@ public class ProxyHandler extends AbstractHandler
} }
} }
private class ProxyToServerConnection implements Connection public class ProxyToServerConnection implements Connection
{ {
private final CountDownLatch ready = new CountDownLatch(1); private final CountDownLatch _ready = new CountDownLatch(1);
private final Buffer buffer = new IndirectNIOBuffer(1024); private final Buffer _buffer = new IndirectNIOBuffer(1024);
private final boolean secure; private final boolean _secure;
private volatile Buffer data; private volatile Buffer _data;
private volatile ClientToProxyConnection connection; private volatile ClientToProxyConnection _toClient;
private volatile long timestamp; private volatile long _timestamp;
private volatile SelectChannelEndPoint endPoint; private volatile SelectChannelEndPoint _endPoint;
public ProxyToServerConnection(boolean secure, Buffer data) public ProxyToServerConnection(boolean secure, Buffer data)
{ {
this.secure = secure; _secure = secure;
this.data = data; _data = data;
} }
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
logger.info("ProxyToServer: handle entered"); _logger.info("ProxyToServer: handle entered");
if (data != null) if (_data != null)
{ {
write(endPoint, data); write(_endPoint, _data);
data = null; _data = null;
} }
while (true) while (true)
{ {
int read = endPoint.fill(buffer); int read = _endPoint.fill(_buffer);
if (read == -1) if (read == -1)
{ {
logger.info("ProxyToServer: closed connection {}", endPoint, null); _logger.info("ProxyToServer: closed connection {}", _endPoint, null);
connection.close(); _toClient.close();
break; break;
} }
if (read == 0) if (read == 0)
break; break;
logger.info("ProxyToServer: read {} bytes {}", read, endPoint); _logger.info("ProxyToServer: read {} bytes {}", read, _endPoint);
write(connection.endPoint, buffer); write(_toClient._endPoint, _buffer);
} }
logger.info("ProxyToServer: handle exited"); _logger.info("ProxyToServer: handle exited");
return this; return this;
} }
public void setConnection(ClientToProxyConnection connection) public void setConnection(ClientToProxyConnection connection)
{ {
this.connection = connection; _toClient = connection;
} }
public long getTimeStamp() public long getTimeStamp()
{ {
return timestamp; return _timestamp;
} }
public void setTimeStamp(long timestamp) public void setTimeStamp(long timestamp)
{ {
this.timestamp = timestamp; _timestamp = timestamp;
} }
public void setEndPoint(SelectChannelEndPoint endpoint) public void setEndPoint(SelectChannelEndPoint endpoint)
{ {
this.endPoint = endpoint; _endPoint = endpoint;
} }
public boolean isIdle() public boolean isIdle()
@ -401,14 +448,14 @@ public class ProxyHandler extends AbstractHandler
public void ready() public void ready()
{ {
ready.countDown(); _ready.countDown();
} }
public void waitReady(long timeout) throws IOException public void waitReady(long timeout) throws IOException
{ {
try try
{ {
ready.await(timeout, TimeUnit.MILLISECONDS); _ready.await(timeout, TimeUnit.MILLISECONDS);
} }
catch (final InterruptedException x) catch (final InterruptedException x)
{ {
@ -418,60 +465,60 @@ public class ProxyHandler extends AbstractHandler
public void close() throws IOException public void close() throws IOException
{ {
endPoint.close(); _endPoint.close();
} }
} }
private class ClientToProxyConnection implements Connection public class ClientToProxyConnection implements Connection
{ {
private final Buffer buffer = new IndirectNIOBuffer(1024); private final Buffer _buffer = new IndirectNIOBuffer(1024);
private final SocketChannel channel; private final SocketChannel _channel;
private final EndPoint endPoint; private final EndPoint _endPoint;
private final long timestamp; private final long _timestamp;
private volatile ProxyToServerConnection connection; private volatile ProxyToServerConnection _toServer;
private boolean firstTime = true; private boolean _firstTime = true;
public ClientToProxyConnection(SocketChannel channel, EndPoint endPoint, long timestamp) public ClientToProxyConnection(SocketChannel channel, EndPoint endPoint, long timestamp)
{ {
this.channel = channel; _channel = channel;
this.endPoint = endPoint; _endPoint = endPoint;
this.timestamp = timestamp; _timestamp = timestamp;
} }
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
logger.info("ClientToProxy: handle entered"); _logger.info("ClientToProxy: handle entered");
if (firstTime) if (_firstTime)
{ {
firstTime = false; _firstTime = false;
register(channel, connection); register(_channel, _toServer);
} }
while (true) while (true)
{ {
int read = endPoint.fill(buffer); int read = _endPoint.fill(_buffer);
if (read == -1) if (read == -1)
{ {
logger.info("ClientToProxy: closed connection {}", endPoint, null); _logger.info("ClientToProxy: closed connection {}", _endPoint, null);
connection.close(); _toServer.close();
break; break;
} }
if (read == 0) if (read == 0)
break; break;
logger.info("ClientToProxy: read {} bytes {}", read, endPoint); _logger.info("ClientToProxy: read {} bytes {}", read, _endPoint);
write(connection.endPoint, buffer); write(_toServer._endPoint, _buffer);
} }
logger.info("ClientToProxy: handle exited"); _logger.info("ClientToProxy: handle exited");
return this; return this;
} }
public long getTimeStamp() public long getTimeStamp()
{ {
return timestamp; return _timestamp;
} }
public boolean isIdle() public boolean isIdle()
@ -486,12 +533,12 @@ public class ProxyHandler extends AbstractHandler
public void setConnection(ProxyToServerConnection connection) public void setConnection(ProxyToServerConnection connection)
{ {
this.connection = connection; _toServer = connection;
} }
public void close() throws IOException public void close() throws IOException
{ {
endPoint.close(); _endPoint.close();
} }
} }
} }

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.log.Log;
/** /**
* @version $Revision$ $Date$ * @version $Revision$ $Date$