jetty-9 cleanups

This commit is contained in:
Greg Wilkins 2012-05-03 21:41:54 +02:00
parent f0eded5fdb
commit 017d6e7c2a
11 changed files with 294 additions and 37 deletions

View File

@ -9,7 +9,9 @@ public abstract class AbstractEndPoint implements EndPoint
private final long _created=System.currentTimeMillis();
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private int _maxIdleTime;
private int _maxIdleTime;
private volatile long _lastNotIdleTimestamp;
protected AbstractEndPoint(InetSocketAddress local,InetSocketAddress remote)
{
@ -23,6 +25,7 @@ public abstract class AbstractEndPoint implements EndPoint
return _created;
}
@Override
public int getMaxIdleTime()
{
@ -49,6 +52,18 @@ public abstract class AbstractEndPoint implements EndPoint
return _remote;
}
/* ------------------------------------------------------------ */
public long getNotIdleTimestamp()
{
return _lastNotIdleTimestamp;
}
/* ------------------------------------------------------------ */
protected void notIdle()
{
_lastNotIdleTimestamp=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
@Override
public String toString()

View File

@ -0,0 +1,225 @@
package org.eclipse.jetty.io;
import static org.eclipse.jetty.io.CompletedIOFuture.COMPLETE;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint
{
public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class);
private final Lock _lock = new ReentrantLock();
private volatile boolean _idlecheck;
private volatile AsyncConnection _connection;
private DispatchedIOFuture _readFuture = new DispatchedIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable task)
{
AsyncByteArrayEndPoint.this.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
// TODO ??
cancelled();
}
finally
{
_lock.unlock();
}
}
};
private ByteBuffer[] _writeBuffers;
private DispatchedIOFuture _writeFuture = new DispatchedIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable task)
{
AsyncByteArrayEndPoint.this.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
// TODO
cancelled();
}
finally
{
_lock.unlock();
}
}
};
@Override
public AsyncConnection getAsyncConnection()
{
return _connection;
}
protected void dispatch(Runnable task)
{
new Thread(task).start();
}
public void setAsyncConnection(AbstractAsyncConnection connection)
{
_connection=connection;
}
@Override
public IOFuture read() throws IllegalStateException
{
_lock.lock();
try
{
if (!_readFuture.isComplete())
throw new IllegalStateException("previous read not complete");
_readFuture.recycle();
// TODO
return _readFuture;
}
finally
{
_lock.unlock();
}
}
@Override
public IOFuture write(ByteBuffer... buffers) throws IllegalStateException
{
_lock.lock();
try
{
if (!_writeFuture.isComplete())
throw new IllegalStateException("previous write not complete");
flush(buffers);
// Are we complete?
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
_writeBuffers=buffers;
_writeFuture.recycle();
// TODO
return _writeFuture;
}
}
return COMPLETE;
}
catch(IOException e)
{
return new CompletedIOFuture(e);
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
private void completeWrite()
{
try
{
flush(_writeBuffers);
// Are we complete?
for (ByteBuffer b : _writeBuffers)
{
if (b.hasRemaining())
{
// TODO
return;
}
}
// we are complete and ready
_writeFuture.ready();
}
catch(final IOException e)
{
_writeBuffers=null;
if (!_writeFuture.isComplete())
_writeFuture.fail(e);
}
}
/* ------------------------------------------------------------ */
@Override
public void setCheckForIdle(boolean check)
{
_idlecheck=check;
}
/* ------------------------------------------------------------ */
@Override
public boolean isCheckForIdle()
{
return _idlecheck;
}
/* ------------------------------------------------------------ */
public void checkForIdleOrReadWriteTimeout(long now)
{
if (_idlecheck || !_readFuture.isComplete() || !_writeFuture.isComplete())
{
long idleTimestamp=getNotIdleTimestamp();
long max_idle_time=getMaxIdleTime();
if (idleTimestamp!=0 && max_idle_time>0)
{
long idleForMs=now-idleTimestamp;
if (idleForMs>max_idle_time)
{
_lock.lock();
try
{
if (_idlecheck)
_connection.onIdleExpired(idleForMs);
if (!_readFuture.isComplete())
_readFuture.fail(new TimeoutException());
if (!_writeFuture.isComplete())
_writeFuture.fail(new TimeoutException());
}
finally
{
notIdle();
_lock.unlock();
}
}
}
}
}
}

View File

@ -122,6 +122,6 @@ public interface AsyncEndPoint extends EndPoint
* @return Timestamp in ms since epoch of when the last data was
* filled or flushed from this endpoint.
*/
long getActivityTimestamp();
long getNotIdleTimestamp();
}

View File

