jetty-9 merged Async and non-Async classes for EndPoint and Connection

This commit is contained in:
Greg Wilkins 2012-08-02 16:08:49 +10:00
parent 2da412009e
commit b52930be14
47 changed files with 773 additions and 813 deletions

View File

@ -23,26 +23,26 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>A convenience base implementation of {@link AsyncConnection}.</p>
* <p>This class uses the capabilities of the {@link AsyncEndPoint} API to provide a
* <p>A convenience base implementation of {@link Connection}.</p>
* <p>This class uses the capabilities of the {@link EndPoint} API to provide a
* more traditional style of async reading. A call to {@link #fillInterested()}
* will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* as appropriate.</p>
*/
public abstract class AbstractAsyncConnection implements AsyncConnection
public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class);
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final AtomicBoolean _readInterested = new AtomicBoolean();
private final AsyncEndPoint _endp;
private final EndPoint _endp;
private final Callback<Void> _readCallback;
public AbstractAsyncConnection(AsyncEndPoint endp, Executor executor)
public AbstractConnection(EndPoint endp, Executor executor)
{
this(endp, executor, false);
}
public AbstractAsyncConnection(AsyncEndPoint endp, Executor executor, final boolean executeOnlyFailure)
public AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnlyFailure)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
@ -72,7 +72,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), AbstractAsyncConnection.this.hashCode());
return String.format("%s@%x", getClass().getSimpleName(), AbstractConnection.this.hashCode());
}
};
}
@ -139,7 +139,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
}
@Override
public AsyncEndPoint getEndPoint()
public EndPoint getEndPoint()
{
return _endp;
}

View File

@ -1,14 +1,23 @@
package org.eclipse.jetty.io;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractEndPoint implements EndPoint
{
private static final Logger LOG = Log.getLogger(AbstractEndPoint.class);
private final long _created=System.currentTimeMillis();
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private volatile long _idleTimeout;
private volatile long _idleTimestamp=System.currentTimeMillis();
private volatile Connection _connection;
protected AbstractEndPoint(InetSocketAddress local,InetSocketAddress remote)
@ -58,6 +67,30 @@ public abstract class AbstractEndPoint implements EndPoint
_idleTimestamp=System.currentTimeMillis();
}
@Override
public Connection getConnection()
{
return _connection;
}
@Override
public void setConnection(Connection connection)
{
_connection = connection;
}
@Override
public void onOpen()
{
LOG.debug("onOpen {}",this);
}
@Override
public void onClose()
{
LOG.debug("onClose {}",this);
}
@Override
public String toString()
{

View File

@ -1,172 +0,0 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint, Runnable
{
private static final Logger LOG = Log.getLogger(AsyncByteArrayEndPoint.class);
private final ReadInterest _readInterest = new ReadInterest()
{
@Override
protected boolean needsFill() throws IOException
{
if (_closed)
throw new ClosedChannelException();
return _in == null || BufferUtil.hasContent(_in);
}
};
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
protected void onIncompleteFlushed()
{
// Don't need to do anything here as takeOutput does the signalling.
}
};
private final AtomicReference<Future<?>> _timeout = new AtomicReference<>();
private final ScheduledExecutorService _scheduler;
private volatile AsyncConnection _connection;
public AsyncByteArrayEndPoint(ScheduledExecutorService scheduler, long idleTimeout)
{
_scheduler = scheduler;
setIdleTimeout(idleTimeout);
}
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, byte[] input, int outputSize)
{
super(input, outputSize);
_scheduler = timer;
setIdleTimeout(idleTimeout);
}
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, String input, int outputSize)
{
super(input, outputSize);
_scheduler = timer;
setIdleTimeout(idleTimeout);
}
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
scheduleIdleTimeout(idleTimeout);
}
private void scheduleIdleTimeout(long delay)
{
Future<?> newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(this, delay, TimeUnit.MILLISECONDS) : null;
Future<?> oldTimeout = _timeout.getAndSet(newTimeout);
if (oldTimeout != null)
oldTimeout.cancel(false);
}
@Override
public void run()
{
if (isOpen())
{
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
if (idleLeft < 0)
{
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout);
}
}
@Override
public void setInput(ByteBuffer in)
{
super.setInput(in);
if (in == null || BufferUtil.hasContent(in))
_readInterest.readable();
}
@Override
public ByteBuffer takeOutput()
{
ByteBuffer b = super.takeOutput();
_writeFlusher.completeWrite();
return b;
}
@Override
public void setOutput(ByteBuffer out)
{
super.setOutput(out);
_writeFlusher.completeWrite();
}
@Override
public void reset()
{
_readInterest.close();
_writeFlusher.close();
super.reset();
}
@Override
public <C> void fillInterested(C context, Callback<C> callback) throws IllegalStateException
{
_readInterest.register(context, callback);
}
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
{
_writeFlusher.write(context, callback, buffers);
}
@Override
public AsyncConnection getAsyncConnection()
{
return _connection;
}
@Override
public void setAsyncConnection(AsyncConnection connection)
{
_connection = connection;
}
@Override
public void onOpen()
{
}
@Override
public void onClose()
{
}
}

View File

@ -1,129 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExecutorCallback;
import org.eclipse.jetty.util.FutureCallback;
/**
* <p>{@link AsyncEndPoint} add asynchronous scheduling methods to {@link EndPoint}.</p>
* <p>The design of these has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because they have
* some inefficiencies.</p>
* <p>This class will frequently be used in conjunction with some of the utility
* implementations of {@link Callback}, such as {@link FutureCallback} and
* {@link ExecutorCallback}. Examples are:</p>
*
* <h3>Blocking Read</h3>
* <p>A FutureCallback can be used to block until an endpoint is ready to be filled
* from:
* <blockquote><pre>
* FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.fillInterested("ContextObj",future);
* ...
* String context = future.get(); // This blocks
* int filled=endpoint.fill(mybuffer);
* </pre></blockquote></p>
*
* <h3>Dispatched Read</h3>
* <p>By using a different callback, the read can be done asynchronously in its own dispatched thread:
* <blockquote><pre>
* endpoint.fillInterested("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* {
* public void onCompleted(String context)
* {
* int filled=endpoint.fill(mybuffer);
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* });
* </pre></blockquote></p>
* <p>The executor callback can also be customized to not dispatch in some circumstances when
* it knows it can use the callback thread and does not need to dispatch.</p>
*
* <h3>Blocking Write</h3>
* <p>The write contract is that the callback complete is not called until all data has been
* written or there is a failure. For blocking this looks like:
* <blockquote><pre>
* FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
* String context = future.get(); // This blocks
* </pre></blockquote></p>
*
* <h3>Dispatched Write</h3>
* <p>Note also that multiple buffers may be passed in write so that gather writes
* can be done:
* <blockquote><pre>
* endpoint.write("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* {
* public void onCompleted(String context)
* {
* int filled=endpoint.fill(mybuffer);
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* },headerBuffer,contentBuffer);
* </pre></blockquote></p>
*/
public interface AsyncEndPoint extends EndPoint
{
/**
* <p>Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.</p>
*
* @param context the context to return via the callback
* @param callback the callback to call when an error occurs or we are readable.
* @throws ReadPendingException if another read operation is concurrent.
*/
<C> void fillInterested(C context, Callback<C> callback) throws ReadPendingException;
/**
* <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either
* all the data has been flushed or an error occurs.</p>
*
* @param context the context to return via the callback
* @param callback the callback to call when an error occurs or the write completed.
* @param buffers one or more {@link ByteBuffer}s that will be flushed.
* @throws WritePendingException if another write operation is concurrent.
*/
<C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException;
/**
* @return the {@link AsyncConnection} associated with this {@link AsyncEndPoint}
* @see #setAsyncConnection(AsyncConnection)
*/
AsyncConnection getAsyncConnection();
/**
* @param connection the {@link AsyncConnection} associated with this {@link AsyncEndPoint}
* @see #getAsyncConnection()
*/
void setAsyncConnection(AsyncConnection connection);
/**
* <p>Callback method invoked when this {@link AsyncEndPoint} is opened.</p>
* @see #onClose()
*/
void onOpen();
/**
* <p>Callback method invoked when this {@link AsyncEndPoint} is close.</p>
* @see #onOpen()
*/
void onClose();
}

View File

