273791 reworked java-6 workaround. Better recreation of selectset

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@203 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2009-05-05 02:03:44 +00:00
parent b35e984317
commit 4b832d9561
6 changed files with 474 additions and 52 deletions

View File

@ -246,6 +246,35 @@ public abstract class SelectorManager extends AbstractLifeCycle
{
Log.warn(ex);
}
/* ------------------------------------------------------------------------------- */
public void dump()
{
for (final SelectSet set :_selectSet)
{
Thread selecting = set._selecting;
Log.info("SelectSet "+set._setID+" : "+selecting);
if (selecting!=null)
{
StackTraceElement[] trace =selecting.getStackTrace();
if (trace!=null)
{
for (StackTraceElement e : trace)
{
Log.info("\tat "+e.toString());
}
}
}
set.addChange(new ChangeTask(){
public void run()
{
set.dump();
}
});
}
}
/* ------------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------------- */
@ -260,7 +289,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
private transient Selector _selector;
private transient int _setID;
private transient int _jvmBug;
private volatile boolean _selecting;
private volatile Thread _selecting;
/* ------------------------------------------------------------ */
SelectSet(int acceptorID) throws Exception
@ -288,9 +317,14 @@ public abstract class SelectorManager extends AbstractLifeCycle
}
/* ------------------------------------------------------------ */
public void addChange(SocketChannel channel, Object att)
public void addChange(SelectableChannel channel, Object att)
{
addChange(new AttachedSocketChannel(channel,att));
if (att==null)
addChange(channel);
else if (att instanceof EndPoint)
addChange(att);
else
addChange(new ChangeSelectableChannel(channel,att));
}
/* ------------------------------------------------------------ */
@ -309,13 +343,13 @@ public abstract class SelectorManager extends AbstractLifeCycle
{
try
{
_selecting=Thread.currentThread();
List<?> changes;
final Selector selector;
synchronized (_changes)
{
changes=_changes[_change];
_change=_change==0?1:0;
_selecting=true;
selector=_selector;
}
@ -327,6 +361,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
try
{
Object o = changes.get(i);
if (o instanceof EndPoint)
{
// Update the operations for a key.
@ -337,21 +372,21 @@ public abstract class SelectorManager extends AbstractLifeCycle
{
dispatch((Runnable)o);
}
else if (o instanceof AttachedSocketChannel)
else if (o instanceof ChangeSelectableChannel)
{
// finish accepting/connecting this connection
final AttachedSocketChannel asc = (AttachedSocketChannel)o;
final SocketChannel channel=asc._channel;
final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
final SelectableChannel channel=asc._channel;
final Object att = asc._attachment;
if (channel.isConnected())
if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
{
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
key.attach(endpoint);
endpoint.schedule();
}
else
else if (channel.isOpen())
{
channel.register(selector,SelectionKey.OP_CONNECT,att);
}
@ -367,7 +402,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
key.attach(endpoint);
endpoint.schedule();
}
else
else if (channel.isOpen())
{
channel.register(selector,SelectionKey.OP_CONNECT,null);
}
@ -377,6 +412,10 @@ public abstract class SelectorManager extends AbstractLifeCycle
ServerSocketChannel channel = (ServerSocketChannel)o;
channel.register(getSelector(),SelectionKey.OP_ACCEPT);
}
else if (o instanceof ChangeTask)
{
((ChangeTask)o).run();
}
else
throw new IllegalArgumentException(o.toString());
}
@ -424,10 +463,10 @@ public abstract class SelectorManager extends AbstractLifeCycle
// Look for JVM bugs
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
if (selected==0 && (now-before)<wait/2)
if (selected==0 && (now-before)<(wait/2))
{
_jvmBug++;
if (_jvmBug>16)
if (_jvmBug>64)
{
synchronized (this)
{
@ -452,14 +491,14 @@ public abstract class SelectorManager extends AbstractLifeCycle
if (attachment==null)
addChange(channel);
else
addChange(attachment);
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
return;
}
}
else if (_jvmBug>8)
else if (_jvmBug>32)
{
// Cancel keys with 0 interested ops
if (_jvmBug0)
@ -480,7 +519,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
}
return;
}
}
else
_jvmBug=0;
@ -599,7 +637,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
// Everything always handled
selector.selectedKeys().clear();
// tick over the timers
_idleTimeout.tick(now);
_timeout.tick(now);
@ -610,7 +648,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
}
finally
{
_selecting=false;
_selecting=null;
}
}
@ -667,7 +705,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
while(selecting)
{
wakeup();
selecting=_selecting;
selecting=_selecting!=null;
}
ArrayList<SelectionKey> keys=new ArrayList<SelectionKey>(_selector.keys());
@ -694,11 +732,11 @@ public abstract class SelectorManager extends AbstractLifeCycle
synchronized (this)
{
selecting=_selecting;
selecting=_selecting!=null;
while(selecting)
{
wakeup();
selecting=_selecting;
selecting=_selecting!=null;
}
_idleTimeout.cancelAll();
@ -715,21 +753,41 @@ public abstract class SelectorManager extends AbstractLifeCycle
_selector=null;
}
}
public void dump()
{
synchronized (System.err)
{
Selector selector=_selector;
Log.info("SelectSet "+_setID+" "+selector.keys().size());
for (SelectionKey key: selector.keys())
{
if (key.isValid())
Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
else
Log.info(key.channel()+" - - "+key.attachment());
}
}
}
}
/* ------------------------------------------------------------ */
private static class AttachedSocketChannel
private static class ChangeSelectableChannel
{
final SocketChannel _channel;
final SelectableChannel _channel;
final Object _attachment;
public AttachedSocketChannel(SocketChannel channel, Object attachment)
public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
{
super();
_channel = channel;
_attachment = attachment;
}
}
/* ------------------------------------------------------------ */
private interface ChangeTask
{
public void run();
}
}

View File

@ -74,6 +74,8 @@ public abstract class AbstractConnector extends AbstractBuffers implements Conne
Object _statsLock = new Object();
transient long _statsStartedAt=-1;
// TODO use concurrents for these!
transient int _requests;
transient int _connections; // total number of connections made to server
@ -922,6 +924,8 @@ public abstract class AbstractConnector extends AbstractBuffers implements Conne
{
long duration=System.currentTimeMillis()-connection.getTimeStamp();
int requests=connection.getRequests();
synchronized(_statsLock)
{
_requests+=requests;
@ -943,8 +947,6 @@ public abstract class AbstractConnector extends AbstractBuffers implements Conne
}
}
if (connection!=null)
connection.destroy();
}
/* ------------------------------------------------------------ */

View File

@ -150,28 +150,6 @@ public class HttpConnection implements Connection
_generator.setSendServerVersion(server.getSendServerVersion());
_server = server;
}
/* ------------------------------------------------------------ */
public void destroy()
{
synchronized(this)
{
while(_handling)
Thread.yield();
if (_parser!=null)
_parser.reset(true);
if (_generator!=null)
_generator.reset(true);
if (_requestFields!=null)
_requestFields.destroy();
if (_responseFields!=null)
_responseFields.destroy();
}
}
/* ------------------------------------------------------------ */
/**

View File

@ -190,7 +190,6 @@ public class LocalConnector extends AbstractConnector
if (!_keepOpen)
{
connectionClosed(connection);
connection.destroy();
connection=null;
}
synchronized (this)

View File

@ -84,9 +84,8 @@ public class SelectChannelConnector extends AbstractNIOConnector
return getThreadPool().dispatch(task);
}
protected void endPointClosed(SelectChannelEndPoint endpoint)
protected void endPointClosed(final SelectChannelEndPoint endpoint)
{
// TODO handle max connections and low resources
connectionClosed((HttpConnection)endpoint.getConnection());
}
@ -317,4 +316,11 @@ public class SelectChannelConnector extends AbstractNIOConnector
}
};
}
/* ------------------------------------------------------------------------------- */
public void dump()
{
Log.info("channel "+_acceptChannel+(_acceptChannel.isOpen()?" is open":" is closed"));
_manager.dump();
}
}