@ -28,8 +28,11 @@ import org.eclipse.jetty.util.StringUtil;
*/
public class ByteArrayEndPoint extends AbstractEndPoint
{
public final static InetSocketAddress NOIP=new InetSocketAddress(0);
protected ByteBuffer _in;
protected ByteBuffer _out;
protected boolean _ishut;
protected boolean _oshut;
protected boolean _closed;
protected boolean _growOutput;
@ -40,7 +43,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public ByteArrayEndPoint()
{
super(null,null);
super(NOIP,NOIP);
_in=BufferUtil.EMPTY_BUFFER;
_out=BufferUtil.allocate(1024);
}
@ -51,7 +54,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public ByteArrayEndPoint(byte[] input, int outputSize)
{
super(null,null);
super(NOIP,NOIP);
_in=input==null?null:ByteBuffer.wrap(input);
_out=BufferUtil.allocate(outputSize);
}
@ -62,7 +65,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public ByteArrayEndPoint(String input, int outputSize)
{
super(null,null);
super(NOIP,NOIP);
setInput(input);
_out=BufferUtil.allocate(outputSize);
}
@ -176,7 +179,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.jetty.io.EndPoint#isOutputShutdown()
*/
public boolean isInputShutdown()
{
return _ishut||_closed;
}
/* ------------------------------------------------------------ */
/*
*/
@Override
public boolean isOutputShutdown()
@ -223,6 +233,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
if (_closed)
throw new IOException("CLOSED");
if (_in==null)
_ishut=true;
if (_ishut)
return -1;
return BufferUtil.append(_in,buffer);
}
@ -234,10 +246,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override
public int flush(ByteBuffer... buffers) throws IOException
{
if (_oshut)
throw new IOException("oshut");
if (_closed)
throw new IOException("CLOSED");
if (_oshut)
throw new IOException("OSHUT");
int len=0;
@ -271,6 +283,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public void reset()
{
_ishut=false;
_oshut=false;
_closed=false;
_in=null;

View File

@ -20,7 +20,6 @@ import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.util.BufferUtil;
@ -150,6 +149,7 @@ public class ChannelEndPoint extends AbstractEndPoint
return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown();
}
@Override
public boolean isInputShutdown()
{
return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown();

View File

@ -5,20 +5,20 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.thread.ThreadPool;
public class CompleteIOFuture implements IOFuture
public class CompletedIOFuture implements IOFuture
{
private final boolean _ready;
private final Throwable _cause;
public final static CompleteIOFuture COMPLETE=new CompleteIOFuture();
public final static CompletedIOFuture COMPLETE=new CompletedIOFuture();
public CompleteIOFuture()
public CompletedIOFuture()
{
_ready=true;
_cause=null;
}
public CompleteIOFuture(Throwable cause)
public CompletedIOFuture(Throwable cause)
{
_ready=false;
_cause=cause;

View File

@ -51,6 +51,8 @@ public interface EndPoint
void shutdownOutput() throws IOException;
boolean isOutputShutdown();
boolean isInputShutdown();
/**
* Close any backing stream associated with the endpoint

View File

@ -26,7 +26,7 @@ import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task;
import static org.eclipse.jetty.io.CompleteIOFuture.COMPLETE;
import static org.eclipse.jetty.io.CompletedIOFuture.COMPLETE;
/* ------------------------------------------------------------ */
/**
@ -54,7 +54,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private boolean _open;
private volatile boolean _idlecheck;
private volatile long _lastNotIdleTimestamp;
private volatile AbstractAsyncConnection _connection;
private DispatchedIOFuture _readFuture = new DispatchedIOFuture(true,_lock)
@ -154,12 +153,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_manager.endPointUpgraded(this,old);
}
/* ------------------------------------------------------------ */
@Override
public long getActivityTimestamp()
{
return _lastNotIdleTimestamp;
}
/* ------------------------------------------------------------ */
/** Called by selectSet to schedule handling
@ -229,18 +222,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
return _idlecheck;
}
/* ------------------------------------------------------------ */
protected void notIdle()
{
_lastNotIdleTimestamp=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
public void checkForIdleOrReadWriteTimeout(long now)
{
if (_idlecheck || !_readFuture.isComplete() || !_writeFuture.isComplete())
{
long idleTimestamp=_lastNotIdleTimestamp;
long idleTimestamp=getNotIdleTimestamp();
long max_idle_time=getMaxIdleTime();
if (idleTimestamp!=0 && max_idle_time>0)
@ -261,7 +249,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
finally
{
_lastNotIdleTimestamp=now;
notIdle();
_lock.unlock();
}
}
@ -330,7 +318,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
catch(IOException e)
{
return new CompleteIOFuture(e);
return new CompletedIOFuture(e);
}
finally
{

View File

@ -13,7 +13,7 @@
package org.eclipse.jetty.io;
import static org.eclipse.jetty.io.CompleteIOFuture.COMPLETE;
import static org.eclipse.jetty.io.CompletedIOFuture.COMPLETE;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -641,6 +641,20 @@ public class SslConnection extends AbstractAsyncConnection
}
}
@Override
public boolean isInputShutdown()
{
_lock.lock();
try
{
return !isOpen()||_engine.isInboundDone();
}
finally
{
_lock.unlock();
}
}
@Override
public void close() throws IOException
{
@ -741,9 +755,9 @@ public class SslConnection extends AbstractAsyncConnection
@Override
public long getActivityTimestamp()
public long getNotIdleTimestamp()
{
return _endp.getActivityTimestamp();
return _endp.getNotIdleTimestamp();
}
@Override
@ -806,7 +820,7 @@ public class SslConnection extends AbstractAsyncConnection
}
catch (IOException e)
{
return new CompleteIOFuture(e);
return new CompletedIOFuture(e);
}
finally
{

View File

@ -20,7 +20,7 @@ public class IOFutureTest
@Test
public void testReadyCompleted() throws Exception
{
IOFuture future = new CompleteIOFuture();
IOFuture future = new CompletedIOFuture();
assertTrue(future.isComplete());
assertTrue(future.isReady());
@ -60,7 +60,7 @@ public class IOFutureTest
public void testFailedCompleted() throws Exception
{
Exception ex=new Exception("failed");
IOFuture future = new CompleteIOFuture(ex);
IOFuture future = new CompletedIOFuture(ex);
assertTrue(future.isComplete());
try

View File

@ -29,7 +29,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.CompleteIOFuture;
import org.eclipse.jetty.io.CompletedIOFuture;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IOFuture;
import org.eclipse.jetty.io.DispatchedIOFuture;
@ -458,7 +458,7 @@ public class HttpConnection extends AbstractAsyncConnection
{
if (BufferUtil.hasContent(b2))
return _endp.write(b2);
return CompleteIOFuture.COMPLETE;
return CompletedIOFuture.COMPLETE;
}
}
}