@ -16,9 +16,16 @@ package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
@ -30,12 +37,35 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{
public final static InetSocketAddress NOIP=new InetSocketAddress(0);
private final AtomicReference<Future<?>> _timeout = new AtomicReference<>();
private final ScheduledExecutorService _scheduler;
protected ByteBuffer _in;
protected ByteBuffer _out;
protected boolean _ishut;
protected boolean _oshut;
protected boolean _closed;
protected boolean _growOutput;
private final ReadInterest _readInterest = new ReadInterest()
{
@Override
protected boolean needsFill() throws IOException
{
if (_closed)
throw new ClosedChannelException();
return _in == null || BufferUtil.hasContent(_in);
}
};
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
protected void onIncompleteFlushed()
{
// Don't need to do anything here as takeOutput does the signalling.
}
};
/* ------------------------------------------------------------ */
/**
@ -43,9 +73,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public ByteArrayEndPoint()
{
super(NOIP,NOIP);
_in=BufferUtil.EMPTY_BUFFER;
_out=BufferUtil.allocate(1024);
this(null,0,null,null);
}
/* ------------------------------------------------------------ */
@ -54,9 +82,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public ByteArrayEndPoint(byte[] input, int outputSize)
{
super(NOIP,NOIP);
_in=input==null?null:ByteBuffer.wrap(input);
_out=BufferUtil.allocate(outputSize);
this(null,0,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
}
/* ------------------------------------------------------------ */
@ -65,10 +91,35 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public ByteArrayEndPoint(String input, int outputSize)
{
super(NOIP,NOIP);
setInput(input);
_out=BufferUtil.allocate(outputSize);
this(null,0,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
}
public ByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeoutMs)
{
this(timer,idleTimeoutMs,null,null);
}
public ByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeoutMs, byte[] input, int outputSize)
{
this(timer,idleTimeoutMs,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
}
public ByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeoutMs, String input, int outputSize)
{
this(timer,idleTimeoutMs,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
}
public ByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output)
{
super(NOIP,NOIP);
_in=input==null?BufferUtil.EMPTY_BUFFER:input;
_out=output==null?BufferUtil.allocate(1024):output;
_scheduler = timer;
setIdleTimeout(idleTimeoutMs);
}
/* ------------------------------------------------------------ */
@ -95,6 +146,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void setInput(ByteBuffer in)
{
_in = in;
if (in == null || BufferUtil.hasContent(in))
_readInterest.readable();
}
/* ------------------------------------------------------------ */
@ -135,6 +188,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{
ByteBuffer b=_out;
_out=BufferUtil.allocate(b.capacity());
_writeFlusher.completeWrite();
return b;
}
@ -164,6 +218,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void setOutput(ByteBuffer out)
{
_out = out;
_writeFlusher.completeWrite();
}
/* ------------------------------------------------------------ */
@ -195,7 +250,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
}
/* ------------------------------------------------------------ */
private void shutdownInput() throws IOException
private void shutdownInput()
{
_ishut=true;
if (_oshut)
@ -222,7 +277,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void close()
{
_closed=true;
// TODO: for sbordet to fix - onClose(); Moved invocation to AsycnByteArrayEndPoint for now (GW)
}
/* ------------------------------------------------------------ */
@ -299,6 +353,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public void reset()
{
_readInterest.close();
_writeFlusher.close();
_ishut=false;
_oshut=false;
_closed=false;
@ -334,5 +390,73 @@ public class ByteArrayEndPoint extends AbstractEndPoint
_growOutput=growOutput;
}
/* ------------------------------------------------------------ */
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
scheduleIdleTimeout(idleTimeout);
}
/* ------------------------------------------------------------ */
private void scheduleIdleTimeout(long delay)
{
if (delay>0 && _scheduler==null)
throw new IllegalStateException();
Future<?> newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(_timeoutTask, delay, TimeUnit.MILLISECONDS) : null;
Future<?> oldTimeout = _timeout.getAndSet(newTimeout);
if (oldTimeout != null)
oldTimeout.cancel(false);
}
/* ------------------------------------------------------------ */
private final Runnable _timeoutTask = new Runnable()
{
@Override
public void run()
{
if (isOpen())
{
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
if (idleLeft < 0)
{
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout);
}
}
};
/* ------------------------------------------------------------ */
@Override
public <C> void fillInterested(C context, Callback<C> callback) throws IllegalStateException
{
_readInterest.register(context, callback);
}
/* ------------------------------------------------------------ */
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
{
_writeFlusher.write(context, callback, buffers);
}
}

View File

@ -22,9 +22,12 @@ import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadPendingException;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -194,4 +197,16 @@ public class ChannelEndPoint extends AbstractEndPoint
{
return _socket;
}
@Override
public <C> void fillInterested(C context, Callback<C> callback) throws ReadPendingException
{
throw new UnsupportedOperationException();
}
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException
{
throw new UnsupportedOperationException();
}
}

View File

@ -16,27 +16,27 @@ package org.eclipse.jetty.io;
import org.eclipse.jetty.util.Callback;
/**
* <p>An {@link AsyncConnection} is associated to an {@link AsyncEndPoint} so that I/O events
* happening on the {@link AsyncEndPoint} can be processed by the {@link AsyncConnection}.</p>
* <p>A typical implementation of {@link AsyncConnection} overrides {@link #onOpen()} to
* {@link AsyncEndPoint#fillInterested(Object, Callback) set read interest} on the {@link AsyncEndPoint},
* and when the {@link AsyncEndPoint} signals read readyness, this {@link AsyncConnection} can
* <p>An {@link Connection} is associated to an {@link EndPoint} so that I/O events
* happening on the {@link EndPoint} can be processed by the {@link Connection}.</p>
* <p>A typical implementation of {@link Connection} overrides {@link #onOpen()} to
* {@link EndPoint#fillInterested(Object, Callback) set read interest} on the {@link EndPoint},
* and when the {@link EndPoint} signals read readyness, this {@link Connection} can
* read bytes from the network and interpret them.</p>
*/
public interface AsyncConnection
public interface Connection
{
/**
* <p>Callback method invoked when this {@link AsyncConnection} is opened.</p>
* <p>Callback method invoked when this {@link Connection} is opened.</p>
*/
void onOpen();
/**
* <p>Callback method invoked when this {@link AsyncConnection} is closed.</p>
* <p>Callback method invoked when this {@link Connection} is closed.</p>
*/
void onClose();
/**
* @return the {@link AsyncEndPoint} associated with this {@link AsyncConnection}
* @return the {@link EndPoint} associated with this {@link Connection}
*/
AsyncEndPoint getEndPoint();
EndPoint getEndPoint();
}

View File

@ -17,12 +17,78 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExecutorCallback;
import org.eclipse.jetty.util.FutureCallback;
/**
*
* A transport EndPoint
*
* <h3>Asynchronous Methods</h3>
* <p>The asynchronous scheduling methods of {@link EndPoint}
* has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because they have
* some inefficiencies.</p>
* <p>This class will frequently be used in conjunction with some of the utility
* implementations of {@link Callback}, such as {@link FutureCallback} and
* {@link ExecutorCallback}. Examples are:</p>
*
* <h3>Blocking Read</h3>
* <p>A FutureCallback can be used to block until an endpoint is ready to be filled
* from:
* <blockquote><pre>
* FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.fillInterested("ContextObj",future);
* ...
* String context = future.get(); // This blocks
* int filled=endpoint.fill(mybuffer);
* </pre></blockquote></p>
*
* <h3>Dispatched Read</h3>
* <p>By using a different callback, the read can be done asynchronously in its own dispatched thread:
* <blockquote><pre>
* endpoint.fillInterested("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* {
* public void onCompleted(String context)
* {
* int filled=endpoint.fill(mybuffer);
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* });
* </pre></blockquote></p>
* <p>The executor callback can also be customized to not dispatch in some circumstances when
* it knows it can use the callback thread and does not need to dispatch.</p>
*
* <h3>Blocking Write</h3>
* <p>The write contract is that the callback complete is not called until all data has been
* written or there is a failure. For blocking this looks like:
* <blockquote><pre>
* FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
* String context = future.get(); // This blocks
* </pre></blockquote></p>
*
* <h3>Dispatched Write</h3>
* <p>Note also that multiple buffers may be passed in write so that gather writes
* can be done:
* <blockquote><pre>
* endpoint.write("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* {
* public void onCompleted(String context)
* {
* int filled=endpoint.fill(mybuffer);
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* },headerBuffer,contentBuffer);
* </pre></blockquote></p>
*/
public interface EndPoint extends Closeable
{
@ -126,5 +192,48 @@ public interface EndPoint extends Closeable
void setIdleTimeout(long idleTimeout);
/**
* <p>Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.</p>
*
* @param context the context to return via the callback
* @param callback the callback to call when an error occurs or we are readable.
* @throws ReadPendingException if another read operation is concurrent.
*/
<C> void fillInterested(C context, Callback<C> callback) throws ReadPendingException;
/**
* <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either
* all the data has been flushed or an error occurs.</p>
*
* @param context the context to return via the callback
* @param callback the callback to call when an error occurs or the write completed.
* @param buffers one or more {@link ByteBuffer}s that will be flushed.
* @throws WritePendingException if another write operation is concurrent.
*/
<C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException;
/**
* @return the {@link Connection} associated with this {@link EndPoint}
* @see #setConnection(Connection)
*/
Connection getConnection();
/**
* @param connection the {@link Connection} associated with this {@link EndPoint}
* @see #getConnection()
*/
void setConnection(Connection connection);
/**
* <p>Callback method invoked when this {@link EndPoint} is opened.</p>
* @see #onClose()
*/
void onOpen();
/**
* <p>Callback method invoked when this {@link EndPoint} is close.</p>
* @see #onOpen()
*/
void onClose();
}

View File

@ -10,7 +10,7 @@ import org.eclipse.jetty.util.Callback;
/* ------------------------------------------------------------ */
/**
* A Utility class to help implement {@link AsyncEndPoint#fillInterested(Object, Callback)}
* A Utility class to help implement {@link EndPoint#fillInterested(Object, Callback)}
* by keeping state and calling the context and callback objects.
*
*/

View File

