292825 avoid ISE in filters and use listener to remove from queue
git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1009 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
parent
baff6ba4d2
commit
ace1d80fab
|
@ -10,6 +10,7 @@ jetty-7.0.1-SNAPSHOT
|
|||
+ 291543 make bin/*.sh scripts executable in distribution
|
||||
+ 291589 Update jetty-rewrite demo
|
||||
+ 292642 Fix errors in embedded Jetty examples
|
||||
+ 292825 Continuations ISE rather than ignore bad transitions
|
||||
+ JETTY-937 More JVM bug work arounds. Insert pause if all else fails
|
||||
+ JETTY-983 Send content-length with multipart ranges
|
||||
+ JETTY-1114 unsynchronised WebAppClassloader.getResource(String)
|
||||
|
@ -17,7 +18,6 @@ jetty-7.0.1-SNAPSHOT
|
|||
+ JETTY-1122 Handle multi-byte utf that causes buffer overflow
|
||||
+ JETTY-1125 TransparentProxy incorrectly configured for test webapp
|
||||
+ JETTY-1129 Filter control characters out of StdErrLog
|
||||
+ Continuations ISE rather than ignore bad transitions
|
||||
+ Fixed XSS issue in CookieDump demo servlet.
|
||||
+ Improved start.jar usage text for properties
|
||||
+ Promoted Jetty Centralized Logging from Sandbox
|
||||
|
|
|
@ -18,6 +18,7 @@ import java.util.HashSet;
|
|||
import java.util.Queue;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -35,8 +36,8 @@ import javax.servlet.http.HttpSessionBindingEvent;
|
|||
import javax.servlet.http.HttpSessionBindingListener;
|
||||
|
||||
import org.eclipse.jetty.continuation.Continuation;
|
||||
import org.eclipse.jetty.continuation.ContinuationListener;
|
||||
import org.eclipse.jetty.continuation.ContinuationSupport;
|
||||
import org.eclipse.jetty.util.ArrayQueue;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.thread.Timeout;
|
||||
|
||||
|
@ -133,6 +134,7 @@ public class DoSFilter implements Filter
|
|||
protected boolean _remotePort;
|
||||
protected Semaphore _passes;
|
||||
protected Queue<Continuation>[] _queue;
|
||||
protected ContinuationListener[] _listener;
|
||||
|
||||
protected int _maxRequestsPerSec;
|
||||
protected final ConcurrentHashMap<String, RateTracker> _rateTrackers=new ConcurrentHashMap<String, RateTracker>();
|
||||
|
@ -149,8 +151,23 @@ public class DoSFilter implements Filter
|
|||
_context = filterConfig.getServletContext();
|
||||
|
||||
_queue = new Queue[getMaxPriority() + 1];
|
||||
_listener = new ContinuationListener[getMaxPriority() + 1];
|
||||
for (int p = 0; p < _queue.length; p++)
|
||||
_queue[p] = new ArrayQueue<Continuation>();
|
||||
{
|
||||
_queue[p] = new ConcurrentLinkedQueue<Continuation>();
|
||||
|
||||
final int priority=p;
|
||||
_listener[p] = new ContinuationListener()
|
||||
{
|
||||
public void onComplete(Continuation continuation)
|
||||
{}
|
||||
|
||||
public void onTimeout(Continuation continuation)
|
||||
{
|
||||
_queue[priority].remove(continuation);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
int baseRateLimit = __DEFAULT_MAX_REQUESTS_PER_SEC;
|
||||
if (filterConfig.getInitParameter(MAX_REQUESTS_PER_S_INIT_PARAM) != null)
|
||||
|
@ -328,7 +345,7 @@ public class DoSFilter implements Filter
|
|||
if (!accepted)
|
||||
{
|
||||
// we were not accepted, so either we suspend to wait,or if we were woken up we insist or we fail
|
||||
final Continuation continuation = ContinuationSupport.getContinuation(request,response);
|
||||
final Continuation continuation = ContinuationSupport.getContinuation(request);
|
||||
|
||||
Boolean throttled = (Boolean)request.getAttribute(__THROTTLED);
|
||||
if (throttled!=Boolean.TRUE && _throttleMs>0)
|
||||
|
@ -341,6 +358,7 @@ public class DoSFilter implements Filter
|
|||
continuation.setTimeout(_throttleMs);
|
||||
continuation.suspend();
|
||||
|
||||
continuation.addContinuationListener(_listener[priority]);
|
||||
_queue[priority].add(continuation);
|
||||
return;
|
||||
}
|
||||
|
@ -375,17 +393,13 @@ public class DoSFilter implements Filter
|
|||
if (accepted)
|
||||
{
|
||||
// wake up the next highest priority request.
|
||||
synchronized (_queue)
|
||||
for (int p = _queue.length; p-- > 0;)
|
||||
{
|
||||
for (int p = _queue.length; p-- > 0;)
|
||||
Continuation continuation = _queue[p].poll();
|
||||
if (continuation != null && continuation.isSuspended())
|
||||
{
|
||||
Continuation continuation = _queue[p].poll();
|
||||
|
||||
if (continuation != null)
|
||||
{
|
||||
continuation.resume();
|
||||
break;
|
||||
}
|
||||
continuation.resume();
|
||||
break;
|
||||
}
|
||||
}
|
||||
_passes.release();
|
||||
|
|
|
@ -15,6 +15,7 @@ package org.eclipse.jetty.servlets;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -30,6 +31,7 @@ import javax.servlet.http.HttpServletResponse;
|
|||
import javax.servlet.http.HttpSession;
|
||||
|
||||
import org.eclipse.jetty.continuation.Continuation;
|
||||
import org.eclipse.jetty.continuation.ContinuationListener;
|
||||
import org.eclipse.jetty.continuation.ContinuationSupport;
|
||||
import org.eclipse.jetty.util.ArrayQueue;
|
||||
|
||||
|
@ -80,6 +82,7 @@ public class QoSFilter implements Filter
|
|||
long _suspendMs;
|
||||
Semaphore _passes;
|
||||
Queue<Continuation>[] _queue;
|
||||
ContinuationListener[] _listener;
|
||||
String _suspended="QoSFilter@"+this.hashCode();
|
||||
|
||||
public void init(FilterConfig filterConfig)
|
||||
|
@ -90,8 +93,23 @@ public class QoSFilter implements Filter
|
|||
if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
|
||||
max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
|
||||
_queue=new Queue[max_priority+1];
|
||||
_listener = new ContinuationListener[max_priority + 1];
|
||||
for (int p=0;p<_queue.length;p++)
|
||||
_queue[p]=new ArrayQueue<Continuation>();
|
||||
{
|
||||
_queue[p]=new ConcurrentLinkedQueue<Continuation>();
|
||||
|
||||
final int priority=p;
|
||||
_listener[p] = new ContinuationListener()
|
||||
{
|
||||
public void onComplete(Continuation continuation)
|
||||
{}
|
||||
|
||||
public void onTimeout(Continuation continuation)
|
||||
{
|
||||
_queue[priority].remove(continuation);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
int passes=__DEFAULT_PASSES;
|
||||
if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
|
||||
|
@ -125,12 +143,12 @@ public class QoSFilter implements Filter
|
|||
else
|
||||
{
|
||||
request.setAttribute(_suspended,Boolean.TRUE);
|
||||
Continuation continuation = ContinuationSupport.getContinuation(request,response);
|
||||
int priority = getPriority(request);
|
||||
Continuation continuation = ContinuationSupport.getContinuation(request);
|
||||
if (_suspendMs>0)
|
||||
continuation.setTimeout(_suspendMs);
|
||||
continuation.suspend();
|
||||
|
||||
int priority = getPriority(request);
|
||||
continuation.addContinuationListener(_listener[priority]);
|
||||
_queue[priority].add(continuation);
|
||||
return;
|
||||
}
|
||||
|
@ -160,7 +178,6 @@ public class QoSFilter implements Filter
|
|||
accepted = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (accepted)
|
||||
{
|
||||
|
@ -170,9 +187,6 @@ public class QoSFilter implements Filter
|
|||
{
|
||||
((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
catch(InterruptedException e)
|
||||
{
|
||||
|
@ -186,7 +200,7 @@ public class QoSFilter implements Filter
|
|||
for (int p=_queue.length;p-->0;)
|
||||
{
|
||||
Continuation continutaion=_queue[p].poll();
|
||||
if (continutaion!=null)
|
||||
if (continutaion!=null && continutaion.isSuspended())
|
||||
{
|
||||
continutaion.resume();
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue