fixed race with expired async request

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@229 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2009-05-19 07:40:07 +00:00
parent 19080861ad
commit c236945c53
3 changed files with 290 additions and 231 deletions

View File

@ -1,4 +1,5 @@
jetty-7.0.0.M2-SNAPSHOT
jetty-7.0.0.M3-SNAPSHOT
+ fixed race with expired async listeners
jetty-7.0.0.M2 18 May 2009
+ JETTY-937 Work around Sun JVM bugs
@ -64,7 +65,7 @@ jetty-7.0.0.M1 22 April 2009
+ Reworked JMX for new layout
+ JETTY-983 DefaultServlet generates accept-ranges for cached/gzip content
+ 273011 JETTY-980 JETTY-992 Security / Directory Listing XSS present
+ 271536 Add supprot to IO for quietly closing Readers / Writers
+ 271536 Add support to IO for quietly closing Readers / Writers
+ 273101 Fix DefaultServletTest XSS test case
+ 273153 Test for Nested references in DispatchServlet

View File

@ -70,42 +70,66 @@ public class AsyncRequest implements AsyncContext, Continuation
private boolean _keepWrappers;
private long _timeoutMs;
private AsyncEventState _event;
// private StringBuilder _history = new StringBuilder();
/* ------------------------------------------------------------ */
protected AsyncRequest()
{
_state=__IDLE;
_initial=true;
// _history.append(super.toString());
// _history.append('\n');
}
/* ------------------------------------------------------------ */
protected void setConnection(final HttpConnection connection)
{
_connection=connection;
synchronized(this)
{
_connection=connection;
// _history.append(connection.toString());
// _history.append('\n');
}
}
/* ------------------------------------------------------------ */
public void addContinuationListener(ContinuationListener listener)
{
_listeners=LazyList.add(_listeners,listener);
synchronized(this)
{
_listeners=LazyList.add(_listeners,listener);
// _history.append('L');
}
}
/* ------------------------------------------------------------ */
public void setAsyncTimeout(long ms)
{
_timeoutMs=ms;
synchronized(this)
{
_timeoutMs=ms;
// _history.append('T');
// _history.append(ms);
}
}
/* ------------------------------------------------------------ */
public long getAsyncTimeout()
{
return _timeoutMs;
synchronized(this)
{
return _timeoutMs;
}
}
/* ------------------------------------------------------------ */
public AsyncEventState getAsyncEventState()
{
return _event;
synchronized(this)
{
return _event;
}
}
/* ------------------------------------------------------------ */
@ -114,7 +138,11 @@ public class AsyncRequest implements AsyncContext, Continuation
*/
public void keepWrappers()
{
_keepWrappers=true;
synchronized(this)
{
// _history.append('W');
_keepWrappers=true;
}
}
/* ------------------------------------------------------------ */
@ -123,7 +151,10 @@ public class AsyncRequest implements AsyncContext, Continuation
*/
public boolean wrappersKept()
{
return _keepWrappers;
synchronized(this)
{
return _keepWrappers;
}
}
/* ------------------------------------------------------------ */
@ -163,7 +194,10 @@ public class AsyncRequest implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
public String toString()
{
return getStatusString();
synchronized (this)
{
return super.toString()+"@"+getStatusString();
}
}
/* ------------------------------------------------------------ */
@ -183,7 +217,9 @@ public class AsyncRequest implements AsyncContext, Continuation
(_state==__UNCOMPLETED)?"UNCOMPLETED":
(_state==__COMPLETE)?"COMPLETE":
("UNKNOWN?"+_state))+
(_initial?",initial":"");
(_initial?",initial":"")+
(_resumed?",resumed":"")+
(_expired?",expired":"");
}
}
@ -195,6 +231,9 @@ public class AsyncRequest implements AsyncContext, Continuation
{
synchronized (this)
{
// _history.append('H');
// _history.append(_connection.getRequest().getUri().toString());
// _history.append(':');
_keepWrappers=false;
switch(_state)
@ -218,7 +257,8 @@ public class AsyncRequest implements AsyncContext, Continuation
return false;
case __SUSPENDED:
cancelTimeout();
return false;
case __UNSUSPENDING:
_state=__REDISPATCHED;
return true;
@ -239,6 +279,7 @@ public class AsyncRequest implements AsyncContext, Continuation
{
synchronized (this)
{
// _history.append('S');
_resumed=false;
_expired=false;
@ -288,6 +329,7 @@ public class AsyncRequest implements AsyncContext, Continuation
{
synchronized (this)
{
// _history.append('U');
switch(_state)
{
case __REDISPATCHED:
@ -337,6 +379,7 @@ public class AsyncRequest implements AsyncContext, Continuation
boolean dispatch=false;
synchronized (this)
{
// _history.append('D');
switch(_state)
{
case __REDISPATCHED:
@ -354,7 +397,7 @@ public class AsyncRequest implements AsyncContext, Continuation
return;
case __SUSPENDED:
dispatch=true;
dispatch=!_expired;
_state=__UNSUSPENDING;
_resumed=true;
break;
@ -377,12 +420,15 @@ public class AsyncRequest implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
protected void expired()
{
Object listeners=null;
synchronized (this)
{
// _history.append('E');
switch(_state)
{
case __SUSPENDING:
case __SUSPENDED:
listeners=_listeners;
break;
default:
return;
@ -390,13 +436,18 @@ public class AsyncRequest implements AsyncContext, Continuation
_expired=true;
}
if (_listeners!=null)
if (listeners!=null)
{
for(int i=0;i<LazyList.size(_listeners);i++)
for(int i=0;i<LazyList.size(listeners);i++)
{
try
{
ContinuationListener listener=((ContinuationListener)LazyList.get(_listeners,i));
// synchronized (this)
// {
// _history.append('l');
// _history.append(i);
// }
ContinuationListener listener=((ContinuationListener)LazyList.get(listeners,i));
listener.onTimeout(_event);
}
catch(Exception e)
@ -408,17 +459,16 @@ public class AsyncRequest implements AsyncContext, Continuation
synchronized (this)
{
// _history.append('e');
switch(_state)
{
case __SUSPENDING:
case __SUSPENDED:
dispatch();
return;
default:
return;
}
}
scheduleDispatch();
}
/* ------------------------------------------------------------ */
@ -431,6 +481,7 @@ public class AsyncRequest implements AsyncContext, Continuation
boolean dispatch=false;
synchronized (this)
{
// _history.append('C');
switch(_state)
{
case __IDLE:
@ -450,7 +501,7 @@ public class AsyncRequest implements AsyncContext, Continuation
case __SUSPENDED:
_state=__COMPLETING;
dispatch=true;
dispatch=!_expired;
break;
default:
@ -472,12 +523,15 @@ public class AsyncRequest implements AsyncContext, Continuation
*/
protected void doComplete()
{
Object listeners=null;
synchronized (this)
{
// _history.append("c");
switch(_state)
{
case __UNCOMPLETED:
_state=__COMPLETE;
listeners=_listeners;
break;
default:
@ -485,13 +539,18 @@ public class AsyncRequest implements AsyncContext, Continuation
}
}
if (_listeners!=null)
if (listeners!=null)
{
for(int i=0;i<LazyList.size(_listeners);i++)
for(int i=0;i<LazyList.size(listeners);i++)
{
try
{
((ContinuationListener)LazyList.get(_listeners,i)).onComplete(_event);
// synchronized (this)
// {
// _history.append('l');
// _history.append(i);
// }
((ContinuationListener)LazyList.get(listeners,i)).onComplete(_event);
}
catch(Exception e)
{
@ -506,6 +565,7 @@ public class AsyncRequest implements AsyncContext, Continuation
{
synchronized (this)
{
// _history.append("r\n");
switch(_state)
{
case __DISPATCHED:
@ -529,6 +589,7 @@ public class AsyncRequest implements AsyncContext, Continuation
{
synchronized (this)
{
// _history.append("X");
_state=__COMPLETE;
_initial = false;
cancelTimeout();
@ -600,35 +661,47 @@ public class AsyncRequest implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
public boolean isCompleting()
{
return _state==__COMPLETING;
synchronized (this)
{
return _state==__COMPLETING;
}
}
/* ------------------------------------------------------------ */
boolean isUncompleted()
{
return _state==__UNCOMPLETED;
synchronized (this)
{
return _state==__UNCOMPLETED;
}
}
/* ------------------------------------------------------------ */
public boolean isComplete()
{
return _state==__COMPLETE;
synchronized (this)
{
return _state==__COMPLETE;
}
}
/* ------------------------------------------------------------ */
public boolean isAsyncStarted()
{
switch(_state)
synchronized (this)
{
case __SUSPENDING:
case __REDISPATCHING:
case __UNSUSPENDING:
case __SUSPENDED:
return true;
default:
return false;
switch(_state)
{
case __SUSPENDING:
case __REDISPATCHING:
case __UNSUSPENDING:
case __SUSPENDED:
return true;
default:
return false;
}
}
}
@ -636,14 +709,17 @@ public class AsyncRequest implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
public boolean isAsync()
{
switch(_state)
synchronized (this)
{
case __IDLE:
case __DISPATCHED:
return false;
default:
return true;
switch(_state)
{
case __IDLE:
case __DISPATCHED:
return false;
default:
return true;
}
}
}
@ -681,20 +757,26 @@ public class AsyncRequest implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
public void start(Runnable run)
{
((Context)_event.getServletContext()).getContextHandler().handle(run);
final AsyncEventState event=_event;
if (event!=null)
((Context)event.getServletContext()).getContextHandler().handle(run);
}
/* ------------------------------------------------------------ */
public boolean hasOriginalRequestAndResponse()
{
return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
synchronized (this)
{
return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
}
}
/* ------------------------------------------------------------ */
public ContextHandler getContextHandler()
{
if (_event!=null)
return ((Context)_event.getServletContext()).getContextHandler();
final AsyncEventState event=_event;
if (event!=null)
return ((Context)event.getServletContext()).getContextHandler();
return null;
}
@ -705,7 +787,10 @@ public class AsyncRequest implements AsyncContext, Continuation
*/
public boolean isResumed()
{
return _resumed;
synchronized (this)
{
return _resumed;
}
}
/* ------------------------------------------------------------ */
/**
@ -713,7 +798,10 @@ public class AsyncRequest implements AsyncContext, Continuation
*/
public boolean isExpired()
{
return _expired;
synchronized (this)
{
return _expired;
}
}
/* ------------------------------------------------------------ */
@ -818,4 +906,13 @@ public class AsyncRequest implements AsyncContext, Continuation
return _path;
}
}
public String getHistory()
{
// synchronized (this)
// {
// return _history.toString();
// }
return null;
}
}

View File

@ -24,6 +24,7 @@ import java.util.Date;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CyclicBarrier;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -71,150 +72,102 @@ public class AsyncStressTest extends TestCase
_server.stop();
}
final static String[][] __tests =
final static String[][] __paths =
{
{"/path","NORMAL"},
{"/path?sleep=<TIMEOUT>","SLEPT"},
{"/path?suspend=<TIMEOUT>","TIMEOUT"},
{"/path?suspend=1000&resume=<TIMEOUT>","RESUMED"},
{"/path?suspend=1000&complete=<TIMEOUT>","COMPLETED"},
{"/path/info","NORMAL"},
{"/path?sleep=<PERIOD>","SLEPT"},
{"/path?suspend=<PERIOD>","TIMEOUT"},
{"/path?suspend=60000&resume=<PERIOD>","RESUMED"},
{"/path?suspend=60000&complete=<PERIOD>","COMPLETED"},
};
public void doPaths(String name) throws Exception
public void doConnections(int connections,final int loops) throws Throwable
{
for (int i=0;i<__tests.length;i++)
Socket[] socket = new Socket[connections];
int [][] path = new int[connections][loops];
for (int i=0;i<connections;i++)
{
int timeout = _random.nextInt(200)+1;
String uri=__tests[i][0].replace("<TIMEOUT>",Integer.toString(timeout));
long start=System.currentTimeMillis();
Socket socket = new Socket(_addr,_port);
socket.setSoTimeout(30000);
String request = "GET "+uri+" HTTP/1.0\r\n\r\n";
socket.getOutputStream().write(request.getBytes());
socket.getOutputStream().flush();
String response = IO.toString(socket.getInputStream());
socket.close();
long end=System.currentTimeMillis();
response=response.substring(response.indexOf("\r\n\r\n")+4);
String test=name+"-"+i+" "+uri+" "+__tests[i][1];
assertEquals(test,__tests[i][1],response);
if (!response.equals("NORMAL"))
socket[i] = new Socket(_addr,_port);
socket[i].setSoTimeout(30000);
if (i%80==0)
System.err.println();
System.err.print('+');
}
System.err.println();
Log.info("Bound "+connections);
for (int l=0;l<loops;l++)
{
for (int i=0;i<connections;i++)
{
long duration=end-start;
assertTrue(test+" "+duration,duration+50>=timeout);
int p=path[i][l]=_random.nextInt(__paths.length);
int period = _random.nextInt(490)+10;
String uri=__paths[p][0].replace("<PERIOD>",Integer.toString(period));
long start=System.currentTimeMillis();
String request =
"GET "+uri+" HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"start: "+start+"\r\n"+
"result: "+__paths[p][1]+"\r\n"+
((l+1<loops)?"":"Connection: close\r\n")+
"\r\n";
socket[i].getOutputStream().write(request.getBytes("UTF-8"));
socket[i].getOutputStream().flush();
}
}
}
public void doLoops(int thread, String name, int loops) throws Exception
{
try
{
for (int i=0;i<loops;i++)
{
_loops[thread]=i;
doPaths(name+"-"+i);
Thread.sleep(10+_random.nextInt(10)*_random.nextInt(10));
}
_loops[thread]=loops;
}
catch(Exception e)
{
System.err.println(e);
_connector.dump();
_loops[thread]=-_loops[thread];
throw e;
}
}
public void doThreads(int threads,final int loops) throws Throwable
{
final Throwable[] throwable=new Throwable[threads];
final Thread[] thread=new Thread[threads];
for (int i=0;i<threads;i++)
{
final int id=i;
final String name = "T"+i;
thread[i]=new Thread()
{
public void run()
{
try
{
doLoops(id,name,loops);
}
catch(Throwable th)
{
th.printStackTrace();
throwable[id]=th;
}
finally
{
}
}
};
if (l%80==0)
System.err.println();
System.err.print('.');
Thread.sleep(_random.nextInt(590)+10);
}
_loops=new int[threads];
for (int i=0;i<threads;i++)
thread[i].start();
System.err.println();
Log.info("Sent "+(loops*__paths.length)+" requests");
while(true)
String[] results=new String[connections];
for (int i=0;i<connections;i++)
{
Thread.sleep(1000L);
int finished=0;
int errors=0;
int min=loops;
int max=0;
int total=0;
for (int i=0;i<threads;i++)
{
int l=_loops[i];
if (l<0)
{
errors++;
total-=l;
}
else
{
if (l<min)
min=l;
if (l>max)
max=l;
total+=l;
if (l==loops)
finished++;
}
}
Log.info("min/ave/max/target="+min+"/"+(total/threads)+"/"+max+"/"+loops+" errors/finished/loops="+errors+"/"+finished+"/"+threads+" idle/threads="+(_threads.getIdleThreads()+_connector.getAcceptors())+"/"+_threads.getThreads());
if ((finished+errors)==threads)
break;
results[i]=IO.toString(socket[i].getInputStream(),"UTF-8");
if (i%80==0)
System.err.println();
System.err.print('-');
}
System.err.println();
Log.info("Read "+connections+" connections");
for (int i=0;i<connections;i++)
{
int offset=0;
String result=results[i];
for (int l=0;l<loops;l++)
{
String expect = __paths[path[i][l]][1];
expect=expect+" "+expect;
offset=result.indexOf("200 OK",offset)+6;
offset=result.indexOf("\r\n\r\n",offset)+4;
int end=result.indexOf("\n",offset);
String r=result.substring(offset,end).trim();
assertEquals(i+","+l,expect,r);
offset=end;
}
}
for (int i=0;i<threads;i++)
thread[i].join();
for (int i=0;i<threads;i++)
if (throwable[i]!=null)
throw throwable[i];
}
public void testAsync() throws Throwable
{
if (_stress)
{
System.err.println("STRESS! "+STRESS_THREADS);
doThreads(STRESS_THREADS,200);
System.err.println("STRESS!");
doConnections(1600,240);
}
else
doThreads(100,10);
Thread.sleep(1000);
{
doConnections(160,80);
}
}
private static class SuspendHandler extends HandlerWrapper
@ -234,6 +187,8 @@ public class AsyncStressTest extends TestCase
long resume_after=-1;
long complete_after=-1;
final String uri=baseRequest.getUri().toString();
if (request.getParameter("read")!=null)
read_before=Integer.parseInt(request.getParameter("read"));
if (request.getParameter("sleep")!=null)
@ -265,7 +220,67 @@ public class AsyncStressTest extends TestCase
if (suspend_for>0)
baseRequest.setAsyncTimeout(suspend_for);
baseRequest.addEventListener(__asyncListener);
baseRequest.startAsync();
final AsyncContext asyncContext = baseRequest.startAsync();
if (complete_after>0)
{
TimerTask complete = new TimerTask()
{
public void run()
{
try
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED " + request.getHeader("result"));
baseRequest.setHandled(true);
asyncContext.complete();
}
catch(Exception e)
{
Request br=(Request)asyncContext.getRequest();
System.err.println("\n"+e.toString());
System.err.println(baseRequest+"=="+br);
System.err.println(uri+"=="+br.getUri());
System.err.println(asyncContext+"=="+br.getAsyncRequest());
System.err.println(((AsyncRequest)asyncContext).getHistory());
Log.warn(e);
System.exit(1);
}
}
};
synchronized (_timer)
{
_timer.schedule(complete,complete_after);
}
}
else if (complete_after==0)
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED "+request.getHeader("result"));
baseRequest.setHandled(true);
asyncContext.complete();
}
else if (resume_after>0)
{
TimerTask resume = new TimerTask()
{
public void run()
{
asyncContext.dispatch();
}
};
synchronized (_timer)
{
_timer.schedule(resume,resume_after);
}
}
else if (resume_after==0)
{
asyncContext.dispatch();
}
}
else if (sleep_for>=0)
{
@ -278,82 +293,28 @@ public class AsyncStressTest extends TestCase
e.printStackTrace();
}
response.setStatus(200);
response.getOutputStream().print("SLEPT");
response.getOutputStream().println("SLEPT "+request.getHeader("result"));
baseRequest.setHandled(true);
return;
}
else
{
response.setStatus(200);
response.getOutputStream().print("NORMAL");
response.getOutputStream().println("NORMAL "+request.getHeader("result"));
baseRequest.setHandled(true);
return;
}
final AsyncContext asyncContext = baseRequest.getAsyncContext();
if (complete_after>0)
{
TimerTask complete = new TimerTask()
{
public void run()
{
try
{
response.setStatus(200);
response.getOutputStream().print("COMPLETED");
baseRequest.setHandled(true);
asyncContext.complete();
}
catch(Exception e)
{
e.printStackTrace();
}
}
};
synchronized (_timer)
{
_timer.schedule(complete,complete_after);
}
}
else if (complete_after==0)
{
response.setStatus(200);
response.getOutputStream().print("COMPLETED");
baseRequest.setHandled(true);
asyncContext.complete();
}
if (resume_after>0)
{
TimerTask resume = new TimerTask()
{
public void run()
{
asyncContext.dispatch();
}
};
synchronized (_timer)
{
_timer.schedule(resume,resume_after);
}
}
else if (resume_after==0)
{
asyncContext.dispatch();
}
}
else if (request.getAttribute("TIMEOUT")!=null)
{
response.setStatus(200);
response.getOutputStream().print("TIMEOUT");
response.getOutputStream().println("TIMEOUT "+request.getHeader("result"));
baseRequest.setHandled(true);
}
else
{
response.setStatus(200);
response.getOutputStream().print("RESUMED");
response.getOutputStream().println("RESUMED "+request.getHeader("result"));
baseRequest.setHandled(true);
}
}