jetty-9 do not recycle IOFutures

This commit is contained in:
Greg Wilkins 2012-05-04 21:41:27 +02:00
parent 9a5f86188e
commit 6cefe6c8f7
7 changed files with 120 additions and 107 deletions

View File

@ -13,6 +13,9 @@ import org.eclipse.jetty.util.log.Logger;
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint
{
public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class);
private final Lock _lock = new ReentrantLock();
@ -20,54 +23,10 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
private volatile AsyncConnection _connection;
private DispatchedIOFuture _readFuture = new DispatchedIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable task)
{
AsyncByteArrayEndPoint.this.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
// TODO ??
cancelled();
}
finally
{
_lock.unlock();
}
}
};
private DispatchedIOFuture _readFuture;
private DispatchedIOFuture _writeFuture;
private ByteBuffer[] _writeBuffers;
private DispatchedIOFuture _writeFuture = new DispatchedIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable task)
{
AsyncByteArrayEndPoint.this.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
// TODO
cancelled();
}
finally
{
_lock.unlock();
}
}
};
@ -93,10 +52,10 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
_lock.lock();
try
{
if (!_readFuture.isComplete())
if (_readFuture!=null && !_readFuture.isComplete())
throw new IllegalStateException("previous read not complete");
_readFuture.recycle();
_readFuture=new ReadFuture();
// TODO
@ -114,7 +73,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
_lock.lock();
try
{
if (!_writeFuture.isComplete())
if (_writeFuture!=null && !_writeFuture.isComplete())
throw new IllegalStateException("previous write not complete");
flush(buffers);
@ -125,7 +84,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
if (b.hasRemaining())
{
_writeBuffers=buffers;
_writeFuture.recycle();
_writeFuture=new WriteFuture();
// TODO
return _writeFuture;
}
@ -222,4 +181,67 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
}
}
private final class WriteFuture extends DispatchedIOFuture
{
private WriteFuture()
{
super(_lock);
}
@Override
protected void dispatch(Runnable task)
{
AsyncByteArrayEndPoint.this.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
// TODO
cancelled();
}
finally
{
_lock.unlock();
}
}
}
private final class ReadFuture extends DispatchedIOFuture
{
private ReadFuture()
{
super(_lock);
}
@Override
protected void dispatch(Runnable task)
{
AsyncByteArrayEndPoint.this.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
// TODO ??
cancelled();
}
finally
{
_lock.unlock();
}
}
}
}

View File

@ -109,25 +109,6 @@ public class DispatchedIOFuture implements IOFuture
}
}
public void recycle()
{
// System.err.println(this);new Throwable().printStackTrace();
_lock.lock();
try
{
if (!_complete)
throw new IllegalStateException();
_ready=false;
_cause=null;
_complete=false;
_callback=null;
}
finally
{
_lock.unlock();
}
}
@Override
public boolean isComplete()
{

View File

@ -17,6 +17,15 @@ final class RunnableIOFuture extends DispatchedIOFuture
super(ready,lock);
}
RunnableIOFuture(Lock lock)
{
super(lock);
}
RunnableIOFuture()
{
}
@Override
protected void dispatch(Runnable callback)
{
@ -44,11 +53,4 @@ final class RunnableIOFuture extends DispatchedIOFuture
return _task!=null;
}
@Override
public void recycle()
{
if (_task!=null)
throw new IllegalStateException("unrun task");
super.recycle();
}
}

View File

