jetty-9 ConcurrentScheduler refinements

This commit is contained in:
Greg Wilkins 2012-09-03 15:24:36 +10:00
parent bb3dcc1c5b
commit c3776764f4
15 changed files with 152 additions and 90 deletions

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -51,7 +51,7 @@ public class ByteArrayEndPointTest
@Before @Before
public void before() throws Exception public void before() throws Exception
{ {
_scheduler = new SimpleScheduler(); _scheduler = new TimerScheduler();
_scheduler.start(); _scheduler.start();
} }

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -54,7 +54,7 @@ public class SelectChannelEndPointInterestsTest
threadPool = new QueuedThreadPool(); threadPool = new QueuedThreadPool();
threadPool.start(); threadPool.start();
scheduler = new SimpleScheduler(); scheduler = new TimerScheduler();
scheduler.start(); scheduler.start();
connector = ServerSocketChannel.open(); connector = ServerSocketChannel.open();

View File

@ -44,7 +44,7 @@ import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -56,7 +56,7 @@ public class SelectChannelEndPointTest
protected volatile EndPoint _lastEndPoint; protected volatile EndPoint _lastEndPoint;
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected Scheduler _scheduler = new SimpleScheduler(); protected Scheduler _scheduler = new TimerScheduler();
protected SelectorManager _manager = new SelectorManager() protected SelectorManager _manager = new SelectorManager()
{ {
@Override @Override

View File

@ -43,7 +43,7 @@ import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -60,7 +60,7 @@ public class SslConnectionTest
private volatile FutureCallback<Void> _writeCallback; private volatile FutureCallback<Void> _writeCallback;
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected Scheduler _scheduler = new SimpleScheduler(); protected Scheduler _scheduler = new TimerScheduler();
protected SelectorManager _manager = new SelectorManager() protected SelectorManager _manager = new SelectorManager()
{ {
@Override @Override

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
/** /**
* <p>Partial implementation of {@link Connector}</p> * <p>Partial implementation of {@link Connector}</p>
@ -79,7 +79,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
{ {
_server=server; _server=server;
_executor=executor!=null?executor:_server.getThreadPool(); _executor=executor!=null?executor:_server.getThreadPool();
_scheduler=scheduler!=null?scheduler:new SimpleScheduler(); _scheduler=scheduler!=null?scheduler:new TimerScheduler();
_byteBufferPool = pool!=null?pool:new MappedByteBufferPool(); _byteBufferPool = pool!=null?pool:new MappedByteBufferPool();
_sslContextFactory = sslContextFactory; _sslContextFactory = sslContextFactory;

View File

@ -233,7 +233,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{ {
try try
{ {
_request.setHandled(false); // TODO: is this right here ? _request.setHandled(false);
_response.getHttpOutput().reopen(); _response.getHttpOutput().reopen();
if (_state.isInitial()) if (_state.isInitial())

View File

@ -48,7 +48,7 @@ import org.eclipse.jetty.server.session.HashSessionIdManager;
import org.eclipse.jetty.server.session.HashSessionManager; import org.eclipse.jetty.server.session.HashSessionManager;
import org.eclipse.jetty.server.session.HashedSession; import org.eclipse.jetty.server.session.HashedSession;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -65,7 +65,7 @@ public class ResponseTest
public void init() throws Exception public void init() throws Exception
{ {
_server = new Server(); _server = new Server();
_scheduler = new SimpleScheduler(); _scheduler = new TimerScheduler();
LocalConnector connector = new LocalConnector(_server, null, _scheduler, null, null, 1); LocalConnector connector = new LocalConnector(_server, null, _scheduler, null, null, 1);
_server.addConnector(connector); _server.addConnector(connector);
_server.setHandler(new DumpHandler()); _server.setHandler(new DumpHandler());

View File

@ -38,7 +38,7 @@ import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -55,7 +55,7 @@ public class AsyncTimeoutTest
ByteBufferPool bufferPool = new MappedByteBufferPool(); ByteBufferPool bufferPool = new MappedByteBufferPool();
Executor threadPool = Executors.newCachedThreadPool(); Executor threadPool = Executors.newCachedThreadPool();
Scheduler scheduler = new SimpleScheduler(); Scheduler scheduler = new TimerScheduler();
scheduler.start(); // TODO need to use jetty lifecycles better here scheduler.start(); // TODO need to use jetty lifecycles better here
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None()) Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None())
@ -102,7 +102,7 @@ public class AsyncTimeoutTest
ByteBufferPool bufferPool = new MappedByteBufferPool(); ByteBufferPool bufferPool = new MappedByteBufferPool();
Executor threadPool = Executors.newCachedThreadPool(); Executor threadPool = Executors.newCachedThreadPool();
Scheduler scheduler = new SimpleScheduler(); Scheduler scheduler = new TimerScheduler();
scheduler.start(); scheduler.start();
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None()) Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None())

View File

@ -49,7 +49,7 @@ import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.generator.Generator; import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
@ -87,7 +87,7 @@ public class StandardSessionTest
{ {
bufferPool = new MappedByteBufferPool(); bufferPool = new MappedByteBufferPool();
threadPool = Executors.newCachedThreadPool(); threadPool = Executors.newCachedThreadPool();
scheduler = new SimpleScheduler(); scheduler = new TimerScheduler();
scheduler.start(); scheduler.start();
generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator,new FlowControlStrategy.None()); session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator,new FlowControlStrategy.None());

View File

@ -51,7 +51,7 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
public class SPDYClient public class SPDYClient
{ {
@ -167,7 +167,7 @@ public class SPDYClient
{ {
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>(); private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new MappedByteBufferPool(); private final ByteBufferPool bufferPool = new MappedByteBufferPool();
private final Scheduler scheduler = new SimpleScheduler(); private final Scheduler scheduler = new TimerScheduler();
private final Executor executor; private final Executor executor;
private final SslContextFactory sslContextFactory; private final SslContextFactory sslContextFactory;
private final SelectorManager selector; private final SelectorManager selector;

View File

@ -59,15 +59,26 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
}); });
private final Queue _delayQ; private final Queue _delayQ;
public ConcurrentScheduler()
{
this(null,8192);
}
public ConcurrentScheduler(Executor executor) public ConcurrentScheduler(Executor executor)
{ {
this(executor,8192); this(executor,8192);
} }
public ConcurrentScheduler(int delayQms)
{
this(null,delayQms);
}
public ConcurrentScheduler(Executor executor,int delayQms) public ConcurrentScheduler(Executor executor,int delayQms)
{ {
_executor = executor; _executor = executor;
addBean(_executor,false); if (_executor!=null)
addBean(_executor,false);
_delayQ=new Queue(delayQms); _delayQ=new Queue(delayQms);
} }
@ -75,7 +86,10 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
super.doStart(); super.doStart();
_executor.execute(this); if (_executor==null)
new Thread(this).start();
else
_executor.execute(this);
} }
@ -110,10 +124,9 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
long interval=event._executeAt-now; long interval=event._executeAt-now;
// Should we execute this event? // Should we execute this event?
if (interval<=0 && event._state.compareAndSet(State.NEW,State.DONE)) if (interval<=0 && event.compareAndSet(State.NEW,State.DONE))
{ event.execute();
_executor.execute(event._task);
}
// Should we delay this event // Should we delay this event
else if (_delayQ._delay>0 && interval>_delayQ._delay) else if (_delayQ._delay>0 && interval>_delayQ._delay)
{ {
@ -121,7 +134,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
_delayQ.add(event,dequeue_at); _delayQ.add(event,dequeue_at);
} }
// else we schedule the event // else we schedule the event
else if (event._state.compareAndSet(State.NEW,State.SCHEDULED)) else if (event.compareAndSet(State.NEW,State.SCHEDULED))
{ {
_timerQ.add(event); _timerQ.add(event);
if (interval<=MAX_SLEEP) if (interval<=MAX_SLEEP)
@ -160,7 +173,15 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
{ {
Event event=next.dequeue(); Event event=next.dequeue();
if (event!=null) if (event!=null)
_timerQ.add(event); {
if (event._executeAt<=now)
{
if (event.compareAndSet(State.SCHEDULED,State.DONE))
event.execute();
}
else
_timerQ.add(event);
}
} }
else else
{ {
@ -185,10 +206,9 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
else if (event._executeAt<=now) else if (event._executeAt<=now)
{ {
i.remove(); i.remove();
if (event._state.compareAndSet(State.SCHEDULED,State.DONE)) event.execute();
{ if (event.compareAndSet(State.SCHEDULED,State.DONE))
_executor.execute(event._task); event.execute();
}
} }
// else how long do we need to wait? // else how long do we need to wait?
else else
@ -218,27 +238,33 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
} }
} }
@Override
public String toString()
{
return String.format("%s@%x{%d,%s}",this.getClass().getSimpleName(),hashCode(),_delayQ._delay,_executor);
}
enum State { NEW, DELAYED, SCHEDULED, CANCELLED, DONE }; enum State { NEW, DELAYED, SCHEDULED, CANCELLED, DONE };
private class Event implements Scheduler.Task private class Event extends AtomicReference<State> implements Scheduler.Task
{ {
/* extends AtomicReference as a minor optimisation rather than holding a _state field */
final Runnable _task; final Runnable _task;
final long _executeAt; final long _executeAt;
final AtomicReference<State> _state=new AtomicReference<>(State.NEW);
volatile QNode _node; volatile QNode _node;
public Event(Runnable task, long executeAt) public Event(Runnable task, long executeAt)
{ {
super(); super(State.NEW);
_task = task; _task = task;
_executeAt = executeAt; _executeAt = executeAt;
} }
public boolean isScheduled() public boolean isScheduled()
{ {
return _state.get()==State.SCHEDULED; return get()==State.SCHEDULED;
} }
@Override @Override
@ -246,7 +272,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
{ {
while(true) while(true)
{ {
switch(_state.get()) switch(get())
{ {
case NEW: case NEW:
throw new IllegalStateException(); throw new IllegalStateException();
@ -255,14 +281,14 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
case CANCELLED: case CANCELLED:
return false; return false;
case DELAYED: case DELAYED:
if (_state.compareAndSet(State.DELAYED,State.CANCELLED)) if (compareAndSet(State.DELAYED,State.CANCELLED))
{ {
_node.cancel(); _node.cancel();
return true; return true;
} }
break; break;
case SCHEDULED: case SCHEDULED:
if (_state.compareAndSet(State.SCHEDULED,State.CANCELLED)) if (compareAndSet(State.SCHEDULED,State.CANCELLED))
{ {
_timerQ.remove(this); _timerQ.remove(this);
return true; return true;
@ -272,10 +298,18 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
} }
} }
public void execute()
{
if (_executor==null)
_task.run();
else
_executor.execute(_task);
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("Event@%x{%s,%d,%s}",hashCode(),_state,_executeAt,_task); return String.format("Event@%x{%s,%d,%s}",hashCode(),get(),_executeAt,_task);
} }
} }
@ -304,19 +338,19 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
Queue(int delay) Queue(int delay)
{ {
_delay=delay; _delay=delay;
_head._next.set(_tail); _head.set(_tail);
_tail._prev=_head; _tail._prev=_head;
} }
void clear() void clear()
{ {
_head._next.set(_tail); _head.set(_tail);
_tail._prev=_head; _tail._prev=_head;
} }
void add(Event event, long dequeue_at) void add(Event event, long dequeue_at)
{ {
if (event._state.compareAndSet(State.NEW,State.DELAYED)) if (event.compareAndSet(State.NEW,State.DELAYED))
{ {
while (true) while (true)
{ {
@ -325,7 +359,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
if (prev!=null) if (prev!=null)
{ {
QNode node = new QNode(event,dequeue_at,prev,_tail); QNode node = new QNode(event,dequeue_at,prev,_tail);
if (prev._next.compareAndSet(_tail,node)) if (prev.compareAndSet(_tail,node))
{ {
_tail._prev=node; _tail._prev=node;
event._node=node; event._node=node;
@ -368,19 +402,19 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
* Roughly based on public domain lock free queue algorithm from: * Roughly based on public domain lock free queue algorithm from:
* http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm * http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm
*/ */
private static class QNode private static class QNode extends AtomicReference<QNode>
{ {
/* extends AtomicReference as a minor optimisation rather than holding a _next field */
final Event _event; final Event _event;
final long _dequeueAt; final long _dequeueAt;
final AtomicReference<QNode> _next=new AtomicReference<>();
volatile QNode _prev; volatile QNode _prev;
QNode(Event event, long dequeue_at, QNode prev, QNode next) QNode(Event event, long dequeue_at, QNode prev, QNode next)
{ {
super(next);
_event=event; _event=event;
_dequeueAt=dequeue_at; _dequeueAt=dequeue_at;
_prev=prev; _prev=prev;
_next.set(next);
} }
/** /**
@ -400,7 +434,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
return event.scanForPrevOf(this); return event.scanForPrevOf(this);
// If the previous next is this (still linked normally) // If the previous next is this (still linked normally)
QNode prev_next = prev._next.get(); QNode prev_next = prev.get();
if (prev_next==this) if (prev_next==this)
return prev; return prev;
@ -448,7 +482,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
*/ */
QNode next() QNode next()
{ {
QNode next = _next.get(); QNode next = get();
while (true) while (true)
{ {
if (next == null) if (next == null)
@ -459,21 +493,21 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
next._prev=this; next._prev=this;
return next; return next;
} }
QNode next_next = next._next.get(); QNode next_next = next.get();
_next.compareAndSet(next, next_next); compareAndSet(next, next_next);
next = next_next; next = next_next;
} }
} }
public boolean cancel() public boolean cancel()
{ {
if (_event._state.compareAndSet(State.DELAYED,State.CANCELLED)) if (_event.compareAndSet(State.DELAYED,State.CANCELLED))
{ {
QNode prev = _prev; QNode prev = _prev;
QNode next = _next.get(); QNode next = get();
if (prev != null && next != null && next.isDelayed()) if (prev != null && next != null && next.isDelayed())
{ {
if (prev._next.compareAndSet(this, next)) if (prev.compareAndSet(this, next))
next._prev=prev; next._prev=prev;
} }
return true; return true;
@ -483,13 +517,13 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
public Event dequeue() public Event dequeue()
{ {
if (_event._state.compareAndSet(State.DELAYED,State.SCHEDULED)) if (_event.compareAndSet(State.DELAYED,State.SCHEDULED))
{ {
QNode prev = _prev; QNode prev = _prev;
QNode next = _next.get(); QNode next = get();
if (prev != null && next != null && next.isDelayed()) if (prev != null && next != null && next.isDelayed())
{ {
if (prev._next.compareAndSet(this, next)) if (prev.compareAndSet(this, next))
next._prev=prev; next._prev=prev;
} }
return _event; return _event;
@ -499,12 +533,12 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
public boolean isDelayed() public boolean isDelayed()
{ {
return _event!=null && _event._state.get()==State.DELAYED; return _event!=null && _event.get()==State.DELAYED;
} }
public boolean isTail() public boolean isTail()
{ {
return _event==null && _next.get()==null; return _event==null && get()==null;
} }
@ -512,7 +546,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
public String toString() public String toString()
{ {
QNode p=_prev; QNode p=_prev;
QNode n=_next.get(); QNode n=get();
return String.format("QNode@%x{%x<-%s->%x}",hashCode(),p==null?0:p.hashCode(),_event,n==null?0:n.hashCode()); return String.format("QNode@%x{%x<-%s->%x}",hashCode(),p==null?0:p.hashCode(),_event,n==null?0:n.hashCode());
} }
} }

View File

@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
public class SimpleScheduler extends AbstractLifeCycle implements Scheduler public class TimerScheduler extends AbstractLifeCycle implements Scheduler
{ {
/* this class uses the Timer class rather than an ScheduledExecutionService because /* this class uses the Timer class rather than an ScheduledExecutionService because
* it uses the same algorithm internally and the signature is cheaper to use as there are no * it uses the same algorithm internally and the signature is cheaper to use as there are no
@ -37,12 +37,12 @@ public class SimpleScheduler extends AbstractLifeCycle implements Scheduler
Timer _timer; Timer _timer;
final String _name; final String _name;
public SimpleScheduler() public TimerScheduler()
{ {
this(null); this(null);
} }
public SimpleScheduler(String name) public TimerScheduler(String name)
{ {
_name=name; _name=name;
} }

View File

@ -28,11 +28,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BenchmarkHelper;
import org.eclipse.jetty.util.statistic.SampleStatistic; import org.eclipse.jetty.util.statistic.SampleStatistic;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -41,15 +43,18 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class SchedulerTest public class SchedulerTest
{ {
public static Executor executor = Executors.newFixedThreadPool(256); private static final BenchmarkHelper benchmark = new BenchmarkHelper();
private static final Executor executor = Executors.newFixedThreadPool(256);
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> data() public static Collection<Object[]> data()
{ {
Object[][] data = new Object[][]{ Object[][] data = new Object[][]{
{new SimpleScheduler()}, {new TimerScheduler()},
{new ConcurrentScheduler(executor,0)}, {new ScheduledExecutionServiceScheduler()},
{new ConcurrentScheduler(executor,2000)} {new ConcurrentScheduler(0)},
{new ConcurrentScheduler(1500)},
{new ConcurrentScheduler(executor,1500)}
}; };
return Arrays.asList(data); return Arrays.asList(data);
} }
@ -132,14 +137,30 @@ public class SchedulerTest
Thread.sleep(1500); Thread.sleep(1500);
Assert.assertEquals(0,executed.get()); Assert.assertEquals(0,executed.get());
} }
@Test @Test
@Slow @Slow
public void testManySchedulesAndCancels() throws Exception public void testManySchedulesAndCancels() throws Exception
{ {
final Random random = new Random(); schedule(500,5000,2000,50);
Thread[] test = new Thread[500]; }
@Test
@Slow
@Ignore
public void testBenchmark() throws Exception
{
schedule(2000,10000,2000,50);
benchmark.startStatistics();
System.err.println(_scheduler);
schedule(2000,30000,2000,50);
benchmark.stopStatistics();
}
private void schedule(int threads,final int duration, final int delay, final int interval) throws Exception
{
final Random random = new Random(1);
Thread[] test = new Thread[threads];
final AtomicInteger schedules = new AtomicInteger(); final AtomicInteger schedules = new AtomicInteger();
final SampleStatistic executions = new SampleStatistic(); final SampleStatistic executions = new SampleStatistic();
@ -156,18 +177,18 @@ public class SchedulerTest
{ {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long start=now; long start=now;
long end=start+5000; long end=start+duration;
while (now<end) while (now+interval<end)
{ {
final int delay=random.nextInt((int)(end-now)); final long expected=now+delay;
final long expected = now+delay; int cancel=random.nextInt(interval);
boolean expected_to_execute=false;
int cancel=random.nextInt(200);
if (cancel==0) if (cancel==0)
cancel=(int)(end-now)+1000; {
else expected_to_execute=true;
cancel=cancel/4; cancel=delay+1000;
}
schedules.incrementAndGet(); schedules.incrementAndGet();
Scheduler.Task task=_scheduler.schedule(new Runnable() Scheduler.Task task=_scheduler.schedule(new Runnable()
@ -184,10 +205,17 @@ public class SchedulerTest
now = System.currentTimeMillis(); now = System.currentTimeMillis();
if (task.cancel()) if (task.cancel())
{ {
long lateness=now-expected; if (expected_to_execute)
cancellations.set(lateness); cancellations.set(now-expected);
else
cancellations.set(0);
}
else if (!expected_to_execute)
{
cancellations.set(9999); // flags failure
} }
Thread.yield();
} }
} }
catch (InterruptedException e) catch (InterruptedException e)
@ -205,9 +233,9 @@ public class SchedulerTest
for (Thread thread : test) for (Thread thread : test)
thread.join(); thread.join();
//System.err.println(schedules); // System.err.println(schedules);
//System.err.println(executions); // System.err.println(executions);
//System.err.println(cancellations); // System.err.println(cancellations);
// there were some executions and cancellations // there were some executions and cancellations
Assert.assertThat(executions.getCount(),Matchers.greaterThan(0L)); Assert.assertThat(executions.getCount(),Matchers.greaterThan(0L));

View File

@ -35,7 +35,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.eclipse.jetty.websocket.api.Extension; import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry; import org.eclipse.jetty.websocket.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -69,7 +69,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public WebSocketClientFactory(Executor threadPool) public WebSocketClientFactory(Executor threadPool)
{ {
this(threadPool,new SimpleScheduler()); this(threadPool,new TimerScheduler());
} }
public WebSocketClientFactory(Executor threadPool, Scheduler scheduler) public WebSocketClientFactory(Executor threadPool, Scheduler scheduler)
@ -110,7 +110,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public WebSocketClientFactory(SslContextFactory sslContextFactory) public WebSocketClientFactory(SslContextFactory sslContextFactory)
{ {
this(new QueuedThreadPool(),new SimpleScheduler(),sslContextFactory); this(new QueuedThreadPool(),new TimerScheduler(),sslContextFactory);
} }
@Override @Override

View File

@ -43,7 +43,7 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SimpleScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.Extension; import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry; import org.eclipse.jetty.websocket.api.ExtensionRegistry;
@ -78,7 +78,7 @@ public class WebSocketServerFactory extends AggregateLifeCycle implements WebSoc
/** /**
* Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler. * Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
*/ */
private final Scheduler scheduler = new SimpleScheduler(); private final Scheduler scheduler = new TimerScheduler();
private final String supportedVersions; private final String supportedVersions;
private final WebSocketPolicy basePolicy; private final WebSocketPolicy basePolicy;
private final EventMethodsCache methodsCache; private final EventMethodsCache methodsCache;