From 4b832d9561af3ca19d8aacaba778725733b10910 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 5 May 2009 02:03:44 +0000 Subject: [PATCH] 273791 reworked java-6 workaround. Better recreation of selectset git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@203 7e9141cc-0065-0410-87d8-b60c137991c4 --- .../eclipse/jetty/io/nio/SelectorManager.java | 108 +++-- .../jetty/server/AbstractConnector.java | 6 +- .../eclipse/jetty/server/HttpConnection.java | 22 - .../eclipse/jetty/server/LocalConnector.java | 1 - .../server/nio/SelectChannelConnector.java | 10 +- .../eclipse/jetty/server/AsyncStressTest.java | 379 ++++++++++++++++++ 6 files changed, 474 insertions(+), 52 deletions(-) create mode 100644 jetty-server/src/test/java/org/eclipse/jetty/server/AsyncStressTest.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java index a146089258e..8657f1d413f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java @@ -246,6 +246,35 @@ public abstract class SelectorManager extends AbstractLifeCycle { Log.warn(ex); } + + /* ------------------------------------------------------------------------------- */ + public void dump() + { + for (final SelectSet set :_selectSet) + { + Thread selecting = set._selecting; + Log.info("SelectSet "+set._setID+" : "+selecting); + if (selecting!=null) + { + StackTraceElement[] trace =selecting.getStackTrace(); + if (trace!=null) + { + for (StackTraceElement e : trace) + { + Log.info("\tat "+e.toString()); + } + } + } + + set.addChange(new ChangeTask(){ + public void run() + { + set.dump(); + } + }); + } + } + /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ @@ -260,7 +289,7 @@ public abstract class SelectorManager extends AbstractLifeCycle private transient Selector _selector; private transient int _setID; private transient int _jvmBug; - private volatile boolean _selecting; + private volatile Thread _selecting; /* ------------------------------------------------------------ */ SelectSet(int acceptorID) throws Exception @@ -288,9 +317,14 @@ public abstract class SelectorManager extends AbstractLifeCycle } /* ------------------------------------------------------------ */ - public void addChange(SocketChannel channel, Object att) + public void addChange(SelectableChannel channel, Object att) { - addChange(new AttachedSocketChannel(channel,att)); + if (att==null) + addChange(channel); + else if (att instanceof EndPoint) + addChange(att); + else + addChange(new ChangeSelectableChannel(channel,att)); } /* ------------------------------------------------------------ */ @@ -309,13 +343,13 @@ public abstract class SelectorManager extends AbstractLifeCycle { try { + _selecting=Thread.currentThread(); List changes; final Selector selector; synchronized (_changes) { changes=_changes[_change]; _change=_change==0?1:0; - _selecting=true; selector=_selector; } @@ -327,6 +361,7 @@ public abstract class SelectorManager extends AbstractLifeCycle try { Object o = changes.get(i); + if (o instanceof EndPoint) { // Update the operations for a key. @@ -337,21 +372,21 @@ public abstract class SelectorManager extends AbstractLifeCycle { dispatch((Runnable)o); } - else if (o instanceof AttachedSocketChannel) + else if (o instanceof ChangeSelectableChannel) { // finish accepting/connecting this connection - final AttachedSocketChannel asc = (AttachedSocketChannel)o; - final SocketChannel channel=asc._channel; + final ChangeSelectableChannel asc = (ChangeSelectableChannel)o; + final SelectableChannel channel=asc._channel; final Object att = asc._attachment; - if (channel.isConnected()) + if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) { SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att); - SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); + SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key); key.attach(endpoint); endpoint.schedule(); } - else + else if (channel.isOpen()) { channel.register(selector,SelectionKey.OP_CONNECT,att); } @@ -367,7 +402,7 @@ public abstract class SelectorManager extends AbstractLifeCycle key.attach(endpoint); endpoint.schedule(); } - else + else if (channel.isOpen()) { channel.register(selector,SelectionKey.OP_CONNECT,null); } @@ -377,6 +412,10 @@ public abstract class SelectorManager extends AbstractLifeCycle ServerSocketChannel channel = (ServerSocketChannel)o; channel.register(getSelector(),SelectionKey.OP_ACCEPT); } + else if (o instanceof ChangeTask) + { + ((ChangeTask)o).run(); + } else throw new IllegalArgumentException(o.toString()); } @@ -424,10 +463,10 @@ public abstract class SelectorManager extends AbstractLifeCycle // Look for JVM bugs // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 - if (selected==0 && (now-before)16) + if (_jvmBug>64) { synchronized (this) { @@ -452,14 +491,14 @@ public abstract class SelectorManager extends AbstractLifeCycle if (attachment==null) addChange(channel); else - addChange(attachment); + addChange(channel,attachment); } _selector.close(); _selector=new_selector; return; } } - else if (_jvmBug>8) + else if (_jvmBug>32) { // Cancel keys with 0 interested ops if (_jvmBug0) @@ -480,7 +519,6 @@ public abstract class SelectorManager extends AbstractLifeCycle } return; } - } else _jvmBug=0; @@ -599,7 +637,7 @@ public abstract class SelectorManager extends AbstractLifeCycle // Everything always handled selector.selectedKeys().clear(); - + // tick over the timers _idleTimeout.tick(now); _timeout.tick(now); @@ -610,7 +648,7 @@ public abstract class SelectorManager extends AbstractLifeCycle } finally { - _selecting=false; + _selecting=null; } } @@ -667,7 +705,7 @@ public abstract class SelectorManager extends AbstractLifeCycle while(selecting) { wakeup(); - selecting=_selecting; + selecting=_selecting!=null; } ArrayList keys=new ArrayList(_selector.keys()); @@ -694,11 +732,11 @@ public abstract class SelectorManager extends AbstractLifeCycle synchronized (this) { - selecting=_selecting; + selecting=_selecting!=null; while(selecting) { wakeup(); - selecting=_selecting; + selecting=_selecting!=null; } _idleTimeout.cancelAll(); @@ -715,21 +753,41 @@ public abstract class SelectorManager extends AbstractLifeCycle _selector=null; } } + + public void dump() + { + synchronized (System.err) + { + Selector selector=_selector; + Log.info("SelectSet "+_setID+" "+selector.keys().size()); + for (SelectionKey key: selector.keys()) + { + if (key.isValid()) + Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment()); + else + Log.info(key.channel()+" - - "+key.attachment()); + } + } + } } /* ------------------------------------------------------------ */ - private static class AttachedSocketChannel + private static class ChangeSelectableChannel { - final SocketChannel _channel; + final SelectableChannel _channel; final Object _attachment; - public AttachedSocketChannel(SocketChannel channel, Object attachment) + public ChangeSelectableChannel(SelectableChannel channel, Object attachment) { super(); _channel = channel; _attachment = attachment; } + } - + /* ------------------------------------------------------------ */ + private interface ChangeTask + { + public void run(); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 76593e6d4dd..041d1cd1c3f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -74,6 +74,8 @@ public abstract class AbstractConnector extends AbstractBuffers implements Conne Object _statsLock = new Object(); transient long _statsStartedAt=-1; + + // TODO use concurrents for these! transient int _requests; transient int _connections; // total number of connections made to server @@ -922,6 +924,8 @@ public abstract class AbstractConnector extends AbstractBuffers implements Conne { long duration=System.currentTimeMillis()-connection.getTimeStamp(); int requests=connection.getRequests(); + + synchronized(_statsLock) { _requests+=requests; @@ -943,8 +947,6 @@ public abstract class AbstractConnector extends AbstractBuffers implements Conne } } - if (connection!=null) - connection.destroy(); } /* ------------------------------------------------------------ */ diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 588d5b61b11..26dd1ec00c3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -150,28 +150,6 @@ public class HttpConnection implements Connection _generator.setSendServerVersion(server.getSendServerVersion()); _server = server; } - - /* ------------------------------------------------------------ */ - public void destroy() - { - synchronized(this) - { - while(_handling) - Thread.yield(); - - if (_parser!=null) - _parser.reset(true); - - if (_generator!=null) - _generator.reset(true); - - if (_requestFields!=null) - _requestFields.destroy(); - - if (_responseFields!=null) - _responseFields.destroy(); - } - } /* ------------------------------------------------------------ */ /** diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 1eed8de27d3..cb845e26835 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -190,7 +190,6 @@ public class LocalConnector extends AbstractConnector if (!_keepOpen) { connectionClosed(connection); - connection.destroy(); connection=null; } synchronized (this) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index 5aa162c69e8..99684ba376c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -84,9 +84,8 @@ public class SelectChannelConnector extends AbstractNIOConnector return getThreadPool().dispatch(task); } - protected void endPointClosed(SelectChannelEndPoint endpoint) + protected void endPointClosed(final SelectChannelEndPoint endpoint) { - // TODO handle max connections and low resources connectionClosed((HttpConnection)endpoint.getConnection()); } @@ -317,4 +316,11 @@ public class SelectChannelConnector extends AbstractNIOConnector } }; } + + /* ------------------------------------------------------------------------------- */ + public void dump() + { + Log.info("channel "+_acceptChannel+(_acceptChannel.isOpen()?" is open":" is closed")); + _manager.dump(); + } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncStressTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncStressTest.java new file mode 100644 index 00000000000..ec0b546eed5 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncStressTest.java @@ -0,0 +1,379 @@ +// ======================================================================== +// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.Socket; +import java.net.URL; +import java.net.URLConnection; +import java.util.Date; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import junit.framework.TestCase; + +import org.eclipse.jetty.continuation.ContinuationEvent; +import org.eclipse.jetty.continuation.ContinuationListener; +import org.eclipse.jetty.server.handler.HandlerWrapper; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.thread.QueuedThreadPool; + +public class AsyncStressTest extends TestCase +{ + protected Server _server = new Server(); + protected SuspendHandler _handler = new SuspendHandler(); + protected SelectChannelConnector _connector; + protected InetAddress _addr; + protected int _port; + protected Random _random = new Random(); + protected int[] _loops; + protected QueuedThreadPool _threads=new QueuedThreadPool(); + protected boolean _stress; + + private static int STRESS_THREADS=500; + + protected void setUp() throws Exception + { + _stress= Boolean.getBoolean("STRESS"); + _threads.setMaxThreads(50); + _server.setThreadPool(_threads); + _connector = new SelectChannelConnector(); + _server.setConnectors(new Connector[]{ _connector }); + _server.setHandler(_handler); + _server.start(); + _port=_connector.getLocalPort(); + _addr=Inet4Address.getLocalHost(); + } + + protected void tearDown() throws Exception + { + _server.stop(); + } + + final static String[][] __tests = + { + {"/path","NORMAL"}, + {"/path?sleep=","SLEPT"}, + {"/path?suspend=","TIMEOUT"}, + {"/path?suspend=1000&resume=","RESUMED"}, + {"/path?suspend=1000&complete=","COMPLETED"}, + }; + + + public void doPaths(String name) throws Exception + { + for (int i=0;i<__tests.length;i++) + { + int timeout = _random.nextInt(200)+1; + String uri=__tests[i][0].replace("",Integer.toString(timeout)); + + long start=System.currentTimeMillis(); + Socket socket = new Socket(_addr,_port); + socket.setSoTimeout(30000); + String request = "GET "+uri+" HTTP/1.0\r\n\r\n"; + socket.getOutputStream().write(request.getBytes()); + socket.getOutputStream().flush(); + String response = IO.toString(socket.getInputStream()); + socket.close(); + long end=System.currentTimeMillis(); + + response=response.substring(response.indexOf("\r\n\r\n")+4); + + String test=name+"-"+i+" "+uri+" "+__tests[i][1]; + assertEquals(test,__tests[i][1],response); + if (!response.equals("NORMAL")) + { + long duration=end-start; + assertTrue(test+" "+duration,duration+50>=timeout); + } + + } + } + + public void doLoops(int thread, String name, int loops) throws Exception + { + try + { + for (int i=0;imax) + max=l; + total+=l; + if (l==loops) + finished++; + } + } + + Log.info("min/ave/max/target="+min+"/"+(total/threads)+"/"+max+"/"+loops+" errors/finished/loops="+errors+"/"+finished+"/"+threads+" idle/threads="+(_threads.getIdleThreads()+_connector.getAcceptors())+"/"+_threads.getThreads()); + if ((finished+errors)==threads) + break; + } + + for (int i=0;i0) + { + byte[] buf=new byte[read_before]; + request.getInputStream().read(buf); + } + else if (read_before<0) + { + InputStream in = request.getInputStream(); + int b=in.read(); + while(b!=-1) + b=in.read(); + } + + if (suspend_for>=0) + { + if (suspend_for>0) + base_request.setAsyncTimeout(suspend_for); + base_request.addEventListener(__asyncListener); + base_request.startAsync(); + } + else if (sleep_for>=0) + { + try + { + Thread.sleep(sleep_for); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + response.setStatus(200); + response.getOutputStream().print("SLEPT"); + base_request.setHandled(true); + return; + } + else + { + response.setStatus(200); + response.getOutputStream().print("NORMAL"); + base_request.setHandled(true); + return; + } + + final AsyncContext asyncContext = base_request.getAsyncContext(); + + + if (complete_after>0) + { + TimerTask complete = new TimerTask() + { + public void run() + { + try + { + response.setStatus(200); + response.getOutputStream().print("COMPLETED"); + base_request.setHandled(true); + asyncContext.complete(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + }; + synchronized (_timer) + { + _timer.schedule(complete,complete_after); + } + } + else if (complete_after==0) + { + response.setStatus(200); + response.getOutputStream().print("COMPLETED"); + base_request.setHandled(true); + asyncContext.complete(); + } + + if (resume_after>0) + { + TimerTask resume = new TimerTask() + { + public void run() + { + asyncContext.dispatch(); + } + }; + synchronized (_timer) + { + _timer.schedule(resume,resume_after); + } + } + else if (resume_after==0) + { + asyncContext.dispatch(); + } + } + else if (request.getAttribute("TIMEOUT")!=null) + { + response.setStatus(200); + response.getOutputStream().print("TIMEOUT"); + base_request.setHandled(true); + } + else + { + response.setStatus(200); + response.getOutputStream().print("RESUMED"); + base_request.setHandled(true); + } + } + } + + + private static ContinuationListener __asyncListener = + new ContinuationListener() + { + public void onComplete(ContinuationEvent event) throws IOException + { + } + + public void onTimeout(ContinuationEvent event) throws IOException + { + event.getRequest().setAttribute("TIMEOUT",Boolean.TRUE); + ((Request)event.getRequest()).getAsyncContext().dispatch(); + } + + }; +}