404468 Ported jetty-http-spi to Jetty-9

This commit is contained in:
Greg Wilkins 2013-07-26 11:40:12 +10:00
parent 50a67eac6a
commit c76de0b584
7 changed files with 191 additions and 144 deletions

View File

@ -2,7 +2,7 @@
<parent>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-project</artifactId>
<version>9.0.0-SNAPSHOT</version>
<version>9.0.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jetty-http-spi</artifactId>

View File

@ -0,0 +1,142 @@
package org.eclipse.jetty.http.spi;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool;
public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool
{
private static final Logger LOG = Log.getLogger(DelegatingThreadPool.class);
private Executor _executor; // memory barrier provided by start/stop semantics
public DelegatingThreadPool(Executor executor)
{
_executor=executor;
addBean(_executor);
}
/* ------------------------------------------------------------ */
public Executor getExecutor()
{
return _executor;
}
/* ------------------------------------------------------------ */
public void setExecutor(Executor executor)
{
if (isRunning())
throw new IllegalStateException(getState());
updateBean(_executor,executor);
_executor=executor;
}
/* ------------------------------------------------------------ */
@Override
public void execute(Runnable job)
{
_executor.execute(job);
}
/* ------------------------------------------------------------ */
@Override
public boolean dispatch(Runnable job)
{
final Executor executor=_executor;
if (executor instanceof ThreadPool)
return ((ThreadPool)executor).dispatch(job);
try
{
_executor.execute(job);
return true;
}
catch(RejectedExecutionException e)
{
LOG.warn(e);
return false;
}
}
/* ------------------------------------------------------------ */
@Override
public int getIdleThreads()
{
final Executor executor=_executor;
if (executor instanceof ThreadPool)
return ((ThreadPool)executor).getIdleThreads();
if (executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
return tpe.getPoolSize() - tpe.getActiveCount();
}
return -1;
}
/* ------------------------------------------------------------ */
@Override
public int getThreads()
{
final Executor executor=_executor;
if (executor instanceof ThreadPool)
return ((ThreadPool)executor).getThreads();
if (executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
return tpe.getPoolSize();
}
return -1;
}
/* ------------------------------------------------------------ */
@Override
public boolean isLowOnThreads()
{
final Executor executor=_executor;
if (executor instanceof ThreadPool)
return ((ThreadPool)executor).isLowOnThreads();
if (executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
// getActiveCount() locks the thread pool, so execute it last
return tpe.getPoolSize() == tpe.getMaximumPoolSize() &&
tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount();
}
return false;
}
/* ------------------------------------------------------------ */
@Override
public void join() throws InterruptedException
{
final Executor executor=_executor;
if (executor instanceof ThreadPool)
((ThreadPool)executor).join();
else if (executor instanceof ExecutorService)
((ExecutorService)executor).awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
else
throw new IllegalStateException();
}
/* ------------------------------------------------------------ */
@Override
protected void doStop() throws Exception
{
super.doStop();
if (!(_executor instanceof LifeCycle) && (_executor instanceof ExecutorService))
((ExecutorService)_executor).shutdownNow();
}
}

View File

