jetty-9 removed IOFuture and added FutureCallback instead

This commit is contained in:
Greg Wilkins 2012-05-09 16:57:18 +02:00
parent 775376ac98
commit 4caced258f
10 changed files with 599 additions and 896 deletions

View File

@ -1,5 +1,7 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -10,6 +12,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class); private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class);
private final AsyncEndPoint _endp; private final AsyncEndPoint _endp;
private final ReadCallback _readCallback = new ReadCallback(); private final ReadCallback _readCallback = new ReadCallback();
private final AtomicBoolean _readInterested = new AtomicBoolean();
public AbstractAsyncConnection(AsyncEndPoint endp) public AbstractAsyncConnection(AsyncEndPoint endp)
{ {
@ -53,29 +56,35 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
_endp.shutdownOutput(); _endp.shutdownOutput();
} }
public IOFuture scheduleOnReadable() /* ------------------------------------------------------------ */
public void scheduleOnReadable()
{ {
IOFuture read=getEndPoint().readable(); if (_readInterested.compareAndSet(false,true))
read.setCallback(_readCallback, null); getEndPoint().readable(null,_readCallback);
return read;
} }
/* ------------------------------------------------------------ */
public void onReadFail(Throwable cause) public void onReadFail(Throwable cause)
{ {
LOG.debug("read failed: "+cause); LOG.debug("read failed: "+cause);
} }
/* ------------------------------------------------------------ */
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x", getClass().getSimpleName(), hashCode()); return String.format("%s@%x", getClass().getSimpleName(), hashCode());
} }
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class ReadCallback implements Callback<Void> private class ReadCallback implements Callback<Void>
{ {
@Override @Override
public void completed(Void context) public void completed(Void context)
{ {
if (_readInterested.compareAndSet(true,false))
onReadable(); onReadable();
} }

View File

@ -1,242 +1,55 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import static org.eclipse.jetty.io.DoneIOFuture.COMPLETE;
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint
{ {
public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class); public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class);
private final Lock _lock = new ReentrantLock(); @Override
private volatile boolean _idlecheck; public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
private volatile AsyncConnection _connection;
private DispatchingIOFuture _readFuture;
private DispatchingIOFuture _writeFuture;
private ByteBuffer[] _writeBuffers;
public AsyncConnection getAsyncConnection()
{ {
return _connection; // TODO Auto-generated method stub
}
protected void dispatch(Runnable task)
{
new Thread(task).start();
}
public void setAsyncConnection(AsyncConnection connection)
{
_connection=connection;
} }
@Override @Override
public IOFuture readable() throws IllegalStateException public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
{ {
_lock.lock(); // TODO Auto-generated method stub
try
{
if (_readFuture!=null && !_readFuture.isDone())
throw new IllegalStateException("previous read not complete");
_readFuture=new ReadFuture();
// TODO
return _readFuture;
}
finally
{
_lock.unlock();
}
}
@Override
public IOFuture write(ByteBuffer... buffers) throws IllegalStateException
{
_lock.lock();
try
{
if (_writeFuture!=null && !_writeFuture.isDone())
throw new IllegalStateException("previous write not complete");
flush(buffers);
// Are we complete?
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
_writeBuffers=buffers;
_writeFuture=new WriteFuture();
// TODO
return _writeFuture;
}
}
return COMPLETE;
}
catch(IOException e)
{
return new DoneIOFuture(e);
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
private void completeWrite()
{
try
{
flush(_writeBuffers);
// Are we complete?
for (ByteBuffer b : _writeBuffers)
{
if (b.hasRemaining())
{
// TODO
return;
}
}
// we are complete and ready
_writeFuture.complete();
}
catch(final IOException e)
{
_writeBuffers=null;
if (!_writeFuture.isDone())
_writeFuture.fail(e);
}
} }
/* ------------------------------------------------------------ */
@Override @Override
public void setCheckForIdle(boolean check) public void setCheckForIdle(boolean check)
{ {
_idlecheck=check; // TODO Auto-generated method stub
} }
/* ------------------------------------------------------------ */
@Override @Override
public boolean isCheckForIdle() public boolean isCheckForIdle()
{ {
return _idlecheck; // TODO Auto-generated method stub
} return false;
/* ------------------------------------------------------------ */
public void checkForIdleOrReadWriteTimeout(long now)
{
if (_idlecheck || !_readFuture.isDone() || !_writeFuture.isDone())
{
long idleTimestamp=getIdleTimestamp();
long max_idle_time=getMaxIdleTime();
if (idleTimestamp!=0 && max_idle_time>0)
{
long idleForMs=now-idleTimestamp;
if (idleForMs>max_idle_time)
{
_lock.lock();
try
{
if (_idlecheck)
_connection.onIdleExpired(idleForMs);
if (!_readFuture.isDone())
_readFuture.fail(new TimeoutException());
if (!_writeFuture.isDone())
_writeFuture.fail(new TimeoutException());
notIdle();
}
finally
{
_lock.unlock();
}
}
}
}
}
private final class WriteFuture extends DispatchingIOFuture
{
private WriteFuture()
{
super(_lock);
} }
@Override @Override
protected void dispatch(Runnable task) public AsyncConnection getAsyncConnection()
{ {
AsyncByteArrayEndPoint.this.dispatch(task); // TODO Auto-generated method stub
return null;
} }
@Override @Override
public void cancel() public void setAsyncConnection(AsyncConnection connection)
{ {
_lock.lock(); // TODO Auto-generated method stub
try
{
// TODO
cancelled();
}
finally
{
_lock.unlock();
}
}
}
private final class ReadFuture extends DispatchingIOFuture
{
private ReadFuture()
{
super(_lock);
}
@Override
protected void dispatch(Runnable task)
{
AsyncByteArrayEndPoint.this.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
// TODO ??
cancelled();
}
finally
{
_lock.unlock();
}
}
} }

View File

@ -4,104 +4,40 @@ import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.eclipse.jetty.util.Callback;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/**Asynchronous End Point /**Asynchronous End Point
* <p> * <p>
* This extension of EndPoint provides asynchronous scheduling methods. * This extension of EndPoint provides asynchronous scheduling methods.
* The design of these has been influenced by NIO.2 Futures and Completion * The design of these has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because: they have * handlers, but does not use those actual interfaces because: they have
* some inefficiencies (eg buffers must be allocated before read); they have * some inefficiencies.
* unrequired overheads due to their generic nature (passing of attachments
* and returning operation counts); there is no need to pass timeouts as
* {@link EndPoint#getMaxIdleTime() is used.
* <p>
* The intent of this API is that it can be used in either: a polling mode (like {@link Future})
* ; in a callback mode (like {@link CompletionHandler} mode; or blocking mod;e or a hybrid mode
* <h3>Blocking read</h3>
* <pre>
* endpoint.readable().block();
* endpoint.fill(buffer);
* </pre>
* <h3>Polling read</h3>
* <pre>
* IOFuture read = endpoint.readable();
* ...
* while (!read.isComplete())
* Thread.sleep(10);
* endpoint.fill(buffer);
* </pre>
* <h3>Callback read</h3>
* <pre>
* endpoint.readable().setCallback(new IOCallback()
* {
* public void onReady() { endpoint.fill(buffer); ... }
* public void onFail(IOException e) { ... }
* }
* </pre>
* *
* <h3>Blocking write</h3>
* <pre>
* endpoint.write(buffer).block();
* </pre>
* <h3>Polling write</h3>
* <pre>
* IOFuture write = endpoint.write(buffer);
* ...
* while (!write.isComplete())
* Thread.sleep(10);
*
* </pre>
* <h3>Callback write</h3>
* <pre>
* endpoint.write(buffer0,buffer1).setCallback(new IOCallback()
* {
* public void onReady() { ... }
* public void onFail(IOException e) { ... }
* }
* </pre>
* <h3>Hybrid write</h3>
* <pre>
* IOFuture write = endpoint.write(buffer);
* // wait a little bit
* if (!write.block(10,TimeUnit.MILLISECONDS))
* {
* // still not ready, so organize a callback
* write.setHandler(new IOCallback()
* {
* public void onReady() { ... }
* public void onFail(IOException e) { ... }
* });
* ...
* </pre>
*
* <h2>Compatibility Notes</h2>
* Some Async IO APIs have the concept of setting read interest. With this
* API calling {@link #readable()} is equivalent to setting read interest to true
* and calling {@link IOFuture#cancel()} is equivalent to setting read interest
* to false.
*/ */
public interface AsyncEndPoint extends EndPoint public interface AsyncEndPoint extends EndPoint
{ {
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Schedule a read operation. /** Asynchronous a readable notification.
* <p> * <p>
* This method allows a {@link #fill(ByteBuffer)} operation to be scheduled * This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF.
* with either blocking, polling or callback semantics. * @param context Context to return via the callback
* @return an {@link IOFuture} instance that will be ready when a call to {@link #fill(ByteBuffer)} will * @param callback The callback to call when an error occurs or we are readable.
* return immediately with data without blocking. * @throws IllegalStateException if another read operation is concurrent.
* @throws IllegalStateException if another read operation has been scheduled and has not timedout, been cancelled or is ready.
*/ */
IOFuture readable() throws IllegalStateException; <C> void readable(C context, Callback<C> callback) throws IllegalStateException;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /** Asynchronous write operation.
* This method performs {@link #flush(ByteBuffer...)} operations and allows the completion of * <p>
* the entire write to be scheduled with blocking, polling or callback semantics. * This method performs {@link #flush(ByteBuffer...)} operation(s) and do a callback when all the data
* has been flushed or an error occurs.
* @param context Context to return via the callback
* @param callback The callback to call when an error occurs or we are readable.
* @param buffers One or more {@link ByteBuffer}s that will be flushed. * @param buffers One or more {@link ByteBuffer}s that will be flushed.
* @return an {@link IOFuture} instance that will be ready when all the data in the buffers passed has been consumed by * @throws IllegalStateException if another write operation is concurrent.
* one or more calls to {@link #flush(ByteBuffer...)}.
*/ */
IOFuture write(ByteBuffer... buffers) throws IllegalStateException; <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Set if the endpoint should be checked for idleness /** Set if the endpoint should be checked for idleness

View File

@ -0,0 +1,101 @@
package org.eclipse.jetty.io;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
public class FutureCallback<C> implements Future<C>,Callback<C>
{
private enum State {NOT_DONE,DOING,DONE};
private final AtomicReference<State> _state=new AtomicReference<>(State.NOT_DONE);
private final CountDownLatch _done= new CountDownLatch(1);
private Throwable _cause;
private C _context;
private boolean _completed;
@Override
public void completed(C context)
{
if (_state.compareAndSet(State.NOT_DONE,State.DOING))
{
_context=context;
_completed=true;
if (_state.compareAndSet(State.DOING,State.DONE))
{
_done.countDown();
return;
}
}
else if (!isCancelled())
throw new IllegalStateException();
}
@Override
public void failed(C context, Throwable cause)
{
if (_state.compareAndSet(State.NOT_DONE,State.DOING))
{
_context=context;
_cause=cause;
if (_state.compareAndSet(State.DOING,State.DONE))
{
_done.countDown();
return;
}
}
else if (!isCancelled())
throw new IllegalStateException();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
failed(null,new CancellationException());
return false;
}
@Override
public boolean isCancelled()
{
return State.DONE.equals(_state.get())&&_cause instanceof CancellationException;
}
@Override
public boolean isDone()
{
return State.DONE.equals(_state.get());
}
@Override
public C get() throws InterruptedException, ExecutionException
{
_done.await();
if (_completed)
return _context;
if (_cause instanceof CancellationException)
throw (CancellationException) new CancellationException().initCause(_cause);
throw new ExecutionException(_cause);
}
@Override
public C get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
if (!_done.await(timeout,unit))
throw new TimeoutException();
if (_completed)
return _context;
if (_cause instanceof TimeoutException)
throw (TimeoutException)_cause;
if (_cause instanceof CancellationException)
throw (CancellationException) new CancellationException().initCause(_cause);
throw new ExecutionException(_cause);
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task; import org.eclipse.jetty.util.thread.Timeout.Task;
@ -31,7 +32,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
/** /**
* An Endpoint that can be scheduled by {@link SelectorManager}. * An Endpoint that can be scheduled by {@link SelectorManager}.
*/ */
public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableAsyncEndPoint
{ {
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
@ -40,8 +41,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private final SelectorManager.SelectSet _selectSet; private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager; private final SelectorManager _manager;
private DispatchingIOFuture _readFuture = new DispatchingIOFuture(true,_lock); private Callback _readCallback;
private DispatchingIOFuture _writeFuture = new DispatchingIOFuture(true,_lock); private Object _readContext;
private Callback _writeCallback;
private Object _writeContext;
private SelectionKey _key; private SelectionKey _key;
@ -73,6 +76,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public AsyncConnection getAsyncConnection() public AsyncConnection getAsyncConnection()
{ {
return _connection; return _connection;
@ -94,6 +98,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public void setAsyncConnection(AsyncConnection connection) public void setAsyncConnection(AsyncConnection connection)
{ {
AsyncConnection old = getAsyncConnection(); AsyncConnection old = getAsyncConnection();
@ -107,6 +112,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
* Called by selectSet to schedule handling * Called by selectSet to schedule handling
* *
*/ */
@Override
public void onSelected() public void onSelected()
{ {
synchronized (_lock) synchronized (_lock)
@ -126,10 +132,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
boolean can_write = (_key.isWritable() && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE); boolean can_write = (_key.isWritable() && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE);
_interestOps = 0; _interestOps = 0;
if (can_read && !_readFuture.isDone()) if (can_read)
_readFuture.complete(); readCompleted();
if (can_write)
if (can_write && _writeBuffers != null)
completeWrite(); completeWrite();
} }
@ -168,9 +173,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public void checkForIdleOrReadWriteTimeout(long now) public void checkForIdleOrReadWriteTimeout(long now)
{ {
if (_idlecheck || !_readFuture.isDone() || !_writeFuture.isDone()) synchronized (_lock)
{
if (_idlecheck || _readCallback!=null || _writeCallback!=null)
{ {
long idleTimestamp = getIdleTimestamp(); long idleTimestamp = getIdleTimestamp();
long max_idle_time = getMaxIdleTime(); long max_idle_time = getMaxIdleTime();
@ -181,21 +189,77 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
if (idleForMs > max_idle_time) if (idleForMs > max_idle_time)
{ {
synchronized (_lock) notIdle();
{
if (_idlecheck) if (_idlecheck)
_connection.onIdleExpired(idleForMs); _connection.onIdleExpired(idleForMs);
if (!_readFuture.isDone())
_readFuture.fail(new TimeoutException()); TimeoutException timeout = new TimeoutException();
if (!_writeFuture.isDone()) readFailed(timeout);
_writeFuture.fail(new TimeoutException()); writeFailed(timeout);
notIdle();
} }
} }
} }
} }
} }
/* ------------------------------------------------------------ */
private void readCompleted()
{
if (_readCallback!=null)
{
Callback cb=_readCallback;
Object ctx=_readContext;
_readCallback=null;
_readContext=null;
System.err.printf("ReadComplete %s %s%n",ctx,cb);
cb.completed(ctx); // TODO after lock released?
}
}
/* ------------------------------------------------------------ */
private void writeCompleted()
{
if (_writeCallback!=null)
{
Callback cb=_writeCallback;
Object ctx=_writeContext;
_writeCallback=null;
_writeContext=null;
_writeBuffers=null;
System.err.printf("writeComplete %s %s%n",ctx,cb);
cb.completed(ctx); // TODO after lock released?
}
}
/* ------------------------------------------------------------ */
private void readFailed(Throwable cause)
{
if (_readCallback!=null)
{
Callback cb=_readCallback;
Object ctx=_readContext;
_readCallback=null;
_readContext=null;
System.err.printf("ReadFail %s %s%n",ctx,cb);
cb.failed(ctx,cause); // TODO after lock released?
}
}
/* ------------------------------------------------------------ */
private void writeFailed(Throwable cause)
{
if (_writeCallback!=null)
{
Callback cb=_writeCallback;
Object ctx=_writeContext;
_writeCallback=null;
_writeContext=null;
System.err.printf("writeFailed %s %s%n",ctx,cb);
cb.failed(ctx,cause); // TODO after lock released?
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public int fill(ByteBuffer buffer) throws IOException public int fill(ByteBuffer buffer) throws IOException
@ -208,30 +272,28 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public IOFuture readable() throws IllegalStateException public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
{ {
synchronized (_lock) synchronized (_lock)
{ {
if (_readFuture != null && !_readFuture.isDone()) if (_readCallback != null)
throw new IllegalStateException("previous read not complete"); throw new IllegalStateException("previous read not complete");
_readContext=context;
_readFuture = new InterestedFuture(SelectionKey.OP_READ); _readCallback=callback;
_interestOps = _interestOps | SelectionKey.OP_READ; _interestOps=_interestOps | SelectionKey.OP_READ;
updateKey(); updateKey();
return _readFuture;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public IOFuture write(ByteBuffer... buffers) public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
{ {
synchronized (_lock) synchronized (_lock)
{ {
try try
{ {
if (_writeFuture != null && !_writeFuture.isDone()) if (_writeCallback!=null)
throw new IllegalStateException("previous write not complete"); throw new IllegalStateException("previous write not complete");
flush(buffers); flush(buffers);
@ -241,18 +303,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{ {
if (b.hasRemaining()) if (b.hasRemaining())
{ {
_writeBuffers = buffers; _writeBuffers=buffers;
_writeFuture = new InterestedFuture(SelectionKey.OP_WRITE); _writeContext=context;
_writeCallback=callback;
_interestOps = _interestOps | SelectionKey.OP_WRITE; _interestOps = _interestOps | SelectionKey.OP_WRITE;
updateKey(); updateKey();
return _writeFuture; return;
} }
} }
return DoneIOFuture.COMPLETE;
callback.completed(context);
} }
catch (IOException e) catch (IOException e)
{ {
return new DoneIOFuture(e); callback.failed(context,e);
} }
} }
} }
@ -260,6 +324,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private void completeWrite() private void completeWrite()
{ {
if (_writeBuffers==null)
return;
try try
{ {
flush(_writeBuffers); flush(_writeBuffers);
@ -274,13 +341,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
} }
} }
// we are complete and ready // we are complete and ready
_writeFuture.complete(); writeCompleted();
} }
catch (final IOException e) catch (IOException e)
{ {
_writeBuffers = null; writeFailed(e);
if (!_writeFuture.isDone())
_writeFuture.fail(e);
} }
} }
@ -332,7 +397,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/** /**
* Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
*/ */
void doUpdateKey() @Override
public void doUpdateKey()
{ {
synchronized (_lock) synchronized (_lock)
{ {
@ -449,7 +515,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
} }
return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}",hashCode(),getRemoteAddress(),getLocalAddress(),isOpen(), return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}",hashCode(),getRemoteAddress(),getLocalAddress(),isOpen(),
isInputShutdown(),isOutputShutdown(),_interestOps,keyString,_readFuture,_writeFuture,getAsyncConnection()); isInputShutdown(),isOutputShutdown(),_interestOps,keyString,_readCallback,_writeCallback,getAsyncConnection());
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -458,38 +524,4 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
return _selectSet; return _selectSet;
} }
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class InterestedFuture extends DispatchingIOFuture
{
private final int _interest;
private InterestedFuture(int interest)
{
super(_lock);
_interest = interest;
}
@Override
protected void dispatch(Runnable task)
{
if (!_manager.dispatch(task))
{
LOG.warn("Dispatch failed: i=" + _interest);
throw new IllegalStateException();
}
}
@Override
public void cancel()
{
synchronized (_lock)
{
_interestOps = _interestOps & ~_interest;
updateKey();
cancelled();
}
}
}
} }

View File

@ -351,7 +351,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @return the new endpoint {@link SelectChannelEndPoint} * @return the new endpoint {@link SelectChannelEndPoint}
* @throws IOException * @throws IOException
*/ */
protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; protected abstract SelectableAsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
@ -453,7 +453,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (change instanceof EndPoint) if (change instanceof EndPoint)
{ {
// Update the operations for a key. // Update the operations for a key.
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; SelectableAsyncEndPoint endpoint = (SelectableAsyncEndPoint)change;
ch=endpoint.getChannel(); ch=endpoint.getChannel();
endpoint.doUpdateKey(); endpoint.doUpdateKey();
} }
@ -595,17 +595,17 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (!key.isValid()) if (!key.isValid())
{ {
key.cancel(); key.cancel();
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); SelectableAsyncEndPoint endpoint = (SelectableAsyncEndPoint)key.attachment();
if (endpoint != null) if (endpoint != null)
endpoint.doUpdateKey(); endpoint.doUpdateKey();
continue; continue;
} }
Object att = key.attachment(); Object att = key.attachment();
if (att instanceof SelectChannelEndPoint) if (att instanceof SelectableAsyncEndPoint)
{ {
if (key.isReadable()||key.isWritable()) if (key.isReadable()||key.isWritable())
((SelectChannelEndPoint)att).onSelected(); ((SelectableAsyncEndPoint)att).onSelected();
} }
else if (key.isConnectable()) else if (key.isConnectable())
{ {
@ -628,7 +628,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
AsyncEndPoint endpoint = createEndPoint(channel, key); AsyncEndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint); key.attach(endpoint);
// TODO: remove the cast // TODO: remove the cast
((SelectChannelEndPoint)endpoint).onSelected(); ((SelectableAsyncEndPoint)endpoint).onSelected();
} }
else else
{ {
@ -645,7 +645,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (key.isReadable()) if (key.isReadable())
{ {
// TODO: remove the cast // TODO: remove the cast
((SelectChannelEndPoint)endpoint).onSelected(); ((SelectableAsyncEndPoint)endpoint).onSelected();
} }
} }
key = null; key = null;
@ -705,7 +705,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
for (AsyncEndPoint endp:_endPoints.keySet()) for (AsyncEndPoint endp:_endPoints.keySet())
{ {
// TODO: remove the cast // TODO: remove the cast
((SelectChannelEndPoint)endp).checkForIdleOrReadWriteTimeout(idle_now); ((SelectableAsyncEndPoint)endp).checkForIdleOrReadWriteTimeout(idle_now);
} }
} }
public String toString() {return "Idle-"+super.toString();} public String toString() {return "Idle-"+super.toString();}
@ -840,7 +840,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void destroyEndPoint(SelectChannelEndPoint endp) public void destroyEndPoint(SelectableAsyncEndPoint endp)
{ {
LOG.debug("destroyEndPoint {}",endp); LOG.debug("destroyEndPoint {}",endp);
_endPoints.remove(endp); _endPoints.remove(endp);
@ -1020,4 +1020,15 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private interface ChangeTask extends Runnable private interface ChangeTask extends Runnable
{} {}
// TODO review this interface
public interface SelectableAsyncEndPoint extends AsyncEndPoint
{
void onSelected();
Channel getChannel();
void doUpdateKey();
void checkForIdleOrReadWriteTimeout(long idle_now);
}
} }

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
@ -45,10 +46,10 @@ public class SslConnection extends AbstractAsyncConnection
private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>(); private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
private final Lock _lock = new ReentrantLock(); private final Lock _lock = new ReentrantLock();
private final AtomicBoolean _writing = new AtomicBoolean();
private final NetWriteCallback _netWriteCallback = new NetWriteCallback(); private final NetWriteCallback _netWriteCallback = new NetWriteCallback();
private DispatchingIOFuture _appReadFuture = new DispatchingIOFuture(true,_lock);
private DispatchingIOFuture _appWriteFuture = new DispatchingIOFuture(true,_lock);
private final SSLEngine _engine; private final SSLEngine _engine;
private final SSLSession _session; private final SSLSession _session;
@ -63,14 +64,15 @@ public class SslConnection extends AbstractAsyncConnection
private boolean _allowRenegotiate=true; private boolean _allowRenegotiate=true;
private boolean _handshook; private boolean _handshook;
private boolean _oshut; private boolean _oshut;
private IOFuture _netReadFuture;
private IOFuture _netWriteFuture;
private final class NetWriteCallback implements Callback<Void> private final class NetWriteCallback implements Callback<Void>
{ {
@Override @Override
public void completed(Void context) public void completed(Void context)
{ {
if (_writing.compareAndSet(true,false))
_appEndPoint.completeWrite(); _appEndPoint.completeWrite();
} }
@ -78,10 +80,8 @@ public class SslConnection extends AbstractAsyncConnection
public void failed(Void context, Throwable cause) public void failed(Void context, Throwable cause)
{ {
LOG.debug("write FAILED",cause); LOG.debug("write FAILED",cause);
if (!_appWriteFuture.isDone()) if (_writing.compareAndSet(true,false))
_appWriteFuture.fail(cause); _appEndPoint.writeFailed(cause);
else
LOG.warn("write FAILED",cause);
} }
} }
@ -233,7 +233,6 @@ public class SslConnection extends AbstractAsyncConnection
{ {
LOG.debug("onReadable {}",this); LOG.debug("onReadable {}",this);
_netReadFuture=null;
allocateBuffers(); allocateBuffers();
boolean progress=true; boolean progress=true;
@ -262,10 +261,8 @@ public class SslConnection extends AbstractAsyncConnection
finally finally
{ {
releaseBuffers(); releaseBuffers();
if (_appReadFuture!=null && !_appReadFuture.isDone() && _netReadFuture==null && !BufferUtil.isFull(_inNet)) if (_appEndPoint._readCallback!=null && !BufferUtil.isFull(_inNet))
_netReadFuture=scheduleOnReadable(); scheduleOnReadable();
LOG.debug("!onReadable {} {}",this,_netReadFuture);
_lock.unlock(); _lock.unlock();
} }
@ -279,8 +276,7 @@ public class SslConnection extends AbstractAsyncConnection
_lock.lock(); _lock.lock();
try try
{ {
if (!_appReadFuture.isDone()) _appEndPoint.readFailed(cause);
_appReadFuture.fail(cause);
} }
finally finally
{ {
@ -382,8 +378,8 @@ public class SslConnection extends AbstractAsyncConnection
finally finally
{ {
// Has the net data consumed allowed us to release net backpressure? // Has the net data consumed allowed us to release net backpressure?
if (BufferUtil.compact(_inNet) && !_appReadFuture.isDone() && _netReadFuture==null) if (BufferUtil.compact(_inNet) && _appEndPoint._readCallback!=null)
_netReadFuture=scheduleOnReadable(); scheduleOnReadable();
releaseBuffers(); releaseBuffers();
_lock.unlock(); _lock.unlock();
@ -393,7 +389,7 @@ public class SslConnection extends AbstractAsyncConnection
private boolean wrap(final ByteBuffer outApp) throws IOException private boolean wrap(final ByteBuffer outApp) throws IOException
{ {
if (_netWriteFuture!=null && !_netWriteFuture.isDone()) if (_writing.get())
return false; return false;
final SSLEngineResult result; final SSLEngineResult result;
@ -440,15 +436,8 @@ public class SslConnection extends AbstractAsyncConnection
throw new IOException(result.toString()); throw new IOException(result.toString());
} }
if (BufferUtil.hasContent(_outNet)) if (BufferUtil.hasContent(_outNet) && _writing.compareAndSet(false,true))
{ _endp.write(null,_netWriteCallback,_outNet);
IOFuture write =_endp.write(_outNet);
if (write.isDone())
return true;
_netWriteFuture=write;
_netWriteFuture.setCallback(_netWriteCallback, null);
}
return result.bytesConsumed()>0 || result.bytesProduced()>0 ; return result.bytesConsumed()>0 || result.bytesProduced()>0 ;
} }
@ -457,9 +446,8 @@ public class SslConnection extends AbstractAsyncConnection
{ {
if (BufferUtil.isEmpty(_inNet)) if (BufferUtil.isEmpty(_inNet))
{ {
if (_netReadFuture==null) scheduleOnReadable();
_netReadFuture=scheduleOnReadable(); LOG.debug("{} unwrap read {}",_session);
LOG.debug("{} unwrap read {}",_session,_netReadFuture);
return false; return false;
} }
@ -495,8 +483,8 @@ public class SslConnection extends AbstractAsyncConnection
// need to wait for more net data // need to wait for more net data
if (_endp.isInputShutdown()) if (_endp.isInputShutdown())
_inNet.clear().limit(0); _inNet.clear().limit(0);
else if (_netReadFuture==null) else
_netReadFuture=scheduleOnReadable(); scheduleOnReadable();
break; break;
@ -522,8 +510,8 @@ public class SslConnection extends AbstractAsyncConnection
} }
// If any bytes were produced and we have an app read waiting, make it ready. // If any bytes were produced and we have an app read waiting, make it ready.
if (result.bytesProduced()>0 && !_appReadFuture.isDone()) if (result.bytesProduced()>0)
_appReadFuture.complete(); _appEndPoint.readCompleted();
return result.bytesConsumed()>0 || result.bytesProduced()>0; return result.bytesConsumed()>0 || result.bytesProduced()>0;
} }
@ -546,12 +534,71 @@ public class SslConnection extends AbstractAsyncConnection
public class AppEndPoint extends AbstractEndPoint implements AsyncEndPoint public class AppEndPoint extends AbstractEndPoint implements AsyncEndPoint
{ {
ByteBuffer[] _writeBuffers; ByteBuffer[] _writeBuffers;
private Callback _readCallback;
private Object _readContext;
private Callback _writeCallback;
private Object _writeContext;
AppEndPoint() AppEndPoint()
{ {
super(_endp.getLocalAddress(),_endp.getRemoteAddress()); super(_endp.getLocalAddress(),_endp.getRemoteAddress());
} }
/* ------------------------------------------------------------ */
private void readCompleted()
{
if (_readCallback!=null)
{
Callback cb=_readCallback;
Object ctx=_readContext;
_readCallback=null;
_readContext=null;
cb.completed(ctx); // TODO after lock released?
}
}
/* ------------------------------------------------------------ */
private void writeCompleted()
{
if (_writeCallback!=null)
{
Callback cb=_writeCallback;
Object ctx=_writeContext;
_writeCallback=null;
_writeContext=null;
cb.completed(ctx); // TODO after lock released?
}
}
/* ------------------------------------------------------------ */
private void readFailed(Throwable cause)
{
if (_readCallback!=null)
{
Callback cb=_readCallback;
Object ctx=_readContext;
_readCallback=null;
_readContext=null;
cb.failed(ctx,cause); // TODO after lock released?
}
}
/* ------------------------------------------------------------ */
private void writeFailed(Throwable cause)
{
if (_writeCallback!=null)
{
Callback cb=_writeCallback;
Object ctx=_writeContext;
_writeCallback=null;
_writeContext=null;
cb.failed(ctx,cause); // TODO after lock released?
}
}
public SSLEngine getSslEngine() public SSLEngine getSslEngine()
{ {
return _engine; return _engine;
@ -712,7 +759,7 @@ public class SslConnection extends AbstractAsyncConnection
_engine.getHandshakeStatus(), _engine.getHandshakeStatus(),
i, o, u, i, o, u,
_endp.isInputShutdown(), _oshut, _endp.isInputShutdown(), _oshut,
_appReadFuture,_appWriteFuture, _readCallback,_writeCallback,
_appConnection); _appConnection);
} }
@ -730,21 +777,25 @@ public class SslConnection extends AbstractAsyncConnection
} }
@Override @Override
public IOFuture readable() throws IllegalStateException public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
{ {
LOG.debug("{} sslEndp.read()",_session);
_lock.lock(); _lock.lock();
try try
{ {
if (_readCallback != null)
throw new IllegalStateException("previous read not complete");
// Do we already have application input data? // Do we already have application input data?
if (BufferUtil.hasContent(_inApp)) if (BufferUtil.hasContent(_inApp))
return DoneIOFuture.COMPLETE; {
callback.completed(context);
return;
}
// No, we need to schedule a network read // No, we need to schedule a network read
_appReadFuture=new DispatchingIOFuture(_lock); _readContext=context;
if (_netReadFuture==null) _readCallback=callback;
_netReadFuture=scheduleOnReadable(); scheduleOnReadable();
return _appReadFuture;
} }
finally finally
{ {
@ -753,12 +804,12 @@ public class SslConnection extends AbstractAsyncConnection
} }
@Override @Override
public IOFuture write(ByteBuffer... buffers) public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
{ {
_lock.lock(); _lock.lock();
try try
{ {
if (!_appWriteFuture.isDone()) if (_writeCallback!=null)
throw new IllegalStateException("previous write not complete"); throw new IllegalStateException("previous write not complete");
// Try to process all // Try to process all
@ -769,15 +820,16 @@ public class SslConnection extends AbstractAsyncConnection
if (b.hasRemaining()) if (b.hasRemaining())
{ {
_writeBuffers=buffers; _writeBuffers=buffers;
_appWriteFuture=new DispatchingIOFuture(_lock); _writeContext=context;
return _appWriteFuture; _writeCallback=callback;
return;
} }
} }
return DoneIOFuture.COMPLETE; callback.completed(context);
} }
catch (IOException e) catch (IOException e)
{ {
return new DoneIOFuture(e); callback.failed(context,e);
} }
finally finally
{ {
@ -798,11 +850,11 @@ public class SslConnection extends AbstractAsyncConnection
if (b.hasRemaining()) if (b.hasRemaining())
return; return;
} }
_appWriteFuture.complete(); writeCompleted();
} }
catch (IOException e) catch (IOException e)
{ {
_appWriteFuture.fail(e); writeFailed(e);
} }
finally finally
{ {

View File

@ -0,0 +1,197 @@
package org.eclipse.jetty.io;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
public class FutureCallbackTest
{
@Test
public void testNotDone()
{
FutureCallback<String> fcb= new FutureCallback<>();
Assert.assertFalse(fcb.isDone());
Assert.assertFalse(fcb.isCancelled());
}
@Test
public void testGetNotDone() throws Exception
{
FutureCallback<String> fcb= new FutureCallback<>();
long start=System.currentTimeMillis();
try
{
fcb.get(500,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(TimeoutException e)
{
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(50L));
}
@Test
public void testDone() throws Exception
{
FutureCallback<String> fcb= new FutureCallback<>();
fcb.completed("Ctx");
Assert.assertTrue(fcb.isDone());
Assert.assertFalse(fcb.isCancelled());
long start=System.currentTimeMillis();
Assert.assertEquals("Ctx",fcb.get());
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
}
@Test
public void testGetDone() throws Exception
{
final FutureCallback<String> fcb= new FutureCallback<>();
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable(){
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.completed("Ctx");
}
}).start();
latch.await();
long start=System.currentTimeMillis();
Assert.assertEquals("Ctx",fcb.get(10000,TimeUnit.MILLISECONDS));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
Assert.assertTrue(fcb.isDone());
Assert.assertFalse(fcb.isCancelled());
}
@Test
public void testFailed() throws Exception
{
FutureCallback<String> fcb= new FutureCallback<>();
Exception ex=new Exception("FAILED");
fcb.failed("Ctx",ex);
Assert.assertTrue(fcb.isDone());
Assert.assertFalse(fcb.isCancelled());
long start=System.currentTimeMillis();
try
{
fcb.get();
Assert.fail();
}
catch(ExecutionException ee)
{
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
}
@Test
public void testGetFailed() throws Exception
{
final FutureCallback<String> fcb= new FutureCallback<>();
final Exception ex=new Exception("FAILED");
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable(){
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.failed("Ctx",ex);
}
}).start();
latch.await();
long start=System.currentTimeMillis();
try
{
fcb.get(10000,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(ExecutionException ee)
{
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
Assert.assertTrue(fcb.isDone());
Assert.assertFalse(fcb.isCancelled());
}
@Test
public void testCancelled() throws Exception
{
FutureCallback<String> fcb= new FutureCallback<>();
fcb.cancel(true);
Assert.assertTrue(fcb.isDone());
Assert.assertTrue(fcb.isCancelled());
long start=System.currentTimeMillis();
try
{
fcb.get();
Assert.fail();
}
catch(CancellationException e)
{
Assert.assertThat(e.getCause(),Matchers.instanceOf(CancellationException.class));
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
}
@Test
public void testGetCancelled() throws Exception
{
final FutureCallback<String> fcb= new FutureCallback<>();
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable(){
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.cancel(true);
}
}).start();
latch.await();
long start=System.currentTimeMillis();
try
{
fcb.get(10000,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(CancellationException e)
{
Assert.assertThat(e.getCause(),Matchers.instanceOf(CancellationException.class));
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
Assert.assertTrue(fcb.isDone());
Assert.assertTrue(fcb.isCancelled());
}
}

View File

@ -1,459 +0,0 @@
package org.eclipse.jetty.io;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.hamcrest.number.OrderingComparison.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class IOFutureTest
{
@Test
public void testReadyCompleted() throws Exception
{
IOFuture future = new DoneIOFuture();
assertTrue(future.isDone());
assertTrue(future.isComplete());
long start=System.currentTimeMillis();
future.block();
Assert.assertThat(System.currentTimeMillis()-start,lessThan(10L));
start=System.currentTimeMillis();
future.block(1000,TimeUnit.MILLISECONDS);
Assert.assertThat(System.currentTimeMillis()-start,lessThan(10L));
final AtomicBoolean ready = new AtomicBoolean(false);
final AtomicReference<Throwable> fail = new AtomicReference<>();
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
ready.set(true);
}
@Override
public void failed(Object context, Throwable cause)
{
fail.set(cause);
}
}, null);
assertTrue(ready.get());
assertNull(fail.get());
}
@Test
public void testFailedCompleted() throws Exception
{
Exception ex=new Exception("failed");
IOFuture future = new DoneIOFuture(ex);
assertTrue(future.isDone());
try
{
future.isComplete();
Assert.fail();
}
catch(ExecutionException e)
{
Assert.assertEquals(ex,e.getCause());
}
long start=System.currentTimeMillis();
try
{
future.block();
Assert.fail();
}
catch(ExecutionException e)
{
Assert.assertEquals(ex,e.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,lessThan(10L));
start=System.currentTimeMillis();
try
{
future.block(1000,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(ExecutionException e)
{
Assert.assertEquals(ex,e.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,lessThan(10L));
final AtomicBoolean ready = new AtomicBoolean(false);
final AtomicReference<Throwable> fail = new AtomicReference<>();
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
ready.set(true);
}
@Override
public void failed(Object context, Throwable cause)
{
fail.set(cause);
}
}, null);
assertFalse(ready.get());
assertEquals(ex,fail.get());
}
@Test
public void testInCompleted() throws Exception
{
IOFuture future = new DispatchingIOFuture();
assertFalse(future.isDone());
assertFalse(future.isComplete());
long start=System.currentTimeMillis();
future.block(10,TimeUnit.MILLISECONDS);
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(9L));
final AtomicBoolean ready = new AtomicBoolean(false);
final AtomicReference<Throwable> fail = new AtomicReference<>();
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
ready.set(true);
}
@Override
public void failed(Object context, Throwable cause)
{
fail.set(cause);
}
}, null);
assertFalse(ready.get());
assertNull(fail.get());
}
@Test
public void testBlockWaitsWhenNotCompleted() throws Exception
{
DispatchingIOFuture future = new DispatchingIOFuture();
assertFalse(future.isDone());
assertFalse(future.isComplete());
final AtomicBoolean completed = new AtomicBoolean(false);
final AtomicReference<Throwable> failure = new AtomicReference<>();
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
completed.set(true);
}
@Override
public void failed(Object context, Throwable cause)
{
failure.set(cause);
}
}, null);
long sleep = 1000;
long start = System.nanoTime();
assertFalse(future.block(sleep, TimeUnit.MILLISECONDS));
assertThat(System.nanoTime() - start, greaterThan(TimeUnit.MILLISECONDS.toNanos(sleep / 2)));
assertFalse(completed.get());
assertNull(failure.get());
}
@Test
public void testTimedBlockWokenUpWhenCompleted() throws Exception
{
final DispatchingIOFuture future = new DispatchingIOFuture();
final CountDownLatch completed = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
completed.countDown();
}
@Override
public void failed(Object context, Throwable cause)
{
failure.set(cause);
}
}, null);
long start = System.nanoTime();
final long delay = 500;
new Thread()
{
@Override
public void run()
{
try
{
// Want the call to block() below to happen before the call to complete() here
TimeUnit.MILLISECONDS.sleep(delay);
future.complete();
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}.start();
assertTrue(future.block(delay * 4, TimeUnit.MILLISECONDS));
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertThat(elapsed, greaterThan(delay / 2));
Assert.assertThat(elapsed, lessThan(delay * 2));
assertTrue(future.isDone());
assertTrue(future.isComplete());
assertTrue(completed.await(delay * 4, TimeUnit.MILLISECONDS));
assertNull(failure.get());
}
@Test
public void testBlockWokenUpWhenCompleted() throws Exception
{
final DispatchingIOFuture future = new DispatchingIOFuture();
final CountDownLatch completed = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
completed.countDown();
}
@Override
public void failed(Object context, Throwable cause)
{
failure.set(cause);
}
}, null);
final long delay = 500;
long start = System.nanoTime();
new Thread()
{
@Override
public void run()
{
try
{
// Want the call to block() below to happen before the call to complete() here
TimeUnit.MILLISECONDS.sleep(delay);
future.complete();
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}.start();
future.block();
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertThat(elapsed, greaterThan(delay / 2));
Assert.assertThat(elapsed, lessThan(delay * 2));
assertTrue(future.isDone());
assertTrue(future.isComplete());
assertTrue(completed.await(delay * 4, TimeUnit.MILLISECONDS));
assertNull(failure.get());
}
@Test
public void testTimedBlockWokenUpOnFailure() throws Exception
{
final DispatchingIOFuture future = new DispatchingIOFuture();
final Exception ex = new Exception("failed");
final AtomicBoolean completed = new AtomicBoolean(false);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final CountDownLatch failureLatch = new CountDownLatch(1);
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
completed.set(true);
}
@Override
public void failed(Object context, Throwable x)
{
failure.set(x);
failureLatch.countDown();
}
}, null);
final long delay = 500;
long start = System.nanoTime();
new Thread()
{
@Override
public void run()
{
try
{
// Want the call to block() below to happen before the call to fail() here
TimeUnit.MILLISECONDS.sleep(delay);
future.fail(ex);
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}.start();
try
{
future.block(delay * 4, TimeUnit.MILLISECONDS);
Assert.fail();
}
catch (ExecutionException e)
{
Assert.assertSame(ex, e.getCause());
}
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertThat(elapsed, greaterThan(delay / 2));
Assert.assertThat(elapsed, lessThan(delay * 2));
assertTrue(future.isDone());
try
{
future.isComplete();
Assert.fail();
}
catch (ExecutionException e)
{
assertSame(ex, e.getCause());
}
assertFalse(completed.get());
assertTrue(failureLatch.await(delay * 4, TimeUnit.MILLISECONDS));
assertSame(ex, failure.get());
}
@Test
public void testBlockWokenUpOnFailure() throws Exception
{
final DispatchingIOFuture future = new DispatchingIOFuture();
final Exception ex = new Exception("failed");
final AtomicBoolean completed = new AtomicBoolean(false);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final CountDownLatch failureLatch = new CountDownLatch(1);
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
completed.set(true);
}
@Override
public void failed(Object context, Throwable x)
{
failure.set(x);
failureLatch.countDown();
}
}, null);
final long delay = 500;
long start = System.nanoTime();
new Thread()
{
@Override
public void run()
{
try
{
// Want the call to block() below to happen before the call to fail() here
TimeUnit.MILLISECONDS.sleep(delay);
future.fail(ex);
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}.start();
try
{
future.block();
Assert.fail();
}
catch (ExecutionException e)
{
Assert.assertSame(ex, e.getCause());
}
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertThat(elapsed, greaterThan(delay / 2));
Assert.assertThat(elapsed, lessThan(delay * 2));
assertTrue(future.isDone());
try
{
future.isComplete();
Assert.fail();
}
catch (ExecutionException e)
{
assertSame(ex, e.getCause());
}
assertFalse(completed.get());
assertTrue(failureLatch.await(delay * 4, TimeUnit.MILLISECONDS));
assertSame(ex, failure.get());
}
}

