From 5b57573596d8811fe169c8275e7f74aa8876d694 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Mon, 23 Jul 2012 19:20:29 +1000 Subject: [PATCH] jetty-9 scheduledexecutorservice --- .../jetty/io/AsyncByteArrayEndPoint.java | 54 ++++++++----------- .../jetty/io/AsyncByteArrayEndPointTest.java | 25 +++++++-- .../jetty/server/LocalHttpConnector.java | 8 +-- .../eclipse/jetty/server/ResponseTest.java | 9 +++- 4 files changed, 55 insertions(+), 41 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index 297936361c4..27d11d5558e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -1,11 +1,10 @@ package org.eclipse.jetty.io; import java.io.IOException; -import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.util.Timer; -import java.util.TimerTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.util.BufferUtil; @@ -17,10 +16,22 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn { private static final int TICK=Integer.getInteger("org.eclipse.jetty.io.AsyncByteArrayEndPoint.TICK",100); public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class); - private final Timer _timer; + private final ScheduledExecutorService _timer; private AsyncConnection _connection; - private final TimerTask _checkTimeout=new TimeoutTask(this); + private final Runnable _checkTimeout=new Runnable() + { + @Override + public void run() + { + if (isOpen()) + { + checkTimeout(System.currentTimeMillis()); + if (isOpen()) + _timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); + } + } + }; private final ReadInterest _readInterest = new ReadInterest() { @@ -42,25 +53,25 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn } }; - public AsyncByteArrayEndPoint(Timer timer) + public AsyncByteArrayEndPoint(ScheduledExecutorService timer) { super(); _timer=timer; - _timer.schedule(_checkTimeout,TICK,TICK); + _timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); } - public AsyncByteArrayEndPoint(Timer timer, byte[] input, int outputSize) + public AsyncByteArrayEndPoint(ScheduledExecutorService timer, byte[] input, int outputSize) { super(input,outputSize); _timer=timer; - _timer.schedule(_checkTimeout,TICK,TICK); + _timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); } - public AsyncByteArrayEndPoint(Timer timer, String input, int outputSize) + public AsyncByteArrayEndPoint(ScheduledExecutorService timer, String input, int outputSize) { super(input,outputSize); _timer=timer; - _timer.schedule(_checkTimeout,TICK,TICK); + _timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); } @Override @@ -156,25 +167,4 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn public void onClose() { } - - private static class TimeoutTask extends TimerTask - { - final WeakReference _endp; - - TimeoutTask(AsyncByteArrayEndPoint endp) - { - _endp=new WeakReference(endp); - } - - @Override - public void run() - { - AsyncByteArrayEndPoint endp = _endp.get(); - if (endp==null) - cancel(); - else - endp.checkTimeout(System.currentTimeMillis()); - } - }; - } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java index 434cfd09534..5b4b539426c 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java @@ -8,21 +8,38 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.nio.ByteBuffer; -import java.util.Timer; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public class AsyncByteArrayEndPointTest { + ScheduledExecutorService _timer; + + @Before + public void before() + { + _timer = new ScheduledThreadPoolExecutor(1); + } + + @After + public void after() + { + _timer.shutdownNow(); + } + @Test public void testReadable() throws Exception { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(new Timer()); + AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); endp.setInput("test input"); ByteBuffer buffer = BufferUtil.allocate(1024); @@ -81,7 +98,7 @@ public class AsyncByteArrayEndPointTest @Test public void testWrite() throws Exception { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(new Timer(),(byte[])null,15); + AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,(byte[])null,15); endp.setGrowOutput(false); endp.setOutput(BufferUtil.allocate(10)); @@ -109,7 +126,7 @@ public class AsyncByteArrayEndPointTest @Test public void testIdle() throws Exception { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(new Timer()); + AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); endp.setIdleTimeout(500); endp.setInput("test"); endp.setGrowOutput(false); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java index 1569eb01651..c6ee3d37e0f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java @@ -21,6 +21,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.AsyncByteArrayEndPoint; @@ -33,7 +35,7 @@ public class LocalHttpConnector extends HttpConnector { private static final Logger LOG = Log.getLogger(LocalHttpConnector.class); private final BlockingQueue _connects = new LinkedBlockingQueue(); - private Timer _timer; + private ScheduledExecutorService _timer; private LocalExecutor _executor; public LocalHttpConnector() @@ -109,7 +111,7 @@ public class LocalHttpConnector extends HttpConnector protected void doStart() throws Exception { super.doStart(); - _timer=new Timer(String.format("LocalHttpConnector@%x:Timer",hashCode()),true); + _timer=new ScheduledThreadPoolExecutor(1); _executor=new LocalExecutor(findExecutor()); } @@ -117,7 +119,7 @@ public class LocalHttpConnector extends HttpConnector protected void doStop() throws Exception { super.doStop(); - _timer.cancel(); + _timer.shutdownNow(); _executor=null; } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index a9ea58917eb..f7a9c48a32c 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -28,6 +28,8 @@ import java.util.Iterator; import java.util.Locale; import java.util.Timer; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import javax.servlet.ServletException; import javax.servlet.http.Cookie; @@ -58,6 +60,7 @@ public class ResponseTest private Server _server; private LocalHttpConnector _connector; private HttpChannel _channel; + private ScheduledExecutorService _timer; @Before public void init() throws Exception @@ -67,8 +70,9 @@ public class ResponseTest _server.addConnector(_connector); _server.setHandler(new DumpHandler()); _server.start(); - - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(new Timer(true)); + _timer=new ScheduledThreadPoolExecutor(1); + + AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); HttpInput input = new HttpInput(); AsyncConnection connection = new AbstractAsyncConnection(endp,new Executor() { @@ -172,6 +176,7 @@ public class ResponseTest { _server.stop(); _server.join(); + _timer.shutdownNow(); } @Test