diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java similarity index 87% rename from jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index 3fc4981137e..2479e7cb5c7 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -23,26 +23,26 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** - *

A convenience base implementation of {@link AsyncConnection}.

- *

This class uses the capabilities of the {@link AsyncEndPoint} API to provide a + *

A convenience base implementation of {@link Connection}.

+ *

This class uses the capabilities of the {@link EndPoint} API to provide a * more traditional style of async reading. A call to {@link #fillInterested()} * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)} * as appropriate.

*/ -public abstract class AbstractAsyncConnection implements AsyncConnection +public abstract class AbstractConnection implements Connection { - private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class); + private static final Logger LOG = Log.getLogger(AbstractConnection.class); private final AtomicBoolean _readInterested = new AtomicBoolean(); - private final AsyncEndPoint _endp; + private final EndPoint _endp; private final Callback _readCallback; - public AbstractAsyncConnection(AsyncEndPoint endp, Executor executor) + public AbstractConnection(EndPoint endp, Executor executor) { this(endp, executor, false); } - public AbstractAsyncConnection(AsyncEndPoint endp, Executor executor, final boolean executeOnlyFailure) + public AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnlyFailure) { if (executor == null) throw new IllegalArgumentException("Executor must not be null!"); @@ -72,7 +72,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), AbstractAsyncConnection.this.hashCode()); + return String.format("%s@%x", getClass().getSimpleName(), AbstractConnection.this.hashCode()); } }; } @@ -139,7 +139,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection } @Override - public AsyncEndPoint getEndPoint() + public EndPoint getEndPoint() { return _endp; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index e1918dd1b43..1abe9f9bdcf 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -1,14 +1,23 @@ package org.eclipse.jetty.io; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ReadPendingException; +import java.nio.channels.WritePendingException; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; public abstract class AbstractEndPoint implements EndPoint { + private static final Logger LOG = Log.getLogger(AbstractEndPoint.class); private final long _created=System.currentTimeMillis(); private final InetSocketAddress _local; private final InetSocketAddress _remote; private volatile long _idleTimeout; private volatile long _idleTimestamp=System.currentTimeMillis(); + private volatile Connection _connection; protected AbstractEndPoint(InetSocketAddress local,InetSocketAddress remote) @@ -58,6 +67,30 @@ public abstract class AbstractEndPoint implements EndPoint _idleTimestamp=System.currentTimeMillis(); } + @Override + public Connection getConnection() + { + return _connection; + } + + @Override + public void setConnection(Connection connection) + { + _connection = connection; + } + + @Override + public void onOpen() + { + LOG.debug("onOpen {}",this); + } + + @Override + public void onClose() + { + LOG.debug("onClose {}",this); + } + @Override public String toString() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java deleted file mode 100644 index 52d4b924263..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ /dev/null @@ -1,172 +0,0 @@ -package org.eclipse.jetty.io; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; - -public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint, Runnable -{ - private static final Logger LOG = Log.getLogger(AsyncByteArrayEndPoint.class); - - private final ReadInterest _readInterest = new ReadInterest() - { - @Override - protected boolean needsFill() throws IOException - { - if (_closed) - throw new ClosedChannelException(); - return _in == null || BufferUtil.hasContent(_in); - } - }; - private final WriteFlusher _writeFlusher = new WriteFlusher(this) - { - @Override - protected void onIncompleteFlushed() - { - // Don't need to do anything here as takeOutput does the signalling. - } - }; - private final AtomicReference> _timeout = new AtomicReference<>(); - private final ScheduledExecutorService _scheduler; - private volatile AsyncConnection _connection; - - public AsyncByteArrayEndPoint(ScheduledExecutorService scheduler, long idleTimeout) - { - _scheduler = scheduler; - setIdleTimeout(idleTimeout); - } - - public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, byte[] input, int outputSize) - { - super(input, outputSize); - _scheduler = timer; - setIdleTimeout(idleTimeout); - } - - public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, String input, int outputSize) - { - super(input, outputSize); - _scheduler = timer; - setIdleTimeout(idleTimeout); - } - - @Override - public void setIdleTimeout(long idleTimeout) - { - super.setIdleTimeout(idleTimeout); - scheduleIdleTimeout(idleTimeout); - } - - private void scheduleIdleTimeout(long delay) - { - Future newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(this, delay, TimeUnit.MILLISECONDS) : null; - Future oldTimeout = _timeout.getAndSet(newTimeout); - if (oldTimeout != null) - oldTimeout.cancel(false); - } - - @Override - public void run() - { - if (isOpen()) - { - long idleTimestamp = getIdleTimestamp(); - long idleTimeout = getIdleTimeout(); - long idleElapsed = System.currentTimeMillis() - idleTimestamp; - long idleLeft = idleTimeout - idleElapsed; - - if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) - { - if (idleTimestamp != 0 && idleTimeout > 0) - { - if (idleLeft < 0) - { - if (isOutputShutdown()) - close(); - notIdle(); - - TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms"); - _readInterest.failed(timeout); - _writeFlusher.failed(timeout); - } - } - } - scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout); - } - } - - @Override - public void setInput(ByteBuffer in) - { - super.setInput(in); - if (in == null || BufferUtil.hasContent(in)) - _readInterest.readable(); - } - - @Override - public ByteBuffer takeOutput() - { - ByteBuffer b = super.takeOutput(); - _writeFlusher.completeWrite(); - return b; - } - - @Override - public void setOutput(ByteBuffer out) - { - super.setOutput(out); - _writeFlusher.completeWrite(); - } - - @Override - public void reset() - { - _readInterest.close(); - _writeFlusher.close(); - super.reset(); - } - - @Override - public void fillInterested(C context, Callback callback) throws IllegalStateException - { - _readInterest.register(context, callback); - } - - @Override - public void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException - { - _writeFlusher.write(context, callback, buffers); - } - - @Override - public AsyncConnection getAsyncConnection() - { - return _connection; - } - - @Override - public void setAsyncConnection(AsyncConnection connection) - { - _connection = connection; - } - - @Override - public void onOpen() - { - } - - @Override - public void onClose() - { - } -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java deleted file mode 100644 index 1d770dc0416..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java +++ /dev/null @@ -1,129 +0,0 @@ -// ======================================================================== -// Copyright (c) 2004-2012 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== - -package org.eclipse.jetty.io; - -import java.nio.ByteBuffer; -import java.nio.channels.ReadPendingException; -import java.nio.channels.WritePendingException; - -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.ExecutorCallback; -import org.eclipse.jetty.util.FutureCallback; - -/** - *

{@link AsyncEndPoint} add asynchronous scheduling methods to {@link EndPoint}.

- *

The design of these has been influenced by NIO.2 Futures and Completion - * handlers, but does not use those actual interfaces because they have - * some inefficiencies.

- *

This class will frequently be used in conjunction with some of the utility - * implementations of {@link Callback}, such as {@link FutureCallback} and - * {@link ExecutorCallback}. Examples are:

- * - *

Blocking Read

- *

A FutureCallback can be used to block until an endpoint is ready to be filled - * from: - *

- * FutureCallback<String> future = new FutureCallback<>();
- * endpoint.fillInterested("ContextObj",future);
- * ...
- * String context = future.get(); // This blocks
- * int filled=endpoint.fill(mybuffer);
- * 

- * - *

Dispatched Read

- *

By using a different callback, the read can be done asynchronously in its own dispatched thread: - *

- * endpoint.fillInterested("ContextObj",new ExecutorCallback<String>(executor)
- * {
- *   public void onCompleted(String context)
- *   {
- *     int filled=endpoint.fill(mybuffer);
- *     ...
- *   }
- *   public void onFailed(String context,Throwable cause) {...}
- * });
- * 

- *

The executor callback can also be customized to not dispatch in some circumstances when - * it knows it can use the callback thread and does not need to dispatch.

- * - *

Blocking Write

- *

The write contract is that the callback complete is not called until all data has been - * written or there is a failure. For blocking this looks like: - *

- * FutureCallback<String> future = new FutureCallback<>();
- * endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
- * String context = future.get(); // This blocks
- * 

- * - *

Dispatched Write

- *

Note also that multiple buffers may be passed in write so that gather writes - * can be done: - *

- * endpoint.write("ContextObj",new ExecutorCallback<String>(executor)
- * {
- *   public void onCompleted(String context)
- *   {
- *     int filled=endpoint.fill(mybuffer);
- *     ...
- *   }
- *   public void onFailed(String context,Throwable cause) {...}
- * },headerBuffer,contentBuffer);
- * 

- */ -public interface AsyncEndPoint extends EndPoint -{ - /** - *

Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.

- * - * @param context the context to return via the callback - * @param callback the callback to call when an error occurs or we are readable. - * @throws ReadPendingException if another read operation is concurrent. - */ - void fillInterested(C context, Callback callback) throws ReadPendingException; - - /** - *

Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either - * all the data has been flushed or an error occurs.

- * - * @param context the context to return via the callback - * @param callback the callback to call when an error occurs or the write completed. - * @param buffers one or more {@link ByteBuffer}s that will be flushed. - * @throws WritePendingException if another write operation is concurrent. - */ - void write(C context, Callback callback, ByteBuffer... buffers) throws WritePendingException; - - /** - * @return the {@link AsyncConnection} associated with this {@link AsyncEndPoint} - * @see #setAsyncConnection(AsyncConnection) - */ - AsyncConnection getAsyncConnection(); - - /** - * @param connection the {@link AsyncConnection} associated with this {@link AsyncEndPoint} - * @see #getAsyncConnection() - */ - void setAsyncConnection(AsyncConnection connection); - - /** - *

Callback method invoked when this {@link AsyncEndPoint} is opened.

- * @see #onClose() - */ - void onOpen(); - - /** - *

Callback method invoked when this {@link AsyncEndPoint} is close.

- * @see #onOpen() - */ - void onClose(); -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index b942f9e7b41..09584d8d583 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -16,9 +16,16 @@ package org.eclipse.jetty.io; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.StringUtil; @@ -30,12 +37,35 @@ public class ByteArrayEndPoint extends AbstractEndPoint { public final static InetSocketAddress NOIP=new InetSocketAddress(0); + private final AtomicReference> _timeout = new AtomicReference<>(); + private final ScheduledExecutorService _scheduler; + protected ByteBuffer _in; protected ByteBuffer _out; protected boolean _ishut; protected boolean _oshut; protected boolean _closed; protected boolean _growOutput; + + + private final ReadInterest _readInterest = new ReadInterest() + { + @Override + protected boolean needsFill() throws IOException + { + if (_closed) + throw new ClosedChannelException(); + return _in == null || BufferUtil.hasContent(_in); + } + }; + private final WriteFlusher _writeFlusher = new WriteFlusher(this) + { + @Override + protected void onIncompleteFlushed() + { + // Don't need to do anything here as takeOutput does the signalling. + } + }; /* ------------------------------------------------------------ */ /** @@ -43,9 +73,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint */ public ByteArrayEndPoint() { - super(NOIP,NOIP); - _in=BufferUtil.EMPTY_BUFFER; - _out=BufferUtil.allocate(1024); + this(null,0,null,null); } /* ------------------------------------------------------------ */ @@ -54,9 +82,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint */ public ByteArrayEndPoint(byte[] input, int outputSize) { - super(NOIP,NOIP); - _in=input==null?null:ByteBuffer.wrap(input); - _out=BufferUtil.allocate(outputSize); + this(null,0,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize)); } /* ------------------------------------------------------------ */ @@ -65,10 +91,35 @@ public class ByteArrayEndPoint extends AbstractEndPoint */ public ByteArrayEndPoint(String input, int outputSize) { - super(NOIP,NOIP); - setInput(input); - _out=BufferUtil.allocate(outputSize); + this(null,0,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize)); } + + public ByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeoutMs) + { + this(timer,idleTimeoutMs,null,null); + } + + public ByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeoutMs, byte[] input, int outputSize) + { + this(timer,idleTimeoutMs,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize)); + } + + public ByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeoutMs, String input, int outputSize) + { + this(timer,idleTimeoutMs,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize)); + } + + public ByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output) + { + super(NOIP,NOIP); + _in=input==null?BufferUtil.EMPTY_BUFFER:input; + _out=output==null?BufferUtil.allocate(1024):output; + _scheduler = timer; + setIdleTimeout(idleTimeoutMs); + } + + + /* ------------------------------------------------------------ */ @@ -95,6 +146,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void setInput(ByteBuffer in) { _in = in; + if (in == null || BufferUtil.hasContent(in)) + _readInterest.readable(); } /* ------------------------------------------------------------ */ @@ -135,6 +188,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint { ByteBuffer b=_out; _out=BufferUtil.allocate(b.capacity()); + _writeFlusher.completeWrite(); return b; } @@ -164,6 +218,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void setOutput(ByteBuffer out) { _out = out; + _writeFlusher.completeWrite(); } /* ------------------------------------------------------------ */ @@ -195,7 +250,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint } /* ------------------------------------------------------------ */ - private void shutdownInput() throws IOException + private void shutdownInput() { _ishut=true; if (_oshut) @@ -222,7 +277,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void close() { _closed=true; - // TODO: for sbordet to fix - onClose(); Moved invocation to AsycnByteArrayEndPoint for now (GW) } /* ------------------------------------------------------------ */ @@ -299,6 +353,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint */ public void reset() { + _readInterest.close(); + _writeFlusher.close(); _ishut=false; _oshut=false; _closed=false; @@ -334,5 +390,73 @@ public class ByteArrayEndPoint extends AbstractEndPoint _growOutput=growOutput; } + /* ------------------------------------------------------------ */ + @Override + public void setIdleTimeout(long idleTimeout) + { + super.setIdleTimeout(idleTimeout); + scheduleIdleTimeout(idleTimeout); + } + + /* ------------------------------------------------------------ */ + private void scheduleIdleTimeout(long delay) + { + if (delay>0 && _scheduler==null) + throw new IllegalStateException(); + + Future newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(_timeoutTask, delay, TimeUnit.MILLISECONDS) : null; + Future oldTimeout = _timeout.getAndSet(newTimeout); + if (oldTimeout != null) + oldTimeout.cancel(false); + } + + /* ------------------------------------------------------------ */ + private final Runnable _timeoutTask = new Runnable() + { + @Override + public void run() + { + if (isOpen()) + { + long idleTimestamp = getIdleTimestamp(); + long idleTimeout = getIdleTimeout(); + long idleElapsed = System.currentTimeMillis() - idleTimestamp; + long idleLeft = idleTimeout - idleElapsed; + + if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) + { + if (idleTimestamp != 0 && idleTimeout > 0) + { + if (idleLeft < 0) + { + if (isOutputShutdown()) + close(); + notIdle(); + + TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms"); + _readInterest.failed(timeout); + _writeFlusher.failed(timeout); + } + } + } + scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout); + } + } + }; + + + /* ------------------------------------------------------------ */ + @Override + public void fillInterested(C context, Callback callback) throws IllegalStateException + { + _readInterest.register(context, callback); + } + + /* ------------------------------------------------------------ */ + @Override + public void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException + { + _writeFlusher.write(context, callback, buffers); + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index 1aebec2784a..4e4d11c83ca 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -22,9 +22,12 @@ import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; import java.nio.channels.ClosedChannelException; import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ReadPendingException; import java.nio.channels.SocketChannel; +import java.nio.channels.WritePendingException; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -194,4 +197,16 @@ public class ChannelEndPoint extends AbstractEndPoint { return _socket; } + + @Override + public void fillInterested(C context, Callback callback) throws ReadPendingException + { + throw new UnsupportedOperationException(); + } + + @Override + public void write(C context, Callback callback, ByteBuffer... buffers) throws WritePendingException + { + throw new UnsupportedOperationException(); + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java similarity index 55% rename from jetty-io/src/main/java/org/eclipse/jetty/io/AsyncConnection.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java index aa57fe26624..0f3497f0938 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java @@ -16,27 +16,27 @@ package org.eclipse.jetty.io; import org.eclipse.jetty.util.Callback; /** - *

An {@link AsyncConnection} is associated to an {@link AsyncEndPoint} so that I/O events - * happening on the {@link AsyncEndPoint} can be processed by the {@link AsyncConnection}.

- *

A typical implementation of {@link AsyncConnection} overrides {@link #onOpen()} to - * {@link AsyncEndPoint#fillInterested(Object, Callback) set read interest} on the {@link AsyncEndPoint}, - * and when the {@link AsyncEndPoint} signals read readyness, this {@link AsyncConnection} can + *

An {@link Connection} is associated to an {@link EndPoint} so that I/O events + * happening on the {@link EndPoint} can be processed by the {@link Connection}.

+ *

A typical implementation of {@link Connection} overrides {@link #onOpen()} to + * {@link EndPoint#fillInterested(Object, Callback) set read interest} on the {@link EndPoint}, + * and when the {@link EndPoint} signals read readyness, this {@link Connection} can * read bytes from the network and interpret them.

*/ -public interface AsyncConnection +public interface Connection { /** - *

Callback method invoked when this {@link AsyncConnection} is opened.

+ *

Callback method invoked when this {@link Connection} is opened.

*/ void onOpen(); /** - *

Callback method invoked when this {@link AsyncConnection} is closed.

+ *

Callback method invoked when this {@link Connection} is closed.

*/ void onClose(); /** - * @return the {@link AsyncEndPoint} associated with this {@link AsyncConnection} + * @return the {@link EndPoint} associated with this {@link Connection} */ - AsyncEndPoint getEndPoint(); + EndPoint getEndPoint(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 80713e07e28..8f28d260263 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -17,12 +17,78 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ReadPendingException; +import java.nio.channels.WritePendingException; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ExecutorCallback; +import org.eclipse.jetty.util.FutureCallback; /** * * A transport EndPoint + * + *

Asynchronous Methods

+ *

The asynchronous scheduling methods of {@link EndPoint} + * has been influenced by NIO.2 Futures and Completion + * handlers, but does not use those actual interfaces because they have + * some inefficiencies.

+ *

This class will frequently be used in conjunction with some of the utility + * implementations of {@link Callback}, such as {@link FutureCallback} and + * {@link ExecutorCallback}. Examples are:

+ * + *

Blocking Read

+ *

A FutureCallback can be used to block until an endpoint is ready to be filled + * from: + *

+ * FutureCallback<String> future = new FutureCallback<>();
+ * endpoint.fillInterested("ContextObj",future);
+ * ...
+ * String context = future.get(); // This blocks
+ * int filled=endpoint.fill(mybuffer);
+ * 

+ * + *

Dispatched Read

+ *

By using a different callback, the read can be done asynchronously in its own dispatched thread: + *

+ * endpoint.fillInterested("ContextObj",new ExecutorCallback<String>(executor)
+ * {
+ *   public void onCompleted(String context)
+ *   {
+ *     int filled=endpoint.fill(mybuffer);
+ *     ...
+ *   }
+ *   public void onFailed(String context,Throwable cause) {...}
+ * });
+ * 

+ *

The executor callback can also be customized to not dispatch in some circumstances when + * it knows it can use the callback thread and does not need to dispatch.

+ * + *

Blocking Write

+ *

The write contract is that the callback complete is not called until all data has been + * written or there is a failure. For blocking this looks like: + *

+ * FutureCallback<String> future = new FutureCallback<>();
+ * endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
+ * String context = future.get(); // This blocks
+ * 

+ * + *

Dispatched Write

+ *

Note also that multiple buffers may be passed in write so that gather writes + * can be done: + *

+ * endpoint.write("ContextObj",new ExecutorCallback<String>(executor)
+ * {
+ *   public void onCompleted(String context)
+ *   {
+ *     int filled=endpoint.fill(mybuffer);
+ *     ...
+ *   }
+ *   public void onFailed(String context,Throwable cause) {...}
+ * },headerBuffer,contentBuffer);
+ * 

*/ public interface EndPoint extends Closeable { @@ -126,5 +192,48 @@ public interface EndPoint extends Closeable void setIdleTimeout(long idleTimeout); + /** + *

Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.

+ * + * @param context the context to return via the callback + * @param callback the callback to call when an error occurs or we are readable. + * @throws ReadPendingException if another read operation is concurrent. + */ + void fillInterested(C context, Callback callback) throws ReadPendingException; + + /** + *

Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either + * all the data has been flushed or an error occurs.

+ * + * @param context the context to return via the callback + * @param callback the callback to call when an error occurs or the write completed. + * @param buffers one or more {@link ByteBuffer}s that will be flushed. + * @throws WritePendingException if another write operation is concurrent. + */ + void write(C context, Callback callback, ByteBuffer... buffers) throws WritePendingException; + + /** + * @return the {@link Connection} associated with this {@link EndPoint} + * @see #setConnection(Connection) + */ + Connection getConnection(); + + /** + * @param connection the {@link Connection} associated with this {@link EndPoint} + * @see #getConnection() + */ + void setConnection(Connection connection); + + /** + *

Callback method invoked when this {@link EndPoint} is opened.

+ * @see #onClose() + */ + void onOpen(); + + /** + *

Callback method invoked when this {@link EndPoint} is close.

+ * @see #onOpen() + */ + void onClose(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java index d2897543b4b..c7edac3181e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java @@ -10,7 +10,7 @@ import org.eclipse.jetty.util.Callback; /* ------------------------------------------------------------ */ /** - * A Utility class to help implement {@link AsyncEndPoint#fillInterested(Object, Callback)} + * A Utility class to help implement {@link EndPoint#fillInterested(Object, Callback)} * by keeping state and calling the context and callback objects. * */ diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index f3731633d95..96944afa9d8 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -32,7 +32,7 @@ import org.eclipse.jetty.util.log.Logger; /** * An ChannelEndpoint that can be scheduled by {@link SelectorManager}. */ -public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, SelectorManager.SelectableAsyncEndPoint +public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint { public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); @@ -45,8 +45,38 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, checkIdleTimeout(); } }; + + + private final Runnable _updateTask = new Runnable() + { + @Override + public void run() + { + try + { + if (getChannel().isOpen()) + { + int oldInterestOps = _key.interestOps(); + int newInterestOps = _interestOps; + if (newInterestOps != oldInterestOps) + setKeyInterests(oldInterestOps, newInterestOps); + } + } + catch (CancelledKeyException x) + { + LOG.debug("Ignoring key update for concurrently closed channel {}", this); + close(); + } + catch (Exception x) + { + LOG.warn("Ignoring key update for " + this, x); + close(); + } + } + }; + /** - * true if {@link ManagedSelector#destroyEndPoint(AsyncEndPoint)} has not been called + * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called */ private final AtomicBoolean _open = new AtomicBoolean(); private final ReadInterest _readInterest = new ReadInterest() @@ -68,7 +98,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, private final SelectorManager.ManagedSelector _selector; private final SelectionKey _key; private final ScheduledExecutorService _scheduler; - private volatile AsyncConnection _connection; /** * The desired value for {@link SelectionKey#interestOps()} */ @@ -131,16 +160,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, } @Override - public AsyncConnection getAsyncConnection() + public void setConnection(Connection connection) { - return _connection; - } - - @Override - public void setAsyncConnection(AsyncConnection connection) - { - AsyncConnection old = getAsyncConnection(); - _connection = connection; + // TODO should this be on AbstractEndPoint? + Connection old = getConnection(); + super.setConnection(connection); if (old != null && old != connection) _selector.getSelectorManager().connectionUpgraded(this, old); } @@ -210,7 +234,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, { _interestOps = newInterestOps; LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this); - _selector.submit(this); + _selector.submit(_updateTask); } else { @@ -218,30 +242,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, } } - @Override - public void run() - { - try - { - if (getChannel().isOpen()) - { - int oldInterestOps = _key.interestOps(); - int newInterestOps = _interestOps; - if (newInterestOps != oldInterestOps) - setKeyInterests(oldInterestOps, newInterestOps); - } - } - catch (CancelledKeyException x) - { - LOG.debug("Ignoring key update for concurrently closed channel {}", this); - close(); - } - catch (Exception x) - { - LOG.warn("Ignoring key update for " + this, x); - close(); - } - } private void setKeyInterests(int oldInterestOps, int newInterestOps) { @@ -260,12 +260,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, @Override public void onOpen() { + super.onOpen(); _open.compareAndSet(false, true); } @Override public void onClose() { + super.onClose(); _writeFlusher.close(); _readInterest.close(); } @@ -290,6 +292,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, } return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}", hashCode(), getRemoteAddress(), getLocalAddress(), isOpen(), isInputShutdown(), - isOutputShutdown(), _interestOps, keyString, _readInterest, _writeFlusher, getAsyncConnection()); + isOutputShutdown(), _interestOps, keyString, _readInterest, _writeFlusher, getConnection()); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index ead6868333a..9287bfab176 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -44,7 +44,7 @@ import org.eclipse.jetty.util.log.Logger; *

{@link SelectorManager} manages a number of {@link ManagedSelector}s that * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.

*

{@link SelectorManager} subclasses implement methods to return protocol-specific - * {@link AsyncEndPoint}s and {@link AsyncConnection}s.

+ * {@link EndPoint}s and {@link Connection}s.

*/ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { @@ -152,7 +152,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * * @param endpoint the endpoint being opened */ - protected void endPointOpened(AsyncEndPoint endpoint) + protected void endPointOpened(EndPoint endpoint) { endpoint.onOpen(); } @@ -162,7 +162,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * * @param endpoint the endpoint being closed */ - protected void endPointClosed(AsyncEndPoint endpoint) + protected void endPointClosed(EndPoint endpoint) { endpoint.onClose(); } @@ -172,7 +172,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * * @param connection the connection just opened */ - public void connectionOpened(AsyncConnection connection) + public void connectionOpened(Connection connection) { connection.onOpen(); } @@ -182,7 +182,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * * @param connection the connection just closed */ - public void connectionClosed(AsyncConnection connection) + public void connectionClosed(Connection connection) { connection.onClose(); } @@ -193,10 +193,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @param endpoint the endpoint holding the new connection * @param oldConnection the previous connection */ - public void connectionUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection) + public void connectionUpgraded(EndPoint endpoint, Connection oldConnection) { connectionClosed(oldConnection); - connectionOpened(endpoint.getAsyncConnection()); + connectionOpened(endpoint.getConnection()); } /** @@ -213,7 +213,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } /** - *

Factory method to create {@link AsyncEndPoint}.

+ *

Factory method to create {@link EndPoint}.

*

This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)} * or {@link #accept(SocketChannel)}.

* @@ -222,12 +222,12 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @param selectionKey the selection key * @return a new endpoint * @throws IOException if the endPoint cannot be created - * @see #newConnection(SocketChannel, AsyncEndPoint, Object) + * @see #newConnection(SocketChannel, EndPoint, Object) */ - protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException; + protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException; /** - *

Factory method to create {@link AsyncConnection}.

+ *

Factory method to create {@link Connection}.

* * @param channel the channel associated to the connection * @param endpoint the endpoint @@ -236,7 +236,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @throws IOException * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey) */ - public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) throws IOException; + public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException; @Override public String dump() @@ -254,7 +254,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa /** *

{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.

*

{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events - * happen for registered channels. When events happen, it notifies the {@link AsyncEndPoint} associated + * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated * with the channel.

*/ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable @@ -413,9 +413,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa Object attachment = key.attachment(); try { - if (attachment instanceof SelectableAsyncEndPoint) + if (attachment instanceof SelectableEndPoint) { - ((SelectableAsyncEndPoint)attachment).onSelected(); + ((SelectableEndPoint)attachment).onSelected(); } else if (key.isConnectable()) { @@ -427,7 +427,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa if (connected) { key.interestOps(0); - AsyncEndPoint endpoint = createEndPoint(channel, key); + EndPoint endpoint = createEndPoint(channel, key); key.attach(endpoint); } else @@ -482,21 +482,21 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa _selector.wakeup(); } - private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException + private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException { - AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey); + EndPoint endPoint = newEndPoint(channel, this, selectionKey); endPointOpened(endPoint); - AsyncConnection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment()); - endPoint.setAsyncConnection(asyncConnection); + Connection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment()); + endPoint.setConnection(asyncConnection); connectionOpened(asyncConnection); LOG.debug("Created {}", endPoint); return endPoint; } - public void destroyEndPoint(AsyncEndPoint endPoint) + public void destroyEndPoint(EndPoint endPoint) { LOG.debug("Destroyed {}", endPoint); - connectionClosed(endPoint.getAsyncConnection()); + connectionClosed(endPoint.getConnection()); endPointClosed(endPoint); } @@ -608,7 +608,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa try { SelectionKey key = _channel.register(_selector, 0, null); - AsyncEndPoint endpoint = createEndPoint(_channel, key); + EndPoint endpoint = createEndPoint(_channel, key); key.attach(endpoint); } catch (IOException x) @@ -685,10 +685,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } /** - * A {@link SelectableAsyncEndPoint} is an {@link AsyncEndPoint} that wish to be notified of + * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of * non-blocking events by the {@link ManagedSelector}. */ - public interface SelectableAsyncEndPoint extends AsyncEndPoint + public interface SelectableEndPoint extends EndPoint { /** *

Callback method invoked when a read or write events has been detected by the {@link ManagedSelector} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 30f7275f1d6..35871736c3a 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -13,7 +13,7 @@ import org.eclipse.jetty.util.Callback; /* ------------------------------------------------------------ */ /** - * A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)} + * A Utility class to help implement {@link EndPoint#write(Object, Callback, ByteBuffer...)} * by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written. * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been * written after a call to flush and should organise for the {@link #completeWrite()} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 9edc472aa8f..a2505a56a9d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -24,10 +24,10 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import org.eclipse.jetty.io.AbstractAsyncConnection; +import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractEndPoint; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.ReadInterest; @@ -40,11 +40,11 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** - * An AsyncConnection that acts as an intercepter between an AsyncEndPoint providing SSL encrypted data - * and another consumer of an AsyncEndPoint (typically an {@link AsyncConnection} like HttpConnection) that + * An AsyncConnection that acts as an intercepter between an EndPoint providing SSL encrypted data + * and another consumer of an EndPoint (typically an {@link Connection} like HttpConnection) that * wants unencrypted data. *

- * The connector uses an {@link AsyncEndPoint} (typically {@link SelectChannelEndPoint}) as + * The connector uses an {@link EndPoint} (typically {@link SelectChannelEndPoint}) as * it's source/sink of encrypted data. It then provides an endpoint via {@link #getDecryptedEndPoint()} to * expose a source/sink of unencrypted data to another connection (eg HttpConnection). *

@@ -52,15 +52,15 @@ import org.eclipse.jetty.util.log.Logger; * asynchronous callbacks, and active methods that do schedule asynchronous callbacks. *

* The passive methods are {@link DecryptedEndPoint#fill(ByteBuffer)} and {@link DecryptedEndPoint#flush(ByteBuffer...)}. They make best - * effort attempts to progress the connection using only calls to the encrypted {@link AsyncEndPoint#fill(ByteBuffer)} and {@link AsyncEndPoint#flush(ByteBuffer...)} + * effort attempts to progress the connection using only calls to the encrypted {@link EndPoint#fill(ByteBuffer)} and {@link EndPoint#flush(ByteBuffer...)} * methods. They will never block nor schedule any readInterest or write callbacks. If a fill/flush cannot progress either because * of network congestion or waiting for an SSL handshake message, then the fill/flush will simply return with zero bytes filled/flushed. * Specifically, if a flush cannot proceed because it needs to receive a handshake message, then the flush will attempt to fill bytes from the - * encrypted endpoint, but if insufficient bytes are read it will NOT call {@link AsyncEndPoint#fillInterested(Object, Callback)}. + * encrypted endpoint, but if insufficient bytes are read it will NOT call {@link EndPoint#fillInterested(Object, Callback)}. *

* It is only the active methods : {@link DecryptedEndPoint#fillInterested(Object, Callback)} and * {@link DecryptedEndPoint#write(Object, Callback, ByteBuffer...)} that may schedule callbacks by calling the encrypted - * {@link AsyncEndPoint#fillInterested(Object, Callback)} and {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)} + * {@link EndPoint#fillInterested(Object, Callback)} and {@link EndPoint#write(Object, Callback, ByteBuffer...)} * methods. For normal data handling, the decrypted fillInterest method will result in an encrypted fillInterest and a decrypted * write will result in an encrypted write. However, due to SSL handshaking requirements, it is also possible for a decrypted fill * to call the encrypted write and for the decrypted flush to call the encrypted fillInterested methods. @@ -70,7 +70,7 @@ import org.eclipse.jetty.util.log.Logger; * be called again and make another best effort attempt to progress the connection. * */ -public class SslConnection extends AbstractAsyncConnection +public class SslConnection extends AbstractConnection { private static final Logger LOG = Log.getLogger(SslConnection.class); private final ByteBufferPool _bufferPool; @@ -82,7 +82,7 @@ public class SslConnection extends AbstractAsyncConnection private final boolean _encryptedDirectBuffers = false; private final boolean _decryptedDirectBuffers = false; - public SslConnection(ByteBufferPool byteBufferPool, Executor executor, AsyncEndPoint endPoint, SSLEngine sslEngine) + public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine) { super(endPoint, executor, true); this._bufferPool = byteBufferPool; @@ -95,7 +95,7 @@ public class SslConnection extends AbstractAsyncConnection return _sslEngine; } - public AsyncEndPoint getDecryptedEndPoint() + public EndPoint getDecryptedEndPoint() { return _decryptedEndPoint; } @@ -113,7 +113,7 @@ public class SslConnection extends AbstractAsyncConnection if (_sslEngine.getUseClientMode()) _decryptedEndPoint.write(null, new Callback.Empty<>(), BufferUtil.EMPTY_BUFFER); - getDecryptedEndPoint().getAsyncConnection().onOpen(); + getDecryptedEndPoint().getConnection().onOpen(); } catch (SSLException x) { @@ -187,25 +187,14 @@ public class SslConnection extends AbstractAsyncConnection } /* ------------------------------------------------------------ */ - public class DecryptedEndPoint extends AbstractEndPoint implements AsyncEndPoint + public class DecryptedEndPoint extends AbstractEndPoint implements EndPoint { - private AsyncConnection _connection; private boolean _fillRequiresFlushToProgress; private boolean _flushRequiresFillToProgress; private boolean _cannotAcceptMoreAppDataToFlush; private boolean _needToFillMoreDataToProgress; private boolean _ishut = false; - @Override - public void onOpen() - { - } - - @Override - public void onClose() - { - } - private final Callback _writeCallback = new Callback() { @@ -553,7 +542,7 @@ public class SslConnection extends AbstractAsyncConnection // or busy handshaking, then zero bytes may be taken from appOuts and this method // will return 0 (even if some handshake bytes were flushed and filled). // it is the applications responsibility to call flush again - either in a busy loop - // or better yet by using AsyncEndPoint#write to do the flushing. + // or better yet by using EndPoint#write to do the flushing. LOG.debug("{} flush enter {}", SslConnection.this, Arrays.toString(appOuts)); try @@ -714,18 +703,6 @@ public class SslConnection extends AbstractAsyncConnection return _ishut; } - @Override - public AsyncConnection getAsyncConnection() - { - return _connection; - } - - @Override - public void setAsyncConnection(AsyncConnection connection) - { - _connection = connection; - } - @Override public String toString() { diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java deleted file mode 100644 index 23d9b6b7e60..00000000000 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java +++ /dev/null @@ -1,192 +0,0 @@ -package org.eclipse.jetty.io; - -import static junit.framework.Assert.assertEquals; -import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; - -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.FutureCallback; -import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AsyncByteArrayEndPointTest -{ - private ScheduledExecutorService _scheduler; - - @Before - public void before() - { - _scheduler = Executors.newSingleThreadScheduledExecutor(); - } - - @After - public void after() - { - _scheduler.shutdownNow(); - } - - @Test - public void testReadable() throws Exception - { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000); - endp.setInput("test input"); - - ByteBuffer buffer = BufferUtil.allocate(1024); - FutureCallback fcb = new FutureCallback<>(); - - endp.fillInterested("CTX", fcb); - assertTrue(fcb.isDone()); - assertEquals("CTX", fcb.get()); - assertEquals(10, endp.fill(buffer)); - assertEquals("test input", BufferUtil.toString(buffer)); - - fcb = new FutureCallback<>(); - endp.fillInterested("CTX", fcb); - assertFalse(fcb.isDone()); - assertEquals(0, endp.fill(buffer)); - - endp.setInput(" more"); - assertTrue(fcb.isDone()); - assertEquals("CTX", fcb.get()); - assertEquals(5, endp.fill(buffer)); - assertEquals("test input more", BufferUtil.toString(buffer)); - - fcb = new FutureCallback<>(); - endp.fillInterested("CTX", fcb); - assertFalse(fcb.isDone()); - assertEquals(0, endp.fill(buffer)); - - endp.setInput((ByteBuffer)null); - assertTrue(fcb.isDone()); - assertEquals("CTX", fcb.get()); - assertEquals(-1, endp.fill(buffer)); - - fcb = new FutureCallback<>(); - endp.fillInterested("CTX", fcb); - assertTrue(fcb.isDone()); - assertEquals("CTX", fcb.get()); - assertEquals(-1, endp.fill(buffer)); - - endp.close(); - - fcb = new FutureCallback<>(); - endp.fillInterested("CTX", fcb); - assertTrue(fcb.isDone()); - try - { - fcb.get(); - fail(); - } - catch (ExecutionException e) - { - assertThat(e.toString(), containsString("Closed")); - } - } - - @Test - public void testWrite() throws Exception - { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000, (byte[])null, 15); - endp.setGrowOutput(false); - endp.setOutput(BufferUtil.allocate(10)); - - ByteBuffer data = BufferUtil.toBuffer("Data."); - ByteBuffer more = BufferUtil.toBuffer(" Some more."); - - FutureCallback fcb = new FutureCallback<>(); - endp.write("CTX", fcb, data); - assertTrue(fcb.isDone()); - assertEquals("CTX", fcb.get()); - assertEquals("Data.", endp.getOutputString()); - - fcb = new FutureCallback<>(); - endp.write("CTX", fcb, more); - assertFalse(fcb.isDone()); - - assertEquals("Data. Some", endp.getOutputString()); - assertEquals("Data. Some", endp.takeOutputString()); - - assertTrue(fcb.isDone()); - assertEquals("CTX", fcb.get()); - assertEquals(" more.", endp.getOutputString()); - } - - @Test - public void testIdle() throws Exception - { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 500); - endp.setInput("test"); - endp.setGrowOutput(false); - endp.setOutput(BufferUtil.allocate(5)); - - // no idle check - assertTrue(endp.isOpen()); - Thread.sleep(1000); - assertTrue(endp.isOpen()); - - // normal read - ByteBuffer buffer = BufferUtil.allocate(1024); - FutureCallback fcb = new FutureCallback<>(); - - endp.fillInterested(null, fcb); - assertTrue(fcb.isDone()); - assertEquals(null, fcb.get()); - assertEquals(4, endp.fill(buffer)); - assertEquals("test", BufferUtil.toString(buffer)); - - // read timeout - fcb = new FutureCallback<>(); - endp.fillInterested(null, fcb); - long start = System.currentTimeMillis(); - try - { - fcb.get(); - fail(); - } - catch (ExecutionException t) - { - assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class)); - } - assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L)); - assertTrue(endp.isOpen()); - - // write timeout - fcb = new FutureCallback<>(); - start = System.currentTimeMillis(); - - endp.write(null, fcb, BufferUtil.toBuffer("This is too long")); - try - { - fcb.get(); - fail(); - } - catch (ExecutionException t) - { - assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class)); - } - assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L)); - assertTrue(endp.isOpen()); - - // Still no idle close - Thread.sleep(1000); - assertTrue(endp.isOpen()); - - // shutdown out - endp.shutdownOutput(); - - // idle close - Thread.sleep(1000); - assertFalse(endp.isOpen()); - } -} diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java index a9e74f427ee..bfe09e36274 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java @@ -2,17 +2,41 @@ package org.eclipse.jetty.io; import static junit.framework.Assert.assertEquals; import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.FutureCallback; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public class ByteArrayEndPointTest { + private ScheduledExecutorService _scheduler; + + @Before + public void before() + { + _scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + @After + public void after() + { + _scheduler.shutdownNow(); + } + @Test public void testFill() throws Exception { @@ -104,6 +128,159 @@ public class ByteArrayEndPointTest assertEquals("data.",BufferUtil.toString(endp.takeOutput())); } - + + @Test + public void testReadable() throws Exception + { + ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 5000); + endp.setInput("test input"); + + ByteBuffer buffer = BufferUtil.allocate(1024); + FutureCallback fcb = new FutureCallback<>(); + + endp.fillInterested("CTX", fcb); + assertTrue(fcb.isDone()); + assertEquals("CTX", fcb.get()); + assertEquals(10, endp.fill(buffer)); + assertEquals("test input", BufferUtil.toString(buffer)); + + fcb = new FutureCallback<>(); + endp.fillInterested("CTX", fcb); + assertFalse(fcb.isDone()); + assertEquals(0, endp.fill(buffer)); + + endp.setInput(" more"); + assertTrue(fcb.isDone()); + assertEquals("CTX", fcb.get()); + assertEquals(5, endp.fill(buffer)); + assertEquals("test input more", BufferUtil.toString(buffer)); + + fcb = new FutureCallback<>(); + endp.fillInterested("CTX", fcb); + assertFalse(fcb.isDone()); + assertEquals(0, endp.fill(buffer)); + + endp.setInput((ByteBuffer)null); + assertTrue(fcb.isDone()); + assertEquals("CTX", fcb.get()); + assertEquals(-1, endp.fill(buffer)); + + fcb = new FutureCallback<>(); + endp.fillInterested("CTX", fcb); + assertTrue(fcb.isDone()); + assertEquals("CTX", fcb.get()); + assertEquals(-1, endp.fill(buffer)); + + endp.close(); + + fcb = new FutureCallback<>(); + endp.fillInterested("CTX", fcb); + assertTrue(fcb.isDone()); + try + { + fcb.get(); + fail(); + } + catch (ExecutionException e) + { + assertThat(e.toString(), containsString("Closed")); + } + } + + @Test + public void testWrite() throws Exception + { + ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 5000, (byte[])null, 15); + endp.setGrowOutput(false); + endp.setOutput(BufferUtil.allocate(10)); + + ByteBuffer data = BufferUtil.toBuffer("Data."); + ByteBuffer more = BufferUtil.toBuffer(" Some more."); + + FutureCallback fcb = new FutureCallback<>(); + endp.write("CTX", fcb, data); + assertTrue(fcb.isDone()); + assertEquals("CTX", fcb.get()); + assertEquals("Data.", endp.getOutputString()); + + fcb = new FutureCallback<>(); + endp.write("CTX", fcb, more); + assertFalse(fcb.isDone()); + + assertEquals("Data. Some", endp.getOutputString()); + assertEquals("Data. Some", endp.takeOutputString()); + + assertTrue(fcb.isDone()); + assertEquals("CTX", fcb.get()); + assertEquals(" more.", endp.getOutputString()); + } + + @Test + public void testIdle() throws Exception + { + ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 500); + endp.setInput("test"); + endp.setGrowOutput(false); + endp.setOutput(BufferUtil.allocate(5)); + + // no idle check + assertTrue(endp.isOpen()); + Thread.sleep(1000); + assertTrue(endp.isOpen()); + + // normal read + ByteBuffer buffer = BufferUtil.allocate(1024); + FutureCallback fcb = new FutureCallback<>(); + + endp.fillInterested(null, fcb); + assertTrue(fcb.isDone()); + assertEquals(null, fcb.get()); + assertEquals(4, endp.fill(buffer)); + assertEquals("test", BufferUtil.toString(buffer)); + + // read timeout + fcb = new FutureCallback<>(); + endp.fillInterested(null, fcb); + long start = System.currentTimeMillis(); + try + { + fcb.get(); + fail(); + } + catch (ExecutionException t) + { + assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class)); + } + assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L)); + assertTrue(endp.isOpen()); + + // write timeout + fcb = new FutureCallback<>(); + start = System.currentTimeMillis(); + + endp.write(null, fcb, BufferUtil.toBuffer("This is too long")); + try + { + fcb.get(); + fail(); + } + catch (ExecutionException t) + { + assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class)); + } + assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L)); + assertTrue(endp.isOpen()); + + // Still no idle close + Thread.sleep(1000); + assertTrue(endp.isOpen()); + + // shutdown out + endp.shutdownOutput(); + + // idle close + Thread.sleep(1000); + assertFalse(endp.isOpen()); + } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java index 617b2ca26e8..a5dbb8315f9 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java @@ -62,7 +62,7 @@ public class SelectChannelEndPointInterestsTest } @Override - protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException { return new SelectChannelEndPoint(channel, selector, selectionKey, scheduler, 60000) { @@ -76,9 +76,9 @@ public class SelectChannelEndPointInterestsTest } @Override - public AsyncConnection newConnection(SocketChannel channel, final AsyncEndPoint endPoint, Object attachment) + public Connection newConnection(SocketChannel channel, final EndPoint endPoint, Object attachment) { - return new AbstractAsyncConnection(endPoint, threadPool) + return new AbstractConnection(endPoint, threadPool) { @Override public void onOpen() @@ -109,7 +109,7 @@ public class SelectChannelEndPointInterestsTest init(new Interested() { @Override - public void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection) + public void onFillable(EndPoint endPoint, AbstractConnection connection) { ByteBuffer input = BufferUtil.allocate(2); int read = fill(endPoint, input); @@ -143,7 +143,7 @@ public class SelectChannelEndPointInterestsTest writeBlocked.set(true); } - private int fill(AsyncEndPoint endPoint, ByteBuffer buffer) + private int fill(EndPoint endPoint, ByteBuffer buffer) { try { @@ -191,7 +191,7 @@ public class SelectChannelEndPointInterestsTest private interface Interested { - void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection); + void onFillable(EndPoint endPoint, AbstractConnection connection); void onIncompleteFlush(); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java index e718498da51..d79845914d2 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java @@ -53,14 +53,14 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest } @Override - protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint) + protected Connection newConnection(SocketChannel channel, EndPoint endpoint) { SSLEngine engine = __sslCtxFactory.newSslEngine(); engine.setUseClientMode(false); SslConnection sslConnection = new SslConnection(__byteBufferPool, _threadPool, endpoint, engine); - AsyncConnection appConnection = super.newConnection(channel,sslConnection.getDecryptedEndPoint()); - sslConnection.getDecryptedEndPoint().setAsyncConnection(appConnection); + Connection appConnection = super.newConnection(channel,sslConnection.getDecryptedEndPoint()); + sslConnection.getDecryptedEndPoint().setConnection(appConnection); _manager.connectionOpened(appConnection); return sslConnection; diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 9580c17c068..9a233aad653 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -51,7 +51,7 @@ import org.junit.Test; public class SelectChannelEndPointTest { protected CountDownLatch _lastEndPointLatch; - protected volatile AsyncEndPoint _lastEndPoint; + protected volatile EndPoint _lastEndPoint; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor(); @@ -64,7 +64,7 @@ public class SelectChannelEndPointTest } @Override - public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) + public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) { return SelectChannelEndPointTest.this.newConnection(channel, endpoint); } @@ -108,18 +108,18 @@ public class SelectChannelEndPointTest return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort()); } - protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint) + protected Connection newConnection(SocketChannel channel, EndPoint endpoint) { return new TestConnection(endpoint); } - public class TestConnection extends AbstractAsyncConnection + public class TestConnection extends AbstractConnection { ByteBuffer _in = BufferUtil.allocate(32 * 1024); ByteBuffer _out = BufferUtil.allocate(32 * 1024); long _last = -1; - public TestConnection(AsyncEndPoint endp) + public TestConnection(EndPoint endp) { super(endp, _threadPool); } @@ -134,7 +134,7 @@ public class SelectChannelEndPointTest @Override public synchronized void onFillable() { - AsyncEndPoint _endp = getEndPoint(); + EndPoint _endp = getEndPoint(); try { _last = System.currentTimeMillis(); @@ -547,7 +547,7 @@ public class SelectChannelEndPointTest System.err.println("time=" + (now - start)); System.err.println("last=" + (now - last)); System.err.println("endp=" + _lastEndPoint); - System.err.println("conn=" + _lastEndPoint.getAsyncConnection()); + System.err.println("conn=" + _lastEndPoint.getConnection()); e.printStackTrace(); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java index 434fa5a7454..fcc9c4d73ee 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java @@ -37,7 +37,7 @@ public class SslConnectionTest private static SslContextFactory __sslCtxFactory=new SslContextFactory(); private static ByteBufferPool __byteBufferPool = new StandardByteBufferPool(); - protected volatile AsyncEndPoint _lastEndp; + protected volatile EndPoint _lastEndp; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor(); @@ -50,14 +50,14 @@ public class SslConnectionTest } @Override - public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) + public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) { SSLEngine engine = __sslCtxFactory.newSslEngine(); engine.setUseClientMode(false); SslConnection sslConnection = new SslConnection(__byteBufferPool, _threadPool, endpoint, engine); - AsyncConnection appConnection = new TestConnection(sslConnection.getDecryptedEndPoint()); - sslConnection.getDecryptedEndPoint().setAsyncConnection(appConnection); + Connection appConnection = new TestConnection(sslConnection.getDecryptedEndPoint()); + sslConnection.getDecryptedEndPoint().setConnection(appConnection); connectionOpened(appConnection); return sslConnection; @@ -105,11 +105,11 @@ public class SslConnectionTest _connector.close(); } - public class TestConnection extends AbstractAsyncConnection + public class TestConnection extends AbstractConnection { ByteBuffer _in = BufferUtil.allocate(8*1024); - public TestConnection(AsyncEndPoint endp) + public TestConnection(EndPoint endp) { super(endp, _threadPool); } @@ -130,7 +130,7 @@ public class SslConnectionTest @Override public synchronized void onFillable() { - AsyncEndPoint endp = getEndPoint(); + EndPoint endp = getEndPoint(); try { boolean progress=true; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 293c50ff6a4..dec6bd341ea 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -22,8 +22,8 @@ import java.util.concurrent.ThreadFactory; import javax.net.ssl.SSLEngine; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.io.ssl.SslConnection; @@ -179,7 +179,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co return _httpConfig; } - protected AsyncConnection newConnection(AsyncEndPoint endp) throws IOException + protected Connection newConnection(EndPoint endp) throws IOException { // TODO make this a plugable configurable connection factory for HTTP, HTTPS, SPDY & Websocket @@ -188,8 +188,8 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co SSLEngine engine = _sslContextFactory.createSSLEngine(endp.getRemoteAddress()); SslConnection ssl_connection = new SslConnection(getByteBufferPool(), getExecutor(), endp, engine); - AsyncConnection http_connection = new HttpConnection(_httpConfig,this,ssl_connection.getDecryptedEndPoint()); - ssl_connection.getDecryptedEndPoint().setAsyncConnection(http_connection); + Connection http_connection = new HttpConnection(_httpConfig,this,ssl_connection.getDecryptedEndPoint()); + ssl_connection.getDecryptedEndPoint().setConnection(http_connection); return ssl_connection; } return new HttpConnection(_httpConfig,this,endp); @@ -389,19 +389,19 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co _name = name; } - protected void connectionOpened(AsyncConnection connection) + protected void connectionOpened(Connection connection) { _stats.connectionOpened(); } - protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection) + protected void connectionUpgraded(Connection oldConnection, Connection newConnection) { long duration = System.currentTimeMillis() - oldConnection.getEndPoint().getCreatedTimeStamp(); int requests = (oldConnection instanceof HttpConnection) ? ((HttpConnection)oldConnection).getHttpChannel().getRequests() : 0; _stats.connectionUpgraded(duration, requests, requests); } - protected void connectionClosed(AsyncConnection connection) + protected void connectionClosed(Connection connection) { long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp(); // TODO: remove casts to HttpConnection diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index fb96d19957f..b31d448737f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -37,8 +37,8 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MimeTypes; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.UncheckedPrintWriter; import org.eclipse.jetty.util.BufferUtil; @@ -72,7 +72,7 @@ public abstract class HttpChannel private final Server _server; - private final AsyncConnection _connection; + private final Connection _connection; private final HttpURI _uri; private final ChannelEventHandler _handler = new ChannelEventHandler(); @@ -100,7 +100,7 @@ public abstract class HttpChannel /* ------------------------------------------------------------ */ - public HttpChannel(Server server,AsyncConnection connection,HttpInput input) + public HttpChannel(Server server,Connection connection,HttpInput input) { _server = server; _connection = connection; @@ -127,7 +127,7 @@ public abstract class HttpChannel } /* ------------------------------------------------------------ */ - public AsyncEndPoint getEndPoint() + public EndPoint getEndPoint() { return getConnection().getEndPoint(); } @@ -190,7 +190,7 @@ public abstract class HttpChannel } /* ------------------------------------------------------------ */ - public AsyncConnection getConnection() + public Connection getConnection() { return _connection; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 4685c2f4651..2a6024f91c5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -24,9 +24,9 @@ import org.eclipse.jetty.http.HttpGenerator.Action; import org.eclipse.jetty.http.HttpGenerator.ResponseInfo; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.AbstractAsyncConnection; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BufferUtil; @@ -37,7 +37,7 @@ import org.eclipse.jetty.util.log.Logger; /** * A Connection that handles the HTTP protocol */ -public class HttpConnection extends AbstractAsyncConnection +public class HttpConnection extends AbstractConnection { public static final Logger LOG = Log.getLogger(HttpConnection.class); @@ -75,7 +75,7 @@ public class HttpConnection extends AbstractAsyncConnection } /* ------------------------------------------------------------ */ - public HttpConnection(HttpConfiguration config, Connector connector, AsyncEndPoint endpoint) + public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endpoint) { super(endpoint,connector.getExecutor()); @@ -195,7 +195,7 @@ public class HttpConnection extends AbstractAsyncConnection /* ------------------------------------------------------------ */ /** Parse and handle HTTP messages. *

- * This method is normally called as the {@link AbstractAsyncConnection} onReadable callback. + * This method is normally called as the {@link AbstractConnection} onReadable callback. * However, it can also be called {@link HttpChannelOverHttp#completed()} if there is unconsumed * data in the _requestBuffer, as a result of resuming a suspended request when there is a pipelined * request already read into the buffer. @@ -279,7 +279,7 @@ public class HttpConnection extends AbstractAsyncConnection } // return if the connection has been changed - if (getEndPoint().getAsyncConnection()!=this) + if (getEndPoint().getConnection()!=this) return; } else if (_headerBytes>= _httpConfig.getRequestHeaderSize()) @@ -437,11 +437,11 @@ public class HttpConnection extends AbstractAsyncConnection // Handle connection upgrades if (getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) { - AsyncConnection connection=(AsyncConnection)getRequest().getAttribute(UPGRADE_CONNECTION_ATTR); + Connection connection=(Connection)getRequest().getAttribute(UPGRADE_CONNECTION_ATTR); if (connection!=null) { LOG.debug("Upgrade from {} to {}",this,connection); - getEndPoint().setAsyncConnection(connection); + getEndPoint().setConnection(connection); HttpConnection.this.reset(); return; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 00134d664ed..fff6c558ef9 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -22,8 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.io.AsyncByteArrayEndPoint; -import org.eclipse.jetty.io.AsyncConnection; +import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; @@ -158,15 +158,15 @@ public class LocalConnector extends AbstractConnector { LOG.debug("accepting {}",acceptorID); LocalEndPoint endp = _connects.take(); - AsyncConnection connection=newConnection(endp); - endp.setAsyncConnection(connection); + Connection connection=newConnection(endp); + endp.setConnection(connection); endp.onOpen(); connection.onOpen(); connectionOpened(connection); } - public class LocalEndPoint extends AsyncByteArrayEndPoint + public class LocalEndPoint extends ByteArrayEndPoint { private CountDownLatch _closed = new CountDownLatch(1); @@ -191,8 +191,8 @@ public class LocalConnector extends AbstractConnector super.close(); if (was_open) { - connectionClosed(getAsyncConnection()); - getAsyncConnection().onClose(); + connectionClosed(getConnection()); + getConnection().onClose(); onClose(); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java index c5f42b6fef5..65664afde44 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java @@ -24,8 +24,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jetty.continuation.Continuation; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; @@ -237,9 +237,9 @@ public class SelectChannelConnector extends AbstractNetConnector return new SelectChannelEndPoint(channel,selectSet,key, getScheduler(), getIdleTimeout()); } - protected void endPointClosed(AsyncEndPoint endpoint) + protected void endPointClosed(EndPoint endpoint) { - connectionClosed(endpoint.getAsyncConnection()); + connectionClosed(endpoint.getConnection()); } /* ------------------------------------------------------------ */ @@ -257,25 +257,25 @@ public class SelectChannelConnector extends AbstractNetConnector } @Override - protected void endPointClosed(AsyncEndPoint endpoint) + protected void endPointClosed(EndPoint endpoint) { - SelectChannelConnector.this.connectionClosed(endpoint.getAsyncConnection()); + SelectChannelConnector.this.connectionClosed(endpoint.getConnection()); super.endPointClosed(endpoint); } @Override - protected void endPointOpened(AsyncEndPoint endpoint) + protected void endPointOpened(EndPoint endpoint) { // TODO handle max connections and low resources super.endPointOpened(endpoint); - SelectChannelConnector.this.connectionOpened(endpoint.getAsyncConnection()); + SelectChannelConnector.this.connectionOpened(endpoint.getConnection()); } @Override - public void connectionUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection) + public void connectionUpgraded(EndPoint endpoint, Connection oldConnection) { super.connectionUpgraded(endpoint, oldConnection); - SelectChannelConnector.this.connectionUpgraded(oldConnection, endpoint.getAsyncConnection()); + SelectChannelConnector.this.connectionUpgraded(oldConnection, endpoint.getConnection()); } @Override @@ -285,7 +285,7 @@ public class SelectChannelConnector extends AbstractNetConnector } @Override - public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) throws IOException + public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException { return SelectChannelConnector.this.newConnection(endpoint); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java index 52be91c8f90..163fae4e8b8 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java @@ -22,7 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint; @@ -88,7 +88,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector } @Override - protected void endPointClosed(AsyncEndPoint endpoint) + protected void endPointClosed(EndPoint endpoint) { super.endPointClosed(endpoint); ((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java index 4cf56be084b..f082e9162c9 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java @@ -150,7 +150,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture // Get the server side endpoint EndPoint endp = endpoint.exchange(null,10,TimeUnit.SECONDS); if (endp instanceof SslConnection.DecryptedEndPoint) - endp=((SslConnection.DecryptedEndPoint)endp).getAsyncConnection().getEndPoint(); + endp=((SslConnection.DecryptedEndPoint)endp).getConnection().getEndPoint(); // read the response String result=IO.toString(is); @@ -223,7 +223,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture // Get the server side endpoint EndPoint endp = endpoint.exchange(null,10,TimeUnit.SECONDS); if (endp instanceof SslConnection.DecryptedEndPoint) - endp=((SslConnection.DecryptedEndPoint)endp).getAsyncConnection().getEndPoint(); + endp=((SslConnection.DecryptedEndPoint)endp).getConnection().getEndPoint(); // read the response String result=IO.toString(is); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index 96ecd8a8ad1..4c620850869 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -38,9 +38,10 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpGenerator.ResponseInfo; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpURI; -import org.eclipse.jetty.io.AbstractAsyncConnection; -import org.eclipse.jetty.io.AsyncByteArrayEndPoint; -import org.eclipse.jetty.io.AsyncConnection; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.session.HashSessionIdManager; @@ -71,9 +72,9 @@ public class ResponseTest _server.start(); _timer=new ScheduledThreadPoolExecutor(1); - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,5000); + AbstractEndPoint endp = new ByteArrayEndPoint(_timer,5000); HttpInput input = new HttpInput(); - AsyncConnection connection = new AbstractAsyncConnection(endp,new Executor() + Connection connection = new AbstractConnection(endp,new Executor() { @Override public void execute(Runnable command) diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelStatisticsTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelStatisticsTest.java index 874d908752e..5763e00db7c 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelStatisticsTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelStatisticsTest.java @@ -28,8 +28,8 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.HandlerWrapper; import org.eclipse.jetty.util.log.Log; @@ -62,14 +62,14 @@ public class SelectChannelStatisticsTest _connector = new SelectChannelConnector(_server) { @Override - protected void endPointClosed(AsyncEndPoint endpoint) + protected void endPointClosed(EndPoint endpoint) { //System.err.println("Endpoint closed "+endpoint); super.endPointClosed(endpoint); } @Override - public void connectionClosed(AsyncConnection connection) + public void connectionClosed(Connection connection) { //System.err.println("Connection closed "+connection); super.connectionClosed(connection); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java index 57eb77d1e19..6ea43951568 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java @@ -26,8 +26,8 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.After; import org.junit.Assert; @@ -46,7 +46,7 @@ public class SlowClientWithPipelinedRequestTest connector = new SelectChannelConnector(server) { @Override - protected AsyncConnection newConnection(AsyncEndPoint endpoint) + protected Connection newConnection(EndPoint endpoint) { return new HttpConnection(getHttpConfig(),this,endpoint) { diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ContextHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ContextHandlerTest.java index bcb2e89e32f..0b10ce06b52 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ContextHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ContextHandlerTest.java @@ -431,8 +431,8 @@ public class ContextHandlerTest private static final class WriterHandler extends AbstractHandler { - boolean error; - Throwable throwable; + volatile boolean error; + volatile Throwable throwable; public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SSLCloseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SSLCloseTest.java index 772ef521bac..3d400806f00 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SSLCloseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SSLCloseTest.java @@ -31,7 +31,7 @@ import javax.servlet.http.HttpServletResponse; import junit.framework.TestCase; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.SelectChannelConnector; @@ -43,7 +43,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler; */ public class SSLCloseTest extends TestCase { - private static AsyncEndPoint __endp; + private static EndPoint __endp; private static class CredulousTM implements TrustManager, X509TrustManager { public X509Certificate[] getAcceptedIssuers() diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/AsyncConnectionFactory.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/AsyncConnectionFactory.java index a5b4741a944..34fa32008c7 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/AsyncConnectionFactory.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/AsyncConnectionFactory.java @@ -15,10 +15,10 @@ package org.eclipse.jetty.spdy; import java.nio.channels.SocketChannel; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; public interface AsyncConnectionFactory { - public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment); + public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment); } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyAsyncEndPoint.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyAsyncEndPoint.java index 45b9d0228ab..2b7189c1ad9 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyAsyncEndPoint.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyAsyncEndPoint.java @@ -19,14 +19,14 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; -public class EmptyAsyncEndPoint implements AsyncEndPoint +public class EmptyEndPoint implements EndPoint { private boolean checkForIdle; - private AsyncConnection connection; + private Connection connection; private boolean oshut; private boolean closed; private long maxIdleTime; @@ -38,13 +38,13 @@ public class EmptyAsyncEndPoint implements AsyncEndPoint } @Override - public AsyncConnection getAsyncConnection() + public Connection getConnection() { return connection; } @Override - public void setAsyncConnection(AsyncConnection connection) + public void setConnection(Connection connection) { this.connection = connection; } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientAsyncConnection.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientAsyncConnection.java index fa5d58922de..84d33b34def 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientAsyncConnection.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientAsyncConnection.java @@ -18,15 +18,15 @@ import java.nio.channels.SocketChannel; import java.util.List; import java.util.concurrent.Executor; -import org.eclipse.jetty.io.AbstractAsyncConnection; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.npn.NextProtoNego; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class NextProtoNegoClientAsyncConnection extends AbstractAsyncConnection implements NextProtoNego.ClientProvider +public class NextProtoNegoClientAsyncConnection extends AbstractConnection implements NextProtoNego.ClientProvider { private final Logger logger = Log.getLogger(getClass()); private final SocketChannel channel; @@ -34,7 +34,7 @@ public class NextProtoNegoClientAsyncConnection extends AbstractAsyncConnection private final SPDYClient client; private volatile boolean completed; - public NextProtoNegoClientAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment, Executor executor, SPDYClient client) + public NextProtoNegoClientAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment, Executor executor, SPDYClient client) { super(endPoint, executor); this.channel = channel; @@ -86,8 +86,8 @@ public class NextProtoNegoClientAsyncConnection extends AbstractAsyncConnection public void unsupported() { // Server does not support NPN, but this is a SPDY client, so hardcode SPDY - AsyncEndPoint endPoint = getEndPoint(); - AsyncConnection connection = client.getDefaultAsyncConnectionFactory().newAsyncConnection(channel, endPoint, attachment); + EndPoint endPoint = getEndPoint(); + Connection connection = client.getDefaultAsyncConnectionFactory().newAsyncConnection(channel, endPoint, attachment); client.replaceAsyncConnection(endPoint, connection); completed = true; } @@ -98,8 +98,8 @@ public class NextProtoNegoClientAsyncConnection extends AbstractAsyncConnection String protocol = client.selectProtocol(protocols); if (protocol == null) return null; - AsyncEndPoint endPoint = getEndPoint(); - AsyncConnection connection = client.getAsyncConnectionFactory(protocol).newAsyncConnection(channel, endPoint, attachment); + EndPoint endPoint = getEndPoint(); + Connection connection = client.getAsyncConnectionFactory(protocol).newAsyncConnection(channel, endPoint, attachment); client.replaceAsyncConnection(endPoint, connection); completed = true; return protocol; diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoServerAsyncConnection.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoServerAsyncConnection.java index 82329477c3b..18f43502c99 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoServerAsyncConnection.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoServerAsyncConnection.java @@ -17,22 +17,22 @@ import java.io.IOException; import java.nio.channels.SocketChannel; import java.util.List; -import org.eclipse.jetty.io.AbstractAsyncConnection; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.npn.NextProtoNego; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class NextProtoNegoServerAsyncConnection extends AbstractAsyncConnection implements NextProtoNego.ServerProvider +public class NextProtoNegoServerAsyncConnection extends AbstractConnection implements NextProtoNego.ServerProvider { private final Logger logger = Log.getLogger(getClass()); private final SocketChannel channel; private final SPDYServerConnector connector; private volatile boolean completed; - public NextProtoNegoServerAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, SPDYServerConnector connector) + public NextProtoNegoServerAsyncConnection(SocketChannel channel, EndPoint endPoint, SPDYServerConnector connector) { super(endPoint, connector.getExecutor()); this.channel = channel; @@ -77,8 +77,8 @@ public class NextProtoNegoServerAsyncConnection extends AbstractAsyncConnection public void unsupported() { AsyncConnectionFactory asyncConnectionFactory = connector.getDefaultAsyncConnectionFactory(); - AsyncEndPoint endPoint = getEndPoint(); - AsyncConnection connection = asyncConnectionFactory.newAsyncConnection(channel, endPoint, connector); + EndPoint endPoint = getEndPoint(); + Connection connection = asyncConnectionFactory.newAsyncConnection(channel, endPoint, connector); connector.replaceAsyncConnection(endPoint, connection); completed = true; } @@ -93,8 +93,8 @@ public class NextProtoNegoServerAsyncConnection extends AbstractAsyncConnection public void protocolSelected(String protocol) { AsyncConnectionFactory asyncConnectionFactory = connector.getAsyncConnectionFactory(protocol); - AsyncEndPoint endPoint = getEndPoint(); - AsyncConnection connection = asyncConnectionFactory.newAsyncConnection(channel, endPoint, connector); + EndPoint endPoint = getEndPoint(); + Connection connection = asyncConnectionFactory.newAsyncConnection(channel, endPoint, connector); connector.replaceAsyncConnection(endPoint, connection); completed = true; } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java index b11e62e1376..810ad41020b 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java @@ -17,8 +17,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.Executor; -import org.eclipse.jetty.io.AbstractAsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.spdy.parser.Parser; @@ -26,7 +26,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class SPDYAsyncConnection extends AbstractAsyncConnection implements Controller, IdleListener +public class SPDYAsyncConnection extends AbstractConnection implements Controller, IdleListener { private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class); private final ByteBufferPool bufferPool; @@ -34,7 +34,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont private volatile ISession session; private volatile boolean idle = false; - public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor) + public SPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor) { super(endPoint, executor); this.bufferPool = bufferPool; @@ -61,7 +61,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont protected int read(ByteBuffer buffer) { - AsyncEndPoint endPoint = getEndPoint(); + EndPoint endPoint = getEndPoint(); while (true) { int filled = fill(endPoint, buffer); @@ -81,7 +81,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont } } - private int fill(AsyncEndPoint endPoint, ByteBuffer buffer) + private int fill(EndPoint endPoint, ByteBuffer buffer) { try { @@ -99,7 +99,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont @Override public int write(ByteBuffer buffer, final Callback callback, StandardSession.FrameBytes context) { - AsyncEndPoint endPoint = getEndPoint(); + EndPoint endPoint = getEndPoint(); int remaining = buffer.remaining(); endPoint.write(context, callback, buffer); return remaining - buffer.remaining(); @@ -108,7 +108,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont @Override public void close(boolean onlyOutput) { - AsyncEndPoint endPoint = getEndPoint(); + EndPoint endPoint = getEndPoint(); // We need to gently close first, to allow // SSL close alerts to be sent by Jetty logger.debug("Shutting down output {}", endPoint); diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java index 22082b89c46..4a85325fbbf 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java @@ -31,8 +31,8 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import javax.net.ssl.SSLEngine; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; @@ -184,10 +184,10 @@ public class SPDYClient return FlowControlStrategyFactory.newFlowControlStrategy(version); } - public void replaceAsyncConnection(AsyncEndPoint endPoint, AsyncConnection connection) + public void replaceAsyncConnection(EndPoint endPoint, Connection connection) { - AsyncConnection oldConnection = endPoint.getAsyncConnection(); - endPoint.setAsyncConnection(connection); + Connection oldConnection = endPoint.getConnection(); + endPoint.setConnection(connection); factory.selector.connectionUpgraded(endPoint, oldConnection); } @@ -306,14 +306,14 @@ public class SPDYClient { @Override - protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { SessionPromise attachment = (SessionPromise)key.attachment(); long clientIdleTimeout = attachment.client.getIdleTimeout(); if (clientIdleTimeout < 0) clientIdleTimeout = idleTimeout; - AsyncEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, scheduler, clientIdleTimeout); + EndPoint result = new SelectChannelEndPoint(channel, selectSet, key, scheduler, clientIdleTimeout); return result; } @@ -325,7 +325,7 @@ public class SPDYClient } @Override - public AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint, final Object attachment) + public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment) { SessionPromise sessionPromise = (SessionPromise)attachment; final SPDYClient client = sessionPromise.client; @@ -345,9 +345,9 @@ public class SPDYClient } }; - AsyncEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); + EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); NextProtoNegoClientAsyncConnection connection = new NextProtoNegoClientAsyncConnection(channel, sslEndPoint, attachment, client.factory.threadPool, client); - sslEndPoint.setAsyncConnection(connection); + sslEndPoint.setConnection(connection); connectionOpened(connection); NextProtoNego.put(engine, connection); @@ -357,8 +357,8 @@ public class SPDYClient else { AsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory(); - AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment); - endPoint.setAsyncConnection(connection); + Connection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment); + endPoint.setConnection(connection); return connection; } } @@ -403,7 +403,7 @@ public class SPDYClient private static class ClientSPDYAsyncConnectionFactory implements AsyncConnectionFactory { @Override - public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment) + public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment) { SessionPromise sessionPromise = (SessionPromise)attachment; SPDYClient client = sessionPromise.client; @@ -414,7 +414,7 @@ public class SPDYClient Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor()); SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory); - endPoint.setAsyncConnection(connection); + endPoint.setConnection(connection); FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy(); @@ -433,7 +433,7 @@ public class SPDYClient { private final Factory factory; - public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory) + public ClientSPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory) { super(endPoint, bufferPool, parser, factory.threadPool); this.factory = factory; diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java index 3500d26467c..1dd9dcc310b 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java @@ -30,8 +30,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.io.ssl.SslConnection; @@ -182,7 +182,7 @@ public class SPDYServerConnector extends SelectChannelConnector } @Override - protected AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint) + protected Connection newConnection(final SocketChannel channel, EndPoint endPoint) { if (sslContextFactory != null) { @@ -198,9 +198,9 @@ public class SPDYServerConnector extends SelectChannelConnector } }; - final AsyncEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); + final EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); NextProtoNegoServerAsyncConnection connection = new NextProtoNegoServerAsyncConnection(channel, sslEndPoint, this); - sslEndPoint.setAsyncConnection(connection); + sslEndPoint.setConnection(connection); getSelectorManager().connectionOpened(connection); NextProtoNego.put(engine, connection); @@ -210,8 +210,8 @@ public class SPDYServerConnector extends SelectChannelConnector else { AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory(); - AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, this); - endPoint.setAsyncConnection(connection); + Connection connection = connectionFactory.newAsyncConnection(channel, endPoint, this); + endPoint.setConnection(connection); return connection; } } @@ -265,10 +265,10 @@ public class SPDYServerConnector extends SelectChannelConnector this.initialWindowSize = initialWindowSize; } - public void replaceAsyncConnection(AsyncEndPoint endPoint, AsyncConnection connection) + public void replaceAsyncConnection(EndPoint endPoint, Connection connection) { - AsyncConnection oldConnection = endPoint.getAsyncConnection(); - endPoint.setAsyncConnection(connection); + Connection oldConnection = endPoint.getConnection(); + endPoint.setConnection(connection); getSelectorManager().connectionUpgraded(endPoint, oldConnection); } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java index 0908447e85e..5d611482454 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java @@ -17,8 +17,8 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.generator.Generator; @@ -52,7 +52,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory } @Override - public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment) + public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment) { CompressionFactory compressionFactory = new StandardCompressionFactory(); Parser parser = new Parser(compressionFactory.newDecompressor()); @@ -62,7 +62,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory ServerSessionFrameListener listener = provideServerSessionFrameListener(endPoint, attachment); SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, bufferPool, parser, listener, connector); - endPoint.setAsyncConnection(connection); + endPoint.setConnection(connection); FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version); @@ -77,7 +77,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory return connection; } - protected ServerSessionFrameListener provideServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment) + protected ServerSessionFrameListener provideServerSessionFrameListener(EndPoint endPoint, Object attachment) { return listener; } @@ -88,7 +88,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory private final SPDYServerConnector connector; private volatile boolean connected; - private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector) + private ServerSPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector) { super(endPoint, bufferPool, parser, connector.getExecutor()); this.listener = listener; diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 5abb60be0c5..e7743e1c7ff 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -125,7 +125,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo // kill queued jobs and flush out idle jobs _jobs.clear(); - Runnable noop = new Runnable(){public void run(){}}; + Runnable noop = new Runnable(){@Override public void run(){}}; for (int i=_threadsIdle.get();i-->0;) _jobs.offer(noop); Thread.yield(); @@ -203,6 +203,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo * @see #getMaxThreads * @param maxThreads maximum number of threads. */ + @Override public void setMaxThreads(int maxThreads) { _maxThreads=maxThreads; @@ -216,6 +217,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo * @see #getMinThreads * @param minThreads minimum number of threads */ + @Override public void setMinThreads(int minThreads) { _minThreads=minThreads; @@ -297,6 +299,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo * @see #setMaxThreads * @return maximum number of threads. */ + @Override public int getMaxThreads() { return _maxThreads; @@ -308,6 +311,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo * @see #setMinThreads * @return minimum number of threads. */ + @Override public int getMinThreads() { return _minThreads; @@ -353,6 +357,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /* ------------------------------------------------------------ */ + @Override public boolean dispatch(Runnable job) { LOG.debug("{} dispatched {}",this,job); @@ -376,6 +381,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /* ------------------------------------------------------------ */ + @Override public void execute(Runnable job) { if (!dispatch(job)) @@ -386,6 +392,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo /** * Blocks until the thread pool is {@link LifeCycle#stop stopped}. */ + @Override public void join() throws InterruptedException { synchronized (_joinLock) @@ -402,6 +409,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo /** * @return The total number of threads currently in the pool */ + @Override public int getThreads() { return _threadsStarted.get(); @@ -411,6 +419,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo /** * @return The number of idle threads in the pool */ + @Override public int getIdleThreads() { return _threadsIdle.get(); @@ -420,6 +429,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo /** * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs */ + @Override public boolean isLowOnThreads() { return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); @@ -460,12 +470,14 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo /* ------------------------------------------------------------ */ + @Override public String dump() { return AggregateLifeCycle.dump(this); } /* ------------------------------------------------------------ */ + @Override public void dump(Appendable out, String indent) throws IOException { List dump = new ArrayList(getMaxThreads()); @@ -491,6 +503,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { dump.add(new Dumpable() { + @Override public void dump(Appendable out, String indent) throws IOException { out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); @@ -498,6 +511,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); } + @Override public String dump() { return null; @@ -530,6 +544,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo /* ------------------------------------------------------------ */ private Runnable _runnable = new Runnable() { + @Override public void run() { boolean shrink=false; @@ -584,7 +599,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { LOG.ignore(e); } - catch(Exception e) + catch(Throwable e) { LOG.warn(e); } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HandshakeConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HandshakeConnection.java index 5ea0b01b853..cc2986c62ba 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HandshakeConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HandshakeConnection.java @@ -22,9 +22,9 @@ import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import org.eclipse.jetty.io.AbstractAsyncConnection; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.FutureCallback; @@ -39,7 +39,7 @@ import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection; *

* Results in a {@link WebSocketAsyncConnection} on successful handshake. */ -public class HandshakeConnection extends AbstractAsyncConnection implements AsyncConnection +public class HandshakeConnection extends AbstractConnection implements Connection { public static final String COOKIE_DELIM = "\"\\\n\r\t\f\b%+ ;="; private final WebSocketClient.ConnectFuture future; @@ -47,7 +47,7 @@ public class HandshakeConnection extends AbstractAsyncConnection implements Asyn private String key; - public HandshakeConnection(AsyncEndPoint endp, Executor executor, ByteBufferPool bufferPool, WebSocketClient.ConnectFuture future) + public HandshakeConnection(EndPoint endp, Executor executor, ByteBufferPool bufferPool, WebSocketClient.ConnectFuture future) { super(endp,executor); this.future = future; diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientAsyncConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientAsyncConnection.java index 58ebadbc4f3..07c328c508e 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientAsyncConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientAsyncConnection.java @@ -3,7 +3,7 @@ package org.eclipse.jetty.websocket.client.io; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.client.WebSocketClientFactory; @@ -13,7 +13,7 @@ public class WebSocketClientAsyncConnection extends WebSocketAsyncConnection { private final WebSocketClientFactory factory; - public WebSocketClientAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, + public WebSocketClientAsyncConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, WebSocketClientFactory factory) { super(endp,executor,scheduler,policy,bufferPool); diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java index 5feec1d8b9c..b6f8ce981ca 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java @@ -25,8 +25,8 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; @@ -66,7 +66,7 @@ public class WebSocketClientSelectorManager extends SelectorManager return sslContextFactory; } - public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment) + public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment) { WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment; WebSocketClientFactory factory = confut.getFactory(); @@ -78,7 +78,7 @@ public class WebSocketClientSelectorManager extends SelectorManager ScheduledExecutorService scheduler = factory.getScheduler(); WebSocketAsyncConnection connection = new WebSocketClientAsyncConnection(endPoint,executor,scheduler,policy,bufferPool,factory); - endPoint.setAsyncConnection(connection); + endPoint.setConnection(connection); connection.getParser().setIncomingFramesHandler(websocket); // TODO: track open websockets? bind open websocket to connection? @@ -87,7 +87,7 @@ public class WebSocketClientSelectorManager extends SelectorManager } @Override - public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment) + public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) { WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment; @@ -97,7 +97,7 @@ public class WebSocketClientSelectorManager extends SelectorManager if ((sslContextFactory != null) && ("wss".equalsIgnoreCase(scheme))) { - final AtomicReference sslEndPointRef = new AtomicReference<>(); + final AtomicReference sslEndPointRef = new AtomicReference<>(); final AtomicReference attachmentRef = new AtomicReference<>(attachment); SSLEngine engine = newSSLEngine(sslContextFactory,channel); SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine) @@ -110,20 +110,20 @@ public class WebSocketClientSelectorManager extends SelectorManager super.onClose(); } }; - endPoint.setAsyncConnection(sslConnection); - AsyncEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); + endPoint.setConnection(sslConnection); + EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); sslEndPointRef.set(sslEndPoint); startHandshake(engine); - AsyncConnection connection = newAsyncConnection(channel,sslEndPoint,attachment); - endPoint.setAsyncConnection(connection); + Connection connection = newAsyncConnection(channel,sslEndPoint,attachment); + endPoint.setConnection(connection); return connection; } else { - AsyncConnection connection = newAsyncConnection(channel,endPoint,attachment); - endPoint.setAsyncConnection(connection); + Connection connection = newAsyncConnection(channel,endPoint,attachment); + endPoint.setConnection(connection); return connection; } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java index abaaae7eb14..0a815976bf3 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java @@ -25,9 +25,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.eclipse.jetty.io.AbstractAsyncConnection; -import org.eclipse.jetty.io.AsyncConnection; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -46,9 +46,9 @@ import org.eclipse.jetty.websocket.protocol.Parser; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; /** - * Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link AsyncConnection} framework of jetty-io + * Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io */ -public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, OutgoingFrames +public abstract class WebSocketAsyncConnection extends AbstractConnection implements RawConnection, OutgoingFrames { private static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class); private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames"); @@ -64,7 +64,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i private boolean flushing; private AtomicLong writes; - public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool) + public WebSocketAsyncConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool) { super(endp,executor); this.policy = policy; @@ -111,7 +111,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i @Override public void disconnect(boolean onlyOutput) { - AsyncEndPoint endPoint = getEndPoint(); + EndPoint endPoint = getEndPoint(); // We need to gently close first, to allow // SSL close alerts to be sent by Jetty LOG.debug("Shutting down output {}",endPoint); @@ -275,7 +275,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i private int read(ByteBuffer buffer) { - AsyncEndPoint endPoint = getEndPoint(); + EndPoint endPoint = getEndPoint(); try { while (true) @@ -367,7 +367,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i private void write(ByteBuffer buffer, WebSocketAsyncConnection webSocketAsyncConnection, FrameBytes frameBytes) { - AsyncEndPoint endpoint = getEndPoint(); + EndPoint endpoint = getEndPoint(); if (LOG_FRAMES.isDebugEnabled()) { diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerAsyncConnection.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerAsyncConnection.java index b0f82bed8cd..322d1fe11e2 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerAsyncConnection.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerAsyncConnection.java @@ -3,7 +3,7 @@ package org.eclipse.jetty.websocket.server; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection; @@ -13,7 +13,7 @@ public class WebSocketServerAsyncConnection extends WebSocketAsyncConnection private final WebSocketServerFactory factory; private boolean connected; - public WebSocketServerAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, + public WebSocketServerAsyncConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, WebSocketServerFactory factory) { super(endp,executor,scheduler,policy,bufferPool); diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index 971798f4e85..ee6436c853c 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -33,7 +33,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.server.HttpConnection; @@ -353,7 +353,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock // Create connection HttpConnection http = HttpConnection.getCurrentConnection(); - AsyncEndPoint endp = http.getEndPoint(); + EndPoint endp = http.getEndPoint(); Executor executor = http.getConnector().getExecutor(); ByteBufferPool bufferPool = http.getConnector().getByteBufferPool(); WebSocketServerAsyncConnection connection = new WebSocketServerAsyncConnection(endp,executor,scheduler,websocket.getPolicy(),bufferPool,this); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketLoadRFC6455Test.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketLoadRFC6455Test.java index 7b4f385f240..bc9ce75167d 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketLoadRFC6455Test.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketLoadRFC6455Test.java @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit; import junit.framework.Assert; -import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.SelectChannelConnector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.DefaultHandler; @@ -59,7 +59,7 @@ public class WebSocketLoadRFC6455Test private final BufferedReader input; private final int iterations; private final CountDownLatch latch; - private/* final */AsyncEndPoint _endp; + private/* final */EndPoint _endp; private final Generator _generator; private final Parser _parser; private final IncomingFrames _handler = new IncomingFrames()