@ -32,7 +32,7 @@ import org.eclipse.jetty.util.log.Logger;
/**
* An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
*/
public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, SelectorManager.SelectableAsyncEndPoint
public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint
{
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
@ -45,8 +45,38 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
checkIdleTimeout();
}
};
private final Runnable _updateTask = new Runnable()
{
@Override
public void run()
{
try
{
if (getChannel().isOpen())
{
int oldInterestOps = _key.interestOps();
int newInterestOps = _interestOps;
if (newInterestOps != oldInterestOps)
setKeyInterests(oldInterestOps, newInterestOps);
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
close();
}
catch (Exception x)
{
LOG.warn("Ignoring key update for " + this, x);
close();
}
}
};
/**
* true if {@link ManagedSelector#destroyEndPoint(AsyncEndPoint)} has not been called
* true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
*/
private final AtomicBoolean _open = new AtomicBoolean();
private final ReadInterest _readInterest = new ReadInterest()
@ -68,7 +98,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
private final SelectorManager.ManagedSelector _selector;
private final SelectionKey _key;
private final ScheduledExecutorService _scheduler;
private volatile AsyncConnection _connection;
/**
* The desired value for {@link SelectionKey#interestOps()}
*/
@ -131,16 +160,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
}
@Override
public AsyncConnection getAsyncConnection()
public void setConnection(Connection connection)
{
return _connection;
}
@Override
public void setAsyncConnection(AsyncConnection connection)
{
AsyncConnection old = getAsyncConnection();
_connection = connection;
// TODO should this be on AbstractEndPoint?
Connection old = getConnection();
super.setConnection(connection);
if (old != null && old != connection)
_selector.getSelectorManager().connectionUpgraded(this, old);
}
@ -210,7 +234,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
{
_interestOps = newInterestOps;
LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
_selector.submit(this);
_selector.submit(_updateTask);
}
else
{
@ -218,30 +242,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
}
}
@Override
public void run()
{
try
{
if (getChannel().isOpen())
{
int oldInterestOps = _key.interestOps();
int newInterestOps = _interestOps;
if (newInterestOps != oldInterestOps)
setKeyInterests(oldInterestOps, newInterestOps);
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
close();
}
catch (Exception x)
{
LOG.warn("Ignoring key update for " + this, x);
close();
}
}
private void setKeyInterests(int oldInterestOps, int newInterestOps)
{
@ -260,12 +260,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
@Override
public void onOpen()
{
super.onOpen();
_open.compareAndSet(false, true);
}
@Override
public void onClose()
{
super.onClose();
_writeFlusher.close();
_readInterest.close();
}
@ -290,6 +292,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
}
return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}",
hashCode(), getRemoteAddress(), getLocalAddress(), isOpen(), isInputShutdown(),
isOutputShutdown(), _interestOps, keyString, _readInterest, _writeFlusher, getAsyncConnection());
isOutputShutdown(), _interestOps, keyString, _readInterest, _writeFlusher, getConnection());
}
}

View File

@ -44,7 +44,7 @@ import org.eclipse.jetty.util.log.Logger;
* <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
* simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
* <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
* {@link AsyncEndPoint}s and {@link AsyncConnection}s.</p>
* {@link EndPoint}s and {@link Connection}s.</p>
*/
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
{
@ -152,7 +152,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param endpoint the endpoint being opened
*/
protected void endPointOpened(AsyncEndPoint endpoint)
protected void endPointOpened(EndPoint endpoint)
{
endpoint.onOpen();
}
@ -162,7 +162,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param endpoint the endpoint being closed
*/
protected void endPointClosed(AsyncEndPoint endpoint)
protected void endPointClosed(EndPoint endpoint)
{
endpoint.onClose();
}
@ -172,7 +172,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param connection the connection just opened
*/
public void connectionOpened(AsyncConnection connection)
public void connectionOpened(Connection connection)
{
connection.onOpen();
}
@ -182,7 +182,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param connection the connection just closed
*/
public void connectionClosed(AsyncConnection connection)
public void connectionClosed(Connection connection)
{
connection.onClose();
}
@ -193,10 +193,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @param endpoint the endpoint holding the new connection
* @param oldConnection the previous connection
*/
public void connectionUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection)
public void connectionUpgraded(EndPoint endpoint, Connection oldConnection)
{
connectionClosed(oldConnection);
connectionOpened(endpoint.getAsyncConnection());
connectionOpened(endpoint.getConnection());
}
/**
@ -213,7 +213,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
/**
* <p>Factory method to create {@link AsyncEndPoint}.</p>
* <p>Factory method to create {@link EndPoint}.</p>
* <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
* or {@link #accept(SocketChannel)}.</p>
*
@ -222,12 +222,12 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @param selectionKey the selection key
* @return a new endpoint
* @throws IOException if the endPoint cannot be created
* @see #newConnection(SocketChannel, AsyncEndPoint, Object)
* @see #newConnection(SocketChannel, EndPoint, Object)
*/
protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
/**
* <p>Factory method to create {@link AsyncConnection}.</p>
* <p>Factory method to create {@link Connection}.</p>
*
* @param channel the channel associated to the connection
* @param endpoint the endpoint
@ -236,7 +236,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @throws IOException
* @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
*/
public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) throws IOException;
public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
@Override
public String dump()
@ -254,7 +254,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
* <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
* happen for registered channels. When events happen, it notifies the {@link AsyncEndPoint} associated
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
* with the channel.</p>
*/
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
@ -413,9 +413,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
Object attachment = key.attachment();
try
{
if (attachment instanceof SelectableAsyncEndPoint)
if (attachment instanceof SelectableEndPoint)
{
((SelectableAsyncEndPoint)attachment).onSelected();
((SelectableEndPoint)attachment).onSelected();
}
else if (key.isConnectable())
{
@ -427,7 +427,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (connected)
{
key.interestOps(0);
AsyncEndPoint endpoint = createEndPoint(channel, key);
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
}
else
@ -482,21 +482,21 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
_selector.wakeup();
}
private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
{
AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey);
EndPoint endPoint = newEndPoint(channel, this, selectionKey);
endPointOpened(endPoint);
AsyncConnection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setAsyncConnection(asyncConnection);
Connection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setConnection(asyncConnection);
connectionOpened(asyncConnection);
LOG.debug("Created {}", endPoint);
return endPoint;
}
public void destroyEndPoint(AsyncEndPoint endPoint)
public void destroyEndPoint(EndPoint endPoint)
{
LOG.debug("Destroyed {}", endPoint);
connectionClosed(endPoint.getAsyncConnection());
connectionClosed(endPoint.getConnection());
endPointClosed(endPoint);
}
@ -608,7 +608,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
try
{
SelectionKey key = _channel.register(_selector, 0, null);
AsyncEndPoint endpoint = createEndPoint(_channel, key);
EndPoint endpoint = createEndPoint(_channel, key);
key.attach(endpoint);
}
catch (IOException x)
@ -685,10 +685,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
/**
* A {@link SelectableAsyncEndPoint} is an {@link AsyncEndPoint} that wish to be notified of
* A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
* non-blocking events by the {@link ManagedSelector}.
*/
public interface SelectableAsyncEndPoint extends AsyncEndPoint
public interface SelectableEndPoint extends EndPoint
{
/**
* <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}

View File

@ -13,7 +13,7 @@ import org.eclipse.jetty.util.Callback;
/* ------------------------------------------------------------ */
/**
* A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
* A Utility class to help implement {@link EndPoint#write(Object, Callback, ByteBuffer...)}
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #onIncompleteFlushed()} is called when not all content has been
* written after a call to flush and should organise for the {@link #completeWrite()}

View File

@ -24,10 +24,10 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ReadInterest;
@ -40,11 +40,11 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* An AsyncConnection that acts as an intercepter between an AsyncEndPoint providing SSL encrypted data
* and another consumer of an AsyncEndPoint (typically an {@link AsyncConnection} like HttpConnection) that
* An AsyncConnection that acts as an intercepter between an EndPoint providing SSL encrypted data
* and another consumer of an EndPoint (typically an {@link Connection} like HttpConnection) that
* wants unencrypted data.
* <p>
* The connector uses an {@link AsyncEndPoint} (typically {@link SelectChannelEndPoint}) as
* The connector uses an {@link EndPoint} (typically {@link SelectChannelEndPoint}) as
* it's source/sink of encrypted data. It then provides an endpoint via {@link #getDecryptedEndPoint()} to
* expose a source/sink of unencrypted data to another connection (eg HttpConnection).
* <p>
@ -52,15 +52,15 @@ import org.eclipse.jetty.util.log.Logger;
* asynchronous callbacks, and active methods that do schedule asynchronous callbacks.
* <p>
* The passive methods are {@link DecryptedEndPoint#fill(ByteBuffer)} and {@link DecryptedEndPoint#flush(ByteBuffer...)}. They make best
* effort attempts to progress the connection using only calls to the encrypted {@link AsyncEndPoint#fill(ByteBuffer)} and {@link AsyncEndPoint#flush(ByteBuffer...)}
* effort attempts to progress the connection using only calls to the encrypted {@link EndPoint#fill(ByteBuffer)} and {@link EndPoint#flush(ByteBuffer...)}
* methods. They will never block nor schedule any readInterest or write callbacks. If a fill/flush cannot progress either because
* of network congestion or waiting for an SSL handshake message, then the fill/flush will simply return with zero bytes filled/flushed.
* Specifically, if a flush cannot proceed because it needs to receive a handshake message, then the flush will attempt to fill bytes from the
* encrypted endpoint, but if insufficient bytes are read it will NOT call {@link AsyncEndPoint#fillInterested(Object, Callback)}.
* encrypted endpoint, but if insufficient bytes are read it will NOT call {@link EndPoint#fillInterested(Object, Callback)}.
* <p>
* It is only the active methods : {@link DecryptedEndPoint#fillInterested(Object, Callback)} and
* {@link DecryptedEndPoint#write(Object, Callback, ByteBuffer...)} that may schedule callbacks by calling the encrypted
* {@link AsyncEndPoint#fillInterested(Object, Callback)} and {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
* {@link EndPoint#fillInterested(Object, Callback)} and {@link EndPoint#write(Object, Callback, ByteBuffer...)}
* methods. For normal data handling, the decrypted fillInterest method will result in an encrypted fillInterest and a decrypted
* write will result in an encrypted write. However, due to SSL handshaking requirements, it is also possible for a decrypted fill
* to call the encrypted write and for the decrypted flush to call the encrypted fillInterested methods.
@ -70,7 +70,7 @@ import org.eclipse.jetty.util.log.Logger;
* be called again and make another best effort attempt to progress the connection.
*
*/
public class SslConnection extends AbstractAsyncConnection
public class SslConnection extends AbstractConnection
{
private static final Logger LOG = Log.getLogger(SslConnection.class);
private final ByteBufferPool _bufferPool;
@ -82,7 +82,7 @@ public class SslConnection extends AbstractAsyncConnection
private final boolean _encryptedDirectBuffers = false;
private final boolean _decryptedDirectBuffers = false;
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, AsyncEndPoint endPoint, SSLEngine sslEngine)
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine)
{
super(endPoint, executor, true);
this._bufferPool = byteBufferPool;
@ -95,7 +95,7 @@ public class SslConnection extends AbstractAsyncConnection
return _sslEngine;
}
public AsyncEndPoint getDecryptedEndPoint()
public EndPoint getDecryptedEndPoint()
{
return _decryptedEndPoint;
}
@ -113,7 +113,7 @@ public class SslConnection extends AbstractAsyncConnection
if (_sslEngine.getUseClientMode())
_decryptedEndPoint.write(null, new Callback.Empty<>(), BufferUtil.EMPTY_BUFFER);
getDecryptedEndPoint().getAsyncConnection().onOpen();
getDecryptedEndPoint().getConnection().onOpen();
}
catch (SSLException x)
{
@ -187,25 +187,14 @@ public class SslConnection extends AbstractAsyncConnection
}
/* ------------------------------------------------------------ */
public class DecryptedEndPoint extends AbstractEndPoint implements AsyncEndPoint
public class DecryptedEndPoint extends AbstractEndPoint implements EndPoint
{
private AsyncConnection _connection;
private boolean _fillRequiresFlushToProgress;
private boolean _flushRequiresFillToProgress;
private boolean _cannotAcceptMoreAppDataToFlush;
private boolean _needToFillMoreDataToProgress;
private boolean _ishut = false;
@Override
public void onOpen()
{
}
@Override
public void onClose()
{
}
private final Callback<Void> _writeCallback = new Callback<Void>()
{
@ -553,7 +542,7 @@ public class SslConnection extends AbstractAsyncConnection
// or busy handshaking, then zero bytes may be taken from appOuts and this method
// will return 0 (even if some handshake bytes were flushed and filled).
// it is the applications responsibility to call flush again - either in a busy loop
// or better yet by using AsyncEndPoint#write to do the flushing.
// or better yet by using EndPoint#write to do the flushing.
LOG.debug("{} flush enter {}", SslConnection.this, Arrays.toString(appOuts));
try
@ -714,18 +703,6 @@ public class SslConnection extends AbstractAsyncConnection
return _ishut;
}
@Override
public AsyncConnection getAsyncConnection()
{
return _connection;
}
@Override
public void setAsyncConnection(AsyncConnection connection)
{
_connection = connection;
}
@Override
public String toString()
{

View File

@ -1,192 +0,0 @@
package org.eclipse.jetty.io;
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AsyncByteArrayEndPointTest
{
private ScheduledExecutorService _scheduler;
@Before
public void before()
{
_scheduler = Executors.newSingleThreadScheduledExecutor();
}
@After
public void after()
{
_scheduler.shutdownNow();
}
@Test
public void testReadable() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000);
endp.setInput("test input");
ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<String> fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(10, endp.fill(buffer));
assertEquals("test input", BufferUtil.toString(buffer));
fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertFalse(fcb.isDone());
assertEquals(0, endp.fill(buffer));
endp.setInput(" more");
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(5, endp.fill(buffer));
assertEquals("test input more", BufferUtil.toString(buffer));
fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertFalse(fcb.isDone());
assertEquals(0, endp.fill(buffer));
endp.setInput((ByteBuffer)null);
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(-1, endp.fill(buffer));
fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(-1, endp.fill(buffer));
endp.close();
fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone());
try
{
fcb.get();
fail();
}
catch (ExecutionException e)
{
assertThat(e.toString(), containsString("Closed"));
}
}
@Test
public void testWrite() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000, (byte[])null, 15);
endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(10));
ByteBuffer data = BufferUtil.toBuffer("Data.");
ByteBuffer more = BufferUtil.toBuffer(" Some more.");
FutureCallback<String> fcb = new FutureCallback<>();
endp.write("CTX", fcb, data);
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals("Data.", endp.getOutputString());
fcb = new FutureCallback<>();
endp.write("CTX", fcb, more);
assertFalse(fcb.isDone());
assertEquals("Data. Some", endp.getOutputString());
assertEquals("Data. Some", endp.takeOutputString());
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(" more.", endp.getOutputString());
}
@Test
public void testIdle() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 500);
endp.setInput("test");
endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(5));
// no idle check
assertTrue(endp.isOpen());
Thread.sleep(1000);
assertTrue(endp.isOpen());
// normal read
ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<Void> fcb = new FutureCallback<>();
endp.fillInterested(null, fcb);
assertTrue(fcb.isDone());
assertEquals(null, fcb.get());
assertEquals(4, endp.fill(buffer));
assertEquals("test", BufferUtil.toString(buffer));
// read timeout
fcb = new FutureCallback<>();
endp.fillInterested(null, fcb);
long start = System.currentTimeMillis();
try
{
fcb.get();
fail();
}
catch (ExecutionException t)
{
assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class));
}
assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L));
assertTrue(endp.isOpen());
// write timeout
fcb = new FutureCallback<>();
start = System.currentTimeMillis();
endp.write(null, fcb, BufferUtil.toBuffer("This is too long"));
try
{
fcb.get();
fail();
}
catch (ExecutionException t)
{
assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class));
}
assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L));
assertTrue(endp.isOpen());
// Still no idle close
Thread.sleep(1000);
assertTrue(endp.isOpen());
// shutdown out
endp.shutdownOutput();
// idle close
Thread.sleep(1000);
assertFalse(endp.isOpen());
}
}

View File

@ -2,17 +2,41 @@ package org.eclipse.jetty.io;
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ByteArrayEndPointTest
{
private ScheduledExecutorService _scheduler;
@Before
public void before()
{
_scheduler = Executors.newSingleThreadScheduledExecutor();
}
@After
public void after()
{
_scheduler.shutdownNow();
}
@Test
public void testFill() throws Exception
{
@ -104,6 +128,159 @@ public class ByteArrayEndPointTest
assertEquals("data.",BufferUtil.toString(endp.takeOutput()));
}
@Test
public void testReadable() throws Exception
{
ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 5000);
endp.setInput("test input");
ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<String> fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(10, endp.fill(buffer));
assertEquals("test input", BufferUtil.toString(buffer));
fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertFalse(fcb.isDone());
assertEquals(0, endp.fill(buffer));
endp.setInput(" more");
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(5, endp.fill(buffer));
assertEquals("test input more", BufferUtil.toString(buffer));
fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertFalse(fcb.isDone());
assertEquals(0, endp.fill(buffer));
endp.setInput((ByteBuffer)null);
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(-1, endp.fill(buffer));
fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(-1, endp.fill(buffer));
endp.close();
fcb = new FutureCallback<>();
endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone());
try
{
fcb.get();
fail();
}
catch (ExecutionException e)
{
assertThat(e.toString(), containsString("Closed"));
}
}
@Test
public void testWrite() throws Exception
{
ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 5000, (byte[])null, 15);
endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(10));
ByteBuffer data = BufferUtil.toBuffer("Data.");
ByteBuffer more = BufferUtil.toBuffer(" Some more.");
FutureCallback<String> fcb = new FutureCallback<>();
endp.write("CTX", fcb, data);
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals("Data.", endp.getOutputString());
fcb = new FutureCallback<>();
endp.write("CTX", fcb, more);
assertFalse(fcb.isDone());
assertEquals("Data. Some", endp.getOutputString());
assertEquals("Data. Some", endp.takeOutputString());
assertTrue(fcb.isDone());
assertEquals("CTX", fcb.get());
assertEquals(" more.", endp.getOutputString());
}
@Test
public void testIdle() throws Exception
{
ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 500);
endp.setInput("test");
endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(5));
// no idle check
assertTrue(endp.isOpen());
Thread.sleep(1000);
assertTrue(endp.isOpen());
// normal read
ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<Void> fcb = new FutureCallback<>();
endp.fillInterested(null, fcb);
assertTrue(fcb.isDone());
assertEquals(null, fcb.get());
assertEquals(4, endp.fill(buffer));
assertEquals("test", BufferUtil.toString(buffer));
// read timeout
fcb = new FutureCallback<>();
endp.fillInterested(null, fcb);
long start = System.currentTimeMillis();
try
{
fcb.get();
fail();
}
catch (ExecutionException t)
{
assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class));
}
assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L));
assertTrue(endp.isOpen());
// write timeout
fcb = new FutureCallback<>();
start = System.currentTimeMillis();
endp.write(null, fcb, BufferUtil.toBuffer("This is too long"));
try
{
fcb.get();
fail();
}
catch (ExecutionException t)
{
assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class));
}
assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L));
assertTrue(endp.isOpen());
// Still no idle close
Thread.sleep(1000);
assertTrue(endp.isOpen());
// shutdown out
endp.shutdownOutput();
// idle close
Thread.sleep(1000);
assertFalse(endp.isOpen());
}
}

