jetty-9 work in progress

This commit is contained in:
Greg Wilkins 2012-05-07 13:37:23 +02:00
parent 6cefe6c8f7
commit 2b78e69ac9
10 changed files with 65 additions and 129 deletions

View File

@ -59,7 +59,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
@Override
public IOFuture scheduleOnReadable()
{
IOFuture read=getEndPoint().read();
IOFuture read=getEndPoint().readable();
read.setCallback(_readCallback);
return read;
}

View File

@ -9,7 +9,7 @@ public abstract class AbstractEndPoint implements EndPoint
private final long _created=System.currentTimeMillis();
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private int _maxIdleTime;
private volatile int _maxIdleTime;
private volatile long _idleTimestamp;

View File

@ -47,7 +47,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
}
@Override
public IOFuture read() throws IllegalStateException
public IOFuture readable() throws IllegalStateException
{
_lock.lock();
try

View File

@ -19,44 +19,44 @@ import java.util.concurrent.Future;
* ; in a callback mode (like {@link CompletionHandler} mode; or blocking mod;e or a hybrid mode
* <h3>Blocking read</h3>
* <pre>
* endpoint.read().complete();
* endpoint.readable().block();
* endpoint.fill(buffer);
* </pre>
* <h3>Polling read</h3>
* <pre>
* IOFuture read = endpoint.read();
* IOFuture read = endpoint.readable();
* ...
* if (read.isReady())
* endpoint.fill(buffer);
* while (!read.isReady())
* Thread.sleep(10);
* endpoint.fill(buffer);
* </pre>
* <h3>Callback read</h3>
* <pre>
* endpoint.read().setHandler(new IOCallback()
* endpoint.readable().setCallback(new IOCallback()
* {
* public void onReady() { endpoint.fill(buffer); ... }
* public void onFail(IOException e) { ... }
* public void onTimeout() { ... }
* }
* </pre>
*
* <h3>Blocking write</h3>
* <pre>
* endpoint.write(buffer).complete();
* endpoint.write(buffer).block();
* </pre>
* <h3>Polling write</h3>
* <pre>
* IOFuture write = endpoint.write(buffer);
* ...
* if (write.isReady())
* // do next write
* while (!write.isReady())
* Thread.sleep(10);
*
* </pre>
* <h3>Callback write</h3>
* <pre>
* endpoint.write(buffer).setHandler(new IOCallback()
* endpoint.write(buffer0,buffer1).setCallback(new IOCallback()
* {
* public void onReady() { ... }
* public void onFail(IOException e) { ... }
* public void onTimeout() { ... }
* }
* </pre>
* <h3>Hybrid write</h3>
@ -70,14 +70,13 @@ import java.util.concurrent.Future;
* {
* public void onReady() { ... }
* public void onFail(IOException e) { ... }
* public void onTimeout() { ... }
* });
* ...
* </pre>
*
* <h2>Compatibility Notes</h2>
* Some Async IO APIs have the concept of setting read interest. With this
* API calling {@link #read()} is equivalent to setting read interest to true
* API calling {@link #readable()} is equivalent to setting read interest to true
* and calling {@link IOFuture#cancel()} is equivalent to setting read interest
* to false.
*/
@ -95,7 +94,7 @@ public interface AsyncEndPoint extends EndPoint
* return immediately with data without blocking.
* @throws IllegalStateException if another read operation has been scheduled and has not timedout, been cancelled or is ready.
*/
IOFuture read() throws IllegalStateException;
IOFuture readable() throws IllegalStateException;
/* ------------------------------------------------------------ */
/** Schedule a write operation.

View File

@ -69,8 +69,12 @@ public class ChannelEndPoint extends AbstractEndPoint
if (_oshut)
close();
}
protected final void shutdownChannelOutput() throws IOException
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close()
*/
@Override
public void shutdownOutput() throws IOException
{
LOG.debug("oshut {}",this);
_oshut = true;
@ -101,15 +105,6 @@ public class ChannelEndPoint extends AbstractEndPoint
}
}
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close()
*/
@Override
public void shutdownOutput() throws IOException
{
shutdownChannelOutput();
}
@Override
public boolean isOutputShutdown()
{

View File

@ -209,7 +209,7 @@ public class DispatchedIOFuture implements IOFuture
protected void dispatch(Runnable callback)
{
callback.run();
new Thread(callback).run();
}
private void dispatchReady()

View File

@ -1,56 +0,0 @@
package org.eclipse.jetty.io;
import java.util.concurrent.locks.Lock;
/* ------------------------------------------------------------ */
/** A Dispatched IOFuture that retains the Runnable.
* This IOFuture captures a dispatched task as a runnable that can be executed later.
* This is often used when the {@link #ready()} or {@link #fail(Throwable)} method is
* called holding locks that should not be held during the execution of the runnable.
*/
final class RunnableIOFuture extends DispatchedIOFuture
{
private volatile Runnable _task;
RunnableIOFuture(boolean ready, Lock lock)
{
super(ready,lock);
}
RunnableIOFuture(Lock lock)
{
super(lock);
}
RunnableIOFuture()
{
}
@Override
protected void dispatch(Runnable callback)
{
if (_task!=null)
throw new IllegalStateException();
_task=callback;
}
public Runnable takeTask()
{
Runnable t=_task;
_task=null;
return t;
}
public void run()
{
Runnable task=takeTask();
if (task!=null)
task.run();
}
public boolean isDispatched()
{
return _task!=null;
}
}

View File

@ -124,8 +124,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
return;
}
boolean can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0);
boolean can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0);
//TODO do we need to test interest here ???
boolean can_read=(_key.isReadable() && (_key.interestOps()&SelectionKey.OP_READ)==SelectionKey.OP_READ);
boolean can_write=(_key.isWritable() && (_key.interestOps()&SelectionKey.OP_WRITE)==SelectionKey.OP_WRITE);
_interestOps=0;
if (can_read && !_readFuture.isComplete())
@ -217,7 +218,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
@Override
public IOFuture read() throws IllegalStateException
public IOFuture readable() throws IllegalStateException
{
_lock.lock();
try
@ -321,27 +322,36 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
*/
private void updateKey()
{
if (!_selected)
if (!_lock.tryLock())
throw new IllegalStateException();
try
{
int current_ops=-1;
if (getChannel().isOpen())
if (!_selected)
{
try
int current_ops=-1;
if (getChannel().isOpen())
{
current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
try
{
current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
}
catch(Exception e)
{
_key=null;
LOG.ignore(e);
}
}
catch(Exception e)
if (_interestOps!=current_ops && !_changing)
{
_key=null;
LOG.ignore(e);
_changing=true;
_selectSet.addChange(this);
_selectSet.wakeup();
}
}
if (_interestOps!=current_ops && !_changing)
{
_changing=true;
_selectSet.addChange(this);
_selectSet.wakeup();
}
}
finally
{
_lock.unlock();
}
}

View File

@ -49,8 +49,8 @@ public class SslConnection extends AbstractAsyncConnection
private final Lock _lock = new ReentrantLock();
private final IOFuture.Callback _netWriteCallback = new NetWriteCallback();
private RunnableIOFuture _appReadFuture = new RunnableIOFuture(true,_lock);
private RunnableIOFuture _appWriteFuture = new RunnableIOFuture(true,_lock);
private DispatchedIOFuture _appReadFuture = new DispatchedIOFuture(true,_lock);
private DispatchedIOFuture _appWriteFuture = new DispatchedIOFuture(true,_lock);
private final SSLEngine _engine;
private final SSLSession _session;
@ -83,10 +83,7 @@ public class SslConnection extends AbstractAsyncConnection
{
LOG.debug("write FAILED",cause);
if (!_appWriteFuture.isComplete())
{
_appWriteFuture.fail(cause);
_appWriteFuture.run();
}
else
LOG.warn("write FAILED",cause);
}
@ -228,7 +225,6 @@ public class SslConnection extends AbstractAsyncConnection
@Override
public void onClose()
{
// TODO is this right?
_appConnection.onClose();
}
@ -256,7 +252,7 @@ public class SslConnection extends AbstractAsyncConnection
allocateBuffers();
boolean progress=true;
while(progress && (_appReadFuture==null || !_appReadFuture.isDispatched()))
while(progress)
{
progress=false;
@ -287,11 +283,6 @@ public class SslConnection extends AbstractAsyncConnection
LOG.debug("!onReadable {} {}",this,_netReadFuture);
_lock.unlock();
// Run any ready callback from _appReadFuture in this thread.
_appReadFuture.run();
_appWriteFuture.run();
}
}
@ -600,9 +591,6 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
_lock.unlock();
_appReadFuture.run();
_appWriteFuture.run();
}
}
@ -764,7 +752,7 @@ public class SslConnection extends AbstractAsyncConnection
}
@Override
public IOFuture read() throws IllegalStateException
public IOFuture readable() throws IllegalStateException
{
LOG.debug("{} sslEndp.read()",_session);
_lock.lock();
@ -775,7 +763,7 @@ public class SslConnection extends AbstractAsyncConnection
return COMPLETE;
// No, we need to schedule a network read
_appReadFuture=new RunnableIOFuture(_lock);
_appReadFuture=new DispatchedIOFuture(_lock);
if (_netReadFuture==null)
_netReadFuture=scheduleOnReadable();
return _appReadFuture;
@ -803,7 +791,7 @@ public class SslConnection extends AbstractAsyncConnection
if (b.hasRemaining())
{
_writeBuffers=buffers;
_appWriteFuture=new RunnableIOFuture(_lock);
_appWriteFuture=new DispatchedIOFuture(_lock);
return _appWriteFuture;
}
}
@ -816,8 +804,6 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
_lock.unlock();
_appReadFuture.run();
_appWriteFuture.run();
}
}
@ -843,9 +829,8 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
_lock.unlock();
_appReadFuture.run();
_appWriteFuture.run();
}
}
}
}

View File

@ -150,7 +150,7 @@ public class SelectChannelEndPointTest
// If the tests wants to block, then block
while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt)
{
_endp.read().block();
_endp.readable().block();
filled=_endp.fill(_in);
progress|=filled>0;
}
@ -179,6 +179,7 @@ public class SelectChannelEndPointTest
// Timeout does not close, so echo exception then shutdown
try
{
System.err.println(e);
_endp.write(BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in))).block();
_endp.shutdownOutput();
}
@ -208,11 +209,13 @@ public class SelectChannelEndPointTest
@Override
public void onIdleExpired(long idleForMs)
{
/*System.err.println("IDLE "+idleForMs);
/*
System.err.println("IDLE "+idleForMs);
System.err.println("last "+(System.currentTimeMillis()-_last));
System.err.println("ENDP "+_endp);
System.err.println("tran "+_endp.getTransport());
System.err.println();*/
System.err.println();
*/
super.onIdleExpired(idleForMs);
}
@ -511,7 +514,7 @@ public class SelectChannelEndPointTest
server.configureBlocking(false);
_manager.register(server);
int writes = 10000;
int writes = 100000;
final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET);
byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET);
@ -542,7 +545,7 @@ public class SelectChannelEndPointTest
for (byte b0 : bytes)
{
int b = in.read();
assertTrue(b>0);
Assert.assertThat(b,greaterThan(0));
assertEquals(0xff&b0,b);
}