Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9

This commit is contained in:
Jesse McConnell 2012-08-11 11:40:48 -05:00
commit 01f4bb2623
12 changed files with 141 additions and 55 deletions

View File

@ -192,7 +192,6 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
@Test
@Override
@Ignore("Not working")
public void testWriteBlocked() throws Exception
{
super.testWriteBlocked();

View File

@ -18,10 +18,14 @@ import java.net.Socket;
import java.nio.channels.AsynchronousCloseException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.print.attribute.standard.MediaSize.ISO;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -37,7 +41,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
*/
public abstract class AbstractConnector extends AggregateLifeCycle implements Connector, Dumpable
{
protected final Logger logger = Log.getLogger(getClass());
private final Logger LOG = Log.getLogger(getClass());
// Order is important on server side, so we use a LinkedHashMap
private final Map<String, ConnectionFactory> factories = new LinkedHashMap<>();
private final Statistics _stats = new ConnectorStatistics();
@ -47,6 +51,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
private final ScheduledExecutorService _scheduler;
private final ByteBufferPool _byteBufferPool;
private final Thread[] _acceptors;
private volatile CountDownLatch _stopping;
private volatile String _name;
private volatile long _idleTimeout = 200000;
private volatile ConnectionFactory defaultConnectionFactory;
@ -90,7 +95,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
if (acceptors<=0)
acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 4);
if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
logger.warn("Acceptors should be <= 2*availableProcessors: " + this);
LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
_acceptors = new Thread[acceptors];
}
@ -161,29 +166,38 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
{
super.doStart();
_stopping=new CountDownLatch(_acceptors.length);
for (int i = 0; i < _acceptors.length; i++)
getExecutor().execute(new Acceptor(i));
logger.info("Started {}", this);
LOG.info("Started {}", this);
}
protected void interruptAcceptors()
{
for (Thread thread : _acceptors)
{
if (thread != null)
thread.interrupt();
}
}
@Override
protected void doStop() throws Exception
{
for (Thread thread : _acceptors)
{
if (thread != null)
{
thread.interrupt();
// Tell the acceptors we are stopping
interruptAcceptors();
// If we have a stop timeout
long stopTimeout = getStopTimeout();
if (stopTimeout > 0)
thread.join(stopTimeout);
}
}
if (stopTimeout > 0 && _stopping!=null)
_stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
_stopping=null;
super.doStop();
logger.info("Stopped {}", this);
LOG.info("Stopped {}", this);
}
public void join() throws InterruptedException
@ -200,6 +214,16 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
/* ------------------------------------------------------------ */
/**
* @return Is the connector accepting new connections
*/
protected boolean isAccepting()
{
return isRunning();
}
public ConnectionFactory getConnectionFactory(String protocol)
{
synchronized (factories)
@ -264,7 +288,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
{
Thread current = Thread.currentThread();
String name = current.getName();
current.setName(name + " Acceptor" + _acceptor + " " + AbstractConnector.this);
current.setName(name + "-acceptor-" + _acceptor + "-" + AbstractConnector.this);
synchronized (AbstractConnector.this)
{
@ -273,20 +297,18 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
try
{
while (isRunning())
while (isAccepting())
{
try
{
accept(_acceptor);
}
catch (AsynchronousCloseException | InterruptedException e)
{
logger.ignore(e);
break;
}
catch (Throwable e)
{
logger.warn(e);
if (isAccepting())
LOG.warn(e);
else
LOG.debug(e);
}
}
}
@ -298,6 +320,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
{
_acceptors[_acceptor] = null;
}
_stopping.countDown();
}
}
}

View File

