jetty-9 refactored common IOFuture pattern

This commit is contained in:
Greg Wilkins 2012-05-04 08:52:27 +02:00
parent a1d8c640ce
commit ad108c42c9
6 changed files with 129 additions and 85 deletions

View File

@ -48,7 +48,6 @@ import org.eclipse.jetty.util.log.Logger;
* <p>This class is not synchronized as it is expected that modifications will only be performed by a
* single thread.
*
*
*/
public class HttpFields implements Iterable<HttpFields.Field>
{

View File

@ -180,6 +180,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
/* ------------------------------------------------------------ */
/*
*/
@Override
public boolean isInputShutdown()
{
return _ishut||_closed;
@ -194,6 +195,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint
return _oshut||_closed;
}
/* ------------------------------------------------------------ */
private void shutdownInput() throws IOException
{
_ishut=true;
if (_oshut)
close();
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#shutdownOutput()
@ -202,6 +211,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void shutdownOutput() throws IOException
{
_oshut=true;
if (_ishut)
close();
}
/* ------------------------------------------------------------ */
@ -233,7 +244,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
if (_closed)
throw new IOException("CLOSED");
if (_in==null)
_ishut=true;
shutdownInput();
if (_ishut)
return -1;
return BufferUtil.append(_in,buffer);

View File

@ -60,47 +60,14 @@ public class ChannelEndPoint extends AbstractEndPoint
return _channel.isOpen();
}
/** Shutdown the channel Input.
* Cannot be overridden. To override, see {@link #shutdownInput()}
* @throws IOException
*/
protected final void shutdownChannelInput() throws IOException
{
LOG.debug("ishut {}", this);
_ishut = true;
if (_channel.isOpen())
{
if (_socket != null)
{
try
{
if (!_socket.isInputShutdown())
{
_socket.shutdownInput();
}
}
catch (SocketException e)
{
LOG.debug(e.toString());
LOG.ignore(e);
}
finally
{
if (_oshut)
{
close();
}
}
}
}
}
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close()
*/
public void shutdownInput() throws IOException
private void shutdownInput() throws IOException
{
shutdownChannelInput();
_ishut=true;
if (_oshut)
close();
}
protected final void shutdownChannelOutput() throws IOException

View File

@ -45,13 +45,35 @@ public interface EndPoint
/* ------------------------------------------------------------ */
long getCreatedTimeStamp();
/**
* Shutdown any backing output stream associated with the endpoint
/* ------------------------------------------------------------ */
/** Shutdown the output.
* <p>This call indicates that no more data will be sent on this endpoint that
* that the remote end should read an EOF once all previously sent data has been
* consumed. Shutdown may be done either at the TCP/IP level, as a protocol exchange (Eg
* TLS close handshake) or both.
* <p>
* If the endpoint has {@link #isInputShutdown()} true, then this call has the same effect
* as {@link #close()}.
* @throws IOException
*/
void shutdownOutput() throws IOException;
/* ------------------------------------------------------------ */
/** Test if output is shutdown.
* The output is shutdown by a call to {@link #shutdownOutput()}
* or {@link #close()}.
* @return true if the output is shutdown or the endpoint is closed.
*/
boolean isOutputShutdown();
/* ------------------------------------------------------------ */
/** Test if the input is shutdown.
* The input is shutdown if an EOF has been read while doing
* a {@link #fill(ByteBuffer)}. Once the input is shutdown, all calls to
* {@link #fill(ByteBuffer)} will return -1, until such time as the
* end point is close, when they will return {@link EofException}.
* @return True if the input is shutdown or the endpoint is closed.
*/
boolean isInputShutdown();
/**
@ -67,8 +89,8 @@ public interface EndPoint
* @param buffer The buffer to fill. The position and limit are modified during the fill. After the
* operation, the position is unchanged and the limit is increased to reflect the new data filled.
* @return an <code>int</code> value indicating the number of bytes
* filled or -1 if EOF is reached.
* @throws EofException If input is shutdown or the endpoint is closed.
* filled or -1 if EOF is read or the input is shutdown.
* @throws EofException If the endpoint is closed.
*/
int fill(ByteBuffer buffer) throws IOException;

View File

@ -0,0 +1,52 @@
package org.eclipse.jetty.io;
import java.util.concurrent.locks.Lock;
/* ------------------------------------------------------------ */
/** A Dispatched IOFuture that retains the Runnable.
* This IOFuture captures a dispatched task as a runnable that can be executed later.
* This is often used when the {@link #ready()} or {@link #fail(Throwable)} method is
* called holding locks that should not be held during the execution of the runnable.
*/
final class RunnableIOFuture extends DispatchedIOFuture
{
private volatile Runnable _task;
RunnableIOFuture(boolean ready, Lock lock)
{
super(ready,lock);
}
@Override
protected void dispatch(Runnable callback)
{
if (_task!=null)
throw new IllegalStateException();
_task=callback;
}
public Runnable takeTask()
{
Runnable t=_task;
_task=null;
return t;
}
public void run()
{
takeTask().run();
}
public boolean isDispatched()
{
return _task!=null;
}
@Override
public void recycle()
{
if (_task!=null)
throw new IllegalStateException("unrun task");
super.recycle();
}
}

View File

@ -45,46 +45,15 @@ import org.eclipse.jetty.util.log.Logger;
public class SslConnection extends AbstractAsyncConnection
{
static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.ssl");
private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0);
private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
private final Lock _lock = new ReentrantLock();
private final DispatchedIOFuture _appReadFuture = new DispatchedIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable callback)
{
if (_appReadTask!=null)
throw new IllegalStateException();
_appReadTask=callback;
}
};
private IOFuture.Callback _writeCallback = new IOFuture.Callback()
{
@Override
public void onReady()
{
_appEndPoint.completeWrite();
}
@Override
public void onFail(Throwable cause)
{
LOG.debug("write FAILED",cause);
if (!_appWriteFuture.isComplete())
_appWriteFuture.fail(cause);
else
LOG.warn("write FAILED",cause);
}
};
private final RunnableIOFuture _appReadFuture = new RunnableIOFuture(true,_lock);
private final RunnableIOFuture _appWriteFuture = new RunnableIOFuture(true,_lock);
private final IOFuture.Callback _netWriteCallback = new NetWriteCallback();
private final DispatchedIOFuture _appWriteFuture = new DispatchedIOFuture(true,_lock);
private Runnable _appReadTask;
private final SSLEngine _engine;
private final SSLSession _session;
private AbstractAsyncConnection _appConnection;
@ -101,6 +70,31 @@ public class SslConnection extends AbstractAsyncConnection
private IOFuture _netReadFuture;
private IOFuture _netWriteFuture;
private final class NetWriteCallback implements IOFuture.Callback
{
@Override
public void onReady()
{
_appEndPoint.completeWrite();
}
@Override
public void onFail(Throwable cause)
{
LOG.debug("write FAILED",cause);
if (!_appWriteFuture.isComplete())
{
_appWriteFuture.fail(cause);
_appWriteFuture.run();
}
else
LOG.warn("write FAILED",cause);
}
}
/* ------------------------------------------------------------ */
/* this is a half baked buffer pool
*/
@ -273,7 +267,7 @@ public class SslConnection extends AbstractAsyncConnection
allocateBuffers();
boolean progress=true;
while(progress && _appReadTask==null)
while(progress && !_appReadFuture.isDispatched())
{
progress=false;
@ -307,12 +301,8 @@ public class SslConnection extends AbstractAsyncConnection
}
// Run any ready callback from _appReadFuture in this thread.
if (_appReadTask!=null)
{
Runnable task=_appReadTask;
_appReadTask=null;
task.run();
}
if (_appReadFuture.isDispatched())
_appReadFuture.run();
}
/* ------------------------------------------------------------ */
@ -486,7 +476,7 @@ public class SslConnection extends AbstractAsyncConnection
return true;
_netWriteFuture=write;
_netWriteFuture.setCallback(_writeCallback);
_netWriteFuture.setCallback(_netWriteCallback);
}
return result.bytesConsumed()>0 || result.bytesProduced()>0 ;
@ -841,6 +831,9 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
_lock.unlock();
if (_appWriteFuture.isDispatched())
_appWriteFuture.run();
}
}
}