@ -40,8 +40,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
private final DispatchedIOFuture _readFuture = new InterestedFuture(SelectionKey.OP_READ,true,_lock);
private final DispatchedIOFuture _writeFuture = new InterestedFuture(SelectionKey.OP_WRITE,true,_lock);
private DispatchedIOFuture _readFuture=new DispatchedIOFuture(true,_lock);
private DispatchedIOFuture _writeFuture=new DispatchedIOFuture(true,_lock);
private SelectionKey _key;
@ -221,10 +222,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_lock.lock();
try
{
if (!_readFuture.isComplete())
if (_readFuture!=null && !_readFuture.isComplete())
throw new IllegalStateException("previous read not complete");
_readFuture.recycle();
_readFuture=new InterestedFuture(SelectionKey.OP_READ);
_interestOps=_interestOps|SelectionKey.OP_READ;
updateKey();
@ -244,7 +245,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_lock.lock();
try
{
if (!_writeFuture.isComplete())
if (_writeFuture!=null && !_writeFuture.isComplete())
throw new IllegalStateException("previous write not complete");
flush(buffers);
@ -255,7 +256,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
if (b.hasRemaining())
{
_writeBuffers=buffers;
_writeFuture.recycle();
_writeFuture=new InterestedFuture(SelectionKey.OP_WRITE);
_interestOps=_interestOps|SelectionKey.OP_WRITE;
updateKey();
return _writeFuture;
@ -500,9 +501,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private class InterestedFuture extends DispatchedIOFuture
{
final int _interest;
private InterestedFuture(int interest,boolean ready, Lock lock)
private InterestedFuture(int interest)
{
super(ready,lock);
super(_lock);
_interest=interest;
}

View File

@ -16,10 +16,7 @@ package org.eclipse.jetty.io;
import static org.eclipse.jetty.io.CompletedIOFuture.COMPLETE;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -50,11 +47,11 @@ public class SslConnection extends AbstractAsyncConnection
private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
private final Lock _lock = new ReentrantLock();
private final RunnableIOFuture _appReadFuture = new RunnableIOFuture(true,_lock);
private final RunnableIOFuture _appWriteFuture = new RunnableIOFuture(true,_lock);
private final IOFuture.Callback _netWriteCallback = new NetWriteCallback();
private RunnableIOFuture _appReadFuture = new RunnableIOFuture(true,_lock);
private RunnableIOFuture _appWriteFuture = new RunnableIOFuture(true,_lock);
private final SSLEngine _engine;
private final SSLSession _session;
private AbstractAsyncConnection _appConnection;
@ -259,7 +256,7 @@ public class SslConnection extends AbstractAsyncConnection
allocateBuffers();
boolean progress=true;
while(progress && !_appReadFuture.isDispatched())
while(progress && (_appReadFuture==null || !_appReadFuture.isDispatched()))
{
progress=false;
@ -284,7 +281,7 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
releaseBuffers();
if (!_appReadFuture.isComplete() && _netReadFuture==null && !BufferUtil.isFull(_inNet))
if (_appReadFuture!=null && !_appReadFuture.isComplete() && _netReadFuture==null && !BufferUtil.isFull(_inNet))
_netReadFuture=scheduleOnReadable();
LOG.debug("!onReadable {} {}",this,_netReadFuture);
@ -778,7 +775,7 @@ public class SslConnection extends AbstractAsyncConnection
return COMPLETE;
// No, we need to schedule a network read
_appReadFuture.recycle();
_appReadFuture=new RunnableIOFuture(_lock);
if (_netReadFuture==null)
_netReadFuture=scheduleOnReadable();
return _appReadFuture;
@ -806,7 +803,7 @@ public class SslConnection extends AbstractAsyncConnection
if (b.hasRemaining())
{
_writeBuffers=buffers;
_appWriteFuture.recycle();
_appWriteFuture=new RunnableIOFuture(_lock);
return _appWriteFuture;
}
}

View File

@ -163,7 +163,7 @@ public class IOFutureTest
@Test
public void testReady() throws Exception
{
final DispatchedIOFuture future = new DispatchedIOFuture();
DispatchedIOFuture future = new DispatchedIOFuture();
assertFalse(future.isComplete());
assertFalse(future.isReady());
@ -195,13 +195,14 @@ public class IOFutureTest
start=System.currentTimeMillis();
final DispatchedIOFuture f0=future;
new Thread()
{
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){}
future.ready();
f0.ready();
}
}.start();
@ -215,17 +216,20 @@ public class IOFutureTest
assertEquals((Throwable)null,fail.get());
ready.set(false);
future.recycle();
future = new DispatchedIOFuture();
assertFalse(future.isComplete());
assertFalse(future.isReady());
start=System.currentTimeMillis();
final DispatchedIOFuture f1=future;
new Thread()
{
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){}
future.ready();
f1.ready();
}
}.start();
@ -236,14 +240,13 @@ public class IOFutureTest
assertTrue(future.isReady());
assertFalse(ready.get()); // no callback set
assertEquals((Throwable)null,fail.get());
}
@Test
public void testFail() throws Exception
{
final DispatchedIOFuture future = new DispatchedIOFuture();
DispatchedIOFuture future = new DispatchedIOFuture();
final Exception ex=new Exception("failed");
assertFalse(future.isComplete());
@ -275,13 +278,14 @@ public class IOFutureTest
assertEquals((Throwable)null,fail.get());
start=System.currentTimeMillis();
final DispatchedIOFuture f0=future;
new Thread()
{
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){}
future.fail(ex);
f0.fail(ex);
}
}.start();
@ -310,19 +314,21 @@ public class IOFutureTest
assertFalse(ready.get());
assertEquals(ex,fail.get());
future=new DispatchedIOFuture();
ready.set(false);
fail.set(null);
future.recycle();
assertFalse(future.isComplete());
assertFalse(future.isReady());
start=System.currentTimeMillis();
final DispatchedIOFuture f1=future;
new Thread()
{
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){}
future.fail(ex);
f1.fail(ex);
}
}.start();

View File

@ -187,6 +187,10 @@ public class SelectChannelEndPointTest
e2.printStackTrace();
}
}
catch(InterruptedException e)
{
// e.printStackTrace();
}
catch(Exception e)
{
e.printStackTrace();
@ -204,11 +208,11 @@ public class SelectChannelEndPointTest
@Override
public void onIdleExpired(long idleForMs)
{
System.err.println("IDLE "+idleForMs);
/*System.err.println("IDLE "+idleForMs);
System.err.println("last "+(System.currentTimeMillis()-_last));
System.err.println("ENDP "+_endp);
System.err.println("tran "+_endp.getTransport());
System.err.println();
System.err.println();*/
super.onIdleExpired(idleForMs);
}
@ -507,7 +511,7 @@ public class SelectChannelEndPointTest
server.configureBlocking(false);
_manager.register(server);
int writes = 1000;
int writes = 10000;
final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET);
byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET);