Deleted class org.eclipse.jetty.util.thread.Timeout, replacing its

usages with o.e.j.u.t.Scheduler.
This commit is contained in:
Simone Bordet 2013-10-17 13:25:15 +02:00
parent c1832b29fb
commit 591cf9badf
3 changed files with 50 additions and 712 deletions

View File

@ -56,7 +56,8 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
/** /**
* Denial of Service filter * Denial of Service filter
@ -184,12 +185,9 @@ public class DoSFilter implements Filter
private ContinuationListener[] _listeners; private ContinuationListener[] _listeners;
private final ConcurrentHashMap<String, RateTracker> _rateTrackers = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, RateTracker> _rateTrackers = new ConcurrentHashMap<>();
private final List<String> _whitelist = new CopyOnWriteArrayList<>(); private final List<String> _whitelist = new CopyOnWriteArrayList<>();
private final Timeout _requestTimeoutQ = new Timeout(); private Scheduler _scheduler;
private final Timeout _trackerTimeoutQ = new Timeout();
private Thread _timerThread;
private volatile boolean _running;
public void init(FilterConfig filterConfig) public void init(FilterConfig filterConfig) throws ServletException
{ {
_context = filterConfig.getServletContext(); _context = filterConfig.getServletContext();
@ -275,47 +273,26 @@ public class DoSFilter implements Filter
parameter = filterConfig.getInitParameter(ENABLED_INIT_PARAM); parameter = filterConfig.getInitParameter(ENABLED_INIT_PARAM);
setEnabled(parameter == null || Boolean.parseBoolean(parameter)); setEnabled(parameter == null || Boolean.parseBoolean(parameter));
_requestTimeoutQ.setNow(); _scheduler = startScheduler();
_requestTimeoutQ.setDuration(_maxRequestMs);
_trackerTimeoutQ.setNow();
_trackerTimeoutQ.setDuration(_maxIdleTrackerMs);
_running = true;
_timerThread = (new Thread()
{
public void run()
{
try
{
while (_running)
{
long now = _requestTimeoutQ.setNow();
_requestTimeoutQ.tick();
_trackerTimeoutQ.setNow(now);
_trackerTimeoutQ.tick();
try
{
Thread.sleep(100);
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
}
finally
{
LOG.debug("DoSFilter timer exited");
}
}
});
_timerThread.start();
if (_context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM))) if (_context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
_context.setAttribute(filterConfig.getFilterName(), this); _context.setAttribute(filterConfig.getFilterName(), this);
} }
protected Scheduler startScheduler() throws ServletException
{
try
{
Scheduler result = new ScheduledExecutorScheduler();
result.start();
return result;
}
catch (Exception x)
{
throw new ServletException(x);
}
}
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException
{ {
doFilter((HttpServletRequest)request, (HttpServletResponse)response, filterChain); doFilter((HttpServletRequest)request, (HttpServletResponse)response, filterChain);
@ -329,8 +306,6 @@ public class DoSFilter implements Filter
return; return;
} }
final long now = _requestTimeoutQ.getNow();
// Look for the rate tracker for this request // Look for the rate tracker for this request
RateTracker tracker = (RateTracker)request.getAttribute(__TRACKER); RateTracker tracker = (RateTracker)request.getAttribute(__TRACKER);
@ -342,7 +317,7 @@ public class DoSFilter implements Filter
tracker = getRateTracker(request); tracker = getRateTracker(request);
// Calculate the rate and check it is over the allowed limit // Calculate the rate and check it is over the allowed limit
final boolean overRateLimit = tracker.isRateExceeded(now); final boolean overRateLimit = tracker.isRateExceeded(System.currentTimeMillis());
// pass it through if we are not currently over the rate limit // pass it through if we are not currently over the rate limit
if (!overRateLimit) if (!overRateLimit)
@ -466,23 +441,23 @@ public class DoSFilter implements Filter
protected void doFilterChain(FilterChain chain, final HttpServletRequest request, final HttpServletResponse response) throws IOException, ServletException protected void doFilterChain(FilterChain chain, final HttpServletRequest request, final HttpServletResponse response) throws IOException, ServletException
{ {
final Thread thread = Thread.currentThread(); final Thread thread = Thread.currentThread();
Runnable requestTimeout = new Runnable()
final Timeout.Task requestTimeout = new Timeout.Task()
{ {
public void expired() @Override
public void run()
{ {
closeConnection(request, response, thread); closeConnection(request, response, thread);
} }
}; };
Scheduler.Task task = _scheduler.schedule(requestTimeout, getMaxRequestMs(), TimeUnit.MILLISECONDS);
try try
{ {
_requestTimeoutQ.schedule(requestTimeout);
chain.doFilter(request, response); chain.doFilter(request, response);
} }
finally finally
{ {
requestTimeout.cancel(); task.cancel();
} }
} }
@ -573,14 +548,14 @@ public class DoSFilter implements Filter
} }
else else
{ {
if (_trackSessions && session != null && !session.isNew()) if (isTrackSessions() && session != null && !session.isNew())
{ {
loadId = session.getId(); loadId = session.getId();
type = USER_SESSION; type = USER_SESSION;
} }
else else
{ {
loadId = _remotePort ? (request.getRemoteAddr() + request.getRemotePort()) : request.getRemoteAddr(); loadId = isRemotePort() ? (request.getRemoteAddr() + request.getRemotePort()) : request.getRemoteAddr();
type = USER_IP; type = USER_IP;
} }
} }
@ -590,16 +565,17 @@ public class DoSFilter implements Filter
if (tracker == null) if (tracker == null)
{ {
boolean allowed = checkWhitelist(_whitelist, request.getRemoteAddr()); boolean allowed = checkWhitelist(_whitelist, request.getRemoteAddr());
tracker = allowed ? new FixedRateTracker(loadId, type, _maxRequestsPerSec) int maxRequestsPerSec = getMaxRequestsPerSec();
: new RateTracker(loadId, type, _maxRequestsPerSec); tracker = allowed ? new FixedRateTracker(loadId, type, maxRequestsPerSec)
: new RateTracker(loadId, type, maxRequestsPerSec);
RateTracker existing = _rateTrackers.putIfAbsent(loadId, tracker); RateTracker existing = _rateTrackers.putIfAbsent(loadId, tracker);
if (existing != null) if (existing != null)
tracker = existing; tracker = existing;
if (type == USER_IP) if (type == USER_IP)
{ {
// USER_IP expiration from _rateTrackers is handled by the _trackerTimeoutQ // USER_IP expiration from _rateTrackers is handled by the _scheduler
_trackerTimeoutQ.schedule(tracker); _scheduler.schedule(tracker, getMaxIdleTrackerMs(), TimeUnit.MILLISECONDS);
} }
else if (session != null) else if (session != null)
{ {
@ -722,14 +698,23 @@ public class DoSFilter implements Filter
public void destroy() public void destroy()
{ {
LOG.debug("Destroy {}",this); LOG.debug("Destroy {}",this);
_running = false; stopScheduler();
_timerThread.interrupt();
_requestTimeoutQ.cancelAll();
_trackerTimeoutQ.cancelAll();
_rateTrackers.clear(); _rateTrackers.clear();
_whitelist.clear(); _whitelist.clear();
} }
protected void stopScheduler()
{
try
{
_scheduler.stop();
}
catch (Exception x)
{
LOG.ignore(x);
}
}
/** /**
* Returns the user id, used to track this connection. * Returns the user id, used to track this connection.
* This SHOULD be overridden by subclasses. * This SHOULD be overridden by subclasses.
@ -1067,7 +1052,7 @@ public class DoSFilter implements Filter
* A RateTracker is associated with a connection, and stores request rate * A RateTracker is associated with a connection, and stores request rate
* data. * data.
*/ */
class RateTracker extends Timeout.Task implements HttpSessionBindingListener, HttpSessionActivationListener, Serializable class RateTracker implements Runnable, HttpSessionBindingListener, HttpSessionActivationListener, Serializable
{ {
private static final long serialVersionUID = 3534663738034577872L; private static final long serialVersionUID = 3534663738034577872L;
@ -1138,15 +1123,15 @@ public class DoSFilter implements Filter
LOG.warn("Unexpected session activation"); LOG.warn("Unexpected session activation");
} }
public void expired() @Override
public void run()
{ {
long now = _trackerTimeoutQ.getNow();
int latestIndex = _next == 0 ? (_timestamps.length - 1) : (_next - 1); int latestIndex = _next == 0 ? (_timestamps.length - 1) : (_next - 1);
long last = _timestamps[latestIndex]; long last = _timestamps[latestIndex];
boolean hasRecentRequest = last != 0 && (now - last) < 1000L; boolean hasRecentRequest = last != 0 && (System.currentTimeMillis() - last) < 1000L;
if (hasRecentRequest) if (hasRecentRequest)
reschedule(); _scheduler.schedule(this, getMaxIdleTrackerMs(), TimeUnit.MILLISECONDS);
else else
_rateTrackers.remove(_id); _rateTrackers.remove(_id);
} }

View File

@ -1,381 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */
/** Timeout queue.
* This class implements a timeout queue for timers that are at least as likely to be cancelled as they are to expire.
* Unlike the util timeout class, the duration of the timeouts is shared by all scheduled tasks and if the duration
* is changed, this affects all scheduled tasks.
* <p>
* The nested class Task should be extended by users of this class to obtain call back notification of
* expires.
*/
@Deprecated
public class Timeout
{
private static final Logger LOG = Log.getLogger(Timeout.class);
private Object _lock;
private long _duration;
private volatile long _now=System.currentTimeMillis();
private Task _head=new Task();
/* ------------------------------------------------------------ */
public Timeout()
{
_lock=new Object();
_head._timeout=this;
}
/* ------------------------------------------------------------ */
public Timeout(Object lock)
{
_lock=lock;
_head._timeout=this;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the duration.
*/
public long getDuration()
{
return _duration;
}
/* ------------------------------------------------------------ */
/**
* @param duration The duration to set.
*/
public void setDuration(long duration)
{
_duration = duration;
}
/* ------------------------------------------------------------ */
public long setNow()
{
return _now=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
public long getNow()
{
return _now;
}
/* ------------------------------------------------------------ */
public void setNow(long now)
{
_now=now;
}
/* ------------------------------------------------------------ */
/** Get an expired tasks.
* This is called instead of {@link #tick()} to obtain the next
* expired Task, but without calling it's {@link Task#expire()} or
* {@link Task#expired()} methods.
*
* @return the next expired task or null.
*/
public Task expired()
{
synchronized (_lock)
{
long _expiry = _now-_duration;
if (_head._next!=_head)
{
Task task = _head._next;
if (task._timestamp>_expiry)
return null;
task.unlink();
task._expired=true;
return task;
}
return null;
}
}
/* ------------------------------------------------------------ */
public void tick()
{
final long expiry = _now-_duration;
Task task=null;
while (true)
{
try
{
synchronized (_lock)
{
task= _head._next;
if (task==_head || task._timestamp>expiry)
break;
task.unlink();
task._expired=true;
task.expire();
}
task.expired();
}
catch(Throwable th)
{
LOG.warn(Log.EXCEPTION,th);
}
}
}
/* ------------------------------------------------------------ */
public void tick(long now)
{
_now=now;
tick();
}
/* ------------------------------------------------------------ */
public void schedule(Task task)
{
schedule(task,0L);
}
/* ------------------------------------------------------------ */
/**
* @param task
* @param delay A delay in addition to the default duration of the timeout
*/
public void schedule(Task task,long delay)
{
synchronized (_lock)
{
if (task._timestamp!=0)
{
task.unlink();
task._timestamp=0;
}
task._timeout=this;
task._expired=false;
task._delay=delay;
task._timestamp = _now+delay;
Task last=_head._prev;
while (last!=_head)
{
if (last._timestamp <= task._timestamp)
break;
last=last._prev;
}
last.link(task);
}
}
/* ------------------------------------------------------------ */
public void cancelAll()
{
synchronized (_lock)
{
_head._next=_head._prev=_head;
}
}
/* ------------------------------------------------------------ */
public boolean isEmpty()
{
synchronized (_lock)
{
return _head._next==_head;
}
}
/* ------------------------------------------------------------ */
public long getTimeToNext()
{
synchronized (_lock)
{
if (_head._next==_head)
return -1;
long to_next = _duration+_head._next._timestamp-_now;
return to_next<0?0:to_next;
}
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
StringBuffer buf = new StringBuffer();
buf.append(super.toString());
Task task = _head._next;
while (task!=_head)
{
buf.append("-->");
buf.append(task);
task=task._next;
}
return buf.toString();
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/** Task.
* The base class for scheduled timeouts. This class should be
* extended to implement the expire() method, which is called if the
* timeout expires.
*
*
*
*/
public static class Task
{
Task _next;
Task _prev;
Timeout _timeout;
long _delay;
long _timestamp=0;
boolean _expired=false;
/* ------------------------------------------------------------ */
protected Task()
{
_next=_prev=this;
}
/* ------------------------------------------------------------ */
public long getTimestamp()
{
return _timestamp;
}
/* ------------------------------------------------------------ */
public long getAge()
{
final Timeout t = _timeout;
if (t!=null)
{
final long now=t._now;
if (now!=0 && _timestamp!=0)
return now-_timestamp;
}
return 0;
}
/* ------------------------------------------------------------ */
private void unlink()
{
_next._prev=_prev;
_prev._next=_next;
_next=_prev=this;
_expired=false;
}
/* ------------------------------------------------------------ */
private void link(Task task)
{
Task next_next = _next;
_next._prev=task;
_next=task;
_next._next=next_next;
_next._prev=this;
}
/* ------------------------------------------------------------ */
/** Schedule the task on the given timeout.
* The task exiry will be called after the timeout duration.
* @param timer
*/
public void schedule(Timeout timer)
{
timer.schedule(this);
}
/* ------------------------------------------------------------ */
/** Schedule the task on the given timeout.
* The task exiry will be called after the timeout duration.
* @param timer
*/
public void schedule(Timeout timer, long delay)
{
timer.schedule(this,delay);
}
/* ------------------------------------------------------------ */
/** Reschedule the task on the current timeout.
* The task timeout is rescheduled as if it had been cancelled and
* scheduled on the current timeout.
*/
public void reschedule()
{
Timeout timeout = _timeout;
if (timeout!=null)
timeout.schedule(this,_delay);
}
/* ------------------------------------------------------------ */
/** Cancel the task.
* Remove the task from the timeout.
*/
public void cancel()
{
Timeout timeout = _timeout;
if (timeout!=null)
{
synchronized (timeout._lock)
{
unlink();
_timestamp=0;
}
}
}
/* ------------------------------------------------------------ */
public boolean isExpired() { return _expired; }
/* ------------------------------------------------------------ */
public boolean isScheduled() { return _next!=this; }
/* ------------------------------------------------------------ */
/** Expire task.
* This method is called when the timeout expires. It is called
* in the scope of the synchronize block (on this) that sets
* the {@link #isExpired()} state to true.
* @see #expired() For an unsynchronized callback.
*/
protected void expire(){}
/* ------------------------------------------------------------ */
/** Expire task.
* This method is called when the timeout expires. It is called
* outside of any synchronization scope and may be delayed.
*
*/
public void expired(){}
}
}

View File

@ -1,266 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.junit.Before;
import org.junit.Test;
public class TimeoutTest
{
private boolean _stress=Boolean.getBoolean("STRESS");
Object lock = new Object();
Timeout timeout = new Timeout(null);
Timeout.Task[] tasks;
@Before
public void setUp() throws Exception
{
timeout=new Timeout(lock);
tasks= new Timeout.Task[10];
for (int i=0;i<tasks.length;i++)
{
tasks[i]=new Timeout.Task();
timeout.setNow(1000+i*100);
timeout.schedule(tasks[i]);
}
timeout.setNow(100);
}
/* ------------------------------------------------------------ */
@Test
public void testExpiry()
{
timeout.setDuration(200);
timeout.setNow(1500);
timeout.tick();
for (int i=0;i<tasks.length;i++)
{
assertEquals("isExpired "+i,i<4, tasks[i].isExpired());
}
}
/* ------------------------------------------------------------ */
@Test
public void testCancel()
{
timeout.setDuration(200);
timeout.setNow(1700);
for (int i=0;i<tasks.length;i++)
if ((i+1)%2==0)
tasks[i].cancel();
timeout.tick();
for (int i=0;i<tasks.length;i++)
{
assertEquals("isExpired "+i,i%2==0 && i<6, tasks[i].isExpired());
}
}
/* ------------------------------------------------------------ */
@Test
public void testTouch()
{
timeout.setDuration(200);
timeout.setNow(1350);
timeout.schedule(tasks[2]);
timeout.setNow(1500);
timeout.tick();
for (int i=0;i<tasks.length;i++)
{
assertEquals("isExpired "+i,i!=2 && i<4, tasks[i].isExpired());
}
timeout.setNow(1550);
timeout.tick();
for (int i=0;i<tasks.length;i++)
{
assertEquals("isExpired "+i, i<4, tasks[i].isExpired());
}
}
/* ------------------------------------------------------------ */
@Test
public void testDelay()
{
Timeout.Task task = new Timeout.Task();
timeout.setNow(1100);
timeout.schedule(task, 300);
timeout.setDuration(200);
timeout.setNow(1300);
timeout.tick();
assertEquals("delay", false, task.isExpired());
timeout.setNow(1500);
timeout.tick();
assertEquals("delay", false, task.isExpired());
timeout.setNow(1700);
timeout.tick();
assertEquals("delay", true, task.isExpired());
}
/* ------------------------------------------------------------ */
@Test
public void testStress() throws Exception
{
if ( !_stress )
return;
final int LOOP=250;
final AtomicBoolean running=new AtomicBoolean(true);
final AtomicIntegerArray count = new AtomicIntegerArray( 4 );
timeout.setNow(System.currentTimeMillis());
timeout.setDuration(500);
// Start a ticker thread that will tick over the timer frequently.
Thread ticker = new Thread()
{
@Override
public void run()
{
while (running.get())
{
try
{
// use lock.wait so we have a memory barrier and
// have no funny optimisation issues.
synchronized (lock)
{
lock.wait(30);
}
Thread.sleep(30);
timeout.tick(System.currentTimeMillis());
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
};
ticker.start();
// start lots of test threads
for (int i=0;i<LOOP;i++)
{
//
Thread th = new Thread()
{
@Override
public void run()
{
// count how many threads were started (should == LOOP)
int once = (int) 10 + count.incrementAndGet( 0 )%50;
// create a task for this thread
Timeout.Task task = new Timeout.Task()
{
@Override
public void expired()
{
// count the number of expires
count.incrementAndGet( 2 );
}
};
// this thread will loop and each loop with schedule a
// task with a delay on top of the timeouts duration
// mostly this thread will then cancel the task
// But once it will wait and the task will expire
// do the looping until we are stopped
int loop=0;
while (running.get())
{
try
{
long delay=1000;
long wait=100-once;
if (loop++==once)
{
// THIS loop is the one time we wait longer than the delay
count.incrementAndGet( 1 );
delay=200;
wait=1000;
}
timeout.schedule(task,delay);
// do the wait
Thread.sleep(wait);
// cancel task (which may have expired)
task.cancel();
}
catch(Exception e)
{
e.printStackTrace();
}
}
count.incrementAndGet(3);
}
};
th.start();
}
long start=System.currentTimeMillis();
// run test until all threads are started
while (count.get(0)<LOOP && (System.currentTimeMillis()-start)<20000)
Thread.sleep(50);
// run test until all expires initiated
while (count.get(1)<LOOP && (System.currentTimeMillis()-start)<20000)
Thread.sleep(50);
// run test until all expires initiated
while (count.get(2)<LOOP && (System.currentTimeMillis()-start)<20000)
Thread.sleep(50);
running.set(false);
// run test until all threads complete
while (count.get(3)<LOOP && (System.currentTimeMillis()-start)<20000)
Thread.sleep(50);
// check the counts
assertEquals("count threads", LOOP,count.get( 0 ));
assertEquals("count once waits",LOOP,count.get(1 ));
assertEquals("count expires",LOOP,count.get(2));
assertEquals("done",LOOP,count.get(3));
}
}