jetty-9 more work on stop/close. Turned on statistics all of the time, but still not enough. more work needed.

This commit is contained in:
Greg Wilkins 2012-08-12 11:24:24 +10:00
parent 73a89427b1
commit c84b496330
16 changed files with 203 additions and 57 deletions

View File

@ -29,6 +29,7 @@ import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -301,6 +302,85 @@ public class IOTest
} }
} }
@Test
public void testServerChannelInterrupt() throws Exception
{
final ServerSocketChannel connector = ServerSocketChannel.open();
connector.configureBlocking(true);
connector.socket().bind(null);
Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket();
client.setSoTimeout(2000);
client.setSoLinger(false, -1);
Socket server = connector.accept().socket();
server.setSoTimeout(2000);
server.setSoLinger(false, -1);
// Write from client to server
client.getOutputStream().write(1);
// Server reads
assertEquals(1, server.getInputStream().read());
// Write from server to client
server.getOutputStream().write(1);
// Client reads
assertEquals(1, client.getInputStream().read());
// block a thread in accept
final CountDownLatch alatch=new CountDownLatch(2);
Thread acceptor = new Thread()
{
@Override
public void run()
{
try
{
alatch.countDown();
connector.accept();
}
catch (Throwable e)
{
}
finally
{
alatch.countDown();
}
}
};
acceptor.start();
while (alatch.getCount()==2)
Thread.sleep(10);
// interrupt the acceptor
acceptor.interrupt();
// wait for acceptor to exit
assertTrue(alatch.await(10,TimeUnit.SECONDS));
// connector is closed
assertFalse(connector.isOpen());
// but connection is still open
assertFalse(client.isClosed());
assertFalse(server.isClosed());
// Write from client to server
client.getOutputStream().write(42);
// Server reads
assertEquals(42, server.getInputStream().read());
// Write from server to client
server.getOutputStream().write(43);
// Client reads
assertEquals(43, client.getInputStream().read());
client.close();
}
@Test @Test
public void testReset() throws Exception public void testReset() throws Exception
{ {

View File

@ -86,11 +86,13 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
_sslContextFactory = sslContextFactory; _sslContextFactory = sslContextFactory;
addBean(_server,false); addBean(_server,false);
addBean(_executor,false); addBean(_executor);
if (executor==null)
unmanage(_executor);
addBean(_scheduler,scheduler==null); addBean(_scheduler,scheduler==null);
addBean(_byteBufferPool,pool==null); addBean(_byteBufferPool,pool==null);
addBean(_sslContextFactory); addBean(_sslContextFactory);
addBean(_stats,false); addBean(_stats,true);
if (acceptors<=0) if (acceptors<=0)
acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 4); acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 4);

View File

@ -94,6 +94,7 @@ public abstract class AbstractNetworkConnector extends AbstractConnector impleme
@Override @Override
public void close() public void close()
{ {
// Interrupting is often sufficient to close the channel
interruptAcceptors(); interruptAcceptors();
} }

View File

@ -13,14 +13,21 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.server.Connector.Statistics; import org.eclipse.jetty.server.Connector.Statistics;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.statistic.CounterStatistic; import org.eclipse.jetty.util.statistic.CounterStatistic;
import org.eclipse.jetty.util.statistic.SampleStatistic; import org.eclipse.jetty.util.statistic.SampleStatistic;
class ConnectorStatistics extends AbstractLifeCycle implements Statistics class ConnectorStatistics extends AbstractLifeCycle implements Statistics, Dumpable
{ {
private final AtomicLong _startMillis = new AtomicLong(-1L); private final AtomicLong _startMillis = new AtomicLong(-1L);
private final CounterStatistic _connectionStats = new CounterStatistic(); private final CounterStatistic _connectionStats = new CounterStatistic();
@ -172,4 +179,19 @@ class ConnectorStatistics extends AbstractLifeCycle implements Statistics
_connectionDurationStats.set(duration); _connectionDurationStats.set(duration);
} }
} }
@Override
@ManagedOperation("dump thread state")
public String dump()
{
return AggregateLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
AggregateLifeCycle.dumpObject(out,this);
AggregateLifeCycle.dump(out,indent,Arrays.asList(new String[]{"connections="+_connectionStats,"duration="+_connectionDurationStats,"in="+_messagesIn,"out="+_messagesOut}));
}
} }

View File

@ -163,10 +163,13 @@ public class SelectChannelConnector extends AbstractNetworkConnector
{ {
ServerSocketChannel serverChannel = _acceptChannel; ServerSocketChannel serverChannel = _acceptChannel;
_acceptChannel = null; _acceptChannel = null;
super.close();
if (serverChannel != null) if (serverChannel != null)
{ {
removeBean(serverChannel); removeBean(serverChannel);
// If the interrupt did not close it, we should close it
if (serverChannel.isOpen()) if (serverChannel.isOpen())
{ {
try try
@ -179,6 +182,7 @@ public class SelectChannelConnector extends AbstractNetworkConnector
} }
} }
} }
// super.close();
_localPort = -2; _localPort = -2;
} }

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -34,6 +35,7 @@ import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.component.Container; import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.component.Destroyable; import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -304,29 +306,48 @@ public class Server extends HandlerWrapper implements Attributes
MultiException mex=new MultiException(); MultiException mex=new MultiException();
long stopTimeout = getStopTimeout();
if (stopTimeout>0) // First close the network connectors to stop accepting new connections
for (Connector connector : _connectors)
{ {
for (Connector connector : _connectors) if (connector instanceof NetworkConnector)
{ ((NetworkConnector)connector).close();
LOG.info("Graceful shutdown {}", connector);
if (connector instanceof NetworkConnector)
((NetworkConnector)connector).close();
}
Handler[] contexts = getChildHandlersByClass(Graceful.class);
for (Handler context : contexts)
{
Graceful graceful = (Graceful)context;
LOG.info("Graceful shutdown {}", graceful);
graceful.shutdown();
}
// TODO, wait for up to stopTimeout for connectors to have no more connections
// can currently only do this via statistics, which might not be turned on.
// should be able to count connections without stats.
} }
// Then tell the contexts that we are shutting down
Handler[] contexts = getChildHandlersByClass(Graceful.class);
for (Handler context : contexts)
{
Graceful graceful = (Graceful)context;
graceful.shutdown();
}
// Shall we gracefully wait for zero connections?
long stopTimeout = getStopTimeout();
if (stopTimeout>0 && LOG.isDebugEnabled()) // TODO disabled unless debg for now
{
long stop_by=System.currentTimeMillis()+stopTimeout;
LOG.info("Graceful shutdown {} by ",this,new Date(stop_by));
// TODO Need to be able to set the maxIdleTime on each individual connection
for (Connector connector : _connectors)
{
// TODO this is not good enough
if (connector instanceof AbstractConnector)
((AbstractConnector)connector).setIdleTimeout(1);
}
for (Connector connector : _connectors)
{
while (connector.getStatistics().isRunning() && connector.getStatistics().getConnectionsOpen()>0 && System.currentTimeMillis()<stop_by)
{
System.err.println(((Dumpable)connector).dump());
Thread.sleep(100);
}
}
}
// Now stop the connectors (this will close existing connections)
for (Connector connector : _connectors) for (Connector connector : _connectors)
{ {
try try
@ -339,6 +360,7 @@ public class Server extends HandlerWrapper implements Attributes
} }
} }
// And finall stop everything else
try try
{ {
super.doStop(); super.doStop();

View File

@ -49,8 +49,7 @@ public abstract class ConnectorCloseTestBase extends HttpServerTestFixture
@Test @Test
public void testCloseBetweenRequests() throws Exception public void testCloseBetweenRequests() throws Exception
{ {
int maxLength = 32; final int requestCount = 32;
int requestCount = iterations(maxLength);
final CountDownLatch latch = new CountDownLatch(requestCount); final CountDownLatch latch = new CountDownLatch(requestCount);
configureServer(new HelloWorldHandler()); configureServer(new HelloWorldHandler());
@ -85,29 +84,30 @@ public abstract class ConnectorCloseTestBase extends HttpServerTestFixture
Thread runner = new Thread(reader); Thread runner = new Thread(reader);
runner.start(); runner.start();
for (int pipeline = 1; pipeline < maxLength; pipeline++) for (int pipeline = 1; pipeline <= requestCount; pipeline++)
{ {
if (pipeline == maxLength / 2) if (pipeline == requestCount / 2)
_connector.close();
String request = "";
for (int i = 0; i < pipeline; i++)
{ {
request += // wait for at least 1 request to have been received
"GET /data?writes=1&block=16&id="+i+" HTTP/1.1\r\n"+ if (latch.getCount()==requestCount)
Thread.sleep(1);
_connector.close();
}
String request =
"GET /data?writes=1&block=16&id="+pipeline+" HTTP/1.1\r\n"+
"host: "+HOST+":"+_connector.getLocalPort()+"\r\n"+ "host: "+HOST+":"+_connector.getLocalPort()+"\r\n"+
"user-agent: testharness/1.0 (blah foo/bar)\r\n"+ "user-agent: testharness/1.0 (blah foo/bar)\r\n"+
"accept-encoding: nothing\r\n"+ "accept-encoding: nothing\r\n"+
"cookie: aaa=1234567890\r\n"+ "cookie: aaa=1234567890\r\n"+
"\r\n"; "\r\n";
}
os.write(request.getBytes()); os.write(request.getBytes());
os.flush(); os.flush();
Thread.sleep(25); Thread.sleep(25);
} }
latch.await(30, TimeUnit.SECONDS); assertTrue(latch.await(5, TimeUnit.SECONDS));
reader.setDone(); reader.setDone();
runner.join(); runner.join();
@ -115,8 +115,6 @@ public abstract class ConnectorCloseTestBase extends HttpServerTestFixture
finally finally
{ {
client.close(); client.close();
assertEquals(requestCount, requestCount - latch.getCount());
} }
} }

View File

@ -30,7 +30,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.HandlerWrapper; import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.toolchain.test.Stress; import org.eclipse.jetty.toolchain.test.Stress;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -40,8 +42,8 @@ public class HttpServerTestFixture
protected static final int LOOPS=Stress.isEnabled()?250:50; protected static final int LOOPS=Stress.isEnabled()?250:50;
protected static final String HOST="localhost"; protected static final String HOST="localhost";
protected static Server _server; protected Server _server;
protected static NetworkConnector _connector; protected NetworkConnector _connector;
protected String _scheme="http"; protected String _scheme="http";
protected Socket newSocket(String host,int port) throws Exception protected Socket newSocket(String host,int port) throws Exception
@ -53,13 +55,13 @@ public class HttpServerTestFixture
return socket; return socket;
} }
@BeforeClass @Before
public static void before() public void before()
{ {
_server = new Server(); _server = new Server();
} }
protected static void startServer(NetworkConnector connector) throws Exception protected void startServer(NetworkConnector connector) throws Exception
{ {
_connector = connector; _connector = connector;
_server.addConnector(_connector); _server.addConnector(_connector);
@ -67,11 +69,12 @@ public class HttpServerTestFixture
_server.start(); _server.start();
} }
@AfterClass @After
public static void stopServer() throws Exception public void stopServer() throws Exception
{ {
_server.stop(); _server.stop();
_server.join(); _server.join();
_server.setConnectors(new Connector[]{});
} }
protected void configureServer(Handler handler) throws Exception protected void configureServer(Handler handler) throws Exception

