jetty-9 AsyncByteArrayEndPointTest

This commit is contained in:
Greg Wilkins 2012-05-11 00:30:28 +02:00
parent 6d70fa1c2a
commit bd00e3615d
6 changed files with 233 additions and 51 deletions

View File

@ -1,7 +1,13 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -9,48 +15,160 @@ import org.eclipse.jetty.util.log.Logger;
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint
{
public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class);
private static final Timer _timer = new Timer();
private boolean _checkForIdle;
private AsyncConnection _connection;
private final ReadInterest _readInterest = new ReadInterest()
{
@Override
protected boolean readIsPossible() throws IOException
{
if (_closed)
throw new ClosedChannelException();
return _in==null || BufferUtil.hasContent(_in);
}
};
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
protected void scheduleCompleteWrite()
{
}
};
public AsyncByteArrayEndPoint()
{
super();
}
public AsyncByteArrayEndPoint(byte[] input, int outputSize)
{
super(input,outputSize);
}
public AsyncByteArrayEndPoint(String input, int outputSize)
{
super(input,outputSize);
}
@Override
public void setInput(ByteBuffer in)
{
super.setInput(in);
if (in==null || BufferUtil.hasContent(in))
_readInterest.readable();
}
@Override
public ByteBuffer takeOutput()
{
ByteBuffer b = super.takeOutput();
_writeFlusher.completeWrite();
return b;
}
@Override
public void setOutput(ByteBuffer out)
{
super.setOutput(out);
_writeFlusher.completeWrite();
}
@Override
public void reset()
{
_readInterest.close();
_writeFlusher.close();
super.reset();
}
@Override
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
{
// TODO Auto-generated method stub
_readInterest.readable(context,callback);
}
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
{
// TODO Auto-generated method stub
_writeFlusher.write(context,callback,buffers);
}
@Override
public void setCheckForIdle(boolean check)
{
// TODO Auto-generated method stub
_checkForIdle=check;
if (check)
{
final TimerTask task=new TimerTask()
{
@Override
public void run()
{
checkForIdleOrReadWriteTimeout(System.currentTimeMillis());
if (_checkForIdle)
_timer.schedule(this,1000);
}
};
_timer.schedule(task,1000);
}
}
@Override
public boolean isCheckForIdle()
{
// TODO Auto-generated method stub
return false;
return _checkForIdle;
}
@Override
public AsyncConnection getAsyncConnection()
{
// TODO Auto-generated method stub
return null;
return _connection;
}
@Override
public void setAsyncConnection(AsyncConnection connection)
{
// TODO Auto-generated method stub
_connection=connection;
}
public void checkForIdleOrReadWriteTimeout(long now)
{
synchronized (this)
{
if (_checkForIdle || _readInterest.isInterested() || _writeFlusher.isWriting())
{
long idleTimestamp = getIdleTimestamp();
long max_idle_time = getMaxIdleTime();
if (idleTimestamp != 0 && max_idle_time > 0)
{
long idleForMs = now - idleTimestamp;
if (idleForMs > max_idle_time)
{
notIdle();
if (_checkForIdle)
_connection.onIdleExpired(idleForMs);
TimeoutException timeout = new TimeoutException();
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
}
}
@Override
public void onClose()
{
setCheckForIdle(false);
super.onClose();
}
}

View File

@ -2,6 +2,8 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import java.util.concurrent.Future;
import org.eclipse.jetty.util.Callback;
@ -23,9 +25,9 @@ public interface AsyncEndPoint extends EndPoint
* This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF.
* @param context Context to return via the callback
* @param callback The callback to call when an error occurs or we are readable.
* @throws IllegalStateException if another read operation is concurrent.
* @throws ReadPendingException if another read operation is concurrent.
*/
<C> void readable(C context, Callback<C> callback) throws IllegalStateException;
<C> void readable(C context, Callback<C> callback) throws ReadPendingException;
/* ------------------------------------------------------------ */
/** Asynchronous write operation.
@ -35,9 +37,9 @@ public interface AsyncEndPoint extends EndPoint
* @param context Context to return via the callback
* @param callback The callback to call when an error occurs or we are readable.
* @param buffers One or more {@link ByteBuffer}s that will be flushed.
* @throws IllegalStateException if another write operation is concurrent.
* @throws WritePendingException if another write operation is concurrent.
*/
<C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException;
<C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException;
/* ------------------------------------------------------------ */
/** Set if the endpoint should be checked for idleness

View File

@ -144,9 +144,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public String takeOutputString()
{
String s=BufferUtil.toString(_out,StringUtil.__UTF8_CHARSET);
BufferUtil.clear(_out);
return s;
ByteBuffer buffer=takeOutput();
return BufferUtil.toString(buffer,StringUtil.__UTF8_CHARSET);
}
/* ------------------------------------------------------------ */

View File

@ -1,5 +1,6 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadPendingException;
import java.util.concurrent.TimeoutException;
@ -7,7 +8,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.Callback;
public class ReadInterest
/* ------------------------------------------------------------ */
/**
* A Utility class to help implement {@link AsyncEndPoint#readable(Object, Callback)}
* by keeping and calling the context and callback objects.
*/
public abstract class ReadInterest
{
private final AtomicBoolean _interested = new AtomicBoolean(false);
private volatile Callback _readCallback;
@ -25,13 +32,20 @@ public class ReadInterest
throw new ReadPendingException();
_readContext=context;
_readCallback=callback;
if (makeInterested())
completed();
try
{
if (readIsPossible())
readable();
}
catch(IOException e)
{
failed(e);
}
}
/* ------------------------------------------------------------ */
public void completed()
public void readable()
{
if (_interested.compareAndSet(true,false))
{
@ -76,9 +90,6 @@ public class ReadInterest
}
/* ------------------------------------------------------------ */
protected boolean makeInterested()
{
throw new IllegalStateException("Unimplemented");
}
abstract protected boolean readIsPossible() throws IOException;
}

View File

@ -37,8 +37,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
{
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
private final Object _lock = this;
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
@ -59,7 +57,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
private final ReadInterest _readInterest = new ReadInterest()
{
@Override
protected boolean makeInterested()
protected boolean readIsPossible()
{
_interestOps=_interestOps | SelectionKey.OP_READ;
updateKey();
@ -133,7 +131,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
boolean can_read;
boolean can_write;
synchronized (_lock)
synchronized (this)
{
_selected = true;
try
@ -153,7 +151,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
}
}
if (can_read)
_readInterest.completed();
_readInterest.readable();
if (can_write)
_writeFlusher.completeWrite();
}
@ -188,7 +186,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
@Override
public void checkForIdleOrReadWriteTimeout(long now)
{
synchronized (_lock)
synchronized (this)
{
if (_idlecheck || _readInterest.isInterested() || _writeFlusher.isWriting())
{
@ -208,7 +206,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
TimeoutException timeout = new TimeoutException();
_readInterest.failed(timeout);
_writeFlusher.failWrite(timeout);
_writeFlusher.failed(timeout);
}
}
}
@ -256,7 +254,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
*/
private void updateKey()
{
synchronized (_lock)
synchronized (this)
{
if (!_selected)
{
@ -290,7 +288,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
@Override
public void doUpdateKey()
{
synchronized (_lock)
synchronized (this)
{
_changing = false;
if (getChannel().isOpen())

View File

@ -7,10 +7,22 @@ import java.nio.channels.WritePendingException;
import java.util.ConcurrentModificationException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class WriteFlusher
/* ------------------------------------------------------------ */
/**
* A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #scheduleCompleteWrite()} is called when not all content has been
* written after a call to flush and should organise for the {@link #completeWrite()}
* method to be called when a subsequent call to flush should be able to make more progress.
*
*/
abstract public class WriteFlusher
{
private final static ByteBuffer[] NO_BUFFERS= new ByteBuffer[0];
private final AtomicBoolean _writing = new AtomicBoolean(false);
private final EndPoint _endp;
@ -41,7 +53,7 @@ public class WriteFlusher
_writeContext=context;
_writeCallback=callback;
scheduleCompleteWrite();
_writing.set(true);
_writing.set(true); // Needed as memory barrier
return;
}
}
@ -59,20 +71,47 @@ public class WriteFlusher
}
/* ------------------------------------------------------------ */
protected void scheduleCompleteWrite()
{
// _interestOps = _interestOps | SelectionKey.OP_WRITE;
// updateKey();
}
abstract protected void scheduleCompleteWrite();
/* ------------------------------------------------------------ */
public void completeWrite()
/* Remove empty buffers from the start of a multi buffer array
*/
private ByteBuffer[] compact(ByteBuffer[] buffers)
{
if (buffers.length<2)
return buffers;
int b=0;
while (b<buffers.length && BufferUtil.isEmpty(buffers[b]))
b++;
if (b==0)
return buffers;
if (b==buffers.length)
return NO_BUFFERS;
ByteBuffer[] compact=new ByteBuffer[buffers.length-b];
for (int i=0;i<compact.length;i++)
compact[i]=buffers[b+i];
return compact;
}
/* ------------------------------------------------------------ */
/**
* Complete a write that has not completed and that called
* {@link #scheduleCompleteWrite()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)}
* is likely to be able to progress.
* @return true if a write was in progress
*/
public boolean completeWrite()
{
if (!_writing.get())
return;
return false;
try
{
_writeBuffers=compact(_writeBuffers);
_endp.flush(_writeBuffers);
// Are we complete?
@ -81,7 +120,7 @@ public class WriteFlusher
if (b.hasRemaining())
{
scheduleCompleteWrite();
return;
return true;
}
}
@ -90,6 +129,7 @@ public class WriteFlusher
Object context=_writeContext;
_writeBuffers=null;
_writeCallback=null;
_writeContext=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.completed(context);
@ -104,30 +144,44 @@ public class WriteFlusher
throw new ConcurrentModificationException();
callback.failed(context,e);
}
return true;
}
/* ------------------------------------------------------------ */
public void failWrite(Throwable th)
/**
* Fail the write in progress and cause any calls to get to throw
* the cause wrapped as an execution exception.
* @return true if a write was in progress
*/
public boolean failed(Throwable cause)
{
if (!_writing.compareAndSet(true,false))
return;
return false;
Callback callback=_writeCallback;
Object context=_writeContext;
_writeBuffers=null;
_writeCallback=null;
callback.failed(context,th);
callback.failed(context,cause);
return true;
}
/* ------------------------------------------------------------ */
public void close()
/**
* Fail the write with a {@link ClosedChannelException}. This is similar
* to a call to {@link #failed(Throwable)}, except that the exception is
* not instantiated unless a write was in progress.
* @return true if a write was in progress
*/
public boolean close()
{
if (!_writing.compareAndSet(true,false))
return;
return false;
Callback callback=_writeCallback;
Object context=_writeContext;
_writeBuffers=null;
_writeCallback=null;
callback.failed(context,new ClosedChannelException());
return true;
}
/* ------------------------------------------------------------ */