397769 - TimerScheduler does not relinquish cancelled tasks.

Implemented ScheduledExecutorScheduler, using JDK's ScheduledExecutorService implementation and configuring it to remove tasks on cancel.
This commit is contained in:
Simone Bordet 2013-01-09 17:01:08 +01:00
parent 39cae560d8
commit fa412b4668
2 changed files with 75 additions and 20 deletions

View File

@ -0,0 +1,69 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
public class ScheduledExecutorScheduler extends AbstractLifeCycle implements Scheduler
{
private volatile ScheduledThreadPoolExecutor scheduler;
@Override
protected void doStart() throws Exception
{
scheduler = new ScheduledThreadPoolExecutor(1);
scheduler.setRemoveOnCancelPolicy(true);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
scheduler.shutdownNow();
super.doStop();
scheduler = null;
}
@Override
public Task schedule(Runnable task, long delay, TimeUnit unit)
{
ScheduledFuture<?> result = scheduler.schedule(task, delay, unit);
return new ScheduledFutureTask(result);
}
private class ScheduledFutureTask implements Task
{
private final ScheduledFuture<?> scheduledFuture;
public ScheduledFutureTask(ScheduledFuture<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}
@Override
public boolean cancel()
{
return scheduledFuture.cancel(false);
}
}
}

View File

@ -22,8 +22,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -44,15 +42,12 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class SchedulerTest public class SchedulerTest
{ {
private static final BenchmarkHelper benchmark = new BenchmarkHelper();
private static final Executor executor = Executors.newFixedThreadPool(256);
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> data() public static Collection<Object[]> data()
{ {
Object[][] data = new Object[][]{ Object[][] data = new Object[][]{
{new TimerScheduler()}/*, {new TimerScheduler()},
{new ScheduledExecutionServiceScheduler()}, {new ScheduledExecutorScheduler()}/*,
{new ConcurrentScheduler(0)}, {new ConcurrentScheduler(0)},
{new ConcurrentScheduler(1500)}, {new ConcurrentScheduler(1500)},
{new ConcurrentScheduler(executor,1500)}*/ {new ConcurrentScheduler(executor,1500)}*/
@ -60,8 +55,7 @@ public class SchedulerTest
return Arrays.asList(data); return Arrays.asList(data);
} }
// Scheduler _scheduler=new SimpleScheduler(); private Scheduler _scheduler;
Scheduler _scheduler;
public SchedulerTest(Scheduler scheduler) public SchedulerTest(Scheduler scheduler)
{ {
@ -98,7 +92,6 @@ public class SchedulerTest
Assert.assertFalse(task.cancel()); Assert.assertFalse(task.cancel());
Assert.assertThat(executed.get(),Matchers.greaterThanOrEqualTo(expected)); Assert.assertThat(executed.get(),Matchers.greaterThanOrEqualTo(expected));
Assert.assertThat(expected-executed.get(),Matchers.lessThan(1000L)); Assert.assertThat(expected-executed.get(),Matchers.lessThan(1000L));
} }
@Test @Test
@ -179,7 +172,7 @@ public class SchedulerTest
public void testTaskThrowsException() throws Exception public void testTaskThrowsException() throws Exception
{ {
long delay = 500; long delay = 500;
Scheduler.Task task=_scheduler.schedule(new Runnable() _scheduler.schedule(new Runnable()
{ {
@Override @Override
public void run() public void run()
@ -218,6 +211,7 @@ public class SchedulerTest
public void testBenchmark() throws Exception public void testBenchmark() throws Exception
{ {
schedule(2000,10000,2000,50); schedule(2000,10000,2000,50);
BenchmarkHelper benchmark = new BenchmarkHelper();
benchmark.startStatistics(); benchmark.startStatistics();
System.err.println(_scheduler); System.err.println(_scheduler);
schedule(2000,30000,2000,50); schedule(2000,30000,2000,50);
@ -226,7 +220,6 @@ public class SchedulerTest
private void schedule(int threads,final int duration, final int delay, final int interval) throws Exception private void schedule(int threads,final int duration, final int delay, final int interval) throws Exception
{ {
final Random random = new Random(1);
Thread[] test = new Thread[threads]; Thread[] test = new Thread[threads];
final AtomicInteger schedules = new AtomicInteger(); final AtomicInteger schedules = new AtomicInteger();
@ -242,6 +235,7 @@ public class SchedulerTest
{ {
try try
{ {
Random random = new Random();
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long start=now; long start=now;
long end=start+duration; long end=start+duration;
@ -301,7 +295,6 @@ public class SchedulerTest
{ {
e.printStackTrace(); e.printStackTrace();
} }
} }
}; };
} }
@ -312,10 +305,6 @@ public class SchedulerTest
for (Thread thread : test) for (Thread thread : test)
thread.join(); thread.join();
// System.err.println(schedules);
// System.err.println(executions);
// System.err.println(cancellations);
// there were some executions and cancellations // there were some executions and cancellations
Assert.assertThat(executions.getCount(),Matchers.greaterThan(0L)); Assert.assertThat(executions.getCount(),Matchers.greaterThan(0L));
Assert.assertThat(cancellations.getCount(),Matchers.greaterThan(0L)); Assert.assertThat(cancellations.getCount(),Matchers.greaterThan(0L));
@ -333,7 +322,4 @@ public class SchedulerTest
// No cancellations long after expected executions // No cancellations long after expected executions
Assert.assertThat(cancellations.getMax(),Matchers.lessThan(500L)); Assert.assertThat(cancellations.getMax(),Matchers.lessThan(500L));
} }
} }