diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java index 29d90a3c573..a5201436c2f 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java @@ -232,7 +232,7 @@ public class HttpGenerator return Result.NEED_BUFFER; // Copy the content - _contentPrepared+=BufferUtil.flipPutFlip(content,buffer); + _contentPrepared+=BufferUtil.append(content,buffer); // are we full? if (BufferUtil.isFull(buffer)) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java new file mode 100644 index 00000000000..795102822ce --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java @@ -0,0 +1,84 @@ +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + + +public abstract class AbstractAsyncConnection implements AsyncConnection +{ + private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class); + protected final AsyncEndPoint _endp; + + private IOFuture.Callback _readCallback = new IOFuture.Callback() + { + @Override + public void onReady() + { + onReadable(); + } + + @Override + public void onFail(Throwable cause) + { + LOG.debug("FAILED: "+cause); + } + }; + + public AbstractAsyncConnection(AsyncEndPoint endp) + { + _endp=endp; + } + + @Override + public AsyncEndPoint getEndPoint() + { + return _endp; + } + + + @Override + public void onIdleExpired(long idleForMs) + { + try + { + LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp); + if (_endp.isOutputShutdown()) + _endp.close(); + else + _endp.shutdownOutput(); + } + catch(IOException e) + { + LOG.ignore(e); + + try + { + _endp.close(); + } + catch(IOException e2) + { + LOG.ignore(e2); + } + } + } + + @Override + public IOFuture scheduleOnReadable() + { + IOFuture read=getEndPoint().read(); + read.setCallback(_readCallback); + return read; + } + + @Override + public String toString() + { + return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java new file mode 100644 index 00000000000..1fcdb2fdec9 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -0,0 +1,65 @@ +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public abstract class AbstractEndPoint implements EndPoint +{ + private final long _created=System.currentTimeMillis(); + private final InetSocketAddress _local; + private final InetSocketAddress _remote; + private int _maxIdleTime; + + protected AbstractEndPoint(InetSocketAddress local,InetSocketAddress remote) + { + _local=local; + _remote=remote; + } + + @Override + public long getCreatedTimeStamp() + { + return _created; + } + + @Override + public int getMaxIdleTime() + { + return _maxIdleTime; + } + + @Override + public void setMaxIdleTime(int timeMs) throws IOException + { + _maxIdleTime=timeMs; + } + + /* ------------------------------------------------------------ */ + @Override + public InetSocketAddress getLocalAddress() + { + return _local; + } + + /* ------------------------------------------------------------ */ + @Override + public InetSocketAddress getRemoteAddress() + { + return _remote; + } + + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + return String.format("%s@%x{%s%s,o=%b,os=%b}", + getClass().getSimpleName(), + hashCode(), + getRemoteAddress(), + getLocalAddress(), + isOpen(), + isOutputShutdown()); + } + +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncConnection.java similarity index 78% rename from jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/AsyncConnection.java index 3bb524c03a1..048e55cfaf6 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncConnection.java @@ -14,17 +14,15 @@ package org.eclipse.jetty.io; - -public interface Connection +public interface AsyncConnection { - EndPoint getEndPoint(); + AsyncEndPoint getEndPoint(); - int getMaxIdleTime(); + IOFuture scheduleOnReadable(); + + void onReadable(); + void onInputShutdown(); + void onClose(); + void onIdleExpired(long idleForMs); - /** - * @return the timestamp at which the connection was created - */ - long getCreatedTimeStamp(); - - boolean isIdle(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java new file mode 100644 index 00000000000..cf4853600e3 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java @@ -0,0 +1,125 @@ +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.Future; + +/* ------------------------------------------------------------ */ +/**Asynchronous End Point + *

+ * This extension of EndPoint provides asynchronous scheduling methods. + * The design of these has been influenced by NIO.2 Futures and Completion + * handlers, but does not use those actual interfaces because: they have + * some inefficiencies (eg buffers must be allocated before read); they have + * unrequired overheads due to their generic nature (passing of attachments + * and returning operation counts); there is no need to pass timeouts as + * {@link EndPoint#getMaxIdleTime() is used. + *

+ * The intent of this API is that it can be used in either: a polling mode (like {@link Future}) + * ; in a callback mode (like {@link CompletionHandler} mode; or blocking mod;e or a hybrid mode + *

Blocking read

+ *
+ * endpoint.read().complete();
+ * endpoint.fill(buffer);
+ * 
+ *

Polling read

+ *
+ * IOFuture read = endpoint.read();
+ * ...
+ * if (read.isReady())
+ *   endpoint.fill(buffer);
+ * 
+ *

Callback read

+ *
+ * endpoint.read().setHandler(new IOCallback()
+ * {
+ *   public void onReady() { endpoint.fill(buffer); ... }
+ *   public void onFail(IOException e) { ... }
+ *   public void onTimeout() { ... }
+ * }
+ * 
+ * + *

Blocking write

+ *
+ * endpoint.write(buffer).complete();
+ * 
+ *

Polling write

+ *
+ * IOFuture write = endpoint.write(buffer);
+ * ...
+ * if (write.isReady())
+ *   // do next write
+ * 
+ *

Callback write

+ *
+ * endpoint.write(buffer).setHandler(new IOCallback()
+ * {
+ *   public void onReady() { ... }
+ *   public void onFail(IOException e) { ... }
+ *   public void onTimeout() { ... }
+ * }
+ * 
+ *

Hybrid write

+ *
+ * IOFuture write = endpoint.write(buffer);
+ * if (write.isReady())
+ *   // write next
+ * else
+ *   write.setHandler(new IOCallback()
+ *   {
+ *     public void onReady() { ... }
+ *     public void onFail(IOException e) { ... }
+ *     public void onTimeout() { ... }
+ *   }
+ * 
+ * + *

Compatibility Notes

+ * Some Async IO APIs have the concept of setting read interest. With this + * API calling {@link #read()} is equivalent to setting read interest to true + * and calling {@link IOFuture#cancel()} is equivalent to setting read interest + * to false. + */ +public interface AsyncEndPoint extends EndPoint +{ + /* ------------------------------------------------------------ */ + AsyncConnection getAsyncConnection(); + + /* ------------------------------------------------------------ */ + /** Schedule a read operation. + *

+ * This method allows a {@link #fill(ByteBuffer)} operation to be scheduled + * with either blocking, polling or callback semantics. + * @return an {@link IOFuture} instance that will be ready when a call to {@link #fill(ByteBuffer)} will + * return immediately with data without blocking. + * @throws IllegalStateException if another read operation has been scheduled and has not timedout, been cancelled or is ready. + */ + IOFuture read() throws IllegalStateException; + + /* ------------------------------------------------------------ */ + /** Schedule a write operation. + * This method performs {@link #flush(ByteBuffer...)} operations and allows the completion of + * the entire write to be scheduled with blocking, polling or callback semantics. + * @param buffers One or more {@link ByteBuffer}s that will be flushed. + * @return an {@link IOFuture} instance that will be ready when all the data in the buffers passed has been consumed by + * one or more calls to {@link #flush(ByteBuffer)}. + */ + IOFuture write(ByteBuffer... buffers) throws IllegalStateException; + + /* ------------------------------------------------------------ */ + /** Set if the endpoint should be checked for idleness + */ + void setCheckForIdle(boolean check); + + /* ------------------------------------------------------------ */ + /** Get if the endpoint should be checked for idleness + */ + boolean isCheckForIdle(); + + /* ------------------------------------------------------------ */ + /** + * @return Timestamp in ms since epoch of when the last data was + * filled or flushed from this endpoint. + */ + long getActivityTimestamp(); + +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index b2a8cb0d236..e04fd82441f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -20,20 +20,17 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.BufferUtil; - /* ------------------------------------------------------------ */ /** ByteArrayEndPoint. - * * */ -public class ByteArrayEndPoint implements EndPoint +public class ByteArrayEndPoint extends AbstractEndPoint { protected byte[] _inBytes; protected ByteBuffer _in; protected ByteBuffer _out; protected boolean _closed; protected boolean _growOutput; - protected int _maxIdleTime; /* ------------------------------------------------------------ */ /** @@ -41,6 +38,7 @@ public class ByteArrayEndPoint implements EndPoint */ public ByteArrayEndPoint() { + super(null,null); } /* ------------------------------------------------------------ */ @@ -49,6 +47,7 @@ public class ByteArrayEndPoint implements EndPoint */ public ByteArrayEndPoint(byte[] input, int outputSize) { + super(null,null); _inBytes=input; _in=ByteBuffer.wrap(input); _out=ByteBuffer.allocate(outputSize); @@ -100,16 +99,6 @@ public class ByteArrayEndPoint implements EndPoint return !_closed; } - /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.jetty.io.EndPoint#isInputShutdown() - */ - @Override - public boolean isInputShutdown() - { - return _closed; - } - /* ------------------------------------------------------------ */ /* * @see org.eclipse.jetty.io.EndPoint#isOutputShutdown() @@ -130,16 +119,6 @@ public class ByteArrayEndPoint implements EndPoint close(); } - /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.io.EndPoint#shutdownInput() - */ - @Override - public void shutdownInput() throws IOException - { - close(); - } - /* ------------------------------------------------------------ */ /* * @see org.eclipse.io.EndPoint#close() @@ -160,7 +139,7 @@ public class ByteArrayEndPoint implements EndPoint if (_closed) throw new IOException("CLOSED"); if (_in!=null) - return BufferUtil.flipPutFlip(_in,buffer); + return BufferUtil.append(_in,buffer); return 0; } @@ -217,20 +196,6 @@ public class ByteArrayEndPoint implements EndPoint _out.clear(); } - /* ------------------------------------------------------------ */ - @Override - public InetSocketAddress getLocalAddress() - { - return null; - } - - /* ------------------------------------------------------------ */ - @Override - public InetSocketAddress getRemoteAddress() - { - return null; - } - /* ------------------------------------------------------------ */ /* * @see org.eclipse.io.EndPoint#getConnection() @@ -259,26 +224,5 @@ public class ByteArrayEndPoint implements EndPoint _growOutput=growOutput; } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.EndPoint#getMaxIdleTime() - */ - @Override - public int getMaxIdleTime() - { - return _maxIdleTime; - } - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.EndPoint#setMaxIdleTime(int) - */ - @Override - public void setMaxIdleTime(int timeMs) throws IOException - { - _maxIdleTime=timeMs; - } - - } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index c15c94758e3..af1e11e7ecf 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -32,56 +32,24 @@ import org.eclipse.jetty.util.log.Logger; *

Holds the channel and socket for an NIO endpoint. * */ -public class ChannelEndPoint implements EndPoint +public class ChannelEndPoint extends AbstractEndPoint { private static final Logger LOG = Log.getLogger(ChannelEndPoint.class); private final ByteChannel _channel; private final Socket _socket; - private final InetSocketAddress _local; - private final InetSocketAddress _remote; - private volatile int _maxIdleTime; private volatile boolean _ishut; private volatile boolean _oshut; - public ChannelEndPoint(ByteChannel channel) throws IOException + public ChannelEndPoint(SocketChannel channel) throws IOException { - super(); - this._channel = channel; - _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null; - if (_socket!=null) - { - _local=(InetSocketAddress)_socket.getLocalSocketAddress(); - _remote=(InetSocketAddress)_socket.getRemoteSocketAddress(); - _maxIdleTime=_socket.getSoTimeout(); - } - else - { - _local=_remote=null; - } - } + super((InetSocketAddress)channel.socket().getLocalSocketAddress(), + (InetSocketAddress)channel.socket().getRemoteSocketAddress() ); - protected ChannelEndPoint(ByteChannel channel, int maxIdleTime) throws IOException - { this._channel = channel; - _maxIdleTime=maxIdleTime; - _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null; - if (_socket!=null) - { - _local=(InetSocketAddress)_socket.getLocalSocketAddress(); - _remote=(InetSocketAddress)_socket.getRemoteSocketAddress(); - _socket.setSoTimeout(_maxIdleTime); - } - else - { - _local=_remote=null; - } - } - - - public boolean isBlocking() - { - return !(_channel instanceof SelectableChannel) || ((SelectableChannel)_channel).isBlocking(); + _socket=channel.socket(); + setMaxIdleTime(_socket.getSoTimeout()); + _socket.setSoTimeout(0); } /* @@ -131,7 +99,6 @@ public class ChannelEndPoint implements EndPoint /* (non-Javadoc) * @see org.eclipse.io.EndPoint#close() */ - @Override public void shutdownInput() throws IOException { shutdownChannelInput(); @@ -183,7 +150,6 @@ public class ChannelEndPoint implements EndPoint return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown(); } - @Override public boolean isInputShutdown() { return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown(); @@ -267,20 +233,6 @@ public class ChannelEndPoint implements EndPoint return _channel; } - /* ------------------------------------------------------------ */ - @Override - public InetSocketAddress getLocalAddress() - { - return _local; - } - - /* ------------------------------------------------------------ */ - @Override - public InetSocketAddress getRemoteAddress() - { - return _remote; - } - /* ------------------------------------------------------------ */ @Override public Object getTransport() @@ -293,24 +245,5 @@ public class ChannelEndPoint implements EndPoint { return _socket; } - - /* ------------------------------------------------------------ */ - @Override - public int getMaxIdleTime() - { - return _maxIdleTime; - } - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int) - */ - @Override - public void setMaxIdleTime(int timeMs) throws IOException - { - //if (_socket!=null && timeMs!=_maxIdleTime) - // _socket.setSoTimeout(timeMs>0?timeMs:0); - _maxIdleTime=timeMs; - } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/CompleteIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/CompleteIOFuture.java new file mode 100644 index 00000000000..bd85e4cc6f4 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/CompleteIOFuture.java @@ -0,0 +1,82 @@ +package org.eclipse.jetty.io; + +import java.util.concurrent.ExecutionException; + +import org.eclipse.jetty.util.thread.ThreadPool; + +public class CompleteIOFuture implements IOFuture +{ + private final boolean _ready; + private final Throwable _cause; + + public final static CompleteIOFuture COMPLETE=new CompleteIOFuture(); + + public CompleteIOFuture() + { + _ready=true; + _cause=null; + } + + public CompleteIOFuture(Throwable cause) + { + _ready=false; + _cause=cause; + } + + @Override + public boolean isReady() throws ExecutionException + { + if (_ready) + return true; + + throw new ExecutionException(_cause); + } + + @Override + public void cancel() throws UnsupportedOperationException + { + throw new UnsupportedOperationException(); + } + + @Override + public void await() throws ExecutionException + { + isReady(); + } + + @Override + public void setCallback(final Callback callback) + { + dispatch(new Runnable() + { + @Override + public void run() + { + if (_ready) + callback.onReady(); + else + callback.onFail(_cause); + } + }); + } + + protected void dispatch(Runnable callback) + { + callback.run(); + } + + @Override + public boolean isComplete() + { + return true; + } + + @Override + public String toString() + { + return String.format("CIOF@%x{r=%b,c=%s}", + hashCode(), + _ready, + _cause); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index b1019fdfb70..138d88dd5ef 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -25,6 +25,26 @@ import java.nio.ByteBuffer; */ public interface EndPoint { + /* ------------------------------------------------------------ */ + /** + * @return The local Inet address to which this EndPoint is bound, or null + * if this EndPoint does not represent a network connection. + */ + InetSocketAddress getLocalAddress(); + + /* ------------------------------------------------------------ */ + /** + * @return The remote Inet address to which this EndPoint is bound, or null + * if this EndPoint does not represent a network connection. + */ + InetSocketAddress getRemoteAddress(); + + /* ------------------------------------------------------------ */ + boolean isOpen(); + + /* ------------------------------------------------------------ */ + long getCreatedTimeStamp(); + /** * Shutdown any backing output stream associated with the endpoint */ @@ -32,13 +52,6 @@ public interface EndPoint boolean isOutputShutdown(); - /** - * Shutdown any backing input stream associated with the endpoint - */ - void shutdownInput() throws IOException; - - boolean isInputShutdown(); - /** * Close any backing stream associated with the endpoint */ @@ -68,38 +81,16 @@ public interface EndPoint */ int flush(ByteBuffer... buffer) throws IOException; - - /* ------------------------------------------------------------ */ - /** - * @return The local Inet address to which this EndPoint is bound, or null - * if this EndPoint does not represent a network connection. - */ - InetSocketAddress getLocalAddress(); - - /* ------------------------------------------------------------ */ - /** - * @return The remote Inet address to which this EndPoint is bound, or null - * if this EndPoint does not represent a network connection. - */ - InetSocketAddress getRemoteAddress(); - - /* ------------------------------------------------------------ */ - boolean isOpen(); - /* ------------------------------------------------------------ */ /** * @return The underlying transport object (socket, channel, etc.) */ Object getTransport(); - /* ------------------------------------------------------------ */ /** Get the max idle time in ms. *

The max idle time is the time the endpoint can be idle before - * extraordinary handling takes place. This loosely corresponds to - * the {@link java.net.Socket#getSoTimeout()} for blocking connections, - * but {@link AsyncEndPoint} implementations must use other mechanisms - * to implement the max idle time. + * extraordinary handling takes place. * @return the max idle time in ms or if ms <= 0 implies an infinite timeout */ int getMaxIdleTime(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/IOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/IOFuture.java new file mode 100644 index 00000000000..0ab3efa4553 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/IOFuture.java @@ -0,0 +1,66 @@ +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +/* ------------------------------------------------------------ */ +/* ------------------------------------------------------------ */ +/* ------------------------------------------------------------ */ +/** Async IO Future interface. + *

+ * This interface make the future status of an IO operation available via + * polling ({@link #isReady()}, blocking ({@link #await()} or callback ({@link #setCallback(Callback)} + * + */ +public interface IOFuture +{ + /* ------------------------------------------------------------ */ + /** Indicate if this Future is complete. + * If this future has completed by becoming ready, excepting or timeout. + * @return True if this future has completed by becoming ready, excepting or timeout. + */ + boolean isComplete(); + + /* ------------------------------------------------------------ */ + /** Indicate the readyness of the IO system. + * For input, ready means that there is data + * ready to be consumed. For output ready means that the prior operation + * has completed and another may be initiated. + * @return True if the IO operation is ready. + * @throws ExecutionException If an exception occurs during the IO operation + */ + boolean isReady() throws ExecutionException; + + /* ------------------------------------------------------------ */ + /** Cancel the IO operation. + * @throws UnsupportedOperationException If the operation cannot be cancelled. + */ + void cancel() throws UnsupportedOperationException; + + /* ------------------------------------------------------------ */ + /** Wait until complete. + *

This call blocks the calling thread until this AsyncIO is ready or + * an exception or until a timeout due to {@link EndPoint#getMaxIdleTime()}. + * @throws InterruptedException if interrupted while blocking + * @throws ExecutionException If any exception occurs during the IO operation + */ + void await() throws InterruptedException, ExecutionException; + + /* ------------------------------------------------------------ */ + /** Set an IOCallback. + * Set an {@link Callback} instance to be called when the IO operation is ready or if + * there is a failure or timeout. + * @param callback + */ + void setCallback(Callback callback); + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + interface Callback + { + void onReady(); + void onFail(Throwable cause); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/RecycledIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/RecycledIOFuture.java new file mode 100644 index 00000000000..dee4c4973e9 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/RecycledIOFuture.java @@ -0,0 +1,241 @@ +package org.eclipse.jetty.io; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class RecycledIOFuture implements IOFuture +{ + private final Lock _lock; + private final Condition _block; + + private boolean _complete; + private boolean _ready; + private Throwable _cause; + + private Callback _callback; + + public RecycledIOFuture() + { + _lock = new ReentrantLock(); + _block = _lock.newCondition(); + } + + public RecycledIOFuture(Lock lock) + { + _lock = lock; + _block = _lock.newCondition(); + } + + public RecycledIOFuture(boolean ready,Lock lock) + { + _ready=ready; + _complete=ready; + _lock = lock; + _block = _lock.newCondition(); + } + + public void fail(final Throwable cause) + { + _lock.lock(); + try + { + if (_complete) + throw new IllegalStateException("complete",cause); + + _cause=cause; + _complete=true; + + if (_callback!=null) + dispatchFail(); + _block.signal(); + } + finally + { + _lock.unlock(); + } + } + + public void ready() + { + _lock.lock(); + try + { + if (_complete) + throw new IllegalStateException(); + _ready=true; + _complete=true; + + if (_callback!=null) + dispatchReady(); + _block.signal(); + } + finally + { + _lock.unlock(); + } + } + + protected void cancelled() + { + _lock.lock(); + try + { + if (_complete) + throw new IllegalStateException(); + _ready=false; + _complete=true; + _block.signal(); + } + finally + { + _lock.unlock(); + } + } + + public void recycle() + { + _lock.lock(); + try + { + if (!_complete) + throw new IllegalStateException(); + _ready=false; + _cause=null; + _complete=false; + _callback=null; + } + finally + { + _lock.unlock(); + } + } + + @Override + public boolean isComplete() + { + _lock.lock(); + try + { + return _complete; + } + finally + { + _lock.unlock(); + } + } + + @Override + public boolean isReady() throws ExecutionException + { + _lock.lock(); + try + { + if (_complete) + { + if (_ready) + return true; + throw new ExecutionException(_cause); + } + + return false; + } + finally + { + _lock.unlock(); + } + } + + @Override + public void cancel() throws UnsupportedOperationException + { + throw new UnsupportedOperationException(); + } + + @Override + public void await() throws InterruptedException, ExecutionException + { + _lock.lock(); + try + { + if (!_complete) + _block.await(); + isReady(); + } + finally + { + _lock.unlock(); + } + } + + @Override + public void setCallback(Callback callback) + { + _lock.lock(); + try + { + if (_callback!=null) + throw new IllegalStateException(); + _callback=callback; + + if (_complete) + { + if (_ready) + dispatchReady(); + else + dispatchFail(); + } + } + finally + { + _lock.unlock(); + } + } + + protected void dispatch(Runnable callback) + { + callback.run(); + } + + private void dispatchReady() + { + final Callback callback=_callback; + _callback=null; + dispatch(new Runnable() + { + @Override + public void run() + { + callback.onReady(); + } + }); + } + + private void dispatchFail() + { + final Callback callback=_callback; + final Throwable cause=_cause; + _callback=null; + dispatch(new Runnable() + { + @Override + public void run() + { + callback.onFail(cause); + } + }); + } + + + + @Override + public String toString() + { + return String.format("RIOF@%x{c=%b,r=%b,c=%s}", + hashCode(), + _complete, + _ready, + _cause); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 5933a21a074..75c283f0514 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -25,16 +26,18 @@ import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Timeout.Task; +import static org.eclipse.jetty.io.CompleteIOFuture.COMPLETE; /* ------------------------------------------------------------ */ /** * An Endpoint that can be scheduled by {@link SelectorManager}. */ -public class SelectChannelEndPoint extends ChannelEndPoint implements SelectableEndPoint +public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint { public static final Logger LOG=Log.getLogger(SelectChannelEndPoint.class); - + private final Lock _lock = new ReentrantLock(); + private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; private SelectionKey _key; @@ -52,22 +55,80 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable private volatile boolean _idlecheck; private volatile long _lastNotIdleTimestamp; - private volatile SelectableConnection _connection; + private volatile AbstractAsyncConnection _connection; + + private RecycledIOFuture _readFuture = new RecycledIOFuture(true,_lock) + { + @Override + protected void dispatch(Runnable task) + { + _manager.dispatch(task); + } + + @Override + public void cancel() + { + _lock.lock(); + try + { + _interestOps=_interestOps&~SelectionKey.OP_READ; + updateKey(); + cancelled(); + } + finally + { + _lock.unlock(); + } + } + }; + + private ByteBuffer[] _writeBuffers; + private RecycledIOFuture _writeFuture = new RecycledIOFuture(true,_lock) + { + @Override + protected void dispatch(Runnable task) + { + _manager.dispatch(task); + } + + @Override + public void cancel() + { + _lock.lock(); + try + { + _interestOps=_interestOps&~SelectionKey.OP_WRITE; + updateKey(); + cancelled(); + } + finally + { + _lock.unlock(); + } + } + }; /* ------------------------------------------------------------ */ public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) throws IOException { - super(channel, maxIdleTime); - + super(channel); _manager = selectSet.getManager(); _selectSet = selectSet; _open=true; _key = key; + setMaxIdleTime(maxIdleTime); setCheckForIdle(true); } + /* ------------------------------------------------------------ */ + @Override + public AsyncConnection getAsyncConnection() + { + return _connection; + } + /* ------------------------------------------------------------ */ public SelectionKey getSelectionKey() { @@ -85,9 +146,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable /* ------------------------------------------------------------ */ - public void setSelectableConnection(SelectableConnection connection) + public void setAsyncConnection(AbstractAsyncConnection connection) { - Connection old=getSelectableConnection(); + AsyncConnection old=getAsyncConnection(); _connection=connection; if (old!=null && old!=connection) _manager.endPointUpgraded(this,old); @@ -95,7 +156,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable /* ------------------------------------------------------------ */ @Override - public long getLastNotIdleTimestamp() + public long getActivityTimestamp() { return _lastNotIdleTimestamp; } @@ -104,7 +165,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable /** Called by selectSet to schedule handling * */ - public void onSelected() throws IOException + public void onSelected() { _lock.lock(); _selected=true; @@ -113,7 +174,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable // If there is no key, then do nothing if (_key == null || !_key.isValid()) { - this.notifyAll(); + // TODO wake ups? return; } @@ -122,22 +183,15 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable _interestOps=0; if (can_read) - { - Runnable task=getSelectableConnection().onReadable(); - if (task!=null) - _manager.dispatch(task); - } - if (can_write) - { - Runnable task=getSelectableConnection().onWriteable(); - if (task!=null) - _manager.dispatch(task); - } + _readFuture.ready(); + + if (can_write && _writeBuffers!=null) + completeWrite(); if (isInputShutdown() && !_ishutCalled) { _ishutCalled=true; - getSelectableConnection().onInputShutdown(); + getAsyncConnection().onInputShutdown(); } } finally @@ -148,71 +202,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable } } - /* ------------------------------------------------------------ */ - @Override - public boolean isReadInterested() - { - _lock.lock(); - try - { - return (_interestOps&SelectionKey.OP_READ)!=0; - } - finally - { - _lock.unlock(); - } - } - - /* ------------------------------------------------------------ */ - @Override - public void setReadInterested(boolean interested) - { - _lock.lock(); - try - { - _interestOps=interested?(_interestOps|SelectionKey.OP_READ):(_interestOps&~SelectionKey.OP_READ); - if (!_selected) - updateKey(); - } - finally - { - _lock.unlock(); - } - } - /* ------------------------------------------------------------ */ - @Override - public boolean isWriteInterested() - { - _lock.lock(); - try - { - return (_interestOps&SelectionKey.OP_READ)!=0; - } - finally - { - _lock.unlock(); - } - } - - /* ------------------------------------------------------------ */ - @Override - public void setWriteInterested(boolean interested) - { - _lock.lock(); - try - { - _interestOps=interested?(_interestOps|SelectionKey.OP_WRITE):(_interestOps&~SelectionKey.OP_WRITE); - if (!_selected) - updateKey(); - } - finally - { - _lock.unlock(); - } - } - - /* ------------------------------------------------------------ */ public void cancelTimeout(Task task) { @@ -229,7 +219,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable @Override public void setCheckForIdle(boolean check) { - _idlecheck=true; + _idlecheck=check; } /* ------------------------------------------------------------ */ @@ -246,9 +236,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable } /* ------------------------------------------------------------ */ - public void checkForIdle(long now) - { - if (_idlecheck) + public void checkForIdleOrReadWriteTimeout(long now) + { + if (_idlecheck || !_readFuture.isComplete() || !_writeFuture.isComplete()) { long idleTimestamp=_lastNotIdleTimestamp; long max_idle_time=getMaxIdleTime(); @@ -259,20 +249,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable if (idleForMs>max_idle_time) { - onIdleExpired(idleForMs); - _lastNotIdleTimestamp=now; + _lock.lock(); + try + { + if (_idlecheck) + _connection.onIdleExpired(idleForMs); + if (!_readFuture.isComplete()) + _readFuture.fail(new TimeoutException()); + if (!_writeFuture.isComplete()) + _writeFuture.fail(new TimeoutException()); + } + finally + { + _lastNotIdleTimestamp=now; + _lock.unlock(); + } } } } } - /* ------------------------------------------------------------ */ - @Override - public void onIdleExpired(long idleForMs) - { - getSelectableConnection().onIdleExpired(idleForMs); - } - /* ------------------------------------------------------------ */ @Override public int fill(ByteBuffer buffer) throws IOException @@ -283,6 +279,96 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable return fill; } + /* ------------------------------------------------------------ */ + @Override + public IOFuture read() throws IllegalStateException + { + _lock.lock(); + try + { + if (!_readFuture.isComplete()) + throw new IllegalStateException("previous read not complete"); + + _readFuture.recycle(); + _interestOps=_interestOps|SelectionKey.OP_READ; + updateKey(); + + return _readFuture; + } + finally + { + _lock.unlock(); + } + } + + + /* ------------------------------------------------------------ */ + @Override + public IOFuture write(ByteBuffer... buffers) + { + _lock.lock(); + try + { + if (!_writeFuture.isComplete()) + throw new IllegalStateException("previous write not complete"); + + flush(buffers); + + // Are we complete? + for (ByteBuffer b : buffers) + { + if (b.hasRemaining()) + { + _writeBuffers=buffers; + _writeFuture.recycle(); + _interestOps=_interestOps|SelectionKey.OP_WRITE; + updateKey(); + return _writeFuture; + } + } + return COMPLETE; + } + catch(IOException e) + { + return new CompleteIOFuture(e); + } + finally + { + _lock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + private void completeWrite() + { + try + { + flush(_writeBuffers); + + // Are we complete? + for (ByteBuffer b : _writeBuffers) + { + if (b.hasRemaining()) + { + _interestOps=_interestOps|SelectionKey.OP_WRITE; + return; + } + } + // we are complete and ready + _writeFuture.ready(); + } + catch(final IOException e) + { + _writeBuffers=null; + if (!_writeFuture.isComplete()) + _writeFuture.fail(e); + } + + + } + + + /* ------------------------------------------------------------ */ @Override public int flush(ByteBuffer... buffers) throws IOException @@ -299,25 +385,28 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable */ private void updateKey() { - int current_ops=-1; - if (getChannel().isOpen()) + if (!_selected) { - try + int current_ops=-1; + if (getChannel().isOpen()) { - current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); + try + { + current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); + } + catch(Exception e) + { + _key=null; + LOG.ignore(e); + } } - catch(Exception e) + if (_interestOps!=current_ops && !_changing) { - _key=null; - LOG.ignore(e); + _changing=true; + _selectSet.addChange(this); + _selectSet.wakeup(); } } - if (_interestOps!=current_ops && !_changing) - { - _changing=true; - _selectSet.addChange(this); - _selectSet.wakeup(); - } } @@ -457,7 +546,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable isOutputShutdown(), _interestOps, keyString, - getSelectableConnection()); + getAsyncConnection()); } /* ------------------------------------------------------------ */ @@ -466,12 +555,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable return _selectSet; } - /* ------------------------------------------------------------ */ - @Override - public SelectableConnection getSelectableConnection() - { - return _connection; - } - + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java deleted file mode 100644 index 226aa1eb1e8..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java +++ /dev/null @@ -1,228 +0,0 @@ -package org.eclipse.jetty.io; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; - - -public abstract class SelectableConnection implements Connection -{ - private static final Logger LOG = Log.getLogger(SelectableConnection.class); - - protected final Lock _lock=new ReentrantLock(); - protected final SelectableEndPoint _endp; - private final long _createdTimeStamp; - private final Condition _readable=_lock.newCondition(); - private final Condition _writeable=_lock.newCondition(); - private Thread _readBlocked; - private Thread _writeBlocked; - - private final Runnable _reader=new Runnable() - { - @Override - public void run() - { - try - { - doRead(); - } - catch(Throwable th) - { - LOG.warn(th); - } - } - }; - private final Runnable _writer=new Runnable() - { - @Override - public void run() - { - try - { - doWrite(); - } - catch(Throwable th) - { - LOG.warn(th); - } - } - }; - - private volatile int _maxIdleTime=-1; - - public SelectableConnection(SelectableEndPoint endp) - { - _endp=endp; - _createdTimeStamp = System.currentTimeMillis(); - } - - @Override - public EndPoint getEndPoint() - { - return _endp; - } - - public SelectableEndPoint getSelectableEndPoint() - { - return _endp; - } - - @Override - public long getCreatedTimeStamp() - { - return _createdTimeStamp; - } - - public Runnable onReadable() - { - _lock.lock(); - try - { - if (_readBlocked!=null) - _readable.signal(); - else - return _reader; - } - finally - { - _lock.unlock(); - } - return null; - } - - public Runnable onWriteable() - { - _lock.lock(); - try - { - if (_writeBlocked!=null) - _writeable.signal(); - else - return _writer; - } - finally - { - _lock.unlock(); - } - return null; - } - - public boolean blockReadable() - { - _lock.lock(); - boolean readable=false; - try - { - if (_readBlocked!=null) - throw new IllegalStateException("already blocked by "+_readBlocked); - - _readBlocked=Thread.currentThread(); - _endp.setReadInterested(true); - readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS); - } - catch(InterruptedException e) - { - LOG.ignore(e); - } - finally - { - if (!readable) - _endp.setReadInterested(false); - _readBlocked=null; - _lock.unlock(); - } - return readable; - } - - public boolean blockWriteable() - { - _lock.lock(); - boolean writeable=false; - try - { - if (_writeBlocked!=null) - throw new IllegalStateException("already blocked by "+_writeBlocked); - _writeBlocked=Thread.currentThread(); - _endp.setWriteInterested(true); - writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS); - } - catch(InterruptedException e) - { - LOG.ignore(e); - } - finally - { - if (!writeable) - _endp.setWriteInterested(false); - _writeBlocked=null; - _lock.unlock(); - } - return writeable; - } - - public void onIdleExpired(long idleForMs) - { - try - { - LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp); - if (_endp.isInputShutdown() || _endp.isOutputShutdown()) - _endp.close(); - else - _endp.shutdownOutput(); - } - catch(IOException e) - { - LOG.ignore(e); - - try - { - _endp.close(); - } - catch(IOException e2) - { - LOG.ignore(e2); - } - } - } - - protected void doRead() - { - throw new IllegalStateException(); - } - - protected void doWrite() - { - throw new IllegalStateException(); - } - - @Override - public int getMaxIdleTime() - { - int max=_maxIdleTime; - return (max==-1)?_endp.getMaxIdleTime():max; - } - - public void setMaxIdleTime(int max) - { - _maxIdleTime=max; - } - - @Override - public String toString() - { - return String.format("%s@%x rb=%s wb=%b", getClass().getSimpleName(), hashCode(),_readBlocked,_writeBlocked); - } - - public void onInputShutdown() throws IOException - { - } - - public void onClose() - { - } -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java deleted file mode 100644 index fcdc3cb85e1..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.eclipse.jetty.io; - -public interface SelectableEndPoint extends EndPoint -{ - public abstract void setWriteInterested(boolean interested); - - public abstract boolean isWriteInterested(); - - public abstract void setReadInterested(boolean interested); - - public abstract boolean isReadInterested(); - - /* ------------------------------------------------------------ */ - SelectableConnection getSelectableConnection(); - - /* ------------------------------------------------------------ */ - /** Callback when idle. - *

An endpoint is idle if there has been no IO activity for - * {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true. - * @param idleForMs TODO - */ - public void onIdleExpired(long idleForMs); - - /* ------------------------------------------------------------ */ - /** Set if the endpoint should be checked for idleness - */ - public void setCheckForIdle(boolean check); - - /* ------------------------------------------------------------ */ - /** Get if the endpoint should be checked for idleness - */ - public boolean isCheckForIdle(); - - public long getLastNotIdleTimestamp(); - - public void checkForIdle(long now); - -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index c6dd98f9a8b..40f8c5783be 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -337,10 +337,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa protected abstract void endPointOpened(SelectChannelEndPoint endpoint); /* ------------------------------------------------------------ */ - protected abstract void endPointUpgraded(SelectChannelEndPoint endpoint,Connection oldConnection); + protected abstract void endPointUpgraded(SelectChannelEndPoint endpoint,AsyncConnection oldConnection); /* ------------------------------------------------------------------------------- */ - public abstract SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment); + public abstract AbstractAsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment); /* ------------------------------------------------------------ */ /** @@ -700,7 +700,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { for (SelectChannelEndPoint endp:_endPoints.keySet()) { - endp.checkForIdle(idle_now); + endp.checkForIdleOrReadWriteTimeout(idle_now); } } public String toString() {return "Idle-"+super.toString();} @@ -839,7 +839,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { LOG.debug("destroyEndPoint {}",endp); _endPoints.remove(endp); - SelectableConnection connection=endp.getSelectableConnection(); + AsyncConnection connection=endp.getAsyncConnection(); if (connection!=null) connection.onClose(); endPointClosed(endp); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java index ad4a3cd7c59..5452b077283 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java @@ -13,6 +13,8 @@ package org.eclipse.jetty.io; +import static org.eclipse.jetty.io.CompleteIOFuture.COMPLETE; + import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -40,27 +42,62 @@ import org.eclipse.jetty.util.log.Logger; * it's source/sink of encrypted data. It then provides {@link #getAppEndPoint()} to * expose a source/sink of unencrypted data to another connection (eg HttpConnection). */ -public class SslConnection extends SelectableConnection +public class SslConnection extends AbstractAsyncConnection { static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0); private static final ThreadLocal __buffers = new ThreadLocal(); + + private final Lock _lock = new ReentrantLock(); + + private final RecycledIOFuture _appReadFuture = new RecycledIOFuture(true,_lock) + { + @Override + protected void dispatch(Runnable callback) + { + if (_appReadTask!=null) + throw new IllegalStateException(); + _appReadTask=callback; + } + }; + + + private IOFuture.Callback _writeable = new IOFuture.Callback() + { + @Override + public void onReady() + { + _appEndPoint.completeWrite(); + } + + @Override + public void onFail(Throwable cause) + { + LOG.warn("FAILED: "+cause); + } + }; + + private final RecycledIOFuture _appWriteFuture = new RecycledIOFuture(true,_lock); + + private Runnable _appReadTask; private final SSLEngine _engine; private final SSLSession _session; - private SelectableConnection _appConnection; + private AbstractAsyncConnection _appConnection; private final AppEndPoint _appEndPoint; private int _allocations; private SslBuffers _buffers; private ByteBuffer _inNet; private ByteBuffer _inApp; private ByteBuffer _outNet; - private SelectableEndPoint _endp; + private AsyncEndPoint _endp; private boolean _allowRenegotiate=true; private boolean _handshook; - private boolean _ishut; + private boolean _eofIn; private boolean _oshut; + private IOFuture _netReadFuture; + private IOFuture _netWriteFuture; /* ------------------------------------------------------------ */ /* this is a half baked buffer pool @@ -81,13 +118,13 @@ public class SslConnection extends SelectableConnection } /* ------------------------------------------------------------ */ - public SslConnection(SSLEngine engine,SelectableEndPoint endp) + public SslConnection(SSLEngine engine,AsyncEndPoint endp) { this(engine,endp,System.currentTimeMillis()); } /* ------------------------------------------------------------ */ - public SslConnection(SSLEngine engine,SelectableEndPoint endp, long timeStamp) + public SslConnection(SSLEngine engine,AsyncEndPoint endp, long timeStamp) { super(endp); _engine=engine; @@ -98,7 +135,7 @@ public class SslConnection extends SelectableConnection /* ------------------------------------------------------------ */ - public void setAppConnection(SelectableConnection connection) + public void setAppConnection(AbstractAsyncConnection connection) { _appConnection=connection; } @@ -192,12 +229,7 @@ public class SslConnection extends SelectableConnection _lock.unlock(); } } - /* ------------------------------------------------------------ */ - @Override - public boolean isIdle() - { - return _appConnection.isIdle(); - } + /* ------------------------------------------------------------ */ @Override @@ -228,49 +260,35 @@ public class SslConnection extends SelectableConnection /* ------------------------------------------------------------ */ @Override - public void doRead() + public void onReadable() { - LOG.debug("do Read {}",_endp); + LOG.debug("onReadable {}",this); _lock.lock(); try { + _netReadFuture=null; allocateBuffers(); boolean progress=true; - while(progress) + while(progress && _appReadTask==null) { progress=false; - // Fill the input buffer with everything available + // Read into the input network buffer if (!BufferUtil.isFull(_inNet)) - progress|=_endp.fill(_inNet)>0; + { + int filled = _endp.fill(_inNet); + LOG.debug("filled {}",filled); + if (filled<0) + _eofIn=true; + else if (filled>0) + progress=true; + } + // process the data progress|=process(null); - if (BufferUtil.hasContent(_inApp) && _appEndPoint.isReadInterested()) - { - _appEndPoint._readInterested=false; - progress=true; - Runnable task =_appConnection.onReadable(); - - if (task!=null) - { - // We have a task from the application connection. We could - // dispatch this to a thread, but we are likely just to return afterwards. - // So we unlock (so another thread can call doRead if the app blocks) and - // call the app ourselves. - try - { - _lock.unlock(); - task.run(); - } - finally - { - _lock.lock(); - } - } - } } } catch(IOException e) @@ -280,41 +298,20 @@ public class SslConnection extends SelectableConnection finally { releaseBuffers(); - _endp.setReadInterested(_appEndPoint.isReadInterested()); - _endp.setWriteInterested(BufferUtil.hasContent(_outNet)); - LOG.debug("done Read {}",_endp); + if (!_appReadFuture.isComplete() && _netReadFuture==null) + _netReadFuture=scheduleOnReadable(); + + LOG.debug("!onReadable {} {}",this,_netReadFuture); + _lock.unlock(); } - } - - /* ------------------------------------------------------------ */ - @Override - public void doWrite() - { - _lock.lock(); - try + + // Run any ready callback from _appReadFuture in this thread. + if (_appReadTask!=null) { - while (BufferUtil.hasContent(_outNet)) - { - int written = _endp.flush(_outNet); - - if (written>0 && _appEndPoint.isWriteInterested()) - { - Runnable task =_appConnection.onWriteable(); - if (task!=null) - task.run(); - } - } - } - catch(IOException e) - { - LOG.warn(e); - } - finally - { - if (BufferUtil.hasContent(_outNet)) - _endp.setWriteInterested(true); - _lock.unlock(); + Runnable task=_appReadTask; + _appReadTask=null; + task.run(); } } @@ -322,7 +319,10 @@ public class SslConnection extends SelectableConnection private boolean process(ByteBuffer appOut) throws IOException { boolean some_progress=false; - _lock.lock(); + + if (!_lock.tryLock()) + throw new IllegalStateException(); + try { allocateBuffers(); @@ -346,12 +346,12 @@ public class SslConnection extends SelectableConnection case NOT_HANDSHAKING: { // Try unwrapping some application data - if (!BufferUtil.isFull(_inApp) && BufferUtil.hasContent(_inNet) && unwrap()) - progress=true; + if (!BufferUtil.isFull(_inApp) && BufferUtil.hasContent(_inNet)) + progress|=unwrap(); // Try wrapping some application data - if (BufferUtil.hasContent(appOut) && !BufferUtil.isFull(_outNet) && wrap(appOut)) - progress=true; + if (BufferUtil.hasContent(appOut) && !BufferUtil.isFull(_outNet)) + progress|=wrap(appOut); } break; @@ -372,8 +372,8 @@ public class SslConnection extends SelectableConnection // The SSL needs to send some handshake data to the other side if (_handshook && !_allowRenegotiate) _endp.close(); - else if (wrap(appOut)) - progress=true; + else + progress|=wrap(appOut); } break; @@ -382,18 +382,17 @@ public class SslConnection extends SelectableConnection // The SSL needs to receive some handshake data from the other side if (_handshook && !_allowRenegotiate) _endp.close(); - else if (BufferUtil.isEmpty(_inNet) && _endp.isInputShutdown()) + else if (BufferUtil.isEmpty(_inNet) && _eofIn) _endp.close(); - else if (unwrap()) - progress=true; + else + progress|=unwrap(); } break; } // pass on ishut/oshut state - if (_endp.isOpen() && _endp.isInputShutdown() && BufferUtil.isEmpty(_inNet)) + if (_endp.isOpen() && _eofIn && BufferUtil.isEmpty(_inNet)) _engine.closeInbound(); - if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outNet)) _endp.shutdownOutput(); @@ -417,6 +416,9 @@ public class SslConnection extends SelectableConnection private boolean wrap(final ByteBuffer outApp) throws IOException { + if (_netWriteFuture!=null && !_netWriteFuture.isComplete()) + return false; + final SSLEngineResult result; int pos=BufferUtil.flipToFill(_outNet); @@ -461,16 +463,29 @@ public class SslConnection extends SelectableConnection throw new IOException(result.toString()); } - int flushed = _endp.flush(_outNet); + if (BufferUtil.hasContent(_outNet)) + { + IOFuture write =_endp.write(_outNet); + if (write.isComplete()) + return true; + + _netWriteFuture=write; + _netWriteFuture.setCallback(_writeable); + } - return result.bytesConsumed()>0 || result.bytesProduced()>0 || flushed>0; + return result.bytesConsumed()>0 || result.bytesProduced()>0 ; } private boolean unwrap() throws IOException { if (BufferUtil.isEmpty(_inNet)) + { + if (_netReadFuture==null) + _netReadFuture=scheduleOnReadable(); + LOG.debug("{} unwrap read {}",_session,_netReadFuture); return false; - + } + final SSLEngineResult result; int pos = BufferUtil.flipToFill(_inApp); @@ -501,9 +516,11 @@ public class SslConnection extends SelectableConnection { case BUFFER_UNDERFLOW: // need to wait for more net data - _inNet.compact().flip(); - if (_endp.isInputShutdown()) + if (_eofIn) _inNet.clear().limit(0); + else if (_netReadFuture==null) + _netReadFuture=scheduleOnReadable(); + break; case BUFFER_OVERFLOW: @@ -527,8 +544,9 @@ public class SslConnection extends SelectableConnection throw new IOException(result.toString()); } - //if (LOG.isDebugEnabled() && result.bytesProduced()>0) - // LOG.debug("{} unwrapped '{}'",_session,buffer); + // If any bytes were produced and we have an app read waiting, make it ready. + if (result.bytesProduced()>0 && !_appReadFuture.isComplete()) + _appReadFuture.ready(); return result.bytesConsumed()>0 || result.bytesProduced()>0; } @@ -540,7 +558,7 @@ public class SslConnection extends SelectableConnection } /* ------------------------------------------------------------ */ - public SelectableEndPoint getAppEndPoint() + public AsyncEndPoint getAppEndPoint() { return _appEndPoint; } @@ -554,10 +572,14 @@ public class SslConnection extends SelectableConnection /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - public class AppEndPoint implements SelectableEndPoint + public class AppEndPoint extends AbstractEndPoint implements AsyncEndPoint { - boolean _readInterested; - boolean _writeInterested; + ByteBuffer[] _writeBuffers; + + AppEndPoint() + { + super(_endp.getLocalAddress(),_endp.getRemoteAddress()); + } public SSLEngine getSslEngine() { @@ -600,30 +622,6 @@ public class SslConnection extends SelectableConnection } } - @Override - public void shutdownInput() throws IOException - { - LOG.debug("{} ssl endp.ishut!",_session); - // We do not do a closeInput here, as SSL does not support half close. - // isInputShutdown works it out itself from buffer state and underlying endpoint state. - } - - @Override - public boolean isInputShutdown() - { - _lock.lock(); - try - { - return _endp.isInputShutdown() && - !(_inApp!=null&&BufferUtil.hasContent(_inApp)) && - !(_inNet!=null&&BufferUtil.hasContent(_inNet)); - } - finally - { - _lock.unlock(); - } - } - @Override public void close() throws IOException { @@ -642,7 +640,7 @@ public class SslConnection extends SelectableConnection process(null); if (BufferUtil.hasContent(_inApp)) - BufferUtil.flipPutFlip(_inApp,buffer); + BufferUtil.append(_inApp,buffer); } finally { @@ -650,7 +648,7 @@ public class SslConnection extends SelectableConnection } int filled=buffer.remaining()-size; - if (filled==0 && isInputShutdown()) + if (filled==0 && _eofIn) return -1; return filled; } @@ -689,17 +687,6 @@ public class SslConnection extends SelectableConnection return _endp; } - public void flush() throws IOException - { - process(null); - } - - @Override - public void onIdleExpired(long idleForMs) - { - _endp.onIdleExpired(idleForMs); - } - @Override public void setCheckForIdle(boolean check) { @@ -712,40 +699,6 @@ public class SslConnection extends SelectableConnection return _endp.isCheckForIdle(); } - @Override - public InetSocketAddress getLocalAddress() - { - return _endp.getLocalAddress(); - } - - @Override - public InetSocketAddress getRemoteAddress() - { - return _endp.getRemoteAddress(); - } - - @Override - public int getMaxIdleTime() - { - return _endp.getMaxIdleTime(); - } - - @Override - public void setMaxIdleTime(int timeMs) throws IOException - { - _endp.setMaxIdleTime(timeMs); - } - - @Override - public SelectableConnection getSelectableConnection() - { - return _appConnection; - } - - public void setSelectableConnection(SelectableConnection connection) - { - _appConnection=(SelectableConnection)connection; - } @Override public String toString() @@ -759,23 +712,48 @@ public class SslConnection extends SelectableConnection int i = inbound == null? -1 : inbound.remaining(); int o = outbound == null ? -1 : outbound.remaining(); int u = unwrap == null ? -1 : unwrap.remaining(); - return String.format("SSL %s %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}", + return String.format("SSL %s %s i/o/u=%d/%d/%d eof=%b oshut=%b {%s}", super.toString(), _engine.getHandshakeStatus(), i, o, u, - _ishut, _oshut, + _eofIn, _oshut, _appConnection); } + @Override - public void setWriteInterested(boolean interested) + public long getActivityTimestamp() { + return _endp.getActivityTimestamp(); + } + + @Override + public long getCreatedTimeStamp() + { + return _endp.getCreatedTimeStamp(); + } + + @Override + public AsyncConnection getAsyncConnection() + { + return _appConnection; + } + + @Override + public IOFuture read() throws IllegalStateException + { + LOG.debug("{} sslEndp.read()",_session); _lock.lock(); try { - _writeInterested=interested; - if (interested) - _endp.setWriteInterested(true); + // Do we already have application input data? + if (BufferUtil.hasContent(_inApp)) + return COMPLETE; + + // No, we need to schedule a network read + _appReadFuture.recycle(); + scheduleOnReadable(); + return _appReadFuture; } finally { @@ -784,42 +762,64 @@ public class SslConnection extends SelectableConnection } @Override - public boolean isWriteInterested() - { - return _writeInterested; - } - - @Override - public void setReadInterested(boolean interested) + public IOFuture write(ByteBuffer... buffers) { _lock.lock(); try { - _readInterested=interested; - if (interested) - _endp.setReadInterested(true); + if (!_appWriteFuture.isComplete()) + throw new IllegalStateException("previous write not complete"); + + // Try to process all + for (ByteBuffer b : buffers) + { + process(b); + + if (b.hasRemaining()) + { + _writeBuffers=buffers; + _appWriteFuture.recycle(); + return _appWriteFuture; + } + } + return COMPLETE; + } + catch (IOException e) + { + return new CompleteIOFuture(e); } finally { _lock.unlock(); } } - - @Override - public boolean isReadInterested() + + void completeWrite() { - return _readInterested; - } + _lock.lock(); + try + { + if (!_appWriteFuture.isComplete()) + throw new IllegalStateException("previous write not complete"); - @Override - public long getLastNotIdleTimestamp() - { - return _endp.getLastNotIdleTimestamp(); - } - - @Override - public void checkForIdle(long now) - { + // Try to process all + for (ByteBuffer b : _writeBuffers) + { + process(b); + + if (b.hasRemaining()) + return; + } + _appWriteFuture.ready(); + } + catch (IOException e) + { + _appWriteFuture.fail(e); + } + finally + { + _lock.unlock(); + } } } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java index ae02dc36296..d640f53bd77 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java @@ -17,14 +17,14 @@ public class BufferUtilTest ByteBuffer from=BufferUtil.toBuffer("12345"); BufferUtil.clear(to); - assertEquals(5,BufferUtil.flipPutFlip(from,to)); + assertEquals(5,BufferUtil.append(from,to)); assertTrue(BufferUtil.isEmpty(from)); assertEquals("12345",BufferUtil.toString(to)); from=BufferUtil.toBuffer("XX67890ZZ"); from.position(2); - assertEquals(5,BufferUtil.flipPutFlip(from,to)); + assertEquals(5,BufferUtil.append(from,to)); assertEquals(2,from.remaining()); assertEquals("1234567890",BufferUtil.toString(to)); } @@ -37,14 +37,14 @@ public class BufferUtilTest ByteBuffer from=BufferUtil.toBuffer("12345"); BufferUtil.clear(to); - assertEquals(5,BufferUtil.flipPutFlip(from,to)); + assertEquals(5,BufferUtil.append(from,to)); assertTrue(BufferUtil.isEmpty(from)); assertEquals("12345",BufferUtil.toString(to)); from=BufferUtil.toBuffer("XX67890ZZ"); from.position(2); - assertEquals(5,BufferUtil.flipPutFlip(from,to)); + assertEquals(5,BufferUtil.append(from,to)); assertEquals(2,from.remaining()); assertEquals("1234567890",BufferUtil.toString(to)); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/EndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/EndPointTest.java index 4b62edce1c8..71b9ef68670 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/EndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/EndPointTest.java @@ -36,10 +36,8 @@ public abstract class EndPointTest // Client and server are open assertTrue(c.client.isOpen()); - assertFalse(c.client.isInputShutdown()); assertFalse(c.client.isOutputShutdown()); assertTrue(c.server.isOpen()); - assertFalse(c.server.isInputShutdown()); assertFalse(c.server.isOutputShutdown()); // Server sends response and closes output @@ -48,10 +46,8 @@ public abstract class EndPointTest // client server are open, server is oshut assertTrue(c.client.isOpen()); - assertFalse(c.client.isInputShutdown()); assertFalse(c.client.isOutputShutdown()); assertTrue(c.server.isOpen()); - assertFalse(c.server.isInputShutdown()); assertTrue(c.server.isOutputShutdown()); // Client reads response @@ -62,10 +58,8 @@ public abstract class EndPointTest // Client and server are open, server is oshut assertTrue(c.client.isOpen()); - assertFalse(c.client.isInputShutdown()); assertFalse(c.client.isOutputShutdown()); assertTrue(c.server.isOpen()); - assertFalse(c.server.isInputShutdown()); assertTrue(c.server.isOutputShutdown()); // Client reads -1 @@ -75,10 +69,8 @@ public abstract class EndPointTest // Client and server are open, server is oshut, client is ishut assertTrue(c.client.isOpen()); - assertTrue(c.client.isInputShutdown()); assertFalse(c.client.isOutputShutdown()); assertTrue(c.server.isOpen()); - assertFalse(c.server.isInputShutdown()); assertTrue(c.server.isOutputShutdown()); // Client shutsdown output, which is a close because already ishut @@ -86,10 +78,8 @@ public abstract class EndPointTest // Client is closed. Server is open and oshut assertFalse(c.client.isOpen()); - assertTrue(c.client.isInputShutdown()); assertTrue(c.client.isOutputShutdown()); assertTrue(c.server.isOpen()); - assertFalse(c.server.isInputShutdown()); assertTrue(c.server.isOutputShutdown()); // Server reads close @@ -99,10 +89,8 @@ public abstract class EndPointTest // Client and Server are closed assertFalse(c.client.isOpen()); - assertTrue(c.client.isInputShutdown()); assertTrue(c.client.isOutputShutdown()); assertFalse(c.server.isOpen()); - assertTrue(c.server.isInputShutdown()); assertTrue(c.server.isOutputShutdown()); } @@ -121,38 +109,30 @@ public abstract class EndPointTest assertEquals("request",BufferUtil.toString(buffer)); assertTrue(c.client.isOpen()); - assertFalse(c.client.isInputShutdown()); assertFalse(c.client.isOutputShutdown()); assertTrue(c.server.isOpen()); - assertFalse(c.server.isInputShutdown()); assertFalse(c.server.isOutputShutdown()); c.client.close(); assertFalse(c.client.isOpen()); - assertTrue(c.client.isInputShutdown()); assertTrue(c.client.isOutputShutdown()); assertTrue(c.server.isOpen()); - assertFalse(c.server.isInputShutdown()); assertFalse(c.server.isOutputShutdown()); len = c.server.fill(buffer); assertEquals(-1,len); assertFalse(c.client.isOpen()); - assertTrue(c.client.isInputShutdown()); assertTrue(c.client.isOutputShutdown()); assertTrue(c.server.isOpen()); - assertTrue(c.server.isInputShutdown()); assertFalse(c.server.isOutputShutdown()); c.server.shutdownOutput(); assertFalse(c.client.isOpen()); - assertTrue(c.client.isInputShutdown()); assertTrue(c.client.isOutputShutdown()); assertFalse(c.server.isOpen()); - assertTrue(c.server.isInputShutdown()); assertTrue(c.server.isOutputShutdown()); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java index 068bc2f5e17..a7b25bc39a1 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java @@ -11,7 +11,7 @@ import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLSocket; -import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SslConnection; @@ -46,15 +46,14 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest } @Override - protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint) + protected AbstractAsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint) { SSLEngine engine = __sslCtxFactory.newSslEngine(); engine.setUseClientMode(false); SslConnection connection = new SslConnection(engine,endpoint); - SelectableConnection delegate = super.newConnection(channel,connection.getAppEndPoint()); + AbstractAsyncConnection delegate = super.newConnection(channel,connection.getAppEndPoint()); connection.setAppConnection(delegate); - connection.getAppEndPoint().setReadInterested(endpoint.isReadInterested()); return connection; } @@ -73,12 +72,6 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest // SSL does not do half closes } - @Override - public void testBlockIn() throws Exception - { - super.testBlockIn(); - } - @Test public void testTcpClose() throws Exception diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 100187a7745..ebcfd4b210d 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -19,10 +19,11 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.SelectableConnection; +import org.eclipse.jetty.io.AsyncConnection; +import org.eclipse.jetty.io.AbstractAsyncConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; @@ -36,7 +37,7 @@ import org.junit.Test; public class SelectChannelEndPointTest { - protected SelectableEndPoint _lastEndp; + protected volatile AsyncEndPoint _lastEndp; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected SelectorManager _manager = new SelectorManager() @@ -58,22 +59,22 @@ public class SelectChannelEndPointTest } @Override - protected void endPointUpgraded(SelectChannelEndPoint endpoint, Connection oldConnection) + protected void endPointUpgraded(SelectChannelEndPoint endpoint, AsyncConnection oldConnection) { } @Override - public SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment) + public AbstractAsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment) { - return SelectChannelEndPointTest.this.newConnection(channel,endpoint); + AbstractAsyncConnection connection = SelectChannelEndPointTest.this.newConnection(channel,endpoint); + return connection; } @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000); - endp.setReadInterested(true); - endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + endp.setAsyncConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); _lastEndp=endp; return endp; } @@ -81,10 +82,13 @@ public class SelectChannelEndPointTest // Must be volatile or the test may fail spuriously private volatile int _blockAt=0; + private volatile int _writeCount=1; @Before public void startManager() throws Exception { + _writeCount=1; + _lastEndp=null; _connector = ServerSocketChannel.open(); _connector.socket().bind(null); _threadPool.start(); @@ -104,26 +108,29 @@ public class SelectChannelEndPointTest return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort()); } - protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint) + protected AbstractAsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint) { - return new TestConnection(endpoint); + AbstractAsyncConnection connection = new TestConnection(endpoint); + connection.scheduleOnReadable(); + return connection; } - public class TestConnection extends SelectableConnection + public class TestConnection extends AbstractAsyncConnection { ByteBuffer _in = BufferUtil.allocate(32*1024); ByteBuffer _out = BufferUtil.allocate(32*1024); - public TestConnection(SelectableEndPoint endp) + public TestConnection(AsyncEndPoint endp) { super(endp); } @Override - public void doRead() + public void onReadable() { try { + _endp.setCheckForIdle(false); boolean progress=true; while(progress) { @@ -132,37 +139,64 @@ public class SelectChannelEndPointTest // Fill the input buffer with everything available if (!BufferUtil.isFull(_in)) progress|=_endp.fill(_in)>0; + // If the tests wants to block, then block - while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt && blockReadable()) - progress|=_endp.fill(_in)>0; - - // Copy to the out buffer - if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in,_out)>0) - progress=true; - - // Try non blocking write - if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0) - progress=true; - - // Try blocking write - while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out) && blockWriteable()) + while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt) { - if (_endp.flush(_out)>0) - progress=true; + _endp.read().await(); + progress|=_endp.fill(_in)>0; + } + + // Copy to the out buffer + if (BufferUtil.hasContent(_in) && BufferUtil.append(_in,_out)>0) + progress=true; + + // Blocking writes + if (BufferUtil.hasContent(_out)) + { + ByteBuffer out=_out.duplicate(); + BufferUtil.clear(_out); + for (int i=0;i<_writeCount;i++) + { + _endp.write(out.asReadOnlyBuffer()).await(); + } + progress=true; } } } catch(ClosedChannelException e) { - System.err.println(e); + // System.err.println(e); } - catch(IOException e) + catch(ExecutionException e) + { + // Timeout does not close, so echo exception then shutdown + try + { + // System.err.println("Execution Exception! "+e); + _endp.write(BufferUtil.toBuffer("Timeout: "+BufferUtil.toString(_in))).await(); + _endp.shutdownOutput(); + } + catch(Exception e2) + { + e2.printStackTrace(); + } + } + catch(InterruptedException e) + { + // System.err.println(e); + } + catch(Exception e) { e.printStackTrace(); } finally { - _endp.setReadInterested(true); + if (_endp.isOpen()) + { + _endp.setCheckForIdle(true); + scheduleOnReadable(); + } } } @@ -185,12 +219,6 @@ public class SelectChannelEndPointTest public void onClose() { } - - @Override - public boolean isIdle() - { - return false; - } } @@ -201,7 +229,7 @@ public class SelectChannelEndPointTest { Socket client = newClient(); - client.setSoTimeout(500); + client.setSoTimeout(60000); SocketChannel server = _connector.accept(); server.configureBlocking(false); @@ -220,6 +248,7 @@ public class SelectChannelEndPointTest } // wait for read timeout + client.setSoTimeout(500); long start=System.currentTimeMillis(); try { @@ -310,7 +339,7 @@ public class SelectChannelEndPointTest @Test - public void testBlockIn() throws Exception + public void testBlockRead() throws Exception { Socket client = newClient(); @@ -330,6 +359,8 @@ public class SelectChannelEndPointTest clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.flush(); + while(_lastEndp==null); + _lastEndp.setMaxIdleTime(10*specifiedTimeout); Thread.sleep(2 * specifiedTimeout); @@ -405,7 +436,64 @@ public class SelectChannelEndPointTest assertFalse(_lastEndp.isOpen()); } + + + @Test + public void testBlockedReadIdle() throws Exception + { + Socket client = newClient(); + OutputStream clientOutputStream = client.getOutputStream(); + + client.setSoTimeout(5000); + SocketChannel server = _connector.accept(); + server.configureBlocking(false); + + _manager.register(server); + + // Write client to server + clientOutputStream.write("HelloWorld".getBytes("UTF-8")); + + // Verify echo server to client + for (char c : "HelloWorld".toCharArray()) + { + int b = client.getInputStream().read(); + assertTrue(b>0); + assertEquals(c,(char)b); + } + + // Set Max idle + _lastEndp.setMaxIdleTime(500); + + // Write 8 and cause block waiting for 10 + _blockAt=10; + clientOutputStream.write("12345678".getBytes("UTF-8")); + clientOutputStream.flush(); + + // read until idle shutdown received + long start=System.currentTimeMillis(); + int b=client.getInputStream().read(); + assertEquals('T',b); + long idle=System.currentTimeMillis()-start; + assertTrue(idle>400); + assertTrue(idle<2000); + + for (char c : "imeout: 12345678".toCharArray()) + { + b = client.getInputStream().read(); + assertTrue(b>0); + assertEquals(c,(char)b); + } + + // But endpoint is still open. + assertTrue(_lastEndp.isOpen()); + + // Wait for another idle callback + Thread.sleep(2000); + // endpoint is closed. + + assertFalse(_lastEndp.isOpen()); + } @Test @@ -478,4 +566,51 @@ public class SelectChannelEndPointTest assertTrue(latch.await(100,TimeUnit.SECONDS)); } + + + @Test + public void testWriteBlock() throws Exception + { + Socket client = newClient(); + + client.setSoTimeout(10000); + + SocketChannel server = _connector.accept(); + server.configureBlocking(false); + + _manager.register(server); + + // Write client to server + _writeCount=10000; + String data="Now is the time for all good men to come to the aid of the party"; + client.getOutputStream().write(data.getBytes("UTF-8")); + + for (int i=0;i<_writeCount;i++) + { + // Verify echo server to client + for (int j=0;j0); + assertEquals("test-"+i+"/"+j,c,(char)b); + } + if (i==0) + _lastEndp.setMaxIdleTime(60000); + if (i%100==0) + TimeUnit.MILLISECONDS.sleep(10); + } + + + client.close(); + + int i=0; + while (server.isOpen()) + { + assert(i++<10); + Thread.sleep(10); + } + + } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 8b69863b37b..b898772700b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -29,7 +29,7 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers.Type; -import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.component.AggregateLifeCycle; @@ -1167,7 +1167,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht } /* ------------------------------------------------------------ */ - protected void connectionOpened(Connection connection) + protected void connectionOpened(AsyncConnection connection) { if (_statsStartedAt.get() == -1) return; @@ -1176,13 +1176,13 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht } /* ------------------------------------------------------------ */ - protected void connectionUpgraded(Connection oldConnection, Connection newConnection) + protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection) { _requestStats.set((oldConnection instanceof HttpConnection)?((HttpConnection)oldConnection).getHttpChannel().getRequests():0); } /* ------------------------------------------------------------ */ - protected void connectionClosed(Connection connection) + protected void connectionClosed(AsyncConnection connection) { if (_statsStartedAt.get() == -1) return; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java index 9b1e9e3d6d9..1c8c38b0401 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java @@ -239,9 +239,6 @@ public interface Connector extends LifeCycle */ void setMaxIdleTime(int ms); - /* ------------------------------------------------------------ */ - int getLowResourceMaxIdleTime(); - void setLowResourceMaxIdleTime(int ms); /* ------------------------------------------------------------ */ /** @@ -357,25 +354,4 @@ public interface Connector extends LifeCycle public long getStatsOnMs(); - /* ------------------------------------------------------------ */ - /** Check if low on resources. - * For most connectors, low resources is measured by calling - * {@link ThreadPool#isLowOnThreads()} on the connector threadpool - * or the server threadpool if there is no connector threadpool. - *

- * For blocking connectors, low resources is used to trigger - * usage of {@link #getLowResourceMaxIdleTime()} for the timeout - * of an idle connection. - *

- * for non-blocking connectors, the number of connections is - * used instead of this method, to select the timeout of an - * idle connection. - *

- * For all connectors, low resources is used to trigger the - * usage of {@link #getLowResourceMaxIdleTime()} for read and - * write operations. - * - * @return true if this connector is low on resources. - */ - public boolean isLowResources(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 960b898c386..8f114ce5348 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -39,7 +39,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MimeTypes; -import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.io.UncheckedPrintWriter; @@ -75,7 +75,7 @@ public abstract class HttpChannel private int _requests; private final Server _server; - private final Connection _connection; + private final AsyncConnection _connection; private final HttpURI _uri; private final HttpFields _requestFields; @@ -111,7 +111,7 @@ public abstract class HttpChannel /** Constructor * */ - public HttpChannel(Server server,Connection connection) + public HttpChannel(Server server,AsyncConnection connection) { _server = server; _connection = connection; @@ -124,7 +124,7 @@ public abstract class HttpChannel } /* ------------------------------------------------------------ */ - public Connection getConnection() + public AsyncConnection getConnection() { return _connection; } @@ -452,7 +452,7 @@ public abstract class HttpChannel /* ------------------------------------------------------------ */ /** - * @see org.eclipse.jetty.io.Connection#isSuspended() + * @see org.eclipse.jetty.io.AsyncConnection#isSuspended() */ public boolean isSuspended() { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 3b1ef45500b..efde5795b3b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -23,10 +23,10 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpGenerator.Action; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.SelectableConnection; -import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.AbstractAsyncConnection; +import org.eclipse.jetty.io.AsyncConnection; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.io.SelectableEndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -34,7 +34,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task; /** */ -public class HttpConnection extends SelectableConnection +public class HttpConnection extends AbstractAsyncConnection { private static final Logger LOG = Log.getLogger(HttpConnection.class); @@ -71,7 +71,7 @@ public class HttpConnection extends SelectableConnection /** Constructor * */ - public HttpConnection(Connector connector, SelectableEndPoint endpoint, Server server) + public HttpConnection(Connector connector, AsyncEndPoint endpoint, Server server) { super(endpoint); _connector = connector; @@ -145,23 +145,6 @@ public class HttpConnection extends SelectableConnection return _generator; } - /* ------------------------------------------------------------ */ - @Override - public boolean isIdle() - { - return _parser.isIdle() && _generator.isIdle(); - } - - /* ------------------------------------------------------------ */ - @Override - public int getMaxIdleTime() - { - if (_connector.isLowResources() && _endp.getMaxIdleTime()==_connector.getMaxIdleTime()) - return _connector.getLowResourceMaxIdleTime(); - if (_endp.getMaxIdleTime()>0) - return _endp.getMaxIdleTime(); - return _connector.getMaxIdleTime(); - } /* ------------------------------------------------------------ */ @Override @@ -176,9 +159,9 @@ public class HttpConnection extends SelectableConnection /* ------------------------------------------------------------ */ @Override - public void doRead() + public void onReadable() { - Connection connection = this; + AsyncConnection connection = this; boolean progress=true; try @@ -201,7 +184,7 @@ public class HttpConnection extends SelectableConnection if (BufferUtil.hasContent(_requestBuffer) && _parser.parseNext(_requestBuffer)) { // don't check for idle while dispatched (unless blocking IO is done). - getSelectableEndPoint().setCheckForIdle(false); + getEndPoint().setCheckForIdle(false); try { _channel.handleRequest(); @@ -211,7 +194,7 @@ public class HttpConnection extends SelectableConnection // If we are not suspended if (!_channel.getRequest().getAsyncContinuation().isAsyncStarted()) // reenable idle checking unless request is suspended - getSelectableEndPoint().setCheckForIdle(true); + getEndPoint().setCheckForIdle(true); } } @@ -236,7 +219,7 @@ public class HttpConnection extends SelectableConnection // look for a switched connection instance? if (_channel.getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) { - Connection switched=(Connection)_channel.getRequest().getAttribute("org.eclipse.jetty.io.Connection"); + AsyncConnection switched=(AsyncConnection)_channel.getRequest().getAttribute("org.eclipse.jetty.io.Connection"); if (switched!=null) connection=switched; } @@ -426,42 +409,31 @@ public class HttpConnection extends SelectableConnection switch(_toFlush) { case 10: - _endp.flush(_responseHeader,_responseBuffer); - _toFlush=(BufferUtil.hasContent(_responseHeader)?8:0)+(BufferUtil.hasContent(_responseBuffer)?2:0); + _endp.write(_responseHeader,_responseBuffer); break; case 9: - _endp.flush(_responseHeader,_content); - _toFlush=(BufferUtil.hasContent(_responseHeader)?8:0)+(BufferUtil.hasContent(_content)?1:0); - if (_toFlush==0) - _content=null; + _endp.write(_responseHeader,_content); + _content=null; break; case 8: - _endp.flush(_responseHeader); - _toFlush=(BufferUtil.hasContent(_responseHeader)?8:0); + _endp.write(_responseHeader); break; case 6: - _endp.flush(_chunk,_responseBuffer); - _toFlush=(BufferUtil.hasContent(_chunk)?4:0)+(BufferUtil.hasContent(_responseBuffer)?2:0); + _endp.write(_chunk,_responseBuffer); break; case 5: - _endp.flush(_chunk,_content); - _toFlush=(BufferUtil.hasContent(_chunk)?4:0)+(BufferUtil.hasContent(_content)?1:0); - if (_toFlush==0) - _content=null; + _endp.write(_chunk,_content); + _content=null; break; case 4: - _endp.flush(_chunk); - _toFlush=(BufferUtil.hasContent(_chunk)?4:0); + _endp.write(_chunk); break; case 2: - _endp.flush(_responseBuffer); - _toFlush=(BufferUtil.hasContent(_responseBuffer)?2:0); + _endp.write(_responseBuffer); break; case 1: - _endp.flush(_content); - _toFlush=(BufferUtil.hasContent(_content)?1:0); - if (_toFlush==0) - _content=null; + _endp.write(_content); + _content=null; break; case 0: default: @@ -486,13 +458,20 @@ public class HttpConnection extends SelectableConnection /* ------------------------------------------------------------ */ @Override - public void onInputShutdown() throws IOException + public void onInputShutdown() { // If we don't have a committed response and we are not suspended if (_generator.isIdle() && !_channel.getRequest().getAsyncContinuation().isSuspended()) { // then no more can happen, so close. - _endp.close(); + try + { + _endp.close(); + } + catch (IOException e) + { + LOG.debug(e); + } } // Make idle parser seek EOF @@ -513,7 +492,7 @@ public class HttpConnection extends SelectableConnection @Override public long getMaxIdleTime() { - return HttpConnection.this.getMaxIdleTime(); + return getEndPoint().getMaxIdleTime(); } @Override @@ -661,7 +640,7 @@ public class HttpConnection extends SelectableConnection try { // Wait until we can read - blockReadable(); + getEndPoint().blockReadable(); // We will need a buffer to read into if (_requestBuffer==null) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index c85b347dd12..7a21dafcc9d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -20,7 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.ByteArrayEndPoint; -import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -107,7 +107,7 @@ public class LocalConnector extends AbstractConnector ByteArrayEndPoint endPoint = new ByteArrayEndPoint(_requestsBuffer.asArray(), 1024) { @Override - public void setConnection(Connection connection) + public void setConnection(AsyncConnection connection) { if (getConnection()!=null && connection!=getConnection()) connectionUpgraded(getConnection(),connection); @@ -127,8 +127,8 @@ public class LocalConnector extends AbstractConnector { while (true) { - final Connection con = endPoint.getConnection(); - final Connection next = con.handle(); + final AsyncConnection con = endPoint.getConnection(); + final AsyncConnection next = con.handle(); if (next!=con) { endPoint.setConnection(next); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index 668c0c54990..d734c45f94b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -21,7 +21,7 @@ import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint; -import org.eclipse.jetty.io.SelectableEndPoint; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; @@ -350,7 +350,7 @@ public class ConnectHandler extends HandlerWrapper { } - private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException + private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, AsyncConnection connection) throws IOException { // Set the new connection as request attribute and change the status to 101 // so that Jetty understands that it has to upgrade the connection @@ -428,7 +428,7 @@ public class ConnectHandler extends HandlerWrapper } @Override - public Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) + public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment; proxyToServer.setTimeStamp(System.currentTimeMillis()); @@ -450,19 +450,19 @@ public class ConnectHandler extends HandlerWrapper } @Override - protected void endPointClosed(SelectableEndPoint endpoint) + protected void endPointClosed(AsyncEndPoint endpoint) { } @Override - protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) + protected void endPointUpgraded(ConnectedEndPoint endpoint, AsyncConnection oldConnection) { } } - public class ProxyToServerConnection implements Connection + public class ProxyToServerConnection implements AsyncConnection { private final CountDownLatch _ready = new CountDownLatch(1); private final ByteBuffer _buffer = new IndirectNIOBuffer(1024); @@ -487,7 +487,7 @@ public class ConnectHandler extends HandlerWrapper return builder.append(")").toString(); } - public Connection handle() throws IOException + public AsyncConnection handle() throws IOException { _logger.debug("{}: begin reading from server", this); try @@ -677,7 +677,7 @@ public class ConnectHandler extends HandlerWrapper } } - public class ClientToProxyConnection implements Connection + public class ClientToProxyConnection implements AsyncConnection { private final ByteBuffer _buffer = new IndirectNIOBuffer(1024); private final ConcurrentMap _context; @@ -704,7 +704,7 @@ public class ConnectHandler extends HandlerWrapper return builder.append(")").toString(); } - public Connection handle() throws IOException + public AsyncConnection handle() throws IOException { _logger.debug("{}: begin reading from client", this); try diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java index 73acefe707f..caae7fd67bf 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java @@ -23,7 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint; -import org.eclipse.jetty.io.SelectableEndPoint; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.SelectorManager; /** @@ -55,13 +55,13 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key) throws IOException { NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, _maxIdleTime, listeners); - endPoint.setSelectableConnection(selectSet.getManager().newConnection(channel,endPoint, key.attachment())); + endPoint.setAsyncConnection(selectSet.getManager().newConnection(channel,endPoint, key.attachment())); endPoint.notifyOpened(); return endPoint; } @Override - protected void endPointClosed(SelectableEndPoint endpoint) + protected void endPointClosed(AsyncEndPoint endpoint) { super.endPointClosed(endpoint); ((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index 05850212263..3341dab03f2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -21,11 +21,11 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import org.eclipse.jetty.continuation.Continuation; -import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint; -import org.eclipse.jetty.io.SelectableConnection; -import org.eclipse.jetty.io.SelectableEndPoint; +import org.eclipse.jetty.io.AbstractAsyncConnection; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.server.HttpConnection; @@ -244,18 +244,18 @@ public class SelectChannelConnector extends AbstractNIOConnector protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); - endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + endp.setAsyncConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); return endp; } /* ------------------------------------------------------------------------------- */ - protected void endPointClosed(SelectableEndPoint endpoint) + protected void endPointClosed(AsyncEndPoint endpoint) { connectionClosed(endpoint.getSelectableConnection()); } /* ------------------------------------------------------------------------------- */ - protected SelectableConnection newConnection(SocketChannel channel,final SelectableEndPoint endpoint) + protected AbstractAsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint) { return new HttpConnection(SelectChannelConnector.this,endpoint,getServer()); } @@ -289,13 +289,13 @@ public class SelectChannelConnector extends AbstractNIOConnector } @Override - protected void endPointUpgraded(SelectChannelEndPoint endpoint, Connection oldConnection) + protected void endPointUpgraded(SelectChannelEndPoint endpoint, AsyncConnection oldConnection) { connectionUpgraded(oldConnection,endpoint.getSelectableConnection()); } @Override - public SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment) + public AbstractAsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment) { return SelectChannelConnector.this.newConnection(channel,endpoint); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java index 5fc717b3c36..a27e1651375 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java @@ -542,13 +542,13 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements /* ------------------------------------------------------------------------------- */ @Override - protected Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint) + protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint) { try { SSLEngine engine = createSSLEngine(channel); SslConnection connection = newSslConnection(endpoint, engine); - Connection delegate = newPlainConnection(channel, connection.getAppEndPoint()); + AsyncConnection delegate = newPlainConnection(channel, connection.getAppEndPoint()); connection.getAppEndPoint().setConnection(delegate); connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate()); return connection; @@ -559,7 +559,7 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements } } - protected Connection newPlainConnection(SocketChannel channel, AsyncEndPoint endPoint) + protected AsyncConnection newPlainConnection(SocketChannel channel, AsyncEndPoint endPoint) { return super.newConnection(channel, endPoint); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AbstractConnectorTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AbstractConnectorTest.java index 8c69d6167c8..b10b128d446 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AbstractConnectorTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AbstractConnectorTest.java @@ -59,7 +59,7 @@ public class AbstractConnectorTest _server = new Server(); _connector = new SelectChannelConnector() { - public void connectionClosed(Connection connection) + public void connectionClosed(AsyncConnection connection) { super.connectionClosed(connection); _closed.countDown(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java index 4a29118ae64..afb9bf3e43e 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java @@ -76,7 +76,7 @@ public class HttpWriterTest AbstractHttpConnection connection = new AbstractHttpConnection(null,endp,new Server(),null,generator,null) { @Override - public Connection handle() throws IOException + public AsyncConnection handle() throws IOException { return null; } @@ -169,7 +169,7 @@ public class HttpWriterTest AbstractHttpConnection connection = new AbstractHttpConnection(null,endp,new Server(),null,hb,null) { @Override - public Connection handle() throws IOException + public AsyncConnection handle() throws IOException { return null; } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index 91a8af7121e..df78045dee5 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -622,7 +622,7 @@ public class ResponseTest } @Override - public Connection handle() throws IOException + public AsyncConnection handle() throws IOException { return this; } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java index 091a2c325b6..b45e0242662 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java @@ -6,14 +6,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.SelectableEndPoint; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.util.IO; import org.junit.Test; public class SelectChannelAsyncContextTest extends LocalAsyncContextTest { - volatile SelectableEndPoint _endp; + volatile AsyncEndPoint _endp; @Override protected Connector initConnector() @@ -24,7 +24,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest public void customize(EndPoint endpoint, Request request) throws IOException { super.customize(endpoint,request); - _endp=(SelectableEndPoint)endpoint; + _endp=(AsyncEndPoint)endpoint; } }; @@ -54,7 +54,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest try { TimeUnit.MILLISECONDS.sleep(200); - SelectableEndPoint endp=_endp; + AsyncEndPoint endp=_endp; if (endp!=null && endp.isOpen()) endp.asyncDispatch(); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index c6aedaaacc2..da2b21a82e1 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -271,7 +271,7 @@ public class BufferUtil * @param to Buffer to put bytes to in flush mode. The buffer is flipToFill before the put and flipToFlush after. * @return number of bytes moved */ - public static int flipPutFlip(ByteBuffer from, ByteBuffer to) + public static int append(ByteBuffer from, ByteBuffer to) { int pos= flipToFill(to); try @@ -636,6 +636,9 @@ public class BufferUtil return "null"; StringBuilder buf = new StringBuilder(); + buf.append(buffer.getClass().getSimpleName()); + buf.append("@"); + buf.append(Integer.toHexString(buffer.hashCode())); buf.append("[p="); buf.append(buffer.position()); buf.append(",l=");