View File

@ -138,12 +138,17 @@ public class SelectChannelEndPointTest
int filled=_endp.fill(_in); int filled=_endp.fill(_in);
if (filled>0) if (filled>0)
progress=true; progress=true;
System.err.println("filled "+filled);
// If the tests wants to block, then block // If the tests wants to block, then block
while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt) while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt)
{ {
_endp.readable().block(); FutureCallback<Void> blockingRead= new FutureCallback<>();
System.err.println("blocking read on "+blockingRead);
_endp.readable(null,blockingRead);
blockingRead.get();
filled=_endp.fill(_in); filled=_endp.fill(_in);
System.err.println("FILLED "+filled);
progress|=filled>0; progress|=filled>0;
} }
@ -157,7 +162,11 @@ public class SelectChannelEndPointTest
ByteBuffer out=_out.duplicate(); ByteBuffer out=_out.duplicate();
BufferUtil.clear(_out); BufferUtil.clear(_out);
for (int i=0;i<_writeCount;i++) for (int i=0;i<_writeCount;i++)
_endp.write(out.asReadOnlyBuffer()).block(); {
FutureCallback<Void> blockingWrite= new FutureCallback<>();
_endp.write(null,blockingWrite,out.asReadOnlyBuffer());
blockingWrite.get();
}
progress=true; progress=true;
} }
@ -171,7 +180,9 @@ public class SelectChannelEndPointTest
// Timeout does not close, so echo exception then shutdown // Timeout does not close, so echo exception then shutdown
try try
{ {
_endp.write(BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in))).block(); FutureCallback<Void> blockingWrite= new FutureCallback<>();
_endp.write(null,blockingWrite,BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in)));
blockingWrite.get();
_endp.shutdownOutput(); _endp.shutdownOutput();
} }
catch(Exception e2) catch(Exception e2)