jetty-9 scheduledexecutorservice

This commit is contained in:
Greg Wilkins 2012-07-23 19:20:29 +10:00
parent 757323e120
commit 5b57573596
4 changed files with 55 additions and 41 deletions

View File

@ -1,11 +1,10 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil;
@ -17,10 +16,22 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
{
private static final int TICK=Integer.getInteger("org.eclipse.jetty.io.AsyncByteArrayEndPoint.TICK",100);
public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class);
private final Timer _timer;
private final ScheduledExecutorService _timer;
private AsyncConnection _connection;
private final TimerTask _checkTimeout=new TimeoutTask(this);
private final Runnable _checkTimeout=new Runnable()
{
@Override
public void run()
{
if (isOpen())
{
checkTimeout(System.currentTimeMillis());
if (isOpen())
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
}
}
};
private final ReadInterest _readInterest = new ReadInterest()
{
@ -42,25 +53,25 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
}
};
public AsyncByteArrayEndPoint(Timer timer)
public AsyncByteArrayEndPoint(ScheduledExecutorService timer)
{
super();
_timer=timer;
_timer.schedule(_checkTimeout,TICK,TICK);
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
}
public AsyncByteArrayEndPoint(Timer timer, byte[] input, int outputSize)
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, byte[] input, int outputSize)
{
super(input,outputSize);
_timer=timer;
_timer.schedule(_checkTimeout,TICK,TICK);
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
}
public AsyncByteArrayEndPoint(Timer timer, String input, int outputSize)
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, String input, int outputSize)
{
super(input,outputSize);
_timer=timer;
_timer.schedule(_checkTimeout,TICK,TICK);
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
}
@Override
@ -156,25 +167,4 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
public void onClose()
{
}
private static class TimeoutTask extends TimerTask
{
final WeakReference<AsyncByteArrayEndPoint> _endp;
TimeoutTask(AsyncByteArrayEndPoint endp)
{
_endp=new WeakReference<AsyncByteArrayEndPoint>(endp);
}
@Override
public void run()
{
AsyncByteArrayEndPoint endp = _endp.get();
if (endp==null)
cancel();
else
endp.checkTimeout(System.currentTimeMillis());
}
};
}

View File

@ -8,21 +8,38 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AsyncByteArrayEndPointTest
{
ScheduledExecutorService _timer;
@Before
public void before()
{
_timer = new ScheduledThreadPoolExecutor(1);
}
@After
public void after()
{
_timer.shutdownNow();
}
@Test
public void testReadable() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(new Timer());
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer);
endp.setInput("test input");
ByteBuffer buffer = BufferUtil.allocate(1024);
@ -81,7 +98,7 @@ public class AsyncByteArrayEndPointTest
@Test
public void testWrite() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(new Timer(),(byte[])null,15);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,(byte[])null,15);
endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(10));
@ -109,7 +126,7 @@ public class AsyncByteArrayEndPointTest
@Test
public void testIdle() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(new Timer());
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer);
endp.setIdleTimeout(500);
endp.setInput("test");
endp.setGrowOutput(false);

View File

@ -21,6 +21,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AsyncByteArrayEndPoint;
@ -33,7 +35,7 @@ public class LocalHttpConnector extends HttpConnector
{
private static final Logger LOG = Log.getLogger(LocalHttpConnector.class);
private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<LocalEndPoint>();
private Timer _timer;
private ScheduledExecutorService _timer;
private LocalExecutor _executor;
public LocalHttpConnector()
@ -109,7 +111,7 @@ public class LocalHttpConnector extends HttpConnector
protected void doStart() throws Exception
{
super.doStart();
_timer=new Timer(String.format("LocalHttpConnector@%x:Timer",hashCode()),true);
_timer=new ScheduledThreadPoolExecutor(1);
_executor=new LocalExecutor(findExecutor());
}
@ -117,7 +119,7 @@ public class LocalHttpConnector extends HttpConnector
protected void doStop() throws Exception
{
super.doStop();
_timer.cancel();
_timer.shutdownNow();
_executor=null;
}

View File

@ -28,6 +28,8 @@ import java.util.Iterator;
import java.util.Locale;
import java.util.Timer;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
@ -58,6 +60,7 @@ public class ResponseTest
private Server _server;
private LocalHttpConnector _connector;
private HttpChannel _channel;
private ScheduledExecutorService _timer;
@Before
public void init() throws Exception
@ -67,8 +70,9 @@ public class ResponseTest
_server.addConnector(_connector);
_server.setHandler(new DumpHandler());
_server.start();
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(new Timer(true));
_timer=new ScheduledThreadPoolExecutor(1);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer);
HttpInput input = new HttpInput();
AsyncConnection connection = new AbstractAsyncConnection(endp,new Executor()
{
@ -172,6 +176,7 @@ public class ResponseTest
{
_server.stop();
_server.join();
_timer.shutdownNow();
}
@Test