@ -20,18 +20,26 @@ package org.eclipse.jetty.http.spi;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpHandler;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
@ -68,10 +76,10 @@ public class JettyHttpServer extends com.sun.net.httpserver.HttpServer
public void bind(InetSocketAddress addr, int backlog) throws IOException
{
// check if there is already a connector listening
Connector[] connectors = _server.getConnectors();
Collection<NetworkConnector> connectors = _server.getBeans(NetworkConnector.class);
if (connectors != null)
{
for (Connector connector : connectors)
for (NetworkConnector connector : connectors)
{
if (connector.getPort() == addr.getPort()) {
if (LOG.isDebugEnabled()) LOG.debug("server already bound to port " + addr.getPort() + ", no need to rebind");
@ -86,8 +94,7 @@ public class JettyHttpServer extends com.sun.net.httpserver.HttpServer
this._addr = addr;
if (LOG.isDebugEnabled()) LOG.debug("binding server to port " + addr.getPort());
SelectChannelConnector connector = new SelectChannelConnector();
connector.setAcceptors(1);
ServerConnector connector = new ServerConnector(_server);
connector.setPort(addr.getPort());
connector.setHost(addr.getHostName());
_server.addConnector(connector);
@ -121,19 +128,20 @@ public class JettyHttpServer extends com.sun.net.httpserver.HttpServer
{
if (executor == null)
throw new IllegalArgumentException("missing required 'executor' argument");
if (!(executor instanceof ThreadPoolExecutor))
throw new IllegalArgumentException("only java.util.concurrent.ThreadPoolExecutor instances are allowed, got: " + executor.getClass().getName());
if (LOG.isDebugEnabled()) LOG.debug("using ThreadPoolExecutor for server thread pool");
this._executor = (ThreadPoolExecutor) executor;
_server.setThreadPool(new ThreadPoolExecutorAdapter(_executor));
ThreadPool threadPool = _server.getThreadPool();
if (threadPool instanceof DelegatingThreadPool)
((DelegatingThreadPool)_server.getThreadPool()).setExecutor(executor);
else
throw new UnsupportedOperationException("!DelegatingThreadPool");
}
@Override
public Executor getExecutor()
{
return _executor;
ThreadPool threadPool = _server.getThreadPool();
if (threadPool instanceof DelegatingThreadPool)
return ((DelegatingThreadPool)_server.getThreadPool()).getExecutor();
return threadPool;
}
@Override

View File

@ -26,6 +26,9 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsServer;
@ -53,11 +56,12 @@ public class JettyHttpServerProvider extends HttpServerProvider
if (server == null)
{
server = new Server();
ThreadPool threadPool = new DelegatingThreadPool(new QueuedThreadPool());
server = new Server(threadPool);
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] {new ContextHandlerCollection(), new DefaultHandler()});
server.setHandler(handlerCollection);
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] {new ContextHandlerCollection(), new DefaultHandler()});
server.setHandler(handlerCollection);
shared = false;
}

View File

@ -1,124 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.spi;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool;
/**
* Jetty {@link ThreadPool} that bridges requests to a {@link ThreadPoolExecutor}.
*/
public class ThreadPoolExecutorAdapter extends AbstractLifeCycle implements ThreadPool
{
private static final Logger LOG = Log.getLogger(ThreadPoolExecutorAdapter.class);
private ThreadPoolExecutor executor;
public ThreadPoolExecutorAdapter(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean dispatch(Runnable job)
{
try
{
executor.execute(job);
return true;
}
catch(RejectedExecutionException e)
{
LOG.warn(e);
return false;
}
}
public int getIdleThreads()
{
return executor.getPoolSize()-executor.getActiveCount();
}
public int getThreads()
{
return executor.getPoolSize();
}
public boolean isLowOnThreads()
{
return executor.getActiveCount()>=executor.getMaximumPoolSize();
}
public void join() throws InterruptedException
{
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public boolean isFailed()
{
return false;
}
public boolean isRunning()
{
return !executor.isTerminated() && !executor.isTerminating();
}
public boolean isStarted()
{
return !executor.isTerminated() && !executor.isTerminating();
}
public boolean isStarting()
{
return false;
}
public boolean isStopped()
{
return executor.isTerminated();
}
public boolean isStopping()
{
return executor.isTerminating();
}
protected void doStart() throws Exception
{
if (executor.isTerminated() || executor.isTerminating() || executor.isShutdown())
throw new IllegalStateException("Cannot restart");
}
protected void doStop() throws Exception
{
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS))
executor.shutdownNow();
}
}

View File

@ -99,6 +99,23 @@ public class ServerConnector extends AbstractNetworkConnector
this(server,null,null,null,0,0,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param acceptors
* the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections.
* @param selectors
* the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress.
*/
public ServerConnector(
@Name("server") Server server,
@Name("acceptors") int acceptors,
@Name("selectors") int selectors)
{
this(server,null,null,null,acceptors,selectors,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** Generic Server Connection with default configuration.
* <p>Construct a Server Connector with the passed Connection factories.</p>

View File

@ -451,10 +451,10 @@
<module>jetty-distribution</module>
<module>jetty-runner</module>
<module>jetty-monitor</module>
<module>jetty-http-spi</module>
<!-- modules that need fixed and added back, or simply dropped and not maintained
<module>jetty-rhttp</module>
<module>jetty-http-spi</module>
-->
<module>jetty-overlay-deployer</module>
</modules>