From 4caced258f3c0fa34a5c7fb0af541a9b8b164801 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 9 May 2012 16:57:18 +0200 Subject: [PATCH] jetty-9 removed IOFuture and added FutureCallback instead --- .../jetty/io/AbstractAsyncConnection.java | 23 +- .../jetty/io/AsyncByteArrayEndPoint.java | 227 +-------- .../org/eclipse/jetty/io/AsyncEndPoint.java | 100 +--- .../org/eclipse/jetty/io/FutureCallback.java | 101 ++++ .../jetty/io/SelectChannelEndPoint.java | 184 ++++--- .../org/eclipse/jetty/io/SelectorManager.java | 29 +- .../org/eclipse/jetty/io/SslConnection.java | 158 ++++-- .../eclipse/jetty/io/FutureCallbackTest.java | 197 ++++++++ .../org/eclipse/jetty/io/IOFutureTest.java | 459 ------------------ .../jetty/io/SelectChannelEndPointTest.java | 17 +- 10 files changed, 599 insertions(+), 896 deletions(-) create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/FutureCallback.java create mode 100644 jetty-io/src/test/java/org/eclipse/jetty/io/FutureCallbackTest.java delete mode 100644 jetty-io/src/test/java/org/eclipse/jetty/io/IOFutureTest.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java index 7f6c3df78a7..2e46b48e6ea 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java @@ -1,5 +1,7 @@ package org.eclipse.jetty.io; +import java.util.concurrent.atomic.AtomicBoolean; + import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -10,6 +12,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class); private final AsyncEndPoint _endp; private final ReadCallback _readCallback = new ReadCallback(); + private final AtomicBoolean _readInterested = new AtomicBoolean(); public AbstractAsyncConnection(AsyncEndPoint endp) { @@ -52,31 +55,37 @@ public abstract class AbstractAsyncConnection implements AsyncConnection else _endp.shutdownOutput(); } - - public IOFuture scheduleOnReadable() + + /* ------------------------------------------------------------ */ + public void scheduleOnReadable() { - IOFuture read=getEndPoint().readable(); - read.setCallback(_readCallback, null); - return read; + if (_readInterested.compareAndSet(false,true)) + getEndPoint().readable(null,_readCallback); } - + + /* ------------------------------------------------------------ */ public void onReadFail(Throwable cause) { LOG.debug("read failed: "+cause); } + /* ------------------------------------------------------------ */ @Override public String toString() { return String.format("%s@%x", getClass().getSimpleName(), hashCode()); } + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ private class ReadCallback implements Callback { @Override public void completed(Void context) { - onReadable(); + if (_readInterested.compareAndSet(true,false)) + onReadable(); } @Override diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index e17c3f24549..f1b6bcb0983 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -1,242 +1,55 @@ package org.eclipse.jetty.io; -import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import static org.eclipse.jetty.io.DoneIOFuture.COMPLETE; - public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint { public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class); - private final Lock _lock = new ReentrantLock(); - private volatile boolean _idlecheck; - - private volatile AsyncConnection _connection; - - private DispatchingIOFuture _readFuture; - private DispatchingIOFuture _writeFuture; - - private ByteBuffer[] _writeBuffers; - - - public AsyncConnection getAsyncConnection() + @Override + public void readable(C context, Callback callback) throws IllegalStateException { - return _connection; - } - - protected void dispatch(Runnable task) - { - new Thread(task).start(); - } - - public void setAsyncConnection(AsyncConnection connection) - { - _connection=connection; + // TODO Auto-generated method stub + } @Override - public IOFuture readable() throws IllegalStateException + public void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException { - _lock.lock(); - try - { - if (_readFuture!=null && !_readFuture.isDone()) - throw new IllegalStateException("previous read not complete"); - - _readFuture=new ReadFuture(); - - // TODO - - return _readFuture; - } - finally - { - _lock.unlock(); - } + // TODO Auto-generated method stub + } - @Override - public IOFuture write(ByteBuffer... buffers) throws IllegalStateException - { - _lock.lock(); - try - { - if (_writeFuture!=null && !_writeFuture.isDone()) - throw new IllegalStateException("previous write not complete"); - - flush(buffers); - - // Are we complete? - for (ByteBuffer b : buffers) - { - if (b.hasRemaining()) - { - _writeBuffers=buffers; - _writeFuture=new WriteFuture(); - // TODO - return _writeFuture; - } - } - return COMPLETE; - } - catch(IOException e) - { - return new DoneIOFuture(e); - } - finally - { - _lock.unlock(); - } - } - - /* ------------------------------------------------------------ */ - private void completeWrite() - { - try - { - flush(_writeBuffers); - - // Are we complete? - for (ByteBuffer b : _writeBuffers) - { - if (b.hasRemaining()) - { - // TODO - return; - } - } - // we are complete and ready - _writeFuture.complete(); - } - catch(final IOException e) - { - _writeBuffers=null; - if (!_writeFuture.isDone()) - _writeFuture.fail(e); - } - - - } - - /* ------------------------------------------------------------ */ @Override public void setCheckForIdle(boolean check) { - _idlecheck=check; + // TODO Auto-generated method stub + } - /* ------------------------------------------------------------ */ @Override public boolean isCheckForIdle() { - return _idlecheck; + // TODO Auto-generated method stub + return false; } - - - - /* ------------------------------------------------------------ */ - public void checkForIdleOrReadWriteTimeout(long now) + @Override + public AsyncConnection getAsyncConnection() { - if (_idlecheck || !_readFuture.isDone() || !_writeFuture.isDone()) - { - long idleTimestamp=getIdleTimestamp(); - long max_idle_time=getMaxIdleTime(); - - if (idleTimestamp!=0 && max_idle_time>0) - { - long idleForMs=now-idleTimestamp; - - if (idleForMs>max_idle_time) - { - _lock.lock(); - try - { - if (_idlecheck) - _connection.onIdleExpired(idleForMs); - if (!_readFuture.isDone()) - _readFuture.fail(new TimeoutException()); - if (!_writeFuture.isDone()) - _writeFuture.fail(new TimeoutException()); - - notIdle(); - } - finally - { - _lock.unlock(); - } - } - } - } + // TODO Auto-generated method stub + return null; } - - private final class WriteFuture extends DispatchingIOFuture + @Override + public void setAsyncConnection(AsyncConnection connection) { - private WriteFuture() - { - super(_lock); - } - - @Override - protected void dispatch(Runnable task) - { - AsyncByteArrayEndPoint.this.dispatch(task); - } - - @Override - public void cancel() - { - _lock.lock(); - try - { - // TODO - cancelled(); - } - finally - { - _lock.unlock(); - } - } - } - - - - - private final class ReadFuture extends DispatchingIOFuture - { - private ReadFuture() - { - super(_lock); - } - - @Override - protected void dispatch(Runnable task) - { - AsyncByteArrayEndPoint.this.dispatch(task); - } - - @Override - public void cancel() - { - _lock.lock(); - try - { - // TODO ?? - cancelled(); - } - finally - { - _lock.unlock(); - } - } + // TODO Auto-generated method stub + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java index 95d48fa6686..57964bb7dfc 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java @@ -4,104 +4,40 @@ import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.concurrent.Future; +import org.eclipse.jetty.util.Callback; + /* ------------------------------------------------------------ */ /**Asynchronous End Point *

* This extension of EndPoint provides asynchronous scheduling methods. * The design of these has been influenced by NIO.2 Futures and Completion * handlers, but does not use those actual interfaces because: they have - * some inefficiencies (eg buffers must be allocated before read); they have - * unrequired overheads due to their generic nature (passing of attachments - * and returning operation counts); there is no need to pass timeouts as - * {@link EndPoint#getMaxIdleTime() is used. - *

- * The intent of this API is that it can be used in either: a polling mode (like {@link Future}) - * ; in a callback mode (like {@link CompletionHandler} mode; or blocking mod;e or a hybrid mode - *

Blocking read

- *
- * endpoint.readable().block();
- * endpoint.fill(buffer);
- * 
- *

Polling read

- *
- * IOFuture read = endpoint.readable();
- * ...
- * while (!read.isComplete())
- *   Thread.sleep(10);
- * endpoint.fill(buffer);
- * 
- *

Callback read

- *
- * endpoint.readable().setCallback(new IOCallback()
- * {
- *   public void onReady() { endpoint.fill(buffer); ... }
- *   public void onFail(IOException e) { ... }
- * }
- * 
- * - *

Blocking write

- *
- * endpoint.write(buffer).block();
- * 
- *

Polling write

- *
- * IOFuture write = endpoint.write(buffer);
- * ...
- * while (!write.isComplete())
- *   Thread.sleep(10);
- *
- * 
- *

Callback write

- *
- * endpoint.write(buffer0,buffer1).setCallback(new IOCallback()
- * {
- *   public void onReady() { ... }
- *   public void onFail(IOException e) { ... }
- * }
- * 
- *

Hybrid write

- *
- * IOFuture write = endpoint.write(buffer);
- * // wait a little bit
- * if (!write.block(10,TimeUnit.MILLISECONDS))
- * {
- *   // still not ready, so organize a callback
- *   write.setHandler(new IOCallback()
- *   {
- *     public void onReady() { ... }
- *     public void onFail(IOException e) { ... }
- *   });
- * ...
- * 
- * - *

Compatibility Notes

- * Some Async IO APIs have the concept of setting read interest. With this - * API calling {@link #readable()} is equivalent to setting read interest to true - * and calling {@link IOFuture#cancel()} is equivalent to setting read interest - * to false. + * some inefficiencies. + * */ public interface AsyncEndPoint extends EndPoint { /* ------------------------------------------------------------ */ - /** Schedule a read operation. + /** Asynchronous a readable notification. *

- * This method allows a {@link #fill(ByteBuffer)} operation to be scheduled - * with either blocking, polling or callback semantics. - * @return an {@link IOFuture} instance that will be ready when a call to {@link #fill(ByteBuffer)} will - * return immediately with data without blocking. - * @throws IllegalStateException if another read operation has been scheduled and has not timedout, been cancelled or is ready. + * This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF. + * @param context Context to return via the callback + * @param callback The callback to call when an error occurs or we are readable. + * @throws IllegalStateException if another read operation is concurrent. */ - IOFuture readable() throws IllegalStateException; + void readable(C context, Callback callback) throws IllegalStateException; /* ------------------------------------------------------------ */ - /** - * This method performs {@link #flush(ByteBuffer...)} operations and allows the completion of - * the entire write to be scheduled with blocking, polling or callback semantics. + /** Asynchronous write operation. + *

+ * This method performs {@link #flush(ByteBuffer...)} operation(s) and do a callback when all the data + * has been flushed or an error occurs. + * @param context Context to return via the callback + * @param callback The callback to call when an error occurs or we are readable. * @param buffers One or more {@link ByteBuffer}s that will be flushed. - * @return an {@link IOFuture} instance that will be ready when all the data in the buffers passed has been consumed by - * one or more calls to {@link #flush(ByteBuffer...)}. + * @throws IllegalStateException if another write operation is concurrent. */ - IOFuture write(ByteBuffer... buffers) throws IllegalStateException; + void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException; /* ------------------------------------------------------------ */ /** Set if the endpoint should be checked for idleness diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FutureCallback.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FutureCallback.java new file mode 100644 index 00000000000..c500e441252 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FutureCallback.java @@ -0,0 +1,101 @@ +package org.eclipse.jetty.io; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.util.Callback; + +public class FutureCallback implements Future,Callback +{ + private enum State {NOT_DONE,DOING,DONE}; + private final AtomicReference _state=new AtomicReference<>(State.NOT_DONE); + private final CountDownLatch _done= new CountDownLatch(1); + private Throwable _cause; + private C _context; + private boolean _completed; + + + @Override + public void completed(C context) + { + if (_state.compareAndSet(State.NOT_DONE,State.DOING)) + { + _context=context; + _completed=true; + if (_state.compareAndSet(State.DOING,State.DONE)) + { + _done.countDown(); + return; + } + } + else if (!isCancelled()) + throw new IllegalStateException(); + } + + @Override + public void failed(C context, Throwable cause) + { + if (_state.compareAndSet(State.NOT_DONE,State.DOING)) + { + _context=context; + _cause=cause; + if (_state.compareAndSet(State.DOING,State.DONE)) + { + _done.countDown(); + return; + } + } + else if (!isCancelled()) + throw new IllegalStateException(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + failed(null,new CancellationException()); + return false; + } + + @Override + public boolean isCancelled() + { + return State.DONE.equals(_state.get())&&_cause instanceof CancellationException; + } + + @Override + public boolean isDone() + { + return State.DONE.equals(_state.get()); + } + + @Override + public C get() throws InterruptedException, ExecutionException + { + _done.await(); + if (_completed) + return _context; + if (_cause instanceof CancellationException) + throw (CancellationException) new CancellationException().initCause(_cause); + throw new ExecutionException(_cause); + } + + @Override + public C get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + if (!_done.await(timeout,unit)) + throw new TimeoutException(); + if (_completed) + return _context; + if (_cause instanceof TimeoutException) + throw (TimeoutException)_cause; + if (_cause instanceof CancellationException) + throw (CancellationException) new CancellationException().initCause(_cause); + throw new ExecutionException(_cause); + } + +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index bacf6779490..208ca9f8ecd 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jetty.io.SelectorManager.SelectSet; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Timeout.Task; @@ -31,7 +32,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task; /** * An Endpoint that can be scheduled by {@link SelectorManager}. */ -public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint +public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableAsyncEndPoint { public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); @@ -40,8 +41,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; - private DispatchingIOFuture _readFuture = new DispatchingIOFuture(true,_lock); - private DispatchingIOFuture _writeFuture = new DispatchingIOFuture(true,_lock); + private Callback _readCallback; + private Object _readContext; + private Callback _writeCallback; + private Object _writeContext; private SelectionKey _key; @@ -73,6 +76,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo } /* ------------------------------------------------------------ */ + @Override public AsyncConnection getAsyncConnection() { return _connection; @@ -94,6 +98,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo } /* ------------------------------------------------------------ */ + @Override public void setAsyncConnection(AsyncConnection connection) { AsyncConnection old = getAsyncConnection(); @@ -107,6 +112,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo * Called by selectSet to schedule handling * */ + @Override public void onSelected() { synchronized (_lock) @@ -126,10 +132,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo boolean can_write = (_key.isWritable() && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE); _interestOps = 0; - if (can_read && !_readFuture.isDone()) - _readFuture.complete(); - - if (can_write && _writeBuffers != null) + if (can_read) + readCompleted(); + if (can_write) completeWrite(); } @@ -168,33 +173,92 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo } /* ------------------------------------------------------------ */ + @Override public void checkForIdleOrReadWriteTimeout(long now) { - if (_idlecheck || !_readFuture.isDone() || !_writeFuture.isDone()) + synchronized (_lock) { - long idleTimestamp = getIdleTimestamp(); - long max_idle_time = getMaxIdleTime(); - - if (idleTimestamp != 0 && max_idle_time > 0) + if (_idlecheck || _readCallback!=null || _writeCallback!=null) { - long idleForMs = now - idleTimestamp; + long idleTimestamp = getIdleTimestamp(); + long max_idle_time = getMaxIdleTime(); - if (idleForMs > max_idle_time) + if (idleTimestamp != 0 && max_idle_time > 0) { - synchronized (_lock) + long idleForMs = now - idleTimestamp; + + if (idleForMs > max_idle_time) { + notIdle(); + if (_idlecheck) _connection.onIdleExpired(idleForMs); - if (!_readFuture.isDone()) - _readFuture.fail(new TimeoutException()); - if (!_writeFuture.isDone()) - _writeFuture.fail(new TimeoutException()); - notIdle(); + + TimeoutException timeout = new TimeoutException(); + readFailed(timeout); + writeFailed(timeout); } } } } } + + /* ------------------------------------------------------------ */ + private void readCompleted() + { + if (_readCallback!=null) + { + Callback cb=_readCallback; + Object ctx=_readContext; + _readCallback=null; + _readContext=null; + System.err.printf("ReadComplete %s %s%n",ctx,cb); + cb.completed(ctx); // TODO after lock released? + } + } + + /* ------------------------------------------------------------ */ + private void writeCompleted() + { + if (_writeCallback!=null) + { + Callback cb=_writeCallback; + Object ctx=_writeContext; + _writeCallback=null; + _writeContext=null; + _writeBuffers=null; + System.err.printf("writeComplete %s %s%n",ctx,cb); + cb.completed(ctx); // TODO after lock released? + } + } + + /* ------------------------------------------------------------ */ + private void readFailed(Throwable cause) + { + if (_readCallback!=null) + { + Callback cb=_readCallback; + Object ctx=_readContext; + _readCallback=null; + _readContext=null; + System.err.printf("ReadFail %s %s%n",ctx,cb); + cb.failed(ctx,cause); // TODO after lock released? + } + } + + /* ------------------------------------------------------------ */ + private void writeFailed(Throwable cause) + { + if (_writeCallback!=null) + { + Callback cb=_writeCallback; + Object ctx=_writeContext; + _writeCallback=null; + _writeContext=null; + System.err.printf("writeFailed %s %s%n",ctx,cb); + cb.failed(ctx,cause); // TODO after lock released? + } + } /* ------------------------------------------------------------ */ @Override @@ -207,31 +271,29 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo } /* ------------------------------------------------------------ */ - @Override - public IOFuture readable() throws IllegalStateException + @Override + public void readable(C context, Callback callback) throws IllegalStateException { synchronized (_lock) { - if (_readFuture != null && !_readFuture.isDone()) + if (_readCallback != null) throw new IllegalStateException("previous read not complete"); - - _readFuture = new InterestedFuture(SelectionKey.OP_READ); - _interestOps = _interestOps | SelectionKey.OP_READ; + _readContext=context; + _readCallback=callback; + _interestOps=_interestOps | SelectionKey.OP_READ; updateKey(); - - return _readFuture; } } /* ------------------------------------------------------------ */ @Override - public IOFuture write(ByteBuffer... buffers) + public void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException { synchronized (_lock) { try { - if (_writeFuture != null && !_writeFuture.isDone()) + if (_writeCallback!=null) throw new IllegalStateException("previous write not complete"); flush(buffers); @@ -241,18 +303,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo { if (b.hasRemaining()) { - _writeBuffers = buffers; - _writeFuture = new InterestedFuture(SelectionKey.OP_WRITE); + _writeBuffers=buffers; + _writeContext=context; + _writeCallback=callback; _interestOps = _interestOps | SelectionKey.OP_WRITE; updateKey(); - return _writeFuture; + return; } } - return DoneIOFuture.COMPLETE; + + callback.completed(context); } catch (IOException e) { - return new DoneIOFuture(e); + callback.failed(context,e); } } } @@ -260,6 +324,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo /* ------------------------------------------------------------ */ private void completeWrite() { + if (_writeBuffers==null) + return; + try { flush(_writeBuffers); @@ -274,13 +341,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo } } // we are complete and ready - _writeFuture.complete(); + writeCompleted(); } - catch (final IOException e) + catch (IOException e) { - _writeBuffers = null; - if (!_writeFuture.isDone()) - _writeFuture.fail(e); + writeFailed(e); } } @@ -332,7 +397,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo /** * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey */ - void doUpdateKey() + @Override + public void doUpdateKey() { synchronized (_lock) { @@ -449,7 +515,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo } return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}",hashCode(),getRemoteAddress(),getLocalAddress(),isOpen(), - isInputShutdown(),isOutputShutdown(),_interestOps,keyString,_readFuture,_writeFuture,getAsyncConnection()); + isInputShutdown(),isOutputShutdown(),_interestOps,keyString,_readCallback,_writeCallback,getAsyncConnection()); } /* ------------------------------------------------------------ */ @@ -458,38 +524,4 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo return _selectSet; } - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - private class InterestedFuture extends DispatchingIOFuture - { - private final int _interest; - - private InterestedFuture(int interest) - { - super(_lock); - _interest = interest; - } - - @Override - protected void dispatch(Runnable task) - { - if (!_manager.dispatch(task)) - { - LOG.warn("Dispatch failed: i=" + _interest); - throw new IllegalStateException(); - } - } - - @Override - public void cancel() - { - synchronized (_lock) - { - _interestOps = _interestOps & ~_interest; - updateKey(); - cancelled(); - } - } - } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index be1fe2a697b..1a6fb34968d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -351,7 +351,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @return the new endpoint {@link SelectChannelEndPoint} * @throws IOException */ - protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; + protected abstract SelectableAsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; /* ------------------------------------------------------------------------------- */ protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) @@ -453,7 +453,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa if (change instanceof EndPoint) { // Update the operations for a key. - SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; + SelectableAsyncEndPoint endpoint = (SelectableAsyncEndPoint)change; ch=endpoint.getChannel(); endpoint.doUpdateKey(); } @@ -595,17 +595,17 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa if (!key.isValid()) { key.cancel(); - SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); + SelectableAsyncEndPoint endpoint = (SelectableAsyncEndPoint)key.attachment(); if (endpoint != null) endpoint.doUpdateKey(); continue; } Object att = key.attachment(); - if (att instanceof SelectChannelEndPoint) + if (att instanceof SelectableAsyncEndPoint) { if (key.isReadable()||key.isWritable()) - ((SelectChannelEndPoint)att).onSelected(); + ((SelectableAsyncEndPoint)att).onSelected(); } else if (key.isConnectable()) { @@ -628,7 +628,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa AsyncEndPoint endpoint = createEndPoint(channel, key); key.attach(endpoint); // TODO: remove the cast - ((SelectChannelEndPoint)endpoint).onSelected(); + ((SelectableAsyncEndPoint)endpoint).onSelected(); } else { @@ -645,7 +645,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa if (key.isReadable()) { // TODO: remove the cast - ((SelectChannelEndPoint)endpoint).onSelected(); + ((SelectableAsyncEndPoint)endpoint).onSelected(); } } key = null; @@ -705,7 +705,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa for (AsyncEndPoint endp:_endPoints.keySet()) { // TODO: remove the cast - ((SelectChannelEndPoint)endp).checkForIdleOrReadWriteTimeout(idle_now); + ((SelectableAsyncEndPoint)endp).checkForIdleOrReadWriteTimeout(idle_now); } } public String toString() {return "Idle-"+super.toString();} @@ -840,7 +840,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } /* ------------------------------------------------------------ */ - public void destroyEndPoint(SelectChannelEndPoint endp) + public void destroyEndPoint(SelectableAsyncEndPoint endp) { LOG.debug("destroyEndPoint {}",endp); _endPoints.remove(endp); @@ -1020,4 +1020,15 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private interface ChangeTask extends Runnable {} + // TODO review this interface + public interface SelectableAsyncEndPoint extends AsyncEndPoint + { + void onSelected(); + + Channel getChannel(); + + void doUpdateKey(); + + void checkForIdleOrReadWriteTimeout(long idle_now); + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java index 53592105d6b..4cf93bc3a30 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java @@ -15,6 +15,7 @@ package org.eclipse.jetty.io; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLEngine; @@ -45,10 +46,10 @@ public class SslConnection extends AbstractAsyncConnection private static final ThreadLocal __buffers = new ThreadLocal(); private final Lock _lock = new ReentrantLock(); + + private final AtomicBoolean _writing = new AtomicBoolean(); private final NetWriteCallback _netWriteCallback = new NetWriteCallback(); - private DispatchingIOFuture _appReadFuture = new DispatchingIOFuture(true,_lock); - private DispatchingIOFuture _appWriteFuture = new DispatchingIOFuture(true,_lock); private final SSLEngine _engine; private final SSLSession _session; @@ -63,25 +64,24 @@ public class SslConnection extends AbstractAsyncConnection private boolean _allowRenegotiate=true; private boolean _handshook; private boolean _oshut; - private IOFuture _netReadFuture; - private IOFuture _netWriteFuture; + + private final class NetWriteCallback implements Callback { @Override public void completed(Void context) { - _appEndPoint.completeWrite(); + if (_writing.compareAndSet(true,false)) + _appEndPoint.completeWrite(); } @Override public void failed(Void context, Throwable cause) { LOG.debug("write FAILED",cause); - if (!_appWriteFuture.isDone()) - _appWriteFuture.fail(cause); - else - LOG.warn("write FAILED",cause); + if (_writing.compareAndSet(true,false)) + _appEndPoint.writeFailed(cause); } } @@ -233,7 +233,6 @@ public class SslConnection extends AbstractAsyncConnection { LOG.debug("onReadable {}",this); - _netReadFuture=null; allocateBuffers(); boolean progress=true; @@ -262,10 +261,8 @@ public class SslConnection extends AbstractAsyncConnection finally { releaseBuffers(); - if (_appReadFuture!=null && !_appReadFuture.isDone() && _netReadFuture==null && !BufferUtil.isFull(_inNet)) - _netReadFuture=scheduleOnReadable(); - - LOG.debug("!onReadable {} {}",this,_netReadFuture); + if (_appEndPoint._readCallback!=null && !BufferUtil.isFull(_inNet)) + scheduleOnReadable(); _lock.unlock(); } @@ -279,8 +276,7 @@ public class SslConnection extends AbstractAsyncConnection _lock.lock(); try { - if (!_appReadFuture.isDone()) - _appReadFuture.fail(cause); + _appEndPoint.readFailed(cause); } finally { @@ -382,8 +378,8 @@ public class SslConnection extends AbstractAsyncConnection finally { // Has the net data consumed allowed us to release net backpressure? - if (BufferUtil.compact(_inNet) && !_appReadFuture.isDone() && _netReadFuture==null) - _netReadFuture=scheduleOnReadable(); + if (BufferUtil.compact(_inNet) && _appEndPoint._readCallback!=null) + scheduleOnReadable(); releaseBuffers(); _lock.unlock(); @@ -393,7 +389,7 @@ public class SslConnection extends AbstractAsyncConnection private boolean wrap(final ByteBuffer outApp) throws IOException { - if (_netWriteFuture!=null && !_netWriteFuture.isDone()) + if (_writing.get()) return false; final SSLEngineResult result; @@ -440,15 +436,8 @@ public class SslConnection extends AbstractAsyncConnection throw new IOException(result.toString()); } - if (BufferUtil.hasContent(_outNet)) - { - IOFuture write =_endp.write(_outNet); - if (write.isDone()) - return true; - - _netWriteFuture=write; - _netWriteFuture.setCallback(_netWriteCallback, null); - } + if (BufferUtil.hasContent(_outNet) && _writing.compareAndSet(false,true)) + _endp.write(null,_netWriteCallback,_outNet); return result.bytesConsumed()>0 || result.bytesProduced()>0 ; } @@ -457,9 +446,8 @@ public class SslConnection extends AbstractAsyncConnection { if (BufferUtil.isEmpty(_inNet)) { - if (_netReadFuture==null) - _netReadFuture=scheduleOnReadable(); - LOG.debug("{} unwrap read {}",_session,_netReadFuture); + scheduleOnReadable(); + LOG.debug("{} unwrap read {}",_session); return false; } @@ -495,8 +483,8 @@ public class SslConnection extends AbstractAsyncConnection // need to wait for more net data if (_endp.isInputShutdown()) _inNet.clear().limit(0); - else if (_netReadFuture==null) - _netReadFuture=scheduleOnReadable(); + else + scheduleOnReadable(); break; @@ -522,8 +510,8 @@ public class SslConnection extends AbstractAsyncConnection } // If any bytes were produced and we have an app read waiting, make it ready. - if (result.bytesProduced()>0 && !_appReadFuture.isDone()) - _appReadFuture.complete(); + if (result.bytesProduced()>0) + _appEndPoint.readCompleted(); return result.bytesConsumed()>0 || result.bytesProduced()>0; } @@ -546,12 +534,71 @@ public class SslConnection extends AbstractAsyncConnection public class AppEndPoint extends AbstractEndPoint implements AsyncEndPoint { ByteBuffer[] _writeBuffers; - + private Callback _readCallback; + private Object _readContext; + private Callback _writeCallback; + private Object _writeContext; + + AppEndPoint() { super(_endp.getLocalAddress(),_endp.getRemoteAddress()); } + /* ------------------------------------------------------------ */ + private void readCompleted() + { + if (_readCallback!=null) + { + Callback cb=_readCallback; + Object ctx=_readContext; + _readCallback=null; + _readContext=null; + cb.completed(ctx); // TODO after lock released? + } + } + + /* ------------------------------------------------------------ */ + private void writeCompleted() + { + if (_writeCallback!=null) + { + Callback cb=_writeCallback; + Object ctx=_writeContext; + _writeCallback=null; + _writeContext=null; + cb.completed(ctx); // TODO after lock released? + } + } + + /* ------------------------------------------------------------ */ + private void readFailed(Throwable cause) + { + if (_readCallback!=null) + { + Callback cb=_readCallback; + Object ctx=_readContext; + _readCallback=null; + _readContext=null; + cb.failed(ctx,cause); // TODO after lock released? + } + } + + /* ------------------------------------------------------------ */ + private void writeFailed(Throwable cause) + { + if (_writeCallback!=null) + { + Callback cb=_writeCallback; + Object ctx=_writeContext; + _writeCallback=null; + _writeContext=null; + cb.failed(ctx,cause); // TODO after lock released? + } + } + + + public SSLEngine getSslEngine() { return _engine; @@ -712,7 +759,7 @@ public class SslConnection extends AbstractAsyncConnection _engine.getHandshakeStatus(), i, o, u, _endp.isInputShutdown(), _oshut, - _appReadFuture,_appWriteFuture, + _readCallback,_writeCallback, _appConnection); } @@ -729,22 +776,26 @@ public class SslConnection extends AbstractAsyncConnection return _endp.getCreatedTimeStamp(); } - @Override - public IOFuture readable() throws IllegalStateException + @Override + public void readable(C context, Callback callback) throws IllegalStateException { - LOG.debug("{} sslEndp.read()",_session); _lock.lock(); try { + if (_readCallback != null) + throw new IllegalStateException("previous read not complete"); + // Do we already have application input data? if (BufferUtil.hasContent(_inApp)) - return DoneIOFuture.COMPLETE; + { + callback.completed(context); + return; + } // No, we need to schedule a network read - _appReadFuture=new DispatchingIOFuture(_lock); - if (_netReadFuture==null) - _netReadFuture=scheduleOnReadable(); - return _appReadFuture; + _readContext=context; + _readCallback=callback; + scheduleOnReadable(); } finally { @@ -753,12 +804,12 @@ public class SslConnection extends AbstractAsyncConnection } @Override - public IOFuture write(ByteBuffer... buffers) + public void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException { _lock.lock(); try { - if (!_appWriteFuture.isDone()) + if (_writeCallback!=null) throw new IllegalStateException("previous write not complete"); // Try to process all @@ -769,15 +820,16 @@ public class SslConnection extends AbstractAsyncConnection if (b.hasRemaining()) { _writeBuffers=buffers; - _appWriteFuture=new DispatchingIOFuture(_lock); - return _appWriteFuture; + _writeContext=context; + _writeCallback=callback; + return; } } - return DoneIOFuture.COMPLETE; + callback.completed(context); } catch (IOException e) { - return new DoneIOFuture(e); + callback.failed(context,e); } finally { @@ -798,11 +850,11 @@ public class SslConnection extends AbstractAsyncConnection if (b.hasRemaining()) return; } - _appWriteFuture.complete(); + writeCompleted(); } catch (IOException e) { - _appWriteFuture.fail(e); + writeFailed(e); } finally { diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/FutureCallbackTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/FutureCallbackTest.java new file mode 100644 index 00000000000..ff2e925807d --- /dev/null +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/FutureCallbackTest.java @@ -0,0 +1,197 @@ +package org.eclipse.jetty.io; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; + + +import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +public class FutureCallbackTest +{ + @Test + public void testNotDone() + { + FutureCallback fcb= new FutureCallback<>(); + Assert.assertFalse(fcb.isDone()); + Assert.assertFalse(fcb.isCancelled()); + } + + @Test + public void testGetNotDone() throws Exception + { + FutureCallback fcb= new FutureCallback<>(); + + long start=System.currentTimeMillis(); + try + { + fcb.get(500,TimeUnit.MILLISECONDS); + Assert.fail(); + } + catch(TimeoutException e) + { + } + Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(50L)); + } + + @Test + public void testDone() throws Exception + { + FutureCallback fcb= new FutureCallback<>(); + fcb.completed("Ctx"); + Assert.assertTrue(fcb.isDone()); + Assert.assertFalse(fcb.isCancelled()); + + long start=System.currentTimeMillis(); + Assert.assertEquals("Ctx",fcb.get()); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); + } + + @Test + public void testGetDone() throws Exception + { + final FutureCallback fcb= new FutureCallback<>(); + final CountDownLatch latch = new CountDownLatch(1); + + new Thread(new Runnable(){ + public void run() + { + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + fcb.completed("Ctx"); + } + }).start(); + + latch.await(); + long start=System.currentTimeMillis(); + Assert.assertEquals("Ctx",fcb.get(10000,TimeUnit.MILLISECONDS)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); + + Assert.assertTrue(fcb.isDone()); + Assert.assertFalse(fcb.isCancelled()); + } + + + + @Test + public void testFailed() throws Exception + { + FutureCallback fcb= new FutureCallback<>(); + Exception ex=new Exception("FAILED"); + fcb.failed("Ctx",ex); + Assert.assertTrue(fcb.isDone()); + Assert.assertFalse(fcb.isCancelled()); + + long start=System.currentTimeMillis(); + try + { + fcb.get(); + Assert.fail(); + } + catch(ExecutionException ee) + { + Assert.assertEquals(ex,ee.getCause()); + } + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); + } + + @Test + public void testGetFailed() throws Exception + { + final FutureCallback fcb= new FutureCallback<>(); + final Exception ex=new Exception("FAILED"); + final CountDownLatch latch = new CountDownLatch(1); + + new Thread(new Runnable(){ + public void run() + { + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + fcb.failed("Ctx",ex); + } + }).start(); + + latch.await(); + long start=System.currentTimeMillis(); + try + { + fcb.get(10000,TimeUnit.MILLISECONDS); + Assert.fail(); + } + catch(ExecutionException ee) + { + Assert.assertEquals(ex,ee.getCause()); + } + Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); + + Assert.assertTrue(fcb.isDone()); + Assert.assertFalse(fcb.isCancelled()); + } + + + + @Test + public void testCancelled() throws Exception + { + FutureCallback fcb= new FutureCallback<>(); + fcb.cancel(true); + Assert.assertTrue(fcb.isDone()); + Assert.assertTrue(fcb.isCancelled()); + + long start=System.currentTimeMillis(); + try + { + fcb.get(); + Assert.fail(); + } + catch(CancellationException e) + { + Assert.assertThat(e.getCause(),Matchers.instanceOf(CancellationException.class)); + } + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); + } + + @Test + public void testGetCancelled() throws Exception + { + final FutureCallback fcb= new FutureCallback<>(); + final CountDownLatch latch = new CountDownLatch(1); + + new Thread(new Runnable(){ + public void run() + { + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + fcb.cancel(true); + } + }).start(); + + latch.await(); + long start=System.currentTimeMillis(); + try + { + fcb.get(10000,TimeUnit.MILLISECONDS); + Assert.fail(); + } + catch(CancellationException e) + { + Assert.assertThat(e.getCause(),Matchers.instanceOf(CancellationException.class)); + } + Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); + + Assert.assertTrue(fcb.isDone()); + Assert.assertTrue(fcb.isCancelled()); + + } + + +} diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/IOFutureTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/IOFutureTest.java deleted file mode 100644 index 7710f9ad24a..00000000000 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/IOFutureTest.java +++ /dev/null @@ -1,459 +0,0 @@ -package org.eclipse.jetty.io; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.eclipse.jetty.util.Callback; -import org.junit.Assert; -import org.junit.Test; - -import static org.hamcrest.number.OrderingComparison.greaterThan; -import static org.hamcrest.number.OrderingComparison.lessThan; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -public class IOFutureTest -{ - @Test - public void testReadyCompleted() throws Exception - { - IOFuture future = new DoneIOFuture(); - - assertTrue(future.isDone()); - assertTrue(future.isComplete()); - - long start=System.currentTimeMillis(); - future.block(); - Assert.assertThat(System.currentTimeMillis()-start,lessThan(10L)); - - start=System.currentTimeMillis(); - future.block(1000,TimeUnit.MILLISECONDS); - Assert.assertThat(System.currentTimeMillis()-start,lessThan(10L)); - - final AtomicBoolean ready = new AtomicBoolean(false); - final AtomicReference fail = new AtomicReference<>(); - - future.setCallback(new Callback() - { - @Override - public void completed(Object context) - { - ready.set(true); - } - - @Override - public void failed(Object context, Throwable cause) - { - fail.set(cause); - } - }, null); - - assertTrue(ready.get()); - assertNull(fail.get()); - } - - - @Test - public void testFailedCompleted() throws Exception - { - Exception ex=new Exception("failed"); - IOFuture future = new DoneIOFuture(ex); - - assertTrue(future.isDone()); - try - { - future.isComplete(); - Assert.fail(); - } - catch(ExecutionException e) - { - Assert.assertEquals(ex,e.getCause()); - } - - - long start=System.currentTimeMillis(); - try - { - future.block(); - Assert.fail(); - } - catch(ExecutionException e) - { - Assert.assertEquals(ex,e.getCause()); - } - Assert.assertThat(System.currentTimeMillis()-start,lessThan(10L)); - - - - start=System.currentTimeMillis(); - try - { - future.block(1000,TimeUnit.MILLISECONDS); - Assert.fail(); - } - catch(ExecutionException e) - { - Assert.assertEquals(ex,e.getCause()); - } - Assert.assertThat(System.currentTimeMillis()-start,lessThan(10L)); - - - final AtomicBoolean ready = new AtomicBoolean(false); - final AtomicReference fail = new AtomicReference<>(); - - future.setCallback(new Callback() - { - @Override - public void completed(Object context) - { - ready.set(true); - } - - @Override - public void failed(Object context, Throwable cause) - { - fail.set(cause); - } - }, null); - - assertFalse(ready.get()); - assertEquals(ex,fail.get()); - } - - - - @Test - public void testInCompleted() throws Exception - { - IOFuture future = new DispatchingIOFuture(); - - assertFalse(future.isDone()); - assertFalse(future.isComplete()); - - long start=System.currentTimeMillis(); - future.block(10,TimeUnit.MILLISECONDS); - Assert.assertThat(System.currentTimeMillis()-start,greaterThan(9L)); - - final AtomicBoolean ready = new AtomicBoolean(false); - final AtomicReference fail = new AtomicReference<>(); - - future.setCallback(new Callback() - { - @Override - public void completed(Object context) - { - ready.set(true); - } - - @Override - public void failed(Object context, Throwable cause) - { - fail.set(cause); - } - }, null); - - assertFalse(ready.get()); - assertNull(fail.get()); - } - - - @Test - public void testBlockWaitsWhenNotCompleted() throws Exception - { - DispatchingIOFuture future = new DispatchingIOFuture(); - - assertFalse(future.isDone()); - assertFalse(future.isComplete()); - - final AtomicBoolean completed = new AtomicBoolean(false); - final AtomicReference failure = new AtomicReference<>(); - - future.setCallback(new Callback() - { - @Override - public void completed(Object context) - { - completed.set(true); - } - - @Override - public void failed(Object context, Throwable cause) - { - failure.set(cause); - } - }, null); - - long sleep = 1000; - long start = System.nanoTime(); - assertFalse(future.block(sleep, TimeUnit.MILLISECONDS)); - assertThat(System.nanoTime() - start, greaterThan(TimeUnit.MILLISECONDS.toNanos(sleep / 2))); - - assertFalse(completed.get()); - assertNull(failure.get()); - } - - @Test - public void testTimedBlockWokenUpWhenCompleted() throws Exception - { - final DispatchingIOFuture future = new DispatchingIOFuture(); - - final CountDownLatch completed = new CountDownLatch(1); - final AtomicReference failure = new AtomicReference<>(); - - future.setCallback(new Callback() - { - @Override - public void completed(Object context) - { - completed.countDown(); - } - - @Override - public void failed(Object context, Throwable cause) - { - failure.set(cause); - } - }, null); - - long start = System.nanoTime(); - final long delay = 500; - new Thread() - { - @Override - public void run() - { - try - { - // Want the call to block() below to happen before the call to complete() here - TimeUnit.MILLISECONDS.sleep(delay); - future.complete(); - } - catch (InterruptedException x) - { - Assert.fail(); - } - } - }.start(); - - assertTrue(future.block(delay * 4, TimeUnit.MILLISECONDS)); - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - Assert.assertThat(elapsed, greaterThan(delay / 2)); - Assert.assertThat(elapsed, lessThan(delay * 2)); - - assertTrue(future.isDone()); - assertTrue(future.isComplete()); - assertTrue(completed.await(delay * 4, TimeUnit.MILLISECONDS)); - assertNull(failure.get()); - } - - @Test - public void testBlockWokenUpWhenCompleted() throws Exception - { - final DispatchingIOFuture future = new DispatchingIOFuture(); - - final CountDownLatch completed = new CountDownLatch(1); - final AtomicReference failure = new AtomicReference<>(); - - future.setCallback(new Callback() - { - @Override - public void completed(Object context) - { - completed.countDown(); - } - - @Override - public void failed(Object context, Throwable cause) - { - failure.set(cause); - } - }, null); - - final long delay = 500; - long start = System.nanoTime(); - new Thread() - { - @Override - public void run() - { - try - { - // Want the call to block() below to happen before the call to complete() here - TimeUnit.MILLISECONDS.sleep(delay); - future.complete(); - } - catch (InterruptedException x) - { - Assert.fail(); - } - } - }.start(); - - future.block(); - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - Assert.assertThat(elapsed, greaterThan(delay / 2)); - Assert.assertThat(elapsed, lessThan(delay * 2)); - - assertTrue(future.isDone()); - assertTrue(future.isComplete()); - assertTrue(completed.await(delay * 4, TimeUnit.MILLISECONDS)); - assertNull(failure.get()); - } - - @Test - public void testTimedBlockWokenUpOnFailure() throws Exception - { - final DispatchingIOFuture future = new DispatchingIOFuture(); - final Exception ex = new Exception("failed"); - - final AtomicBoolean completed = new AtomicBoolean(false); - final AtomicReference failure = new AtomicReference<>(); - final CountDownLatch failureLatch = new CountDownLatch(1); - future.setCallback(new Callback() - { - @Override - public void completed(Object context) - { - completed.set(true); - } - - @Override - public void failed(Object context, Throwable x) - { - failure.set(x); - failureLatch.countDown(); - } - }, null); - - final long delay = 500; - long start = System.nanoTime(); - new Thread() - { - @Override - public void run() - { - try - { - // Want the call to block() below to happen before the call to fail() here - TimeUnit.MILLISECONDS.sleep(delay); - future.fail(ex); - } - catch (InterruptedException x) - { - Assert.fail(); - } - } - }.start(); - - try - { - future.block(delay * 4, TimeUnit.MILLISECONDS); - Assert.fail(); - } - catch (ExecutionException e) - { - Assert.assertSame(ex, e.getCause()); - } - - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - Assert.assertThat(elapsed, greaterThan(delay / 2)); - Assert.assertThat(elapsed, lessThan(delay * 2)); - - assertTrue(future.isDone()); - try - { - future.isComplete(); - Assert.fail(); - } - catch (ExecutionException e) - { - assertSame(ex, e.getCause()); - } - - assertFalse(completed.get()); - assertTrue(failureLatch.await(delay * 4, TimeUnit.MILLISECONDS)); - assertSame(ex, failure.get()); - } - - @Test - public void testBlockWokenUpOnFailure() throws Exception - { - final DispatchingIOFuture future = new DispatchingIOFuture(); - final Exception ex = new Exception("failed"); - - final AtomicBoolean completed = new AtomicBoolean(false); - final AtomicReference failure = new AtomicReference<>(); - final CountDownLatch failureLatch = new CountDownLatch(1); - future.setCallback(new Callback() - { - @Override - public void completed(Object context) - { - completed.set(true); - } - - @Override - public void failed(Object context, Throwable x) - { - failure.set(x); - failureLatch.countDown(); - } - }, null); - - final long delay = 500; - long start = System.nanoTime(); - new Thread() - { - @Override - public void run() - { - try - { - // Want the call to block() below to happen before the call to fail() here - TimeUnit.MILLISECONDS.sleep(delay); - future.fail(ex); - } - catch (InterruptedException x) - { - Assert.fail(); - } - } - }.start(); - - try - { - future.block(); - Assert.fail(); - } - catch (ExecutionException e) - { - Assert.assertSame(ex, e.getCause()); - } - - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - Assert.assertThat(elapsed, greaterThan(delay / 2)); - Assert.assertThat(elapsed, lessThan(delay * 2)); - - assertTrue(future.isDone()); - try - { - future.isComplete(); - Assert.fail(); - } - catch (ExecutionException e) - { - assertSame(ex, e.getCause()); - } - - assertFalse(completed.get()); - assertTrue(failureLatch.await(delay * 4, TimeUnit.MILLISECONDS)); - assertSame(ex, failure.get()); - } -} diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 1a85f518a48..07ab46f83bc 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -138,12 +138,17 @@ public class SelectChannelEndPointTest int filled=_endp.fill(_in); if (filled>0) progress=true; + System.err.println("filled "+filled); // If the tests wants to block, then block while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt) { - _endp.readable().block(); + FutureCallback blockingRead= new FutureCallback<>(); + System.err.println("blocking read on "+blockingRead); + _endp.readable(null,blockingRead); + blockingRead.get(); filled=_endp.fill(_in); + System.err.println("FILLED "+filled); progress|=filled>0; } @@ -157,7 +162,11 @@ public class SelectChannelEndPointTest ByteBuffer out=_out.duplicate(); BufferUtil.clear(_out); for (int i=0;i<_writeCount;i++) - _endp.write(out.asReadOnlyBuffer()).block(); + { + FutureCallback blockingWrite= new FutureCallback<>(); + _endp.write(null,blockingWrite,out.asReadOnlyBuffer()); + blockingWrite.get(); + } progress=true; } @@ -171,7 +180,9 @@ public class SelectChannelEndPointTest // Timeout does not close, so echo exception then shutdown try { - _endp.write(BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in))).block(); + FutureCallback blockingWrite= new FutureCallback<>(); + _endp.write(null,blockingWrite,BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in))); + blockingWrite.get(); _endp.shutdownOutput(); } catch(Exception e2)