diff --git a/jetty-http-spi/pom.xml b/jetty-http-spi/pom.xml index 42439750c09..263c91fcc12 100644 --- a/jetty-http-spi/pom.xml +++ b/jetty-http-spi/pom.xml @@ -2,7 +2,7 @@ org.eclipse.jetty jetty-project - 9.0.0-SNAPSHOT + 9.0.5-SNAPSHOT 4.0.0 jetty-http-spi diff --git a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/DelegatingThreadPool.java b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/DelegatingThreadPool.java new file mode 100644 index 00000000000..245ca1db341 --- /dev/null +++ b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/DelegatingThreadPool.java @@ -0,0 +1,142 @@ +package org.eclipse.jetty.http.spi; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ThreadPool; + +public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool +{ + private static final Logger LOG = Log.getLogger(DelegatingThreadPool.class); + + private Executor _executor; // memory barrier provided by start/stop semantics + + public DelegatingThreadPool(Executor executor) + { + _executor=executor; + addBean(_executor); + } + + /* ------------------------------------------------------------ */ + public Executor getExecutor() + { + return _executor; + } + + /* ------------------------------------------------------------ */ + public void setExecutor(Executor executor) + { + if (isRunning()) + throw new IllegalStateException(getState()); + updateBean(_executor,executor); + _executor=executor; + } + + /* ------------------------------------------------------------ */ + @Override + public void execute(Runnable job) + { + _executor.execute(job); + } + + + /* ------------------------------------------------------------ */ + @Override + public boolean dispatch(Runnable job) + { + final Executor executor=_executor; + if (executor instanceof ThreadPool) + return ((ThreadPool)executor).dispatch(job); + + try + { + _executor.execute(job); + return true; + } + catch(RejectedExecutionException e) + { + LOG.warn(e); + return false; + } + } + + /* ------------------------------------------------------------ */ + @Override + public int getIdleThreads() + { + final Executor executor=_executor; + if (executor instanceof ThreadPool) + return ((ThreadPool)executor).getIdleThreads(); + + if (executor instanceof ThreadPoolExecutor) + { + final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor; + return tpe.getPoolSize() - tpe.getActiveCount(); + } + return -1; + } + + /* ------------------------------------------------------------ */ + @Override + public int getThreads() + { + final Executor executor=_executor; + if (executor instanceof ThreadPool) + return ((ThreadPool)executor).getThreads(); + + if (executor instanceof ThreadPoolExecutor) + { + final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor; + return tpe.getPoolSize(); + } + return -1; + } + + /* ------------------------------------------------------------ */ + @Override + public boolean isLowOnThreads() + { + final Executor executor=_executor; + if (executor instanceof ThreadPool) + return ((ThreadPool)executor).isLowOnThreads(); + + if (executor instanceof ThreadPoolExecutor) + { + final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor; + // getActiveCount() locks the thread pool, so execute it last + return tpe.getPoolSize() == tpe.getMaximumPoolSize() && + tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount(); + } + return false; + } + + /* ------------------------------------------------------------ */ + @Override + public void join() throws InterruptedException + { + final Executor executor=_executor; + if (executor instanceof ThreadPool) + ((ThreadPool)executor).join(); + else if (executor instanceof ExecutorService) + ((ExecutorService)executor).awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + else + throw new IllegalStateException(); + } + + /* ------------------------------------------------------------ */ + @Override + protected void doStop() throws Exception + { + super.doStop(); + if (!(_executor instanceof LifeCycle) && (_executor instanceof ExecutorService)) + ((ExecutorService)_executor).shutdownNow(); + } + +} diff --git a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpServer.java b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpServer.java index f5af9720d5e..22e6b3d576c 100644 --- a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpServer.java +++ b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpServer.java @@ -20,18 +20,26 @@ package org.eclipse.jetty.http.spi; import com.sun.net.httpserver.HttpContext; import com.sun.net.httpserver.HttpHandler; + import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.ContextHandlerCollection; -import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; + + + +import org.eclipse.jetty.util.thread.ThreadPool; + import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; @@ -68,10 +76,10 @@ public class JettyHttpServer extends com.sun.net.httpserver.HttpServer public void bind(InetSocketAddress addr, int backlog) throws IOException { // check if there is already a connector listening - Connector[] connectors = _server.getConnectors(); + Collection connectors = _server.getBeans(NetworkConnector.class); if (connectors != null) { - for (Connector connector : connectors) + for (NetworkConnector connector : connectors) { if (connector.getPort() == addr.getPort()) { if (LOG.isDebugEnabled()) LOG.debug("server already bound to port " + addr.getPort() + ", no need to rebind"); @@ -86,8 +94,7 @@ public class JettyHttpServer extends com.sun.net.httpserver.HttpServer this._addr = addr; if (LOG.isDebugEnabled()) LOG.debug("binding server to port " + addr.getPort()); - SelectChannelConnector connector = new SelectChannelConnector(); - connector.setAcceptors(1); + ServerConnector connector = new ServerConnector(_server); connector.setPort(addr.getPort()); connector.setHost(addr.getHostName()); _server.addConnector(connector); @@ -121,19 +128,20 @@ public class JettyHttpServer extends com.sun.net.httpserver.HttpServer { if (executor == null) throw new IllegalArgumentException("missing required 'executor' argument"); - - if (!(executor instanceof ThreadPoolExecutor)) - throw new IllegalArgumentException("only java.util.concurrent.ThreadPoolExecutor instances are allowed, got: " + executor.getClass().getName()); - - if (LOG.isDebugEnabled()) LOG.debug("using ThreadPoolExecutor for server thread pool"); - this._executor = (ThreadPoolExecutor) executor; - _server.setThreadPool(new ThreadPoolExecutorAdapter(_executor)); + ThreadPool threadPool = _server.getThreadPool(); + if (threadPool instanceof DelegatingThreadPool) + ((DelegatingThreadPool)_server.getThreadPool()).setExecutor(executor); + else + throw new UnsupportedOperationException("!DelegatingThreadPool"); } @Override public Executor getExecutor() { - return _executor; + ThreadPool threadPool = _server.getThreadPool(); + if (threadPool instanceof DelegatingThreadPool) + return ((DelegatingThreadPool)_server.getThreadPool()).getExecutor(); + return threadPool; } @Override diff --git a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpServerProvider.java b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpServerProvider.java index b96d9a2b543..410ef2a992b 100644 --- a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpServerProvider.java +++ b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpServerProvider.java @@ -26,6 +26,9 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ThreadPool; import com.sun.net.httpserver.HttpServer; import com.sun.net.httpserver.HttpsServer; @@ -53,11 +56,12 @@ public class JettyHttpServerProvider extends HttpServerProvider if (server == null) { - server = new Server(); - - HandlerCollection handlerCollection = new HandlerCollection(); - handlerCollection.setHandlers(new Handler[] {new ContextHandlerCollection(), new DefaultHandler()}); - server.setHandler(handlerCollection); + ThreadPool threadPool = new DelegatingThreadPool(new QueuedThreadPool()); + server = new Server(threadPool); + + HandlerCollection handlerCollection = new HandlerCollection(); + handlerCollection.setHandlers(new Handler[] {new ContextHandlerCollection(), new DefaultHandler()}); + server.setHandler(handlerCollection); shared = false; } diff --git a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/ThreadPoolExecutorAdapter.java b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/ThreadPoolExecutorAdapter.java deleted file mode 100644 index aaffe8e43e8..00000000000 --- a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/ThreadPoolExecutorAdapter.java +++ /dev/null @@ -1,124 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.http.spi; - -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.util.component.AbstractLifeCycle; -import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; - -import org.eclipse.jetty.util.thread.ThreadPool; - -/** - * Jetty {@link ThreadPool} that bridges requests to a {@link ThreadPoolExecutor}. - */ -public class ThreadPoolExecutorAdapter extends AbstractLifeCycle implements ThreadPool -{ - private static final Logger LOG = Log.getLogger(ThreadPoolExecutorAdapter.class); - - - private ThreadPoolExecutor executor; - - public ThreadPoolExecutorAdapter(ThreadPoolExecutor executor) - { - this.executor = executor; - } - - public boolean dispatch(Runnable job) - { - try - { - executor.execute(job); - return true; - } - catch(RejectedExecutionException e) - { - LOG.warn(e); - return false; - } - } - - public int getIdleThreads() - { - return executor.getPoolSize()-executor.getActiveCount(); - } - - public int getThreads() - { - return executor.getPoolSize(); - } - - public boolean isLowOnThreads() - { - return executor.getActiveCount()>=executor.getMaximumPoolSize(); - } - - public void join() throws InterruptedException - { - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - } - - - - public boolean isFailed() - { - return false; - } - - public boolean isRunning() - { - return !executor.isTerminated() && !executor.isTerminating(); - } - - public boolean isStarted() - { - return !executor.isTerminated() && !executor.isTerminating(); - } - - public boolean isStarting() - { - return false; - } - - public boolean isStopped() - { - return executor.isTerminated(); - } - - public boolean isStopping() - { - return executor.isTerminating(); - } - - protected void doStart() throws Exception - { - if (executor.isTerminated() || executor.isTerminating() || executor.isShutdown()) - throw new IllegalStateException("Cannot restart"); - } - - protected void doStop() throws Exception - { - executor.shutdown(); - if (!executor.awaitTermination(60, TimeUnit.SECONDS)) - executor.shutdownNow(); - } - -} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index eadb2f413c9..8e815d9acdd 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -98,6 +98,23 @@ public class ServerConnector extends AbstractNetworkConnector { this(server,null,null,null,0,0,new HttpConnectionFactory()); } + + /* ------------------------------------------------------------ */ + /** HTTP Server Connection. + *

Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.

+ * @param server The {@link Server} this connector will accept connection for. + * @param acceptors + * the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections. + * @param selectors + * the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress. + */ + public ServerConnector( + @Name("server") Server server, + @Name("acceptors") int acceptors, + @Name("selectors") int selectors) + { + this(server,null,null,null,acceptors,selectors,new HttpConnectionFactory()); + } /* ------------------------------------------------------------ */ /** Generic Server Connection with default configuration. diff --git a/pom.xml b/pom.xml index aebaa96269e..2869f6c0372 100644 --- a/pom.xml +++ b/pom.xml @@ -451,10 +451,10 @@ jetty-distribution jetty-runner jetty-monitor + jetty-http-spi jetty-overlay-deployer