View File

@ -25,7 +25,6 @@ public class SelectChannelConnectorCloseTest extends ConnectorCloseTestBase
@Before @Before
public void init() throws Exception public void init() throws Exception
{ {
System.setProperty("org.eclipse.jetty.util.log.DEBUG","true");
startServer(new SelectChannelConnector(_server)); startServer(new SelectChannelConnector(_server));
} }

View File

@ -13,15 +13,15 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import org.junit.BeforeClass; import org.junit.Before;
/** /**
* HttpServer Tester. * HttpServer Tester.
*/ */
public class SelectChannelServerTest extends HttpServerTestBase public class SelectChannelServerTest extends HttpServerTestBase
{ {
@BeforeClass @Before
public static void init() throws Exception public void init() throws Exception
{ {
startServer(new SelectChannelConnector(_server)); startServer(new SelectChannelConnector(_server));
} }

View File

@ -22,14 +22,14 @@ import java.net.Socket;
import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.server.session.SessionHandler;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
public class SelectChannelTimeoutTest extends ConnectorTimeoutTest public class SelectChannelTimeoutTest extends ConnectorTimeoutTest
{ {
@Before
@BeforeClass public void init() throws Exception
public static void init() throws Exception
{ {
SelectChannelConnector connector = new SelectChannelConnector(_server); SelectChannelConnector connector = new SelectChannelConnector(_server);
connector.setIdleTimeout(MAX_IDLE_TIME); // 250 msec max idle connector.setIdleTimeout(MAX_IDLE_TIME); // 250 msec max idle

View File

@ -52,7 +52,6 @@ public class StatisticsHandlerTest
_connector = new LocalConnector(_server); _connector = new LocalConnector(_server);
_server.addConnector(_connector); _server.addConnector(_connector);
_connector.getStatistics().start();
_latchHandler = new LatchHandler(); _latchHandler = new LatchHandler();
_statsHandler = new StatisticsHandler(); _statsHandler = new StatisticsHandler();

View File

@ -27,6 +27,7 @@ import javax.net.ssl.TrustManagerFactory;
import org.eclipse.jetty.server.HttpServerTestBase; import org.eclipse.jetty.server.HttpServerTestBase;
import org.eclipse.jetty.server.SelectChannelConnector; import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -47,8 +48,8 @@ public class SelectChannelServerSslTest extends HttpServerTestBase
return __sslContext.getSocketFactory().createSocket(host,port); return __sslContext.getSocketFactory().createSocket(host,port);
} }
@BeforeClass @Before
public static void init() throws Exception public void init() throws Exception
{ {
String keystorePath = System.getProperty("basedir",".") + "/src/test/resources/keystore"; String keystorePath = System.getProperty("basedir",".") + "/src/test/resources/keystore";
SslContextFactory sslContextFactory = new SslContextFactory(); SslContextFactory sslContextFactory = new SslContextFactory();

View File

@ -23,6 +23,7 @@ import javax.net.ssl.TrustManagerFactory;
import org.eclipse.jetty.server.ConnectorTimeoutTest; import org.eclipse.jetty.server.ConnectorTimeoutTest;
import org.eclipse.jetty.server.SelectChannelConnector; import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
public class SslSelectChannelTimeoutTest extends ConnectorTimeoutTest public class SslSelectChannelTimeoutTest extends ConnectorTimeoutTest
@ -35,8 +36,8 @@ public class SslSelectChannelTimeoutTest extends ConnectorTimeoutTest
return __sslContext.getSocketFactory().createSocket(host,port); return __sslContext.getSocketFactory().createSocket(host,port);
} }
@BeforeClass @Before
public static void init() throws Exception public void init() throws Exception
{ {
String keystorePath = System.getProperty("basedir",".") + "/src/test/resources/keystore"; String keystorePath = System.getProperty("basedir",".") + "/src/test/resources/keystore";
SslContextFactory sslContextFactory = new SslContextFactory(); SslContextFactory sslContextFactory = new SslContextFactory();

View File

@ -109,4 +109,11 @@ public class CounterStatistic
{ {
return _total.get(); return _total.get();
} }
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s@%x{c=%d,m=%d,t=%d}",this.getClass().getSimpleName(),hashCode(),_curr.get(),_max.get(),_total.get());
}
} }

View File

@ -101,4 +101,11 @@ public class SampleStatistic
{ {
return Math.sqrt(getVariance()); return Math.sqrt(getVariance());
} }
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s@%x{c=%d,m=%d,t=%d,v100=%d}",this.getClass().getSimpleName(),hashCode(),_count.get(),_max.get(),_total.get(),_totalVariance100.get());
}
} }