@ -92,8 +92,15 @@ public abstract class AbstractNetworkConnector extends AbstractConnector impleme
}
@Override
public void close() throws IOException
public void close()
{
interruptAcceptors();
}
@Override
protected boolean isAccepting()
{
return super.isAccepting() && isOpen();
}
@Override

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.util.log.Logger;
*/
public class HttpConnection extends AbstractConnection
{
public static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();

View File

@ -27,10 +27,13 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class LocalConnector extends AbstractConnector
{
private static final Logger LOG = Log.getLogger(LocalConnector.class);
private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
public LocalConnector(Server server)
@ -120,7 +123,7 @@ public class LocalConnector extends AbstractConnector
*/
public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
{
logger.debug("getResponses");
LOG.debug("getResponses");
LocalEndPoint endp = new LocalEndPoint();
endp.setInput(requestsBuffer);
_connects.add(endp);
@ -145,7 +148,7 @@ public class LocalConnector extends AbstractConnector
@Override
protected void accept(int acceptorID) throws IOException, InterruptedException
{
logger.debug("accepting {}", acceptorID);
LOG.debug("accepting {}", acceptorID);
LocalEndPoint endp = _connects.take();
Connection connection = getDefaultConnectionFactory().newConnection(null, endp, null);
endp.setConnection(connection);
@ -215,7 +218,7 @@ public class LocalConnector extends AbstractConnector
}
catch(Exception e)
{
logger.warn(e);
LOG.warn(e);
}
}
}
@ -237,7 +240,7 @@ public class LocalConnector extends AbstractConnector
}
catch(Exception e)
{
logger.warn(e);
LOG.warn(e);
}
}
}

View File