View File

@ -62,7 +62,7 @@ public class SelectChannelEndPointInterestsTest
}
@Override
protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, scheduler, 60000)
{
@ -76,9 +76,9 @@ public class SelectChannelEndPointInterestsTest
}
@Override
public AsyncConnection newConnection(SocketChannel channel, final AsyncEndPoint endPoint, Object attachment)
public Connection newConnection(SocketChannel channel, final EndPoint endPoint, Object attachment)
{
return new AbstractAsyncConnection(endPoint, threadPool)
return new AbstractConnection(endPoint, threadPool)
{
@Override
public void onOpen()
@ -109,7 +109,7 @@ public class SelectChannelEndPointInterestsTest
init(new Interested()
{
@Override
public void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection)
public void onFillable(EndPoint endPoint, AbstractConnection connection)
{
ByteBuffer input = BufferUtil.allocate(2);
int read = fill(endPoint, input);
@ -143,7 +143,7 @@ public class SelectChannelEndPointInterestsTest
writeBlocked.set(true);
}
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
private int fill(EndPoint endPoint, ByteBuffer buffer)
{
try
{
@ -191,7 +191,7 @@ public class SelectChannelEndPointInterestsTest
private interface Interested
{
void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection);
void onFillable(EndPoint endPoint, AbstractConnection connection);
void onIncompleteFlush();
}

View File

@ -53,14 +53,14 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
}
@Override
protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
protected Connection newConnection(SocketChannel channel, EndPoint endpoint)
{
SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(false);
SslConnection sslConnection = new SslConnection(__byteBufferPool, _threadPool, endpoint, engine);
AsyncConnection appConnection = super.newConnection(channel,sslConnection.getDecryptedEndPoint());
sslConnection.getDecryptedEndPoint().setAsyncConnection(appConnection);
Connection appConnection = super.newConnection(channel,sslConnection.getDecryptedEndPoint());
sslConnection.getDecryptedEndPoint().setConnection(appConnection);
_manager.connectionOpened(appConnection);
return sslConnection;

View File

@ -51,7 +51,7 @@ import org.junit.Test;
public class SelectChannelEndPointTest
{
protected CountDownLatch _lastEndPointLatch;
protected volatile AsyncEndPoint _lastEndPoint;
protected volatile EndPoint _lastEndPoint;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor();
@ -64,7 +64,7 @@ public class SelectChannelEndPointTest
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
{
return SelectChannelEndPointTest.this.newConnection(channel, endpoint);
}
@ -108,18 +108,18 @@ public class SelectChannelEndPointTest
return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort());
}
protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
protected Connection newConnection(SocketChannel channel, EndPoint endpoint)
{
return new TestConnection(endpoint);
}
public class TestConnection extends AbstractAsyncConnection
public class TestConnection extends AbstractConnection
{
ByteBuffer _in = BufferUtil.allocate(32 * 1024);
ByteBuffer _out = BufferUtil.allocate(32 * 1024);
long _last = -1;
public TestConnection(AsyncEndPoint endp)
public TestConnection(EndPoint endp)
{
super(endp, _threadPool);
}
@ -134,7 +134,7 @@ public class SelectChannelEndPointTest
@Override
public synchronized void onFillable()
{
AsyncEndPoint _endp = getEndPoint();
EndPoint _endp = getEndPoint();
try
{
_last = System.currentTimeMillis();
@ -547,7 +547,7 @@ public class SelectChannelEndPointTest
System.err.println("time=" + (now - start));
System.err.println("last=" + (now - last));
System.err.println("endp=" + _lastEndPoint);
System.err.println("conn=" + _lastEndPoint.getAsyncConnection());
System.err.println("conn=" + _lastEndPoint.getConnection());
e.printStackTrace();
}

View File

@ -37,7 +37,7 @@ public class SslConnectionTest
private static SslContextFactory __sslCtxFactory=new SslContextFactory();
private static ByteBufferPool __byteBufferPool = new StandardByteBufferPool();
protected volatile AsyncEndPoint _lastEndp;
protected volatile EndPoint _lastEndp;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor();
@ -50,14 +50,14 @@ public class SslConnectionTest
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
{
SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(false);
SslConnection sslConnection = new SslConnection(__byteBufferPool, _threadPool, endpoint, engine);
AsyncConnection appConnection = new TestConnection(sslConnection.getDecryptedEndPoint());
sslConnection.getDecryptedEndPoint().setAsyncConnection(appConnection);
Connection appConnection = new TestConnection(sslConnection.getDecryptedEndPoint());
sslConnection.getDecryptedEndPoint().setConnection(appConnection);
connectionOpened(appConnection);
return sslConnection;
@ -105,11 +105,11 @@ public class SslConnectionTest
_connector.close();
}
public class TestConnection extends AbstractAsyncConnection
public class TestConnection extends AbstractConnection
{
ByteBuffer _in = BufferUtil.allocate(8*1024);
public TestConnection(AsyncEndPoint endp)
public TestConnection(EndPoint endp)
{
super(endp, _threadPool);
}
@ -130,7 +130,7 @@ public class SslConnectionTest
@Override
public synchronized void onFillable()
{
AsyncEndPoint endp = getEndPoint();
EndPoint endp = getEndPoint();
try
{
boolean progress=true;

View File

@ -22,8 +22,8 @@ import java.util.concurrent.ThreadFactory;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
@ -179,7 +179,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _httpConfig;
}
protected AsyncConnection newConnection(AsyncEndPoint endp) throws IOException
protected Connection newConnection(EndPoint endp) throws IOException
{
// TODO make this a plugable configurable connection factory for HTTP, HTTPS, SPDY & Websocket
@ -188,8 +188,8 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
SSLEngine engine = _sslContextFactory.createSSLEngine(endp.getRemoteAddress());
SslConnection ssl_connection = new SslConnection(getByteBufferPool(), getExecutor(), endp, engine);
AsyncConnection http_connection = new HttpConnection(_httpConfig,this,ssl_connection.getDecryptedEndPoint());
ssl_connection.getDecryptedEndPoint().setAsyncConnection(http_connection);
Connection http_connection = new HttpConnection(_httpConfig,this,ssl_connection.getDecryptedEndPoint());
ssl_connection.getDecryptedEndPoint().setConnection(http_connection);
return ssl_connection;
}
return new HttpConnection(_httpConfig,this,endp);
@ -389,19 +389,19 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
_name = name;
}
protected void connectionOpened(AsyncConnection connection)
protected void connectionOpened(Connection connection)
{
_stats.connectionOpened();
}
protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection)
protected void connectionUpgraded(Connection oldConnection, Connection newConnection)
{
long duration = System.currentTimeMillis() - oldConnection.getEndPoint().getCreatedTimeStamp();
int requests = (oldConnection instanceof HttpConnection) ? ((HttpConnection)oldConnection).getHttpChannel().getRequests() : 0;
_stats.connectionUpgraded(duration, requests, requests);
}
protected void connectionClosed(AsyncConnection connection)
protected void connectionClosed(Connection connection)
{
long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
// TODO: remove casts to HttpConnection

View File

@ -37,8 +37,8 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.UncheckedPrintWriter;
import org.eclipse.jetty.util.BufferUtil;
@ -72,7 +72,7 @@ public abstract class HttpChannel
private final Server _server;
private final AsyncConnection _connection;
private final Connection _connection;
private final HttpURI _uri;
private final ChannelEventHandler _handler = new ChannelEventHandler();
@ -100,7 +100,7 @@ public abstract class HttpChannel
/* ------------------------------------------------------------ */
public HttpChannel(Server server,AsyncConnection connection,HttpInput input)
public HttpChannel(Server server,Connection connection,HttpInput input)
{
_server = server;
_connection = connection;
@ -127,7 +127,7 @@ public abstract class HttpChannel
}
/* ------------------------------------------------------------ */
public AsyncEndPoint getEndPoint()
public EndPoint getEndPoint()
{
return getConnection().getEndPoint();
}
@ -190,7 +190,7 @@ public abstract class HttpChannel
}
/* ------------------------------------------------------------ */
public AsyncConnection getConnection()
public Connection getConnection()
{
return _connection;
}

View File

@ -24,9 +24,9 @@ import org.eclipse.jetty.http.HttpGenerator.Action;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
@ -37,7 +37,7 @@ import org.eclipse.jetty.util.log.Logger;
/**
* A Connection that handles the HTTP protocol
*/
public class HttpConnection extends AbstractAsyncConnection
public class HttpConnection extends AbstractConnection
{
public static final Logger LOG = Log.getLogger(HttpConnection.class);
@ -75,7 +75,7 @@ public class HttpConnection extends AbstractAsyncConnection
}
/* ------------------------------------------------------------ */
public HttpConnection(HttpConfiguration config, Connector connector, AsyncEndPoint endpoint)
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endpoint)
{
super(endpoint,connector.getExecutor());
@ -195,7 +195,7 @@ public class HttpConnection extends AbstractAsyncConnection
/* ------------------------------------------------------------ */
/** Parse and handle HTTP messages.
* <p>
* This method is normally called as the {@link AbstractAsyncConnection} onReadable callback.
* This method is normally called as the {@link AbstractConnection} onReadable callback.
* However, it can also be called {@link HttpChannelOverHttp#completed()} if there is unconsumed
* data in the _requestBuffer, as a result of resuming a suspended request when there is a pipelined
* request already read into the buffer.
@ -279,7 +279,7 @@ public class HttpConnection extends AbstractAsyncConnection
}
// return if the connection has been changed
if (getEndPoint().getAsyncConnection()!=this)
if (getEndPoint().getConnection()!=this)
return;
}
else if (_headerBytes>= _httpConfig.getRequestHeaderSize())
@ -437,11 +437,11 @@ public class HttpConnection extends AbstractAsyncConnection
// Handle connection upgrades
if (getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
AsyncConnection connection=(AsyncConnection)getRequest().getAttribute(UPGRADE_CONNECTION_ATTR);
Connection connection=(Connection)getRequest().getAttribute(UPGRADE_CONNECTION_ATTR);
if (connection!=null)
{
LOG.debug("Upgrade from {} to {}",this,connection);
getEndPoint().setAsyncConnection(connection);
getEndPoint().setConnection(connection);
HttpConnection.this.reset();
return;
}

View File

@ -22,8 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AsyncByteArrayEndPoint;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
@ -158,15 +158,15 @@ public class LocalConnector extends AbstractConnector
{
LOG.debug("accepting {}",acceptorID);
LocalEndPoint endp = _connects.take();
AsyncConnection connection=newConnection(endp);
endp.setAsyncConnection(connection);
Connection connection=newConnection(endp);
endp.setConnection(connection);
endp.onOpen();
connection.onOpen();
connectionOpened(connection);
}
public class LocalEndPoint extends AsyncByteArrayEndPoint
public class LocalEndPoint extends ByteArrayEndPoint
{
private CountDownLatch _closed = new CountDownLatch(1);
@ -191,8 +191,8 @@ public class LocalConnector extends AbstractConnector
super.close();
if (was_open)
{
connectionClosed(getAsyncConnection());
getAsyncConnection().onClose();
connectionClosed(getConnection());
getConnection().onClose();
onClose();
}
}

View File

@ -24,8 +24,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
@ -237,9 +237,9 @@ public class SelectChannelConnector extends AbstractNetConnector
return new SelectChannelEndPoint(channel,selectSet,key, getScheduler(), getIdleTimeout());
}
protected void endPointClosed(AsyncEndPoint endpoint)
protected void endPointClosed(EndPoint endpoint)
{
connectionClosed(endpoint.getAsyncConnection());
connectionClosed(endpoint.getConnection());
}
/* ------------------------------------------------------------ */
@ -257,25 +257,25 @@ public class SelectChannelConnector extends AbstractNetConnector
}
@Override
protected void endPointClosed(AsyncEndPoint endpoint)
protected void endPointClosed(EndPoint endpoint)
{
SelectChannelConnector.this.connectionClosed(endpoint.getAsyncConnection());
SelectChannelConnector.this.connectionClosed(endpoint.getConnection());
super.endPointClosed(endpoint);
}
@Override
protected void endPointOpened(AsyncEndPoint endpoint)
protected void endPointOpened(EndPoint endpoint)
{
// TODO handle max connections and low resources
super.endPointOpened(endpoint);
SelectChannelConnector.this.connectionOpened(endpoint.getAsyncConnection());
SelectChannelConnector.this.connectionOpened(endpoint.getConnection());
}
@Override
public void connectionUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection)
public void connectionUpgraded(EndPoint endpoint, Connection oldConnection)
{
super.connectionUpgraded(endpoint, oldConnection);
SelectChannelConnector.this.connectionUpgraded(oldConnection, endpoint.getAsyncConnection());
SelectChannelConnector.this.connectionUpgraded(oldConnection, endpoint.getConnection());
}
@Override
@ -285,7 +285,7 @@ public class SelectChannelConnector extends AbstractNetConnector
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) throws IOException
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
return SelectChannelConnector.this.newConnection(endpoint);
}

View File

@ -22,7 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint;
@ -88,7 +88,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector
}
@Override
protected void endPointClosed(AsyncEndPoint endpoint)
protected void endPointClosed(EndPoint endpoint)
{
super.endPointClosed(endpoint);
((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed();

View File

@ -150,7 +150,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
// Get the server side endpoint
EndPoint endp = endpoint.exchange(null,10,TimeUnit.SECONDS);
if (endp instanceof SslConnection.DecryptedEndPoint)
endp=((SslConnection.DecryptedEndPoint)endp).getAsyncConnection().getEndPoint();
endp=((SslConnection.DecryptedEndPoint)endp).getConnection().getEndPoint();
// read the response
String result=IO.toString(is);
@ -223,7 +223,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
// Get the server side endpoint
EndPoint endp = endpoint.exchange(null,10,TimeUnit.SECONDS);
if (endp instanceof SslConnection.DecryptedEndPoint)
endp=((SslConnection.DecryptedEndPoint)endp).getAsyncConnection().getEndPoint();
endp=((SslConnection.DecryptedEndPoint)endp).getConnection().getEndPoint();
// read the response
String result=IO.toString(is);

View File

@ -38,9 +38,10 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncByteArrayEndPoint;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.session.HashSessionIdManager;
@ -71,9 +72,9 @@ public class ResponseTest
_server.start();
_timer=new ScheduledThreadPoolExecutor(1);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,5000);
AbstractEndPoint endp = new ByteArrayEndPoint(_timer,5000);
HttpInput input = new HttpInput();
AsyncConnection connection = new AbstractAsyncConnection(endp,new Executor()
Connection connection = new AbstractConnection(endp,new Executor()
{
@Override
public void execute(Runnable command)

View File

@ -28,8 +28,8 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.util.log.Log;
@ -62,14 +62,14 @@ public class SelectChannelStatisticsTest
_connector = new SelectChannelConnector(_server)
{
@Override
protected void endPointClosed(AsyncEndPoint endpoint)
protected void endPointClosed(EndPoint endpoint)
{
//System.err.println("Endpoint closed "+endpoint);
super.endPointClosed(endpoint);
}
@Override
public void connectionClosed(AsyncConnection connection)
public void connectionClosed(Connection connection)
{
//System.err.println("Connection closed "+connection);
super.connectionClosed(connection);

View File

@ -26,8 +26,8 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.After;
import org.junit.Assert;
@ -46,7 +46,7 @@ public class SlowClientWithPipelinedRequestTest
connector = new SelectChannelConnector(server)
{
@Override
protected AsyncConnection newConnection(AsyncEndPoint endpoint)
protected Connection newConnection(EndPoint endpoint)
{
return new HttpConnection(getHttpConfig(),this,endpoint)
{

View File

@ -431,8 +431,8 @@ public class ContextHandlerTest
private static final class WriterHandler extends AbstractHandler
{
boolean error;
Throwable throwable;
volatile boolean error;
volatile Throwable throwable;
public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException

View File

@ -31,7 +31,7 @@ import javax.servlet.http.HttpServletResponse;
import junit.framework.TestCase;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SelectChannelConnector;
@ -43,7 +43,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
*/
public class SSLCloseTest extends TestCase
{
private static AsyncEndPoint __endp;
private static EndPoint __endp;
private static class CredulousTM implements TrustManager, X509TrustManager
{
public X509Certificate[] getAcceptedIssuers()

View File

@ -15,10 +15,10 @@ package org.eclipse.jetty.spdy;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
public interface AsyncConnectionFactory
{
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment);
public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment);
}

View File

@ -19,14 +19,14 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
public class EmptyAsyncEndPoint implements AsyncEndPoint
public class EmptyEndPoint implements EndPoint
{
private boolean checkForIdle;
private AsyncConnection connection;
private Connection connection;
private boolean oshut;
private boolean closed;
private long maxIdleTime;
@ -38,13 +38,13 @@ public class EmptyAsyncEndPoint implements AsyncEndPoint
}
@Override
public AsyncConnection getAsyncConnection()
public Connection getConnection()
{
return connection;
}
@Override
public void setAsyncConnection(AsyncConnection connection)
public void setConnection(Connection connection)
{
this.connection = connection;
}

View File

@ -18,15 +18,15 @@ import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class NextProtoNegoClientAsyncConnection extends AbstractAsyncConnection implements NextProtoNego.ClientProvider
public class NextProtoNegoClientAsyncConnection extends AbstractConnection implements NextProtoNego.ClientProvider
{
private final Logger logger = Log.getLogger(getClass());
private final SocketChannel channel;
@ -34,7 +34,7 @@ public class NextProtoNegoClientAsyncConnection extends AbstractAsyncConnection
private final SPDYClient client;
private volatile boolean completed;
public NextProtoNegoClientAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment, Executor executor, SPDYClient client)
public NextProtoNegoClientAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment, Executor executor, SPDYClient client)
{
super(endPoint, executor);
this.channel = channel;
@ -86,8 +86,8 @@ public class NextProtoNegoClientAsyncConnection extends AbstractAsyncConnection
public void unsupported()
{
// Server does not support NPN, but this is a SPDY client, so hardcode SPDY
AsyncEndPoint endPoint = getEndPoint();
AsyncConnection connection = client.getDefaultAsyncConnectionFactory().newAsyncConnection(channel, endPoint, attachment);
EndPoint endPoint = getEndPoint();
Connection connection = client.getDefaultAsyncConnectionFactory().newAsyncConnection(channel, endPoint, attachment);
client.replaceAsyncConnection(endPoint, connection);
completed = true;
}
@ -98,8 +98,8 @@ public class NextProtoNegoClientAsyncConnection extends AbstractAsyncConnection
String protocol = client.selectProtocol(protocols);
if (protocol == null)
return null;
AsyncEndPoint endPoint = getEndPoint();
AsyncConnection connection = client.getAsyncConnectionFactory(protocol).newAsyncConnection(channel, endPoint, attachment);
EndPoint endPoint = getEndPoint();
Connection connection = client.getAsyncConnectionFactory(protocol).newAsyncConnection(channel, endPoint, attachment);
client.replaceAsyncConnection(endPoint, connection);
completed = true;
return protocol;

View File

@ -17,22 +17,22 @@ import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.List;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class NextProtoNegoServerAsyncConnection extends AbstractAsyncConnection implements NextProtoNego.ServerProvider
public class NextProtoNegoServerAsyncConnection extends AbstractConnection implements NextProtoNego.ServerProvider
{
private final Logger logger = Log.getLogger(getClass());
private final SocketChannel channel;
private final SPDYServerConnector connector;
private volatile boolean completed;
public NextProtoNegoServerAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, SPDYServerConnector connector)
public NextProtoNegoServerAsyncConnection(SocketChannel channel, EndPoint endPoint, SPDYServerConnector connector)
{
super(endPoint, connector.getExecutor());
this.channel = channel;
@ -77,8 +77,8 @@ public class NextProtoNegoServerAsyncConnection extends AbstractAsyncConnection
public void unsupported()
{
AsyncConnectionFactory asyncConnectionFactory = connector.getDefaultAsyncConnectionFactory();
AsyncEndPoint endPoint = getEndPoint();
AsyncConnection connection = asyncConnectionFactory.newAsyncConnection(channel, endPoint, connector);
EndPoint endPoint = getEndPoint();
Connection connection = asyncConnectionFactory.newAsyncConnection(channel, endPoint, connector);
connector.replaceAsyncConnection(endPoint, connection);
completed = true;
}
@ -93,8 +93,8 @@ public class NextProtoNegoServerAsyncConnection extends AbstractAsyncConnection
public void protocolSelected(String protocol)
{
AsyncConnectionFactory asyncConnectionFactory = connector.getAsyncConnectionFactory(protocol);
AsyncEndPoint endPoint = getEndPoint();
AsyncConnection connection = asyncConnectionFactory.newAsyncConnection(channel, endPoint, connector);
EndPoint endPoint = getEndPoint();
Connection connection = asyncConnectionFactory.newAsyncConnection(channel, endPoint, connector);
connector.replaceAsyncConnection(endPoint, connection);
completed = true;
}

View File

@ -17,8 +17,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.spdy.parser.Parser;
@ -26,7 +26,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class SPDYAsyncConnection extends AbstractAsyncConnection implements Controller<StandardSession.FrameBytes>, IdleListener
public class SPDYAsyncConnection extends AbstractConnection implements Controller<StandardSession.FrameBytes>, IdleListener
{
private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class);
private final ByteBufferPool bufferPool;
@ -34,7 +34,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
private volatile ISession session;
private volatile boolean idle = false;
public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
public SPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
{
super(endPoint, executor);
this.bufferPool = bufferPool;
@ -61,7 +61,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
protected int read(ByteBuffer buffer)
{
AsyncEndPoint endPoint = getEndPoint();
EndPoint endPoint = getEndPoint();
while (true)
{
int filled = fill(endPoint, buffer);
@ -81,7 +81,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
}
}
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
private int fill(EndPoint endPoint, ByteBuffer buffer)
{
try
{
@ -99,7 +99,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
@Override
public int write(ByteBuffer buffer, final Callback<StandardSession.FrameBytes> callback, StandardSession.FrameBytes context)
{
AsyncEndPoint endPoint = getEndPoint();
EndPoint endPoint = getEndPoint();
int remaining = buffer.remaining();
endPoint.write(context, callback, buffer);
return remaining - buffer.remaining();
@ -108,7 +108,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
@Override
public void close(boolean onlyOutput)
{
AsyncEndPoint endPoint = getEndPoint();
EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
logger.debug("Shutting down output {}", endPoint);

View File

@ -31,8 +31,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
@ -184,10 +184,10 @@ public class SPDYClient
return FlowControlStrategyFactory.newFlowControlStrategy(version);
}
public void replaceAsyncConnection(AsyncEndPoint endPoint, AsyncConnection connection)
public void replaceAsyncConnection(EndPoint endPoint, Connection connection)
{
AsyncConnection oldConnection = endPoint.getAsyncConnection();
endPoint.setAsyncConnection(connection);
Connection oldConnection = endPoint.getConnection();
endPoint.setConnection(connection);
factory.selector.connectionUpgraded(endPoint, oldConnection);
}
@ -306,14 +306,14 @@ public class SPDYClient
{
@Override
protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
SessionPromise attachment = (SessionPromise)key.attachment();
long clientIdleTimeout = attachment.client.getIdleTimeout();
if (clientIdleTimeout < 0)
clientIdleTimeout = idleTimeout;
AsyncEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, scheduler, clientIdleTimeout);
EndPoint result = new SelectChannelEndPoint(channel, selectSet, key, scheduler, clientIdleTimeout);
return result;
}
@ -325,7 +325,7 @@ public class SPDYClient
}
@Override
public AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint, final Object attachment)
public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment)
{
SessionPromise sessionPromise = (SessionPromise)attachment;
final SPDYClient client = sessionPromise.client;
@ -345,9 +345,9 @@ public class SPDYClient
}
};
AsyncEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
NextProtoNegoClientAsyncConnection connection = new NextProtoNegoClientAsyncConnection(channel, sslEndPoint, attachment, client.factory.threadPool, client);
sslEndPoint.setAsyncConnection(connection);
sslEndPoint.setConnection(connection);
connectionOpened(connection);
NextProtoNego.put(engine, connection);
@ -357,8 +357,8 @@ public class SPDYClient
else
{
AsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment);
endPoint.setAsyncConnection(connection);
Connection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment);
endPoint.setConnection(connection);
return connection;
}
}
@ -403,7 +403,7 @@ public class SPDYClient
private static class ClientSPDYAsyncConnectionFactory implements AsyncConnectionFactory
{
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment)
{
SessionPromise sessionPromise = (SessionPromise)attachment;
SPDYClient client = sessionPromise.client;
@ -414,7 +414,7 @@ public class SPDYClient
Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor());
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
endPoint.setAsyncConnection(connection);
endPoint.setConnection(connection);
FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
@ -433,7 +433,7 @@ public class SPDYClient
{
private final Factory factory;
public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
public ClientSPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
{
super(endPoint, bufferPool, parser, factory.threadPool);
this.factory = factory;

View File

@ -30,8 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
@ -182,7 +182,7 @@ public class SPDYServerConnector extends SelectChannelConnector
}
@Override
protected AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint)
protected Connection newConnection(final SocketChannel channel, EndPoint endPoint)
{
if (sslContextFactory != null)
{
@ -198,9 +198,9 @@ public class SPDYServerConnector extends SelectChannelConnector
}
};
final AsyncEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
final EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
NextProtoNegoServerAsyncConnection connection = new NextProtoNegoServerAsyncConnection(channel, sslEndPoint, this);
sslEndPoint.setAsyncConnection(connection);
sslEndPoint.setConnection(connection);
getSelectorManager().connectionOpened(connection);
NextProtoNego.put(engine, connection);
@ -210,8 +210,8 @@ public class SPDYServerConnector extends SelectChannelConnector
else
{
AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, this);
endPoint.setAsyncConnection(connection);
Connection connection = connectionFactory.newAsyncConnection(channel, endPoint, this);
endPoint.setConnection(connection);
return connection;
}
}
@ -265,10 +265,10 @@ public class SPDYServerConnector extends SelectChannelConnector
this.initialWindowSize = initialWindowSize;
}
public void replaceAsyncConnection(AsyncEndPoint endPoint, AsyncConnection connection)
public void replaceAsyncConnection(EndPoint endPoint, Connection connection)
{
AsyncConnection oldConnection = endPoint.getAsyncConnection();
endPoint.setAsyncConnection(connection);
Connection oldConnection = endPoint.getConnection();
endPoint.setConnection(connection);
getSelectorManager().connectionUpgraded(endPoint, oldConnection);
}

View File

@ -17,8 +17,8 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.generator.Generator;
@ -52,7 +52,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment)
{
CompressionFactory compressionFactory = new StandardCompressionFactory();
Parser parser = new Parser(compressionFactory.newDecompressor());
@ -62,7 +62,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
ServerSessionFrameListener listener = provideServerSessionFrameListener(endPoint, attachment);
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, bufferPool, parser, listener, connector);
endPoint.setAsyncConnection(connection);
endPoint.setConnection(connection);
FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version);
@ -77,7 +77,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
return connection;
}
protected ServerSessionFrameListener provideServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
protected ServerSessionFrameListener provideServerSessionFrameListener(EndPoint endPoint, Object attachment)
{
return listener;
}
@ -88,7 +88,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
private final SPDYServerConnector connector;
private volatile boolean connected;
private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
private ServerSPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
{
super(endPoint, bufferPool, parser, connector.getExecutor());
this.listener = listener;

View File

@ -125,7 +125,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
// kill queued jobs and flush out idle jobs
_jobs.clear();
Runnable noop = new Runnable(){public void run(){}};
Runnable noop = new Runnable(){@Override public void run(){}};
for (int i=_threadsIdle.get();i-->0;)
_jobs.offer(noop);
Thread.yield();
@ -203,6 +203,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
* @see #getMaxThreads
* @param maxThreads maximum number of threads.
*/
@Override
public void setMaxThreads(int maxThreads)
{
_maxThreads=maxThreads;
@ -216,6 +217,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
* @see #getMinThreads
* @param minThreads minimum number of threads
*/
@Override
public void setMinThreads(int minThreads)
{
_minThreads=minThreads;
@ -297,6 +299,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
* @see #setMaxThreads
* @return maximum number of threads.
*/
@Override
public int getMaxThreads()
{
return _maxThreads;
@ -308,6 +311,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
* @see #setMinThreads
* @return minimum number of threads.
*/
@Override
public int getMinThreads()
{
return _minThreads;
@ -353,6 +357,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
}
/* ------------------------------------------------------------ */
@Override
public boolean dispatch(Runnable job)
{
LOG.debug("{} dispatched {}",this,job);
@ -376,6 +381,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
}
/* ------------------------------------------------------------ */
@Override
public void execute(Runnable job)
{
if (!dispatch(job))
@ -386,6 +392,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* Blocks until the thread pool is {@link LifeCycle#stop stopped}.
*/
@Override
public void join() throws InterruptedException
{
synchronized (_joinLock)
@ -402,6 +409,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* @return The total number of threads currently in the pool
*/
@Override
public int getThreads()
{
return _threadsStarted.get();
@ -411,6 +419,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* @return The number of idle threads in the pool
*/
@Override
public int getIdleThreads()
{
return _threadsIdle.get();
@ -420,6 +429,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
*/
@Override
public boolean isLowOnThreads()
{
return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
@ -460,12 +470,14 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/* ------------------------------------------------------------ */
@Override
public String dump()
{
return AggregateLifeCycle.dump(this);
}
/* ------------------------------------------------------------ */
@Override
public void dump(Appendable out, String indent) throws IOException
{
List<Object> dump = new ArrayList<Object>(getMaxThreads());
@ -491,6 +503,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{
dump.add(new Dumpable()
{
@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
@ -498,6 +511,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
}
@Override
public String dump()
{
return null;
@ -530,6 +544,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/* ------------------------------------------------------------ */
private Runnable _runnable = new Runnable()
{
@Override
public void run()
{
boolean shrink=false;
@ -584,7 +599,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{
LOG.ignore(e);
}
catch(Exception e)
catch(Throwable e)
{
LOG.warn(e);
}

View File

@ -22,9 +22,9 @@ import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.FutureCallback;
@ -39,7 +39,7 @@ import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
* <p>
* Results in a {@link WebSocketAsyncConnection} on successful handshake.
*/
public class HandshakeConnection extends AbstractAsyncConnection implements AsyncConnection
public class HandshakeConnection extends AbstractConnection implements Connection
{
public static final String COOKIE_DELIM = "\"\\\n\r\t\f\b%+ ;=";
private final WebSocketClient.ConnectFuture future;
@ -47,7 +47,7 @@ public class HandshakeConnection extends AbstractAsyncConnection implements Asyn
private String key;
public HandshakeConnection(AsyncEndPoint endp, Executor executor, ByteBufferPool bufferPool, WebSocketClient.ConnectFuture future)
public HandshakeConnection(EndPoint endp, Executor executor, ByteBufferPool bufferPool, WebSocketClient.ConnectFuture future)
{
super(endp,executor);
this.future = future;

View File

@ -3,7 +3,7 @@ package org.eclipse.jetty.websocket.client.io;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
@ -13,7 +13,7 @@ public class WebSocketClientAsyncConnection extends WebSocketAsyncConnection
{
private final WebSocketClientFactory factory;
public WebSocketClientAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy,
public WebSocketClientAsyncConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy,
ByteBufferPool bufferPool, WebSocketClientFactory factory)
{
super(endp,executor,scheduler,policy,bufferPool);

View File

@ -25,8 +25,8 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
@ -66,7 +66,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
return sslContextFactory;
}
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment)
{
WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment;
WebSocketClientFactory factory = confut.getFactory();
@ -78,7 +78,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
ScheduledExecutorService scheduler = factory.getScheduler();
WebSocketAsyncConnection connection = new WebSocketClientAsyncConnection(endPoint,executor,scheduler,policy,bufferPool,factory);
endPoint.setAsyncConnection(connection);
endPoint.setConnection(connection);
connection.getParser().setIncomingFramesHandler(websocket);
// TODO: track open websockets? bind open websocket to connection?
@ -87,7 +87,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment)
{
WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment;
@ -97,7 +97,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
if ((sslContextFactory != null) && ("wss".equalsIgnoreCase(scheme)))
{
final AtomicReference<AsyncEndPoint> sslEndPointRef = new AtomicReference<>();
final AtomicReference<EndPoint> sslEndPointRef = new AtomicReference<>();
final AtomicReference<Object> attachmentRef = new AtomicReference<>(attachment);
SSLEngine engine = newSSLEngine(sslContextFactory,channel);
SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine)
@ -110,20 +110,20 @@ public class WebSocketClientSelectorManager extends SelectorManager
super.onClose();
}
};
endPoint.setAsyncConnection(sslConnection);
AsyncEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
endPoint.setConnection(sslConnection);
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
sslEndPointRef.set(sslEndPoint);
startHandshake(engine);
AsyncConnection connection = newAsyncConnection(channel,sslEndPoint,attachment);
endPoint.setAsyncConnection(connection);
Connection connection = newAsyncConnection(channel,sslEndPoint,attachment);
endPoint.setConnection(connection);
return connection;
}
else
{
AsyncConnection connection = newAsyncConnection(channel,endPoint,attachment);
endPoint.setAsyncConnection(connection);
Connection connection = newAsyncConnection(channel,endPoint,attachment);
endPoint.setConnection(connection);
return connection;
}
}

View File

@ -25,9 +25,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -46,9 +46,9 @@ import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link AsyncConnection} framework of jetty-io
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
*/
public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, OutgoingFrames
public abstract class WebSocketAsyncConnection extends AbstractConnection implements RawConnection, OutgoingFrames
{
private static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class);
private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames");
@ -64,7 +64,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i
private boolean flushing;
private AtomicLong writes;
public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
public WebSocketAsyncConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
super(endp,executor);
this.policy = policy;
@ -111,7 +111,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i
@Override
public void disconnect(boolean onlyOutput)
{
AsyncEndPoint endPoint = getEndPoint();
EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
LOG.debug("Shutting down output {}",endPoint);
@ -275,7 +275,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i
private int read(ByteBuffer buffer)
{
AsyncEndPoint endPoint = getEndPoint();
EndPoint endPoint = getEndPoint();
try
{
while (true)
@ -367,7 +367,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i
private <C> void write(ByteBuffer buffer, WebSocketAsyncConnection webSocketAsyncConnection, FrameBytes<C> frameBytes)
{
AsyncEndPoint endpoint = getEndPoint();
EndPoint endpoint = getEndPoint();
if (LOG_FRAMES.isDebugEnabled())
{

View File

@ -3,7 +3,7 @@ package org.eclipse.jetty.websocket.server;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
@ -13,7 +13,7 @@ public class WebSocketServerAsyncConnection extends WebSocketAsyncConnection
private final WebSocketServerFactory factory;
private boolean connected;
public WebSocketServerAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy,
public WebSocketServerAsyncConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy,
ByteBufferPool bufferPool, WebSocketServerFactory factory)
{
super(endp,executor,scheduler,policy,bufferPool);

View File

@ -33,7 +33,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.server.HttpConnection;
@ -353,7 +353,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
// Create connection
HttpConnection http = HttpConnection.getCurrentConnection();
AsyncEndPoint endp = http.getEndPoint();
EndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().getExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
WebSocketServerAsyncConnection connection = new WebSocketServerAsyncConnection(endp,executor,scheduler,websocket.getPolicy(),bufferPool,this);

View File

@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
@ -59,7 +59,7 @@ public class WebSocketLoadRFC6455Test
private final BufferedReader input;
private final int iterations;
private final CountDownLatch latch;
private/* final */AsyncEndPoint _endp;
private/* final */EndPoint _endp;
private final Generator _generator;
private final Parser _parser;
private final IncomingFrames _handler = new IncomingFrames()