View File

@ -0,0 +1,379 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InputStream;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLConnection;
import java.util.Date;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import junit.framework.TestCase;
import org.eclipse.jetty.continuation.ContinuationEvent;
import org.eclipse.jetty.continuation.ContinuationListener;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
public class AsyncStressTest extends TestCase
{
protected Server _server = new Server();
protected SuspendHandler _handler = new SuspendHandler();
protected SelectChannelConnector _connector;
protected InetAddress _addr;
protected int _port;
protected Random _random = new Random();
protected int[] _loops;
protected QueuedThreadPool _threads=new QueuedThreadPool();
protected boolean _stress;
private static int STRESS_THREADS=500;
protected void setUp() throws Exception
{
_stress= Boolean.getBoolean("STRESS");
_threads.setMaxThreads(50);
_server.setThreadPool(_threads);
_connector = new SelectChannelConnector();
_server.setConnectors(new Connector[]{ _connector });
_server.setHandler(_handler);
_server.start();
_port=_connector.getLocalPort();
_addr=Inet4Address.getLocalHost();
}
protected void tearDown() throws Exception
{
_server.stop();
}
final static String[][] __tests =
{
{"/path","NORMAL"},
{"/path?sleep=<TIMEOUT>","SLEPT"},
{"/path?suspend=<TIMEOUT>","TIMEOUT"},
{"/path?suspend=1000&resume=<TIMEOUT>","RESUMED"},
{"/path?suspend=1000&complete=<TIMEOUT>","COMPLETED"},
};
public void doPaths(String name) throws Exception
{
for (int i=0;i<__tests.length;i++)
{
int timeout = _random.nextInt(200)+1;
String uri=__tests[i][0].replace("<TIMEOUT>",Integer.toString(timeout));
long start=System.currentTimeMillis();
Socket socket = new Socket(_addr,_port);
socket.setSoTimeout(30000);
String request = "GET "+uri+" HTTP/1.0\r\n\r\n";
socket.getOutputStream().write(request.getBytes());
socket.getOutputStream().flush();
String response = IO.toString(socket.getInputStream());
socket.close();
long end=System.currentTimeMillis();
response=response.substring(response.indexOf("\r\n\r\n")+4);
String test=name+"-"+i+" "+uri+" "+__tests[i][1];
assertEquals(test,__tests[i][1],response);
if (!response.equals("NORMAL"))
{
long duration=end-start;
assertTrue(test+" "+duration,duration+50>=timeout);
}
}
}
public void doLoops(int thread, String name, int loops) throws Exception
{
try
{
for (int i=0;i<loops;i++)
{
_loops[thread]=i;
doPaths(name+"-"+i);
Thread.sleep(_random.nextInt(100));
}
_loops[thread]=loops;
}
catch(Exception e)
{
System.err.println(e);
_connector.dump();
_loops[thread]=-_loops[thread];
throw e;
}
}
public void doThreads(int threads,final int loops) throws Throwable
{
final Throwable[] throwable=new Throwable[threads];
final Thread[] thread=new Thread[threads];
for (int i=0;i<threads;i++)
{
final int id=i;
final String name = "T"+i;
thread[i]=new Thread()
{
public void run()
{
try
{
doLoops(id,name,loops);
}
catch(Throwable th)
{
th.printStackTrace();
throwable[id]=th;
}
finally
{
}
}
};
}
_loops=new int[threads];
for (int i=0;i<threads;i++)
thread[i].start();
while(true)
{
Thread.sleep(1000L);
int finished=0;
int errors=0;
int min=loops;
int max=0;
int total=0;
for (int i=0;i<threads;i++)
{
int l=_loops[i];
if (l<0)
{
errors++;
total-=l;
}
else
{
if (l<min)
min=l;
if (l>max)
max=l;
total+=l;
if (l==loops)
finished++;
}
}
Log.info("min/ave/max/target="+min+"/"+(total/threads)+"/"+max+"/"+loops+" errors/finished/loops="+errors+"/"+finished+"/"+threads+" idle/threads="+(_threads.getIdleThreads()+_connector.getAcceptors())+"/"+_threads.getThreads());
if ((finished+errors)==threads)
break;
}
for (int i=0;i<threads;i++)
thread[i].join();
for (int i=0;i<threads;i++)
if (throwable[i]!=null)
throw throwable[i];
}
public void testAsync() throws Throwable
{
if (_stress)
{
System.err.println("STRESS! "+STRESS_THREADS);
doThreads(STRESS_THREADS,200);
}
else
doThreads(100,10);
Thread.sleep(1000);
}
private static class SuspendHandler extends HandlerWrapper
{
private Timer _timer;
public SuspendHandler()
{
_timer=new Timer();
}
public void handle(String target, final HttpServletRequest request, final HttpServletResponse response) throws IOException, ServletException
{
final Request base_request = (request instanceof Request)?((Request)request):HttpConnection.getCurrentConnection().getRequest();
int read_before=0;
long sleep_for=-1;
long suspend_for=-1;
long resume_after=-1;
long complete_after=-1;
if (request.getParameter("read")!=null)
read_before=Integer.parseInt(request.getParameter("read"));
if (request.getParameter("sleep")!=null)
sleep_for=Integer.parseInt(request.getParameter("sleep"));
if (request.getParameter("suspend")!=null)
suspend_for=Integer.parseInt(request.getParameter("suspend"));
if (request.getParameter("resume")!=null)
resume_after=Integer.parseInt(request.getParameter("resume"));
if (request.getParameter("complete")!=null)
complete_after=Integer.parseInt(request.getParameter("complete"));
if (DispatcherType.REQUEST.equals(base_request.getDispatcherType()))
{
if (read_before>0)
{
byte[] buf=new byte[read_before];
request.getInputStream().read(buf);
}
else if (read_before<0)
{
InputStream in = request.getInputStream();
int b=in.read();
while(b!=-1)
b=in.read();
}
if (suspend_for>=0)
{
if (suspend_for>0)
base_request.setAsyncTimeout(suspend_for);
base_request.addEventListener(__asyncListener);
base_request.startAsync();
}
else if (sleep_for>=0)
{
try
{
Thread.sleep(sleep_for);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
response.setStatus(200);
response.getOutputStream().print("SLEPT");
base_request.setHandled(true);
return;
}
else
{
response.setStatus(200);
response.getOutputStream().print("NORMAL");
base_request.setHandled(true);
return;
}
final AsyncContext asyncContext = base_request.getAsyncContext();
if (complete_after>0)
{
TimerTask complete = new TimerTask()
{
public void run()
{
try
{
response.setStatus(200);
response.getOutputStream().print("COMPLETED");
base_request.setHandled(true);
asyncContext.complete();
}
catch(Exception e)
{
e.printStackTrace();
}
}
};
synchronized (_timer)
{
_timer.schedule(complete,complete_after);
}
}
else if (complete_after==0)
{
response.setStatus(200);
response.getOutputStream().print("COMPLETED");
base_request.setHandled(true);
asyncContext.complete();
}
if (resume_after>0)
{
TimerTask resume = new TimerTask()
{
public void run()
{
asyncContext.dispatch();
}
};
synchronized (_timer)
{
_timer.schedule(resume,resume_after);
}
}
else if (resume_after==0)
{
asyncContext.dispatch();
}
}
else if (request.getAttribute("TIMEOUT")!=null)
{
response.setStatus(200);
response.getOutputStream().print("TIMEOUT");
base_request.setHandled(true);
}
else
{
response.setStatus(200);
response.getOutputStream().print("RESUMED");
base_request.setHandled(true);
}
}
}
private static ContinuationListener __asyncListener =
new ContinuationListener()
{
public void onComplete(ContinuationEvent event) throws IOException
{
}
public void onTimeout(ContinuationEvent event) throws IOException
{
event.getRequest().setAttribute("TIMEOUT",Boolean.TRUE);
((Request)event.getRequest()).getAsyncContext().dispatch();
}
};
}