@ -32,9 +32,20 @@ public interface NetworkConnector extends Connector, AutoCloseable
/**
* <p>Performs the activities needed to close the network communication
* (for example, to stop accepting network connections).</p>
* Once a connector has been closed, it cannot be opened again without first
* calling {@link #stop()} and it will not be active again until a subsequent call to {@link #start()}
* @throws IOException if this connector cannot be closed
*/
void close() throws IOException;
@Override
void close();
/* ------------------------------------------------------------ */
/**
* A Connector may be opened and not started (to reserve a port)
* or closed and running (to allow graceful shutdown of existing connections)
* @return True if the connector is Open.
*/
boolean isOpen();
/**
* @return The hostname representing the interface to which

View File

@ -32,6 +32,8 @@ import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
/**
@ -39,6 +41,8 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
*/
public class SelectChannelConnector extends AbstractNetworkConnector
{
public static final Logger LOG = Log.getLogger(SelectChannelConnector.class);
private final SelectorManager _manager;
private volatile ServerSocketChannel _acceptChannel;
private volatile boolean _inheritChannel = false;
@ -85,6 +89,13 @@ public class SelectChannelConnector extends AbstractNetworkConnector
setDefaultConnectionFactory(new HttpServerConnectionFactory(this));
}
@Override
public boolean isOpen()
{
ServerSocketChannel channel = _acceptChannel;
return channel!=null && channel.isOpen();
}
/**
* @return whether this connector uses a channel inherited from the JVM.
* @see System#inheritedChannel()
@ -122,7 +133,7 @@ public class SelectChannelConnector extends AbstractNetworkConnector
if (channel instanceof ServerSocketChannel)
serverChannel = (ServerSocketChannel)channel;
else
logger.warn("Unable to use System.inheritedChannel() [{}]. Trying a new ServerSocketChannel at {}:{}", channel, getHost(), getPort());
LOG.warn("Unable to use System.inheritedChannel() [{}]. Trying a new ServerSocketChannel at {}:{}", channel, getHost(), getPort());
}
if (serverChannel == null)
@ -148,16 +159,26 @@ public class SelectChannelConnector extends AbstractNetworkConnector
}
@Override
public void close() throws IOException
public void close()
{
ServerSocketChannel serverChannel = _acceptChannel;
_acceptChannel = null;
super.close();
if (serverChannel != null)
{
removeBean(serverChannel);
if (serverChannel.isOpen())
{
try
{
serverChannel.close();
}
_acceptChannel = null;
catch (IOException e)
{
LOG.warn(e);
}
}
}
_localPort = -2;
}
@ -187,7 +208,7 @@ public class SelectChannelConnector extends AbstractNetworkConnector
}
catch (SocketException e)
{
logger.ignore(e);
LOG.ignore(e);
}
}
@ -321,4 +342,5 @@ public class SelectChannelConnector extends AbstractNetworkConnector
return SelectChannelConnector.this.newConnection(channel, endpoint, attachment);
}
}
}

View File

@ -321,7 +321,10 @@ public class Server extends HandlerWrapper implements Attributes
LOG.info("Graceful shutdown {}", graceful);
graceful.shutdown();
}
Thread.sleep(stopTimeout);
// TODO, wait for up to stopTimeout for connectors to have no more connections
// can currently only do this via statistics, which might not be turned on.
// should be able to count connections without stats.
}
for (Connector connector : _connectors)

View File

@ -7,7 +7,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class LocalHttpConnectorTest
public class LocalConnectorTest
{
private Server _server;
private LocalConnector _connector;

View File

@ -29,16 +29,21 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class SPDYServerConnector extends SelectChannelConnector
{
private static final Logger LOG = Log.getLogger(SPDYServerConnector.class);
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ServerSessionFrameListener listener;
private volatile int initialWindowSize;
@ -67,7 +72,7 @@ public class SPDYServerConnector extends SelectChannelConnector
protected void doStart() throws Exception
{
super.doStart();
logger.info("SPDY support is experimental. Please report feedback at jetty-dev@eclipse.org");
LOG.info("SPDY support is experimental. Please report feedback at jetty-dev@eclipse.org");
}
@Override

View File

@ -41,7 +41,7 @@ public abstract class AbstractLifeCycle implements LifeCycle
private final Object _lock = new Object();
private final int __FAILED = -1, __STOPPED = 0, __STARTING = 1, __STARTED = 2, __STOPPING = 3;
private volatile int _state = __STOPPED;
private long _stopTimeout = 0;
private long _stopTimeout = 30000;
protected void doStart() throws Exception
{

View File

@ -111,37 +111,50 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
super.doStop();
long start=System.currentTimeMillis();
// TODO: review the stop logic avoiding sleep(1), and eventually using Thread.interrupt() + thread.join()
// let jobs complete naturally for a while
while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (getStopTimeout()/2))
Thread.sleep(1);
// kill queued jobs and flush out idle jobs
long timeout=getStopTimeout();
BlockingQueue<Runnable> jobs = getQueue();
// If no stop timeout, clear job queue
if (timeout<=0)
jobs.clear();
// Fill job Q with noop jobs to wakeup idle
Runnable noop = new Runnable(){@Override public void run(){}};
for (int i=_threadsIdle.get();i-->0;)
for (int i=_threadsStarted.get();i-->0;)
jobs.offer(noop);
Thread.yield();
// try to jobs complete naturally for half our stop time
long stopby=System.currentTimeMillis()+timeout/2;
for (Thread thread : _threads)
{
long canwait =stopby-System.currentTimeMillis();
if (canwait>0)
thread.join(canwait);
}
// If we still have threads running, get a bit more aggressive
// interrupt remaining threads
if (_threadsStarted.get()>0)
for (Thread thread : _threads)
thread.interrupt();
// wait for remaining threads to die
while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < getStopTimeout())
// wait again for the other half of our stop time
stopby=System.currentTimeMillis()+timeout/2;
for (Thread thread : _threads)
{
Thread.sleep(1);
long canwait =stopby-System.currentTimeMillis();
if (canwait>0)
thread.join(canwait);
}
Thread.yield();
int size=_threads.size();
if (size>0)
{
LOG.warn("{} threads could not be stopped", size);
if (size==1 || LOG.isDebugEnabled())
if (size<=Runtime.getRuntime().availableProcessors() || LOG.isDebugEnabled())
{
for (Thread unstopped : _threads)
{