420359 - Support 0 acceptors for ServerConnector

This commit is contained in:
Greg Wilkins 2013-11-04 16:16:54 +11:00
parent f567bddad9
commit 74272663e6
7 changed files with 139 additions and 33 deletions

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
public class ExampleServer
{
@ -44,7 +45,6 @@ public class ExampleServer
handlers.setHandlers(new Handler[]{context,new DefaultHandler()});
server.setHandler(handlers);
server.start();
server.join();
}

View File

@ -164,6 +164,22 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
selector.submit(selector.new Accept(channel));
}
/**
* <p>Registers a channel to select accept operations</p>
*
* @param channel the channel to register
*/
public void acceptor(final ServerSocketChannel server)
{
final ManagedSelector selector = chooseSelector();
selector.submit(selector.new Acceptor(server));
}
protected void accepted(SocketChannel channel) throws IOException
{
throw new UnsupportedOperationException();
}
@Override
protected void doStart() throws Exception
{
@ -544,6 +560,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
processConnect(key, (Connect)attachment);
}
else if (key.isAcceptable())
{
processAccept(key);
}
else
{
throw new IllegalStateException();
@ -588,6 +608,23 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
private void processAccept(SelectionKey key)
{
ServerSocketChannel server = (ServerSocketChannel)key.channel();
try
{
SocketChannel channel;
while ((channel=server.accept())!=null)
{
accepted(channel);
}
}
catch (Throwable x)
{
LOG.warn("Accept failed",x);
}
}
private void closeNoExceptions(Closeable closeable)
{
try
@ -723,6 +760,31 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
private class Acceptor implements Runnable
{
private final ServerSocketChannel _channel;
public Acceptor(ServerSocketChannel channel)
{
this._channel = channel;
}
@Override
public void run()
{
try
{
SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
LOG.debug("{} acceptor={}",this,key);
}
catch (Throwable x)
{
closeNoExceptions(_channel);
LOG.warn(x);
}
}
}
private class Accept implements Runnable
{
private final SocketChannel _channel;

View File

@ -189,7 +189,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
for (ConnectionFactory factory:factories)
addConnectionFactory(factory);
if (acceptors<=0)
if (acceptors<0)
acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 2);
if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);

View File

@ -47,22 +47,22 @@ public class LocalConnector extends AbstractConnector
public LocalConnector(Server server)
{
this(server, null, null, null, 0, new HttpConnectionFactory());
this(server, null, null, null, -1, new HttpConnectionFactory());
}
public LocalConnector(Server server, SslContextFactory sslContextFactory)
{
this(server, null, null, null, 0,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
this(server, null, null, null, -1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
}
public LocalConnector(Server server, ConnectionFactory connectionFactory)
{
this(server, null, null, null, 0, connectionFactory);
this(server, null, null, null, -1, connectionFactory);
}
public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
{
this(server, null, null, null, 0, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
}
@Override

View File

@ -61,6 +61,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ShutdownThread;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
/* ------------------------------------------------------------ */
/** Jetty HTTP Servlet Server.
@ -300,6 +301,24 @@ public class Server extends HandlerWrapper implements Attributes
HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION);
MultiException mex=new MultiException();
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class);
int max=pool==null?-1:pool.getMaxThreads();
int needed=1;
if (mex.size()==0)
{
for (Connector connector : _connectors)
{
if (connector instanceof AbstractConnector)
needed+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector)
needed+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}
if (max>0 && needed>=max)
throw new IllegalStateException("Insufficient max threads in ThreadPool: max="+max+" < needed="+needed);
try
{
super.doStart();
@ -309,20 +328,18 @@ public class Server extends HandlerWrapper implements Attributes
mex.add(e);
}
if (mex.size()==0)
{
for (Connector _connector : _connectors)
// start connectors last
for (Connector connector : _connectors)
{
try
{
_connector.start();
connector.start();
}
catch(Throwable e)
{
mex.add(e);
}
}
}
if (isDumpAfterStart())
dumpStdErr();

View File

@ -96,7 +96,7 @@ public class ServerConnector extends AbstractNetworkConnector
public ServerConnector(
@Name("server") Server server)
{
this(server,null,null,null,0,0,new HttpConnectionFactory());
this(server,null,null,null,-1,-1,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
@ -104,9 +104,10 @@ public class ServerConnector extends AbstractNetworkConnector
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param acceptors
* the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections.
* the number of acceptor threads to use, or -1 for a default value. Acceptors accept new TCP/IP connections. If 0, then
* the selector threads are used to accept connections.
* @param selectors
* the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress.
* the number of selector threads, or -1 for a default value. Selectors notice and schedule established connection that can make IO progress.
*/
public ServerConnector(
@Name("server") Server server,
@ -126,7 +127,7 @@ public class ServerConnector extends AbstractNetworkConnector
@Name("server") Server server,
@Name("factories") ConnectionFactory... factories)
{
this(server,null,null,null,0,0,factories);
this(server,null,null,null,-1,-1,factories);
}
/* ------------------------------------------------------------ */
@ -140,7 +141,7 @@ public class ServerConnector extends AbstractNetworkConnector
@Name("server") Server server,
@Name("sslContextFactory") SslContextFactory sslContextFactory)
{
this(server,null,null,null,0,0,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
this(server,null,null,null,-1,-1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
}
/* ------------------------------------------------------------ */
@ -150,9 +151,10 @@ public class ServerConnector extends AbstractNetworkConnector
* @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the
* list of HTTP Connection Factory.
* @param acceptors
* the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections.
* the number of acceptor threads to use, or -1 for a default value. Acceptors accept new TCP/IP connections. If 0, then
* the selector threads are used to accept connections.
* @param selectors
* the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress.
* the number of selector threads, or -1 for a default value. Selectors notice and schedule established connection that can make IO progress.
*/
public ServerConnector(
@Name("server") Server server,
@ -175,7 +177,7 @@ public class ServerConnector extends AbstractNetworkConnector
@Name("sslContextFactory") SslContextFactory sslContextFactory,
@Name("factories") ConnectionFactory... factories)
{
this(server,null,null,null,0,0,AbstractConnectionFactory.getFactories(sslContextFactory,factories));
this(server,null,null,null,-1,-1,AbstractConnectionFactory.getFactories(sslContextFactory,factories));
}
/** Generic Server Connection.
@ -189,9 +191,10 @@ public class ServerConnector extends AbstractNetworkConnector
* @param bufferPool
* A ByteBuffer pool used to allocate buffers. If null then create a private pool with default configuration.
* @param acceptors
* the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections.
* the number of acceptor threads to use, or -1 for a default value. Acceptors accept new TCP/IP connections. If 0, then
* the selector threads are used to accept connections.
* @param selectors
* the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress.
* the number of selector threads, or -1 for a default value. Selectors notice and schedule established connection that can make IO progress.
* @param factories
* Zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
@ -205,10 +208,22 @@ public class ServerConnector extends AbstractNetworkConnector
@Name("factories") ConnectionFactory... factories)
{
super(server,executor,scheduler,bufferPool,acceptors,factories);
_manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors > 0 ? selectors : Runtime.getRuntime().availableProcessors());
_manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors >= 0 ? selectors : Runtime.getRuntime().availableProcessors());
addBean(_manager, true);
}
@Override
protected void doStart() throws Exception
{
super.doStart();
if (getAcceptors()==0)
{
_acceptChannel.configureBlocking(false);
_manager.acceptor(_acceptChannel);
}
}
@Override
public boolean isOpen()
{
@ -319,12 +334,17 @@ public class ServerConnector extends AbstractNetworkConnector
if (serverChannel != null && serverChannel.isOpen())
{
SocketChannel channel = serverChannel.accept();
accepted(channel);
}
}
private void accepted(SocketChannel channel) throws IOException
{
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
_manager.accept(channel);
}
}
protected void configure(Socket socket)
{
@ -426,6 +446,12 @@ public class ServerConnector extends AbstractNetworkConnector
super(executor, scheduler, selectors);
}
@Override
protected void accepted(SocketChannel channel) throws IOException
{
ServerConnector.this.accepted(channel);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{

View File

@ -28,6 +28,7 @@ public class SelectChannelServerTest extends HttpServerTestBase
@Before
public void init() throws Exception
{
startServer(new ServerConnector(_server,1,1));
// Run this test with 0 acceptors. Other tests already check the acceptors >0
startServer(new ServerConnector(_server,0,1));
}
}