jetty-9 ConcurrentScheduler refinements
This commit is contained in:
parent
3f3b269cca
commit
4fce4d0e53
|
@ -157,6 +157,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
|
|||
_runner=Thread.currentThread();
|
||||
while(isRunning())
|
||||
{
|
||||
|
||||
try
|
||||
{
|
||||
// Work out how long to sleep for and execute expired events
|
||||
|
@ -181,7 +182,10 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
|
|||
}
|
||||
else
|
||||
{
|
||||
long interval=event._executeAt-now;
|
||||
_timerQ.add(event);
|
||||
if (interval<sleep)
|
||||
sleep=interval;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -208,7 +212,6 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
|
|||
else if (event._executeAt<=now)
|
||||
{
|
||||
i.remove();
|
||||
event.execute();
|
||||
if (event.compareAndSet(State.SCHEDULED,State.DONE))
|
||||
event.execute();
|
||||
}
|
||||
|
@ -437,7 +440,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
|
|||
|
||||
// If the previous next is this (still linked normally)
|
||||
QNode prev_next = prev.get();
|
||||
if (prev_next==this)
|
||||
if (prev_next==this && prev.isDelayed())
|
||||
return prev;
|
||||
|
||||
if (prev_next==null || prev_next.isDelayed())
|
||||
|
|
|
@ -100,6 +100,42 @@ public class SchedulerTest
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoExecution() throws Exception
|
||||
{
|
||||
final AtomicLong executed = new AtomicLong();
|
||||
long expected=System.currentTimeMillis()+3000;
|
||||
Scheduler.Task task=_scheduler.schedule(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
executed.set(System.currentTimeMillis());
|
||||
}
|
||||
},3000,TimeUnit.MILLISECONDS);
|
||||
|
||||
Thread.sleep(4000);
|
||||
Assert.assertFalse(task.cancel());
|
||||
Assert.assertThat(executed.get(),Matchers.greaterThanOrEqualTo(expected));
|
||||
Assert.assertThat(expected-executed.get(),Matchers.lessThan(1000L));
|
||||
|
||||
final AtomicLong executed1 = new AtomicLong();
|
||||
long expected1=System.currentTimeMillis()+3000;
|
||||
Scheduler.Task task1=_scheduler.schedule(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
executed1.set(System.currentTimeMillis());
|
||||
}
|
||||
},3000,TimeUnit.MILLISECONDS);
|
||||
|
||||
Thread.sleep(4000);
|
||||
Assert.assertFalse(task1.cancel());
|
||||
Assert.assertThat(executed1.get(),Matchers.greaterThanOrEqualTo(expected1));
|
||||
Assert.assertThat(expected1-executed1.get(),Matchers.lessThan(1000L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQuickCancel() throws Exception
|
||||
{
|
||||
|
@ -142,7 +178,7 @@ public class SchedulerTest
|
|||
@Slow
|
||||
public void testManySchedulesAndCancels() throws Exception
|
||||
{
|
||||
schedule(100,3000,1800,50);
|
||||
schedule(100,10000,3800,200);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -178,17 +214,21 @@ public class SchedulerTest
|
|||
long now = System.currentTimeMillis();
|
||||
long start=now;
|
||||
long end=start+duration;
|
||||
|
||||
while (now+interval<end)
|
||||
boolean last=false;
|
||||
while (!last)
|
||||
{
|
||||
final long expected=now+delay;
|
||||
int cancel=random.nextInt(interval);
|
||||
boolean expected_to_execute=false;
|
||||
if (cancel==0)
|
||||
final boolean expected_to_execute;
|
||||
|
||||
last=now+2*interval>end;
|
||||
if (cancel==0 || last)
|
||||
{
|
||||
expected_to_execute=true;
|
||||
cancel=delay+1000;
|
||||
}
|
||||
else
|
||||
expected_to_execute=false;
|
||||
|
||||
schedules.incrementAndGet();
|
||||
Scheduler.Task task=_scheduler.schedule(new Runnable()
|
||||
|
@ -197,7 +237,11 @@ public class SchedulerTest
|
|||
public void run()
|
||||
{
|
||||
long lateness=System.currentTimeMillis()-expected;
|
||||
if (expected_to_execute)
|
||||
executions.set(lateness);
|
||||
else
|
||||
executions.set(6666);
|
||||
|
||||
}
|
||||
},delay,TimeUnit.MILLISECONDS);
|
||||
|
||||
|
@ -205,14 +249,18 @@ public class SchedulerTest
|
|||
now = System.currentTimeMillis();
|
||||
if (task.cancel())
|
||||
{
|
||||
long lateness=now-expected;
|
||||
if (expected_to_execute)
|
||||
cancellations.set(now-expected);
|
||||
cancellations.set(lateness);
|
||||
else
|
||||
cancellations.set(0);
|
||||
}
|
||||
else if (!expected_to_execute)
|
||||
else
|
||||
{
|
||||
cancellations.set(9999); // flags failure
|
||||
if (!expected_to_execute)
|
||||
{
|
||||
cancellations.set(9999);
|
||||
}
|
||||
}
|
||||
|
||||
Thread.yield();
|
||||
|
|
Loading…
Reference in New Issue