From 74272663e6c2af40b81789a9401ed41a53a62596 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Mon, 4 Nov 2013 16:16:54 +1100 Subject: [PATCH] 420359 - Support 0 acceptors for ServerConnector --- .../eclipse/jetty/embedded/ExampleServer.java | 2 +- .../org/eclipse/jetty/io/SelectorManager.java | 62 +++++++++++++++++++ .../jetty/server/AbstractConnector.java | 2 +- .../eclipse/jetty/server/LocalConnector.java | 8 +-- .../java/org/eclipse/jetty/server/Server.java | 39 ++++++++---- .../eclipse/jetty/server/ServerConnector.java | 56 ++++++++++++----- .../jetty/server/SelectChannelServerTest.java | 3 +- 7 files changed, 139 insertions(+), 33 deletions(-) diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ExampleServer.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ExampleServer.java index 5142049cba1..c96d5b30934 100644 --- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ExampleServer.java +++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ExampleServer.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.thread.QueuedThreadPool; public class ExampleServer { @@ -44,7 +45,6 @@ public class ExampleServer handlers.setHandlers(new Handler[]{context,new DefaultHandler()}); server.setHandler(handlers); - server.start(); server.join(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index b2cc0673f90..3d37a660ef0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -163,6 +163,22 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa final ManagedSelector selector = chooseSelector(); selector.submit(selector.new Accept(channel)); } + + /** + *

Registers a channel to select accept operations

+ * + * @param channel the channel to register + */ + public void acceptor(final ServerSocketChannel server) + { + final ManagedSelector selector = chooseSelector(); + selector.submit(selector.new Acceptor(server)); + } + + protected void accepted(SocketChannel channel) throws IOException + { + throw new UnsupportedOperationException(); + } @Override protected void doStart() throws Exception @@ -544,6 +560,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { processConnect(key, (Connect)attachment); } + else if (key.isAcceptable()) + { + processAccept(key); + } else { throw new IllegalStateException(); @@ -587,6 +607,23 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa connect.failed(x); } } + + private void processAccept(SelectionKey key) + { + ServerSocketChannel server = (ServerSocketChannel)key.channel(); + try + { + SocketChannel channel; + while ((channel=server.accept())!=null) + { + accepted(channel); + } + } + catch (Throwable x) + { + LOG.warn("Accept failed",x); + } + } private void closeNoExceptions(Closeable closeable) { @@ -723,6 +760,31 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } } + private class Acceptor implements Runnable + { + private final ServerSocketChannel _channel; + + public Acceptor(ServerSocketChannel channel) + { + this._channel = channel; + } + + @Override + public void run() + { + try + { + SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null); + LOG.debug("{} acceptor={}",this,key); + } + catch (Throwable x) + { + closeNoExceptions(_channel); + LOG.warn(x); + } + } + } + private class Accept implements Runnable { private final SocketChannel _channel; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 0747b01c52b..aeb54b472f5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -189,7 +189,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co for (ConnectionFactory factory:factories) addConnectionFactory(factory); - if (acceptors<=0) + if (acceptors<0) acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 2); if (acceptors > 2 * Runtime.getRuntime().availableProcessors()) LOG.warn("Acceptors should be <= 2*availableProcessors: " + this); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index e336cb34ac8..3dc634b1cdf 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -47,22 +47,22 @@ public class LocalConnector extends AbstractConnector public LocalConnector(Server server) { - this(server, null, null, null, 0, new HttpConnectionFactory()); + this(server, null, null, null, -1, new HttpConnectionFactory()); } public LocalConnector(Server server, SslContextFactory sslContextFactory) { - this(server, null, null, null, 0,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory())); + this(server, null, null, null, -1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory())); } public LocalConnector(Server server, ConnectionFactory connectionFactory) { - this(server, null, null, null, 0, connectionFactory); + this(server, null, null, null, -1, connectionFactory); } public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory) { - this(server, null, null, null, 0, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory)); + this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory)); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index 40293b210b2..45563e61f98 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -61,6 +61,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ShutdownThread; import org.eclipse.jetty.util.thread.ThreadPool; +import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; /* ------------------------------------------------------------ */ /** Jetty HTTP Servlet Server. @@ -300,6 +301,24 @@ public class Server extends HandlerWrapper implements Attributes HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION); MultiException mex=new MultiException(); + // check size of thread pool + SizedThreadPool pool = getBean(SizedThreadPool.class); + int max=pool==null?-1:pool.getMaxThreads(); + int needed=1; + if (mex.size()==0) + { + for (Connector connector : _connectors) + { + if (connector instanceof AbstractConnector) + needed+=((AbstractConnector)connector).getAcceptors(); + if (connector instanceof ServerConnector) + needed+=((ServerConnector)connector).getSelectorManager().getSelectorCount(); + } + } + + if (max>0 && needed>=max) + throw new IllegalStateException("Insufficient max threads in ThreadPool: max="+max+" < needed="+needed); + try { super.doStart(); @@ -309,21 +328,19 @@ public class Server extends HandlerWrapper implements Attributes mex.add(e); } - if (mex.size()==0) + // start connectors last + for (Connector connector : _connectors) { - for (Connector _connector : _connectors) + try + { + connector.start(); + } + catch(Throwable e) { - try - { - _connector.start(); - } - catch (Throwable e) - { - mex.add(e); - } + mex.add(e); } } - + if (isDumpAfterStart()) dumpStdErr(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 7fedc13f425..c6d3977a96c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -96,7 +96,7 @@ public class ServerConnector extends AbstractNetworkConnector public ServerConnector( @Name("server") Server server) { - this(server,null,null,null,0,0,new HttpConnectionFactory()); + this(server,null,null,null,-1,-1,new HttpConnectionFactory()); } /* ------------------------------------------------------------ */ @@ -104,9 +104,10 @@ public class ServerConnector extends AbstractNetworkConnector *

Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.

* @param server The {@link Server} this connector will accept connection for. * @param acceptors - * the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections. + * the number of acceptor threads to use, or -1 for a default value. Acceptors accept new TCP/IP connections. If 0, then + * the selector threads are used to accept connections. * @param selectors - * the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress. + * the number of selector threads, or -1 for a default value. Selectors notice and schedule established connection that can make IO progress. */ public ServerConnector( @Name("server") Server server, @@ -126,7 +127,7 @@ public class ServerConnector extends AbstractNetworkConnector @Name("server") Server server, @Name("factories") ConnectionFactory... factories) { - this(server,null,null,null,0,0,factories); + this(server,null,null,null,-1,-1,factories); } /* ------------------------------------------------------------ */ @@ -140,7 +141,7 @@ public class ServerConnector extends AbstractNetworkConnector @Name("server") Server server, @Name("sslContextFactory") SslContextFactory sslContextFactory) { - this(server,null,null,null,0,0,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory())); + this(server,null,null,null,-1,-1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory())); } /* ------------------------------------------------------------ */ @@ -150,9 +151,10 @@ public class ServerConnector extends AbstractNetworkConnector * @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the * list of HTTP Connection Factory. * @param acceptors - * the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections. + * the number of acceptor threads to use, or -1 for a default value. Acceptors accept new TCP/IP connections. If 0, then + * the selector threads are used to accept connections. * @param selectors - * the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress. + * the number of selector threads, or -1 for a default value. Selectors notice and schedule established connection that can make IO progress. */ public ServerConnector( @Name("server") Server server, @@ -175,7 +177,7 @@ public class ServerConnector extends AbstractNetworkConnector @Name("sslContextFactory") SslContextFactory sslContextFactory, @Name("factories") ConnectionFactory... factories) { - this(server,null,null,null,0,0,AbstractConnectionFactory.getFactories(sslContextFactory,factories)); + this(server,null,null,null,-1,-1,AbstractConnectionFactory.getFactories(sslContextFactory,factories)); } /** Generic Server Connection. @@ -189,9 +191,10 @@ public class ServerConnector extends AbstractNetworkConnector * @param bufferPool * A ByteBuffer pool used to allocate buffers. If null then create a private pool with default configuration. * @param acceptors - * the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections. + * the number of acceptor threads to use, or -1 for a default value. Acceptors accept new TCP/IP connections. If 0, then + * the selector threads are used to accept connections. * @param selectors - * the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress. + * the number of selector threads, or -1 for a default value. Selectors notice and schedule established connection that can make IO progress. * @param factories * Zero or more {@link ConnectionFactory} instances used to create and configure connections. */ @@ -205,10 +208,22 @@ public class ServerConnector extends AbstractNetworkConnector @Name("factories") ConnectionFactory... factories) { super(server,executor,scheduler,bufferPool,acceptors,factories); - _manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors > 0 ? selectors : Runtime.getRuntime().availableProcessors()); + _manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors >= 0 ? selectors : Runtime.getRuntime().availableProcessors()); addBean(_manager, true); } + @Override + protected void doStart() throws Exception + { + super.doStart(); + + if (getAcceptors()==0) + { + _acceptChannel.configureBlocking(false); + _manager.acceptor(_acceptChannel); + } + } + @Override public boolean isOpen() { @@ -319,12 +334,17 @@ public class ServerConnector extends AbstractNetworkConnector if (serverChannel != null && serverChannel.isOpen()) { SocketChannel channel = serverChannel.accept(); - channel.configureBlocking(false); - Socket socket = channel.socket(); - configure(socket); - _manager.accept(channel); + accepted(channel); } } + + private void accepted(SocketChannel channel) throws IOException + { + channel.configureBlocking(false); + Socket socket = channel.socket(); + configure(socket); + _manager.accept(channel); + } protected void configure(Socket socket) { @@ -426,6 +446,12 @@ public class ServerConnector extends AbstractNetworkConnector super(executor, scheduler, selectors); } + @Override + protected void accepted(SocketChannel channel) throws IOException + { + ServerConnector.this.accepted(channel); + } + @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException { diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelServerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelServerTest.java index 396a60d907a..b23c355d399 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelServerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelServerTest.java @@ -28,6 +28,7 @@ public class SelectChannelServerTest extends HttpServerTestBase @Before public void init() throws Exception { - startServer(new ServerConnector(_server,1,1)); + // Run this test with 0 acceptors. Other tests already check the acceptors >0 + startServer(new ServerConnector(_server,0,1)); } }