jetty-9 new jetty-io NIO.2 inspired design, mostly working but more work needed on SSL

This commit is contained in:
Greg Wilkins 2012-05-02 17:22:55 +02:00
parent c7f25b0928
commit c61497327a
34 changed files with 1369 additions and 957 deletions

View File

@ -232,7 +232,7 @@ public class HttpGenerator
return Result.NEED_BUFFER; return Result.NEED_BUFFER;
// Copy the content // Copy the content
_contentPrepared+=BufferUtil.flipPutFlip(content,buffer); _contentPrepared+=BufferUtil.append(content,buffer);
// are we full? // are we full?
if (BufferUtil.isFull(buffer)) if (BufferUtil.isFull(buffer))

View File

@ -0,0 +1,84 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractAsyncConnection implements AsyncConnection
{
private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class);
protected final AsyncEndPoint _endp;
private IOFuture.Callback _readCallback = new IOFuture.Callback()
{
@Override
public void onReady()
{
onReadable();
}
@Override
public void onFail(Throwable cause)
{
LOG.debug("FAILED: "+cause);
}
};
public AbstractAsyncConnection(AsyncEndPoint endp)
{
_endp=endp;
}
@Override
public AsyncEndPoint getEndPoint()
{
return _endp;
}
@Override
public void onIdleExpired(long idleForMs)
{
try
{
LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
if (_endp.isOutputShutdown())
_endp.close();
else
_endp.shutdownOutput();
}
catch(IOException e)
{
LOG.ignore(e);
try
{
_endp.close();
}
catch(IOException e2)
{
LOG.ignore(e2);
}
}
}
@Override
public IOFuture scheduleOnReadable()
{
IOFuture read=getEndPoint().read();
read.setCallback(_readCallback);
return read;
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

View File

@ -0,0 +1,65 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
public abstract class AbstractEndPoint implements EndPoint
{
private final long _created=System.currentTimeMillis();
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private int _maxIdleTime;
protected AbstractEndPoint(InetSocketAddress local,InetSocketAddress remote)
{
_local=local;
_remote=remote;
}
@Override
public long getCreatedTimeStamp()
{
return _created;
}
@Override
public int getMaxIdleTime()
{
return _maxIdleTime;
}
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
_maxIdleTime=timeMs;
}
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getLocalAddress()
{
return _local;
}
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getRemoteAddress()
{
return _remote;
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s@%x{%s<r-l>%s,o=%b,os=%b}",
getClass().getSimpleName(),
hashCode(),
getRemoteAddress(),
getLocalAddress(),
isOpen(),
isOutputShutdown());
}
}

View File

@ -14,17 +14,15 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
public interface AsyncConnection
public interface Connection
{ {
EndPoint getEndPoint(); AsyncEndPoint getEndPoint();
int getMaxIdleTime(); IOFuture scheduleOnReadable();
/** void onReadable();
* @return the timestamp at which the connection was created void onInputShutdown();
*/ void onClose();
long getCreatedTimeStamp(); void onIdleExpired(long idleForMs);
boolean isIdle();
} }

View File

@ -0,0 +1,125 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Future;
/* ------------------------------------------------------------ */
/**Asynchronous End Point
* <p>
* This extension of EndPoint provides asynchronous scheduling methods.
* The design of these has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because: they have
* some inefficiencies (eg buffers must be allocated before read); they have
* unrequired overheads due to their generic nature (passing of attachments
* and returning operation counts); there is no need to pass timeouts as
* {@link EndPoint#getMaxIdleTime() is used.
* <p>
* The intent of this API is that it can be used in either: a polling mode (like {@link Future})
* ; in a callback mode (like {@link CompletionHandler} mode; or blocking mod;e or a hybrid mode
* <h3>Blocking read</h3>
* <pre>
* endpoint.read().complete();
* endpoint.fill(buffer);
* </pre>
* <h3>Polling read</h3>
* <pre>
* IOFuture read = endpoint.read();
* ...
* if (read.isReady())
* endpoint.fill(buffer);
* </pre>
* <h3>Callback read</h3>
* <pre>
* endpoint.read().setHandler(new IOCallback()
* {
* public void onReady() { endpoint.fill(buffer); ... }
* public void onFail(IOException e) { ... }
* public void onTimeout() { ... }
* }
* </pre>
*
* <h3>Blocking write</h3>
* <pre>
* endpoint.write(buffer).complete();
* </pre>
* <h3>Polling write</h3>
* <pre>
* IOFuture write = endpoint.write(buffer);
* ...
* if (write.isReady())
* // do next write
* </pre>
* <h3>Callback write</h3>
* <pre>
* endpoint.write(buffer).setHandler(new IOCallback()
* {
* public void onReady() { ... }
* public void onFail(IOException e) { ... }
* public void onTimeout() { ... }
* }
* </pre>
* <h3>Hybrid write</h3>
* <pre>
* IOFuture write = endpoint.write(buffer);
* if (write.isReady())
* // write next
* else
* write.setHandler(new IOCallback()
* {
* public void onReady() { ... }
* public void onFail(IOException e) { ... }
* public void onTimeout() { ... }
* }
* </pre>
*
* <h2>Compatibility Notes</h2>
* Some Async IO APIs have the concept of setting read interest. With this
* API calling {@link #read()} is equivalent to setting read interest to true
* and calling {@link IOFuture#cancel()} is equivalent to setting read interest
* to false.
*/
public interface AsyncEndPoint extends EndPoint
{
/* ------------------------------------------------------------ */
AsyncConnection getAsyncConnection();
/* ------------------------------------------------------------ */
/** Schedule a read operation.
* <p>
* This method allows a {@link #fill(ByteBuffer)} operation to be scheduled
* with either blocking, polling or callback semantics.
* @return an {@link IOFuture} instance that will be ready when a call to {@link #fill(ByteBuffer)} will
* return immediately with data without blocking.
* @throws IllegalStateException if another read operation has been scheduled and has not timedout, been cancelled or is ready.
*/
IOFuture read() throws IllegalStateException;
/* ------------------------------------------------------------ */
/** Schedule a write operation.
* This method performs {@link #flush(ByteBuffer...)} operations and allows the completion of
* the entire write to be scheduled with blocking, polling or callback semantics.
* @param buffers One or more {@link ByteBuffer}s that will be flushed.
* @return an {@link IOFuture} instance that will be ready when all the data in the buffers passed has been consumed by
* one or more calls to {@link #flush(ByteBuffer)}.
*/
IOFuture write(ByteBuffer... buffers) throws IllegalStateException;
/* ------------------------------------------------------------ */
/** Set if the endpoint should be checked for idleness
*/
void setCheckForIdle(boolean check);
/* ------------------------------------------------------------ */
/** Get if the endpoint should be checked for idleness
*/
boolean isCheckForIdle();
/* ------------------------------------------------------------ */
/**
* @return Timestamp in ms since epoch of when the last data was
* filled or flushed from this endpoint.
*/
long getActivityTimestamp();
}

View File

@ -20,20 +20,17 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** ByteArrayEndPoint. /** ByteArrayEndPoint.
*
* *
*/ */
public class ByteArrayEndPoint implements EndPoint public class ByteArrayEndPoint extends AbstractEndPoint
{ {
protected byte[] _inBytes; protected byte[] _inBytes;
protected ByteBuffer _in; protected ByteBuffer _in;
protected ByteBuffer _out; protected ByteBuffer _out;
protected boolean _closed; protected boolean _closed;
protected boolean _growOutput; protected boolean _growOutput;
protected int _maxIdleTime;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -41,6 +38,7 @@ public class ByteArrayEndPoint implements EndPoint
*/ */
public ByteArrayEndPoint() public ByteArrayEndPoint()
{ {
super(null,null);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -49,6 +47,7 @@ public class ByteArrayEndPoint implements EndPoint
*/ */
public ByteArrayEndPoint(byte[] input, int outputSize) public ByteArrayEndPoint(byte[] input, int outputSize)
{ {
super(null,null);
_inBytes=input; _inBytes=input;
_in=ByteBuffer.wrap(input); _in=ByteBuffer.wrap(input);
_out=ByteBuffer.allocate(outputSize); _out=ByteBuffer.allocate(outputSize);
@ -100,16 +99,6 @@ public class ByteArrayEndPoint implements EndPoint
return !_closed; return !_closed;
} }
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.jetty.io.EndPoint#isInputShutdown()
*/
@Override
public boolean isInputShutdown()
{
return _closed;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* /*
* @see org.eclipse.jetty.io.EndPoint#isOutputShutdown() * @see org.eclipse.jetty.io.EndPoint#isOutputShutdown()
@ -130,16 +119,6 @@ public class ByteArrayEndPoint implements EndPoint
close(); close();
} }
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#shutdownInput()
*/
@Override
public void shutdownInput() throws IOException
{
close();
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* /*
* @see org.eclipse.io.EndPoint#close() * @see org.eclipse.io.EndPoint#close()
@ -160,7 +139,7 @@ public class ByteArrayEndPoint implements EndPoint
if (_closed) if (_closed)
throw new IOException("CLOSED"); throw new IOException("CLOSED");
if (_in!=null) if (_in!=null)
return BufferUtil.flipPutFlip(_in,buffer); return BufferUtil.append(_in,buffer);
return 0; return 0;
} }
@ -217,20 +196,6 @@ public class ByteArrayEndPoint implements EndPoint
_out.clear(); _out.clear();
} }
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getLocalAddress()
{
return null;
}
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getRemoteAddress()
{
return null;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* /*
* @see org.eclipse.io.EndPoint#getConnection() * @see org.eclipse.io.EndPoint#getConnection()
@ -259,26 +224,5 @@ public class ByteArrayEndPoint implements EndPoint
_growOutput=growOutput; _growOutput=growOutput;
} }
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.EndPoint#getMaxIdleTime()
*/
@Override
public int getMaxIdleTime()
{
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.EndPoint#setMaxIdleTime(int)
*/
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
_maxIdleTime=timeMs;
}
} }

View File

@ -32,56 +32,24 @@ import org.eclipse.jetty.util.log.Logger;
* <p>Holds the channel and socket for an NIO endpoint. * <p>Holds the channel and socket for an NIO endpoint.
* *
*/ */
public class ChannelEndPoint implements EndPoint public class ChannelEndPoint extends AbstractEndPoint
{ {
private static final Logger LOG = Log.getLogger(ChannelEndPoint.class); private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
private final ByteChannel _channel; private final ByteChannel _channel;
private final Socket _socket; private final Socket _socket;
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private volatile int _maxIdleTime;
private volatile boolean _ishut; private volatile boolean _ishut;
private volatile boolean _oshut; private volatile boolean _oshut;
public ChannelEndPoint(ByteChannel channel) throws IOException public ChannelEndPoint(SocketChannel channel) throws IOException
{ {
super(); super((InetSocketAddress)channel.socket().getLocalSocketAddress(),
(InetSocketAddress)channel.socket().getRemoteSocketAddress() );
this._channel = channel; this._channel = channel;
_socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null; _socket=channel.socket();
if (_socket!=null) setMaxIdleTime(_socket.getSoTimeout());
{ _socket.setSoTimeout(0);
_local=(InetSocketAddress)_socket.getLocalSocketAddress();
_remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
_maxIdleTime=_socket.getSoTimeout();
}
else
{
_local=_remote=null;
}
}
protected ChannelEndPoint(ByteChannel channel, int maxIdleTime) throws IOException
{
this._channel = channel;
_maxIdleTime=maxIdleTime;
_socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
if (_socket!=null)
{
_local=(InetSocketAddress)_socket.getLocalSocketAddress();
_remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
_socket.setSoTimeout(_maxIdleTime);
}
else
{
_local=_remote=null;
}
}
public boolean isBlocking()
{
return !(_channel instanceof SelectableChannel) || ((SelectableChannel)_channel).isBlocking();
} }
/* /*
@ -131,7 +99,6 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc) /* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close() * @see org.eclipse.io.EndPoint#close()
*/ */
@Override
public void shutdownInput() throws IOException public void shutdownInput() throws IOException
{ {
shutdownChannelInput(); shutdownChannelInput();
@ -183,7 +150,6 @@ public class ChannelEndPoint implements EndPoint
return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown(); return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown();
} }
@Override
public boolean isInputShutdown() public boolean isInputShutdown()
{ {
return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown(); return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown();
@ -267,20 +233,6 @@ public class ChannelEndPoint implements EndPoint
return _channel; return _channel;
} }
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getLocalAddress()
{
return _local;
}
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getRemoteAddress()
{
return _remote;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public Object getTransport() public Object getTransport()
@ -294,23 +246,4 @@ public class ChannelEndPoint implements EndPoint
return _socket; return _socket;
} }
/* ------------------------------------------------------------ */
@Override
public int getMaxIdleTime()
{
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int)
*/
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
//if (_socket!=null && timeMs!=_maxIdleTime)
// _socket.setSoTimeout(timeMs>0?timeMs:0);
_maxIdleTime=timeMs;
}
} }

View File

@ -0,0 +1,82 @@
package org.eclipse.jetty.io;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.util.thread.ThreadPool;
public class CompleteIOFuture implements IOFuture
{
private final boolean _ready;
private final Throwable _cause;
public final static CompleteIOFuture COMPLETE=new CompleteIOFuture();
public CompleteIOFuture()
{
_ready=true;
_cause=null;
}
public CompleteIOFuture(Throwable cause)
{
_ready=false;
_cause=cause;
}
@Override
public boolean isReady() throws ExecutionException
{
if (_ready)
return true;
throw new ExecutionException(_cause);
}
@Override
public void cancel() throws UnsupportedOperationException
{
throw new UnsupportedOperationException();
}
@Override
public void await() throws ExecutionException
{
isReady();
}
@Override
public void setCallback(final Callback callback)
{
dispatch(new Runnable()
{
@Override
public void run()
{
if (_ready)
callback.onReady();
else
callback.onFail(_cause);
}
});
}
protected void dispatch(Runnable callback)
{
callback.run();
}
@Override
public boolean isComplete()
{
return true;
}
@Override
public String toString()
{
return String.format("CIOF@%x{r=%b,c=%s}",
hashCode(),
_ready,
_cause);
}
}

View File

@ -25,6 +25,26 @@ import java.nio.ByteBuffer;
*/ */
public interface EndPoint public interface EndPoint
{ {
/* ------------------------------------------------------------ */
/**
* @return The local Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>
* if this <code>EndPoint</code> does not represent a network connection.
*/
InetSocketAddress getLocalAddress();
/* ------------------------------------------------------------ */
/**
* @return The remote Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>
* if this <code>EndPoint</code> does not represent a network connection.
*/
InetSocketAddress getRemoteAddress();
/* ------------------------------------------------------------ */
boolean isOpen();
/* ------------------------------------------------------------ */
long getCreatedTimeStamp();
/** /**
* Shutdown any backing output stream associated with the endpoint * Shutdown any backing output stream associated with the endpoint
*/ */
@ -32,13 +52,6 @@ public interface EndPoint
boolean isOutputShutdown(); boolean isOutputShutdown();
/**
* Shutdown any backing input stream associated with the endpoint
*/
void shutdownInput() throws IOException;
boolean isInputShutdown();
/** /**
* Close any backing stream associated with the endpoint * Close any backing stream associated with the endpoint
*/ */
@ -68,38 +81,16 @@ public interface EndPoint
*/ */
int flush(ByteBuffer... buffer) throws IOException; int flush(ByteBuffer... buffer) throws IOException;
/* ------------------------------------------------------------ */
/**
* @return The local Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>
* if this <code>EndPoint</code> does not represent a network connection.
*/
InetSocketAddress getLocalAddress();
/* ------------------------------------------------------------ */
/**
* @return The remote Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>
* if this <code>EndPoint</code> does not represent a network connection.
*/
InetSocketAddress getRemoteAddress();
/* ------------------------------------------------------------ */
boolean isOpen();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return The underlying transport object (socket, channel, etc.) * @return The underlying transport object (socket, channel, etc.)
*/ */
Object getTransport(); Object getTransport();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Get the max idle time in ms. /** Get the max idle time in ms.
* <p>The max idle time is the time the endpoint can be idle before * <p>The max idle time is the time the endpoint can be idle before
* extraordinary handling takes place. This loosely corresponds to * extraordinary handling takes place.
* the {@link java.net.Socket#getSoTimeout()} for blocking connections,
* but {@link AsyncEndPoint} implementations must use other mechanisms
* to implement the max idle time.
* @return the max idle time in ms or if ms <= 0 implies an infinite timeout * @return the max idle time in ms or if ms <= 0 implies an infinite timeout
*/ */
int getMaxIdleTime(); int getMaxIdleTime();

View File

@ -0,0 +1,66 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/** Async IO Future interface.
* <p>
* This interface make the future status of an IO operation available via
* polling ({@link #isReady()}, blocking ({@link #await()} or callback ({@link #setCallback(Callback)}
*
*/
public interface IOFuture
{
/* ------------------------------------------------------------ */
/** Indicate if this Future is complete.
* If this future has completed by becoming ready, excepting or timeout.
* @return True if this future has completed by becoming ready, excepting or timeout.
*/
boolean isComplete();
/* ------------------------------------------------------------ */
/** Indicate the readyness of the IO system.
* For input, ready means that there is data
* ready to be consumed. For output ready means that the prior operation
* has completed and another may be initiated.
* @return True if the IO operation is ready.
* @throws ExecutionException If an exception occurs during the IO operation
*/
boolean isReady() throws ExecutionException;
/* ------------------------------------------------------------ */
/** Cancel the IO operation.
* @throws UnsupportedOperationException If the operation cannot be cancelled.
*/
void cancel() throws UnsupportedOperationException;
/* ------------------------------------------------------------ */
/** Wait until complete.
* <p>This call blocks the calling thread until this AsyncIO is ready or
* an exception or until a timeout due to {@link EndPoint#getMaxIdleTime()}.
* @throws InterruptedException if interrupted while blocking
* @throws ExecutionException If any exception occurs during the IO operation
*/
void await() throws InterruptedException, ExecutionException;
/* ------------------------------------------------------------ */
/** Set an IOCallback.
* Set an {@link Callback} instance to be called when the IO operation is ready or if
* there is a failure or timeout.
* @param callback
*/
void setCallback(Callback callback);
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
interface Callback
{
void onReady();
void onFail(Throwable cause);
}
}

View File

@ -0,0 +1,241 @@
package org.eclipse.jetty.io;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class RecycledIOFuture implements IOFuture
{
private final Lock _lock;
private final Condition _block;
private boolean _complete;
private boolean _ready;
private Throwable _cause;
private Callback _callback;
public RecycledIOFuture()
{
_lock = new ReentrantLock();
_block = _lock.newCondition();
}
public RecycledIOFuture(Lock lock)
{
_lock = lock;
_block = _lock.newCondition();
}
public RecycledIOFuture(boolean ready,Lock lock)
{
_ready=ready;
_complete=ready;
_lock = lock;
_block = _lock.newCondition();
}
public void fail(final Throwable cause)
{
_lock.lock();
try
{
if (_complete)
throw new IllegalStateException("complete",cause);
_cause=cause;
_complete=true;
if (_callback!=null)
dispatchFail();
_block.signal();
}
finally
{
_lock.unlock();
}
}
public void ready()
{
_lock.lock();
try
{
if (_complete)
throw new IllegalStateException();
_ready=true;
_complete=true;
if (_callback!=null)
dispatchReady();
_block.signal();
}
finally
{
_lock.unlock();
}
}
protected void cancelled()
{
_lock.lock();
try
{
if (_complete)
throw new IllegalStateException();
_ready=false;
_complete=true;
_block.signal();
}
finally
{
_lock.unlock();
}
}
public void recycle()
{
_lock.lock();
try
{
if (!_complete)
throw new IllegalStateException();
_ready=false;
_cause=null;
_complete=false;
_callback=null;
}
finally
{
_lock.unlock();
}
}
@Override
public boolean isComplete()
{
_lock.lock();
try
{
return _complete;
}
finally
{
_lock.unlock();
}
}
@Override
public boolean isReady() throws ExecutionException
{
_lock.lock();
try
{
if (_complete)
{
if (_ready)
return true;
throw new ExecutionException(_cause);
}
return false;
}
finally
{
_lock.unlock();
}
}
@Override
public void cancel() throws UnsupportedOperationException
{
throw new UnsupportedOperationException();
}
@Override
public void await() throws InterruptedException, ExecutionException
{
_lock.lock();
try
{
if (!_complete)
_block.await();
isReady();
}
finally
{
_lock.unlock();
}
}
@Override
public void setCallback(Callback callback)
{
_lock.lock();
try
{
if (_callback!=null)
throw new IllegalStateException();
_callback=callback;
if (_complete)
{
if (_ready)
dispatchReady();
else
dispatchFail();
}
}
finally
{
_lock.unlock();
}
}
protected void dispatch(Runnable callback)
{
callback.run();
}
private void dispatchReady()
{
final Callback callback=_callback;
_callback=null;
dispatch(new Runnable()
{
@Override
public void run()
{
callback.onReady();
}
});
}
private void dispatchFail()
{
final Callback callback=_callback;
final Throwable cause=_cause;
_callback=null;
dispatch(new Runnable()
{
@Override
public void run()
{
callback.onFail(cause);
}
});
}
@Override
public String toString()
{
return String.format("RIOF@%x{c=%b,r=%b,c=%s}",
hashCode(),
_complete,
_ready,
_cause);
}
}

View File

@ -18,6 +18,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -25,16 +26,18 @@ import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task; import org.eclipse.jetty.util.thread.Timeout.Task;
import static org.eclipse.jetty.io.CompleteIOFuture.COMPLETE;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* An Endpoint that can be scheduled by {@link SelectorManager}. * An Endpoint that can be scheduled by {@link SelectorManager}.
*/ */
public class SelectChannelEndPoint extends ChannelEndPoint implements SelectableEndPoint public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint
{ {
public static final Logger LOG=Log.getLogger(SelectChannelEndPoint.class); public static final Logger LOG=Log.getLogger(SelectChannelEndPoint.class);
private final Lock _lock = new ReentrantLock(); private final Lock _lock = new ReentrantLock();
private final SelectorManager.SelectSet _selectSet; private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager; private final SelectorManager _manager;
private SelectionKey _key; private SelectionKey _key;
@ -52,22 +55,80 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
private volatile boolean _idlecheck; private volatile boolean _idlecheck;
private volatile long _lastNotIdleTimestamp; private volatile long _lastNotIdleTimestamp;
private volatile SelectableConnection _connection; private volatile AbstractAsyncConnection _connection;
private RecycledIOFuture _readFuture = new RecycledIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable task)
{
_manager.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
_interestOps=_interestOps&~SelectionKey.OP_READ;
updateKey();
cancelled();
}
finally
{
_lock.unlock();
}
}
};
private ByteBuffer[] _writeBuffers;
private RecycledIOFuture _writeFuture = new RecycledIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable task)
{
_manager.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
_interestOps=_interestOps&~SelectionKey.OP_WRITE;
updateKey();
cancelled();
}
finally
{
_lock.unlock();
}
}
};
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
throws IOException throws IOException
{ {
super(channel, maxIdleTime); super(channel);
_manager = selectSet.getManager(); _manager = selectSet.getManager();
_selectSet = selectSet; _selectSet = selectSet;
_open=true; _open=true;
_key = key; _key = key;
setMaxIdleTime(maxIdleTime);
setCheckForIdle(true); setCheckForIdle(true);
} }
/* ------------------------------------------------------------ */
@Override
public AsyncConnection getAsyncConnection()
{
return _connection;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SelectionKey getSelectionKey() public SelectionKey getSelectionKey()
{ {
@ -85,9 +146,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void setSelectableConnection(SelectableConnection connection) public void setAsyncConnection(AbstractAsyncConnection connection)
{ {
Connection old=getSelectableConnection(); AsyncConnection old=getAsyncConnection();
_connection=connection; _connection=connection;
if (old!=null && old!=connection) if (old!=null && old!=connection)
_manager.endPointUpgraded(this,old); _manager.endPointUpgraded(this,old);
@ -95,7 +156,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public long getLastNotIdleTimestamp() public long getActivityTimestamp()
{ {
return _lastNotIdleTimestamp; return _lastNotIdleTimestamp;
} }
@ -104,7 +165,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
/** Called by selectSet to schedule handling /** Called by selectSet to schedule handling
* *
*/ */
public void onSelected() throws IOException public void onSelected()
{ {
_lock.lock(); _lock.lock();
_selected=true; _selected=true;
@ -113,7 +174,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
// If there is no key, then do nothing // If there is no key, then do nothing
if (_key == null || !_key.isValid()) if (_key == null || !_key.isValid())
{ {
this.notifyAll(); // TODO wake ups?
return; return;
} }
@ -122,22 +183,15 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
_interestOps=0; _interestOps=0;
if (can_read) if (can_read)
{ _readFuture.ready();
Runnable task=getSelectableConnection().onReadable();
if (task!=null) if (can_write && _writeBuffers!=null)
_manager.dispatch(task); completeWrite();
}
if (can_write)
{
Runnable task=getSelectableConnection().onWriteable();
if (task!=null)
_manager.dispatch(task);
}
if (isInputShutdown() && !_ishutCalled) if (isInputShutdown() && !_ishutCalled)
{ {
_ishutCalled=true; _ishutCalled=true;
getSelectableConnection().onInputShutdown(); getAsyncConnection().onInputShutdown();
} }
} }
finally finally
@ -148,70 +202,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
} }
} }
/* ------------------------------------------------------------ */
@Override
public boolean isReadInterested()
{
_lock.lock();
try
{
return (_interestOps&SelectionKey.OP_READ)!=0;
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public void setReadInterested(boolean interested)
{
_lock.lock();
try
{
_interestOps=interested?(_interestOps|SelectionKey.OP_READ):(_interestOps&~SelectionKey.OP_READ);
if (!_selected)
updateKey();
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public boolean isWriteInterested()
{
_lock.lock();
try
{
return (_interestOps&SelectionKey.OP_READ)!=0;
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public void setWriteInterested(boolean interested)
{
_lock.lock();
try
{
_interestOps=interested?(_interestOps|SelectionKey.OP_WRITE):(_interestOps&~SelectionKey.OP_WRITE);
if (!_selected)
updateKey();
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void cancelTimeout(Task task) public void cancelTimeout(Task task)
@ -229,7 +219,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
@Override @Override
public void setCheckForIdle(boolean check) public void setCheckForIdle(boolean check)
{ {
_idlecheck=true; _idlecheck=check;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -246,9 +236,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void checkForIdle(long now) public void checkForIdleOrReadWriteTimeout(long now)
{ {
if (_idlecheck) if (_idlecheck || !_readFuture.isComplete() || !_writeFuture.isComplete())
{ {
long idleTimestamp=_lastNotIdleTimestamp; long idleTimestamp=_lastNotIdleTimestamp;
long max_idle_time=getMaxIdleTime(); long max_idle_time=getMaxIdleTime();
@ -259,18 +249,24 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
if (idleForMs>max_idle_time) if (idleForMs>max_idle_time)
{ {
onIdleExpired(idleForMs); _lock.lock();
_lastNotIdleTimestamp=now; try
}
}
}
}
/* ------------------------------------------------------------ */
@Override
public void onIdleExpired(long idleForMs)
{ {
getSelectableConnection().onIdleExpired(idleForMs); if (_idlecheck)
_connection.onIdleExpired(idleForMs);
if (!_readFuture.isComplete())
_readFuture.fail(new TimeoutException());
if (!_writeFuture.isComplete())
_writeFuture.fail(new TimeoutException());
}
finally
{
_lastNotIdleTimestamp=now;
_lock.unlock();
}
}
}
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -283,6 +279,96 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
return fill; return fill;
} }
/* ------------------------------------------------------------ */
@Override
public IOFuture read() throws IllegalStateException
{
_lock.lock();
try
{
if (!_readFuture.isComplete())
throw new IllegalStateException("previous read not complete");
_readFuture.recycle();
_interestOps=_interestOps|SelectionKey.OP_READ;
updateKey();
return _readFuture;
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public IOFuture write(ByteBuffer... buffers)
{
_lock.lock();
try
{
if (!_writeFuture.isComplete())
throw new IllegalStateException("previous write not complete");
flush(buffers);
// Are we complete?
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
_writeBuffers=buffers;
_writeFuture.recycle();
_interestOps=_interestOps|SelectionKey.OP_WRITE;
updateKey();
return _writeFuture;
}
}
return COMPLETE;
}
catch(IOException e)
{
return new CompleteIOFuture(e);
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
private void completeWrite()
{
try
{
flush(_writeBuffers);
// Are we complete?
for (ByteBuffer b : _writeBuffers)
{
if (b.hasRemaining())
{
_interestOps=_interestOps|SelectionKey.OP_WRITE;
return;
}
}
// we are complete and ready
_writeFuture.ready();
}
catch(final IOException e)
{
_writeBuffers=null;
if (!_writeFuture.isComplete())
_writeFuture.fail(e);
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public int flush(ByteBuffer... buffers) throws IOException public int flush(ByteBuffer... buffers) throws IOException
@ -298,6 +384,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
* Updates selection key. This method schedules a call to doUpdateKey to do the keyChange * Updates selection key. This method schedules a call to doUpdateKey to do the keyChange
*/ */
private void updateKey() private void updateKey()
{
if (!_selected)
{ {
int current_ops=-1; int current_ops=-1;
if (getChannel().isOpen()) if (getChannel().isOpen())
@ -319,6 +407,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
_selectSet.wakeup(); _selectSet.wakeup();
} }
} }
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -457,7 +546,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
isOutputShutdown(), isOutputShutdown(),
_interestOps, _interestOps,
keyString, keyString,
getSelectableConnection()); getAsyncConnection());
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -466,12 +555,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
return _selectSet; return _selectSet;
} }
/* ------------------------------------------------------------ */
@Override
public SelectableConnection getSelectableConnection()
{
return _connection;
}
} }

View File

@ -1,228 +0,0 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class SelectableConnection implements Connection
{
private static final Logger LOG = Log.getLogger(SelectableConnection.class);
protected final Lock _lock=new ReentrantLock();
protected final SelectableEndPoint _endp;
private final long _createdTimeStamp;
private final Condition _readable=_lock.newCondition();
private final Condition _writeable=_lock.newCondition();
private Thread _readBlocked;
private Thread _writeBlocked;
private final Runnable _reader=new Runnable()
{
@Override
public void run()
{
try
{
doRead();
}
catch(Throwable th)
{
LOG.warn(th);
}
}
};
private final Runnable _writer=new Runnable()
{
@Override
public void run()
{
try
{
doWrite();
}
catch(Throwable th)
{
LOG.warn(th);
}
}
};
private volatile int _maxIdleTime=-1;
public SelectableConnection(SelectableEndPoint endp)
{
_endp=endp;
_createdTimeStamp = System.currentTimeMillis();
}
@Override
public EndPoint getEndPoint()
{
return _endp;
}
public SelectableEndPoint getSelectableEndPoint()
{
return _endp;
}
@Override
public long getCreatedTimeStamp()
{
return _createdTimeStamp;
}
public Runnable onReadable()
{
_lock.lock();
try
{
if (_readBlocked!=null)
_readable.signal();
else
return _reader;
}
finally
{
_lock.unlock();
}
return null;
}
public Runnable onWriteable()
{
_lock.lock();
try
{
if (_writeBlocked!=null)
_writeable.signal();
else
return _writer;
}
finally
{
_lock.unlock();
}
return null;
}
public boolean blockReadable()
{
_lock.lock();
boolean readable=false;
try
{
if (_readBlocked!=null)
throw new IllegalStateException("already blocked by "+_readBlocked);
_readBlocked=Thread.currentThread();
_endp.setReadInterested(true);
readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
finally
{
if (!readable)
_endp.setReadInterested(false);
_readBlocked=null;
_lock.unlock();
}
return readable;
}
public boolean blockWriteable()
{
_lock.lock();
boolean writeable=false;
try
{
if (_writeBlocked!=null)
throw new IllegalStateException("already blocked by "+_writeBlocked);
_writeBlocked=Thread.currentThread();
_endp.setWriteInterested(true);
writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
finally
{
if (!writeable)
_endp.setWriteInterested(false);
_writeBlocked=null;
_lock.unlock();
}
return writeable;
}
public void onIdleExpired(long idleForMs)
{
try
{
LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
if (_endp.isInputShutdown() || _endp.isOutputShutdown())
_endp.close();
else
_endp.shutdownOutput();
}
catch(IOException e)
{
LOG.ignore(e);
try
{
_endp.close();
}
catch(IOException e2)
{
LOG.ignore(e2);
}
}
}
protected void doRead()
{
throw new IllegalStateException();
}
protected void doWrite()
{
throw new IllegalStateException();
}
@Override
public int getMaxIdleTime()
{
int max=_maxIdleTime;
return (max==-1)?_endp.getMaxIdleTime():max;
}
public void setMaxIdleTime(int max)
{
_maxIdleTime=max;
}
@Override
public String toString()
{
return String.format("%s@%x rb=%s wb=%b", getClass().getSimpleName(), hashCode(),_readBlocked,_writeBlocked);
}
public void onInputShutdown() throws IOException
{
}
public void onClose()
{
}
}

View File

@ -1,38 +0,0 @@
package org.eclipse.jetty.io;
public interface SelectableEndPoint extends EndPoint
{
public abstract void setWriteInterested(boolean interested);
public abstract boolean isWriteInterested();
public abstract void setReadInterested(boolean interested);
public abstract boolean isReadInterested();
/* ------------------------------------------------------------ */
SelectableConnection getSelectableConnection();
/* ------------------------------------------------------------ */
/** Callback when idle.
* <p>An endpoint is idle if there has been no IO activity for
* {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true.
* @param idleForMs TODO
*/
public void onIdleExpired(long idleForMs);
/* ------------------------------------------------------------ */
/** Set if the endpoint should be checked for idleness
*/
public void setCheckForIdle(boolean check);
/* ------------------------------------------------------------ */
/** Get if the endpoint should be checked for idleness
*/
public boolean isCheckForIdle();
public long getLastNotIdleTimestamp();
public void checkForIdle(long now);
}

View File

@ -337,10 +337,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
protected abstract void endPointOpened(SelectChannelEndPoint endpoint); protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected abstract void endPointUpgraded(SelectChannelEndPoint endpoint,Connection oldConnection); protected abstract void endPointUpgraded(SelectChannelEndPoint endpoint,AsyncConnection oldConnection);
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
public abstract SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment); public abstract AbstractAsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment);
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -700,7 +700,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{ {
for (SelectChannelEndPoint endp:_endPoints.keySet()) for (SelectChannelEndPoint endp:_endPoints.keySet())
{ {
endp.checkForIdle(idle_now); endp.checkForIdleOrReadWriteTimeout(idle_now);
} }
} }
public String toString() {return "Idle-"+super.toString();} public String toString() {return "Idle-"+super.toString();}
@ -839,7 +839,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{ {
LOG.debug("destroyEndPoint {}",endp); LOG.debug("destroyEndPoint {}",endp);
_endPoints.remove(endp); _endPoints.remove(endp);
SelectableConnection connection=endp.getSelectableConnection(); AsyncConnection connection=endp.getAsyncConnection();
if (connection!=null) if (connection!=null)
connection.onClose(); connection.onClose();
endPointClosed(endp); endPointClosed(endp);

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import static org.eclipse.jetty.io.CompleteIOFuture.COMPLETE;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -40,27 +42,62 @@ import org.eclipse.jetty.util.log.Logger;
* it's source/sink of encrypted data. It then provides {@link #getAppEndPoint()} to * it's source/sink of encrypted data. It then provides {@link #getAppEndPoint()} to
* expose a source/sink of unencrypted data to another connection (eg HttpConnection). * expose a source/sink of unencrypted data to another connection (eg HttpConnection).
*/ */
public class SslConnection extends SelectableConnection public class SslConnection extends AbstractAsyncConnection
{ {
static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0); private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0);
private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>(); private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
private final Lock _lock = new ReentrantLock();
private final RecycledIOFuture _appReadFuture = new RecycledIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable callback)
{
if (_appReadTask!=null)
throw new IllegalStateException();
_appReadTask=callback;
}
};
private IOFuture.Callback _writeable = new IOFuture.Callback()
{
@Override
public void onReady()
{
_appEndPoint.completeWrite();
}
@Override
public void onFail(Throwable cause)
{
LOG.warn("FAILED: "+cause);
}
};
private final RecycledIOFuture _appWriteFuture = new RecycledIOFuture(true,_lock);
private Runnable _appReadTask;
private final SSLEngine _engine; private final SSLEngine _engine;
private final SSLSession _session; private final SSLSession _session;
private SelectableConnection _appConnection; private AbstractAsyncConnection _appConnection;
private final AppEndPoint _appEndPoint; private final AppEndPoint _appEndPoint;
private int _allocations; private int _allocations;
private SslBuffers _buffers; private SslBuffers _buffers;
private ByteBuffer _inNet; private ByteBuffer _inNet;
private ByteBuffer _inApp; private ByteBuffer _inApp;
private ByteBuffer _outNet; private ByteBuffer _outNet;
private SelectableEndPoint _endp; private AsyncEndPoint _endp;
private boolean _allowRenegotiate=true; private boolean _allowRenegotiate=true;
private boolean _handshook; private boolean _handshook;
private boolean _ishut; private boolean _eofIn;
private boolean _oshut; private boolean _oshut;
private IOFuture _netReadFuture;
private IOFuture _netWriteFuture;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* this is a half baked buffer pool /* this is a half baked buffer pool
@ -81,13 +118,13 @@ public class SslConnection extends SelectableConnection
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SslConnection(SSLEngine engine,SelectableEndPoint endp) public SslConnection(SSLEngine engine,AsyncEndPoint endp)
{ {
this(engine,endp,System.currentTimeMillis()); this(engine,endp,System.currentTimeMillis());
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SslConnection(SSLEngine engine,SelectableEndPoint endp, long timeStamp) public SslConnection(SSLEngine engine,AsyncEndPoint endp, long timeStamp)
{ {
super(endp); super(endp);
_engine=engine; _engine=engine;
@ -98,7 +135,7 @@ public class SslConnection extends SelectableConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void setAppConnection(SelectableConnection connection) public void setAppConnection(AbstractAsyncConnection connection)
{ {
_appConnection=connection; _appConnection=connection;
} }
@ -192,12 +229,7 @@ public class SslConnection extends SelectableConnection
_lock.unlock(); _lock.unlock();
} }
} }
/* ------------------------------------------------------------ */
@Override
public boolean isIdle()
{
return _appConnection.isIdle();
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
@ -228,49 +260,35 @@ public class SslConnection extends SelectableConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public void doRead() public void onReadable()
{ {
LOG.debug("do Read {}",_endp); LOG.debug("onReadable {}",this);
_lock.lock(); _lock.lock();
try try
{ {
_netReadFuture=null;
allocateBuffers(); allocateBuffers();
boolean progress=true; boolean progress=true;
while(progress) while(progress && _appReadTask==null)
{ {
progress=false; progress=false;
// Fill the input buffer with everything available // Read into the input network buffer
if (!BufferUtil.isFull(_inNet)) if (!BufferUtil.isFull(_inNet))
progress|=_endp.fill(_inNet)>0; {
int filled = _endp.fill(_inNet);
LOG.debug("filled {}",filled);
if (filled<0)
_eofIn=true;
else if (filled>0)
progress=true;
}
// process the data
progress|=process(null); progress|=process(null);
if (BufferUtil.hasContent(_inApp) && _appEndPoint.isReadInterested())
{
_appEndPoint._readInterested=false;
progress=true;
Runnable task =_appConnection.onReadable();
if (task!=null)
{
// We have a task from the application connection. We could
// dispatch this to a thread, but we are likely just to return afterwards.
// So we unlock (so another thread can call doRead if the app blocks) and
// call the app ourselves.
try
{
_lock.unlock();
task.run();
}
finally
{
_lock.lock();
}
}
}
} }
} }
catch(IOException e) catch(IOException e)
@ -280,49 +298,31 @@ public class SslConnection extends SelectableConnection
finally finally
{ {
releaseBuffers(); releaseBuffers();
_endp.setReadInterested(_appEndPoint.isReadInterested()); if (!_appReadFuture.isComplete() && _netReadFuture==null)
_endp.setWriteInterested(BufferUtil.hasContent(_outNet)); _netReadFuture=scheduleOnReadable();
LOG.debug("done Read {}",_endp);
LOG.debug("!onReadable {} {}",this,_netReadFuture);
_lock.unlock(); _lock.unlock();
} }
}
/* ------------------------------------------------------------ */ // Run any ready callback from _appReadFuture in this thread.
@Override if (_appReadTask!=null)
public void doWrite()
{ {
_lock.lock(); Runnable task=_appReadTask;
try _appReadTask=null;
{
while (BufferUtil.hasContent(_outNet))
{
int written = _endp.flush(_outNet);
if (written>0 && _appEndPoint.isWriteInterested())
{
Runnable task =_appConnection.onWriteable();
if (task!=null)
task.run(); task.run();
} }
} }
}
catch(IOException e)
{
LOG.warn(e);
}
finally
{
if (BufferUtil.hasContent(_outNet))
_endp.setWriteInterested(true);
_lock.unlock();
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private boolean process(ByteBuffer appOut) throws IOException private boolean process(ByteBuffer appOut) throws IOException
{ {
boolean some_progress=false; boolean some_progress=false;
_lock.lock();
if (!_lock.tryLock())
throw new IllegalStateException();
try try
{ {
allocateBuffers(); allocateBuffers();
@ -346,12 +346,12 @@ public class SslConnection extends SelectableConnection
case NOT_HANDSHAKING: case NOT_HANDSHAKING:
{ {
// Try unwrapping some application data // Try unwrapping some application data
if (!BufferUtil.isFull(_inApp) && BufferUtil.hasContent(_inNet) && unwrap()) if (!BufferUtil.isFull(_inApp) && BufferUtil.hasContent(_inNet))
progress=true; progress|=unwrap();
// Try wrapping some application data // Try wrapping some application data
if (BufferUtil.hasContent(appOut) && !BufferUtil.isFull(_outNet) && wrap(appOut)) if (BufferUtil.hasContent(appOut) && !BufferUtil.isFull(_outNet))
progress=true; progress|=wrap(appOut);
} }
break; break;
@ -372,8 +372,8 @@ public class SslConnection extends SelectableConnection
// The SSL needs to send some handshake data to the other side // The SSL needs to send some handshake data to the other side
if (_handshook && !_allowRenegotiate) if (_handshook && !_allowRenegotiate)
_endp.close(); _endp.close();
else if (wrap(appOut)) else
progress=true; progress|=wrap(appOut);
} }
break; break;
@ -382,18 +382,17 @@ public class SslConnection extends SelectableConnection
// The SSL needs to receive some handshake data from the other side // The SSL needs to receive some handshake data from the other side
if (_handshook && !_allowRenegotiate) if (_handshook && !_allowRenegotiate)
_endp.close(); _endp.close();
else if (BufferUtil.isEmpty(_inNet) && _endp.isInputShutdown()) else if (BufferUtil.isEmpty(_inNet) && _eofIn)
_endp.close(); _endp.close();
else if (unwrap()) else
progress=true; progress|=unwrap();
} }
break; break;
} }
// pass on ishut/oshut state // pass on ishut/oshut state
if (_endp.isOpen() && _endp.isInputShutdown() && BufferUtil.isEmpty(_inNet)) if (_endp.isOpen() && _eofIn && BufferUtil.isEmpty(_inNet))
_engine.closeInbound(); _engine.closeInbound();
if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outNet)) if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outNet))
_endp.shutdownOutput(); _endp.shutdownOutput();
@ -417,6 +416,9 @@ public class SslConnection extends SelectableConnection
private boolean wrap(final ByteBuffer outApp) throws IOException private boolean wrap(final ByteBuffer outApp) throws IOException
{ {
if (_netWriteFuture!=null && !_netWriteFuture.isComplete())
return false;
final SSLEngineResult result; final SSLEngineResult result;
int pos=BufferUtil.flipToFill(_outNet); int pos=BufferUtil.flipToFill(_outNet);
@ -461,15 +463,28 @@ public class SslConnection extends SelectableConnection
throw new IOException(result.toString()); throw new IOException(result.toString());
} }
int flushed = _endp.flush(_outNet); if (BufferUtil.hasContent(_outNet))
{
IOFuture write =_endp.write(_outNet);
if (write.isComplete())
return true;
return result.bytesConsumed()>0 || result.bytesProduced()>0 || flushed>0; _netWriteFuture=write;
_netWriteFuture.setCallback(_writeable);
}
return result.bytesConsumed()>0 || result.bytesProduced()>0 ;
} }
private boolean unwrap() throws IOException private boolean unwrap() throws IOException
{ {
if (BufferUtil.isEmpty(_inNet)) if (BufferUtil.isEmpty(_inNet))
{
if (_netReadFuture==null)
_netReadFuture=scheduleOnReadable();
LOG.debug("{} unwrap read {}",_session,_netReadFuture);
return false; return false;
}
final SSLEngineResult result; final SSLEngineResult result;
@ -501,9 +516,11 @@ public class SslConnection extends SelectableConnection
{ {
case BUFFER_UNDERFLOW: case BUFFER_UNDERFLOW:
// need to wait for more net data // need to wait for more net data
_inNet.compact().flip(); if (_eofIn)
if (_endp.isInputShutdown())
_inNet.clear().limit(0); _inNet.clear().limit(0);
else if (_netReadFuture==null)
_netReadFuture=scheduleOnReadable();
break; break;
case BUFFER_OVERFLOW: case BUFFER_OVERFLOW:
@ -527,8 +544,9 @@ public class SslConnection extends SelectableConnection
throw new IOException(result.toString()); throw new IOException(result.toString());
} }
//if (LOG.isDebugEnabled() && result.bytesProduced()>0) // If any bytes were produced and we have an app read waiting, make it ready.
// LOG.debug("{} unwrapped '{}'",_session,buffer); if (result.bytesProduced()>0 && !_appReadFuture.isComplete())
_appReadFuture.ready();
return result.bytesConsumed()>0 || result.bytesProduced()>0; return result.bytesConsumed()>0 || result.bytesProduced()>0;
} }
@ -540,7 +558,7 @@ public class SslConnection extends SelectableConnection
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SelectableEndPoint getAppEndPoint() public AsyncEndPoint getAppEndPoint()
{ {
return _appEndPoint; return _appEndPoint;
} }
@ -554,10 +572,14 @@ public class SslConnection extends SelectableConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public class AppEndPoint implements SelectableEndPoint public class AppEndPoint extends AbstractEndPoint implements AsyncEndPoint
{ {
boolean _readInterested; ByteBuffer[] _writeBuffers;
boolean _writeInterested;
AppEndPoint()
{
super(_endp.getLocalAddress(),_endp.getRemoteAddress());
}
public SSLEngine getSslEngine() public SSLEngine getSslEngine()
{ {
@ -600,30 +622,6 @@ public class SslConnection extends SelectableConnection
} }
} }
@Override
public void shutdownInput() throws IOException
{
LOG.debug("{} ssl endp.ishut!",_session);
// We do not do a closeInput here, as SSL does not support half close.
// isInputShutdown works it out itself from buffer state and underlying endpoint state.
}
@Override
public boolean isInputShutdown()
{
_lock.lock();
try
{
return _endp.isInputShutdown() &&
!(_inApp!=null&&BufferUtil.hasContent(_inApp)) &&
!(_inNet!=null&&BufferUtil.hasContent(_inNet));
}
finally
{
_lock.unlock();
}
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
@ -642,7 +640,7 @@ public class SslConnection extends SelectableConnection
process(null); process(null);
if (BufferUtil.hasContent(_inApp)) if (BufferUtil.hasContent(_inApp))
BufferUtil.flipPutFlip(_inApp,buffer); BufferUtil.append(_inApp,buffer);
} }
finally finally
{ {
@ -650,7 +648,7 @@ public class SslConnection extends SelectableConnection
} }
int filled=buffer.remaining()-size; int filled=buffer.remaining()-size;
if (filled==0 && isInputShutdown()) if (filled==0 && _eofIn)
return -1; return -1;
return filled; return filled;
} }
@ -689,17 +687,6 @@ public class SslConnection extends SelectableConnection
return _endp; return _endp;
} }
public void flush() throws IOException
{
process(null);
}
@Override
public void onIdleExpired(long idleForMs)
{
_endp.onIdleExpired(idleForMs);
}
@Override @Override
public void setCheckForIdle(boolean check) public void setCheckForIdle(boolean check)
{ {
@ -712,40 +699,6 @@ public class SslConnection extends SelectableConnection
return _endp.isCheckForIdle(); return _endp.isCheckForIdle();
} }
@Override
public InetSocketAddress getLocalAddress()
{
return _endp.getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
{
return _endp.getRemoteAddress();
}
@Override
public int getMaxIdleTime()
{
return _endp.getMaxIdleTime();
}
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
_endp.setMaxIdleTime(timeMs);
}
@Override
public SelectableConnection getSelectableConnection()
{
return _appConnection;
}
public void setSelectableConnection(SelectableConnection connection)
{
_appConnection=(SelectableConnection)connection;
}
@Override @Override
public String toString() public String toString()
@ -759,23 +712,48 @@ public class SslConnection extends SelectableConnection
int i = inbound == null? -1 : inbound.remaining(); int i = inbound == null? -1 : inbound.remaining();
int o = outbound == null ? -1 : outbound.remaining(); int o = outbound == null ? -1 : outbound.remaining();
int u = unwrap == null ? -1 : unwrap.remaining(); int u = unwrap == null ? -1 : unwrap.remaining();
return String.format("SSL %s %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}", return String.format("SSL %s %s i/o/u=%d/%d/%d eof=%b oshut=%b {%s}",
super.toString(), super.toString(),
_engine.getHandshakeStatus(), _engine.getHandshakeStatus(),
i, o, u, i, o, u,
_ishut, _oshut, _eofIn, _oshut,
_appConnection); _appConnection);
} }
@Override @Override
public void setWriteInterested(boolean interested) public long getActivityTimestamp()
{ {
return _endp.getActivityTimestamp();
}
@Override
public long getCreatedTimeStamp()
{
return _endp.getCreatedTimeStamp();
}
@Override
public AsyncConnection getAsyncConnection()
{
return _appConnection;
}
@Override
public IOFuture read() throws IllegalStateException
{
LOG.debug("{} sslEndp.read()",_session);
_lock.lock(); _lock.lock();
try try
{ {
_writeInterested=interested; // Do we already have application input data?
if (interested) if (BufferUtil.hasContent(_inApp))
_endp.setWriteInterested(true); return COMPLETE;
// No, we need to schedule a network read
_appReadFuture.recycle();
scheduleOnReadable();
return _appReadFuture;
} }
finally finally
{ {
@ -784,20 +762,31 @@ public class SslConnection extends SelectableConnection
} }
@Override @Override
public boolean isWriteInterested() public IOFuture write(ByteBuffer... buffers)
{
return _writeInterested;
}
@Override
public void setReadInterested(boolean interested)
{ {
_lock.lock(); _lock.lock();
try try
{ {
_readInterested=interested; if (!_appWriteFuture.isComplete())
if (interested) throw new IllegalStateException("previous write not complete");
_endp.setReadInterested(true);
// Try to process all
for (ByteBuffer b : buffers)
{
process(b);
if (b.hasRemaining())
{
_writeBuffers=buffers;
_appWriteFuture.recycle();
return _appWriteFuture;
}
}
return COMPLETE;
}
catch (IOException e)
{
return new CompleteIOFuture(e);
} }
finally finally
{ {
@ -805,21 +794,32 @@ public class SslConnection extends SelectableConnection
} }
} }
@Override void completeWrite()
public boolean isReadInterested()
{ {
return _readInterested; _lock.lock();
} try
{
if (!_appWriteFuture.isComplete())
throw new IllegalStateException("previous write not complete");
@Override // Try to process all
public long getLastNotIdleTimestamp() for (ByteBuffer b : _writeBuffers)
{ {
return _endp.getLastNotIdleTimestamp(); process(b);
}
@Override if (b.hasRemaining())
public void checkForIdle(long now) return;
}
_appWriteFuture.ready();
}
catch (IOException e)
{ {
_appWriteFuture.fail(e);
}
finally
{
_lock.unlock();
}
} }
} }
} }

View File

@ -17,14 +17,14 @@ public class BufferUtilTest
ByteBuffer from=BufferUtil.toBuffer("12345"); ByteBuffer from=BufferUtil.toBuffer("12345");
BufferUtil.clear(to); BufferUtil.clear(to);
assertEquals(5,BufferUtil.flipPutFlip(from,to)); assertEquals(5,BufferUtil.append(from,to));
assertTrue(BufferUtil.isEmpty(from)); assertTrue(BufferUtil.isEmpty(from));
assertEquals("12345",BufferUtil.toString(to)); assertEquals("12345",BufferUtil.toString(to));
from=BufferUtil.toBuffer("XX67890ZZ"); from=BufferUtil.toBuffer("XX67890ZZ");
from.position(2); from.position(2);
assertEquals(5,BufferUtil.flipPutFlip(from,to)); assertEquals(5,BufferUtil.append(from,to));
assertEquals(2,from.remaining()); assertEquals(2,from.remaining());
assertEquals("1234567890",BufferUtil.toString(to)); assertEquals("1234567890",BufferUtil.toString(to));
} }
@ -37,14 +37,14 @@ public class BufferUtilTest
ByteBuffer from=BufferUtil.toBuffer("12345"); ByteBuffer from=BufferUtil.toBuffer("12345");
BufferUtil.clear(to); BufferUtil.clear(to);
assertEquals(5,BufferUtil.flipPutFlip(from,to)); assertEquals(5,BufferUtil.append(from,to));
assertTrue(BufferUtil.isEmpty(from)); assertTrue(BufferUtil.isEmpty(from));
assertEquals("12345",BufferUtil.toString(to)); assertEquals("12345",BufferUtil.toString(to));
from=BufferUtil.toBuffer("XX67890ZZ"); from=BufferUtil.toBuffer("XX67890ZZ");
from.position(2); from.position(2);
assertEquals(5,BufferUtil.flipPutFlip(from,to)); assertEquals(5,BufferUtil.append(from,to));
assertEquals(2,from.remaining()); assertEquals(2,from.remaining());
assertEquals("1234567890",BufferUtil.toString(to)); assertEquals("1234567890",BufferUtil.toString(to));
} }

View File

@ -36,10 +36,8 @@ public abstract class EndPointTest<T extends EndPoint>
// Client and server are open // Client and server are open
assertTrue(c.client.isOpen()); assertTrue(c.client.isOpen());
assertFalse(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown()); assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen()); assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertFalse(c.server.isOutputShutdown()); assertFalse(c.server.isOutputShutdown());
// Server sends response and closes output // Server sends response and closes output
@ -48,10 +46,8 @@ public abstract class EndPointTest<T extends EndPoint>
// client server are open, server is oshut // client server are open, server is oshut
assertTrue(c.client.isOpen()); assertTrue(c.client.isOpen());
assertFalse(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown()); assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen()); assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown()); assertTrue(c.server.isOutputShutdown());
// Client reads response // Client reads response
@ -62,10 +58,8 @@ public abstract class EndPointTest<T extends EndPoint>
// Client and server are open, server is oshut // Client and server are open, server is oshut
assertTrue(c.client.isOpen()); assertTrue(c.client.isOpen());
assertFalse(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown()); assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen()); assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown()); assertTrue(c.server.isOutputShutdown());
// Client reads -1 // Client reads -1
@ -75,10 +69,8 @@ public abstract class EndPointTest<T extends EndPoint>
// Client and server are open, server is oshut, client is ishut // Client and server are open, server is oshut, client is ishut
assertTrue(c.client.isOpen()); assertTrue(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown()); assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen()); assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown()); assertTrue(c.server.isOutputShutdown());
// Client shutsdown output, which is a close because already ishut // Client shutsdown output, which is a close because already ishut
@ -86,10 +78,8 @@ public abstract class EndPointTest<T extends EndPoint>
// Client is closed. Server is open and oshut // Client is closed. Server is open and oshut
assertFalse(c.client.isOpen()); assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown()); assertTrue(c.client.isOutputShutdown());
assertTrue(c.server.isOpen()); assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown()); assertTrue(c.server.isOutputShutdown());
// Server reads close // Server reads close
@ -99,10 +89,8 @@ public abstract class EndPointTest<T extends EndPoint>
// Client and Server are closed // Client and Server are closed
assertFalse(c.client.isOpen()); assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown()); assertTrue(c.client.isOutputShutdown());
assertFalse(c.server.isOpen()); assertFalse(c.server.isOpen());
assertTrue(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown()); assertTrue(c.server.isOutputShutdown());
} }
@ -121,38 +109,30 @@ public abstract class EndPointTest<T extends EndPoint>
assertEquals("request",BufferUtil.toString(buffer)); assertEquals("request",BufferUtil.toString(buffer));
assertTrue(c.client.isOpen()); assertTrue(c.client.isOpen());
assertFalse(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown()); assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen()); assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertFalse(c.server.isOutputShutdown()); assertFalse(c.server.isOutputShutdown());
c.client.close(); c.client.close();
assertFalse(c.client.isOpen()); assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown()); assertTrue(c.client.isOutputShutdown());
assertTrue(c.server.isOpen()); assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertFalse(c.server.isOutputShutdown()); assertFalse(c.server.isOutputShutdown());
len = c.server.fill(buffer); len = c.server.fill(buffer);
assertEquals(-1,len); assertEquals(-1,len);
assertFalse(c.client.isOpen()); assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown()); assertTrue(c.client.isOutputShutdown());
assertTrue(c.server.isOpen()); assertTrue(c.server.isOpen());
assertTrue(c.server.isInputShutdown());
assertFalse(c.server.isOutputShutdown()); assertFalse(c.server.isOutputShutdown());
c.server.shutdownOutput(); c.server.shutdownOutput();
assertFalse(c.client.isOpen()); assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown()); assertTrue(c.client.isOutputShutdown());
assertFalse(c.server.isOpen()); assertFalse(c.server.isOpen());
assertTrue(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown()); assertTrue(c.server.isOutputShutdown());
} }

View File

@ -11,7 +11,7 @@ import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocket;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SslConnection; import org.eclipse.jetty.io.SslConnection;
@ -46,15 +46,14 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
} }
@Override @Override
protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint) protected AbstractAsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
{ {
SSLEngine engine = __sslCtxFactory.newSslEngine(); SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(false); engine.setUseClientMode(false);
SslConnection connection = new SslConnection(engine,endpoint); SslConnection connection = new SslConnection(engine,endpoint);
SelectableConnection delegate = super.newConnection(channel,connection.getAppEndPoint()); AbstractAsyncConnection delegate = super.newConnection(channel,connection.getAppEndPoint());
connection.setAppConnection(delegate); connection.setAppConnection(delegate);
connection.getAppEndPoint().setReadInterested(endpoint.isReadInterested());
return connection; return connection;
} }
@ -73,12 +72,6 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
// SSL does not do half closes // SSL does not do half closes
} }
@Override
public void testBlockIn() throws Exception
{
super.testBlockIn();
}
@Test @Test
public void testTcpClose() throws Exception public void testTcpClose() throws Exception

View File

@ -19,10 +19,11 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.SelectableConnection; import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SelectorManager;
@ -36,7 +37,7 @@ import org.junit.Test;
public class SelectChannelEndPointTest public class SelectChannelEndPointTest
{ {
protected SelectableEndPoint _lastEndp; protected volatile AsyncEndPoint _lastEndp;
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected SelectorManager _manager = new SelectorManager() protected SelectorManager _manager = new SelectorManager()
@ -58,22 +59,22 @@ public class SelectChannelEndPointTest
} }
@Override @Override
protected void endPointUpgraded(SelectChannelEndPoint endpoint, Connection oldConnection) protected void endPointUpgraded(SelectChannelEndPoint endpoint, AsyncConnection oldConnection)
{ {
} }
@Override @Override
public SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment) public AbstractAsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment)
{ {
return SelectChannelEndPointTest.this.newConnection(channel,endpoint); AbstractAsyncConnection connection = SelectChannelEndPointTest.this.newConnection(channel,endpoint);
return connection;
} }
@Override @Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{ {
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000); SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000);
endp.setReadInterested(true); endp.setAsyncConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
_lastEndp=endp; _lastEndp=endp;
return endp; return endp;
} }
@ -81,10 +82,13 @@ public class SelectChannelEndPointTest
// Must be volatile or the test may fail spuriously // Must be volatile or the test may fail spuriously
private volatile int _blockAt=0; private volatile int _blockAt=0;
private volatile int _writeCount=1;
@Before @Before
public void startManager() throws Exception public void startManager() throws Exception
{ {
_writeCount=1;
_lastEndp=null;
_connector = ServerSocketChannel.open(); _connector = ServerSocketChannel.open();
_connector.socket().bind(null); _connector.socket().bind(null);
_threadPool.start(); _threadPool.start();
@ -104,26 +108,29 @@ public class SelectChannelEndPointTest
return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort()); return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort());
} }
protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint) protected AbstractAsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
{ {
return new TestConnection(endpoint); AbstractAsyncConnection connection = new TestConnection(endpoint);
connection.scheduleOnReadable();
return connection;
} }
public class TestConnection extends SelectableConnection public class TestConnection extends AbstractAsyncConnection
{ {
ByteBuffer _in = BufferUtil.allocate(32*1024); ByteBuffer _in = BufferUtil.allocate(32*1024);
ByteBuffer _out = BufferUtil.allocate(32*1024); ByteBuffer _out = BufferUtil.allocate(32*1024);
public TestConnection(SelectableEndPoint endp) public TestConnection(AsyncEndPoint endp)
{ {
super(endp); super(endp);
} }
@Override @Override
public void doRead() public void onReadable()
{ {
try try
{ {
_endp.setCheckForIdle(false);
boolean progress=true; boolean progress=true;
while(progress) while(progress)
{ {
@ -132,37 +139,64 @@ public class SelectChannelEndPointTest
// Fill the input buffer with everything available // Fill the input buffer with everything available
if (!BufferUtil.isFull(_in)) if (!BufferUtil.isFull(_in))
progress|=_endp.fill(_in)>0; progress|=_endp.fill(_in)>0;
// If the tests wants to block, then block // If the tests wants to block, then block
while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt && blockReadable()) while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt)
{
_endp.read().await();
progress|=_endp.fill(_in)>0; progress|=_endp.fill(_in)>0;
}
// Copy to the out buffer // Copy to the out buffer
if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in,_out)>0) if (BufferUtil.hasContent(_in) && BufferUtil.append(_in,_out)>0)
progress=true; progress=true;
// Try non blocking write // Blocking writes
if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0) if (BufferUtil.hasContent(_out))
progress=true;
// Try blocking write
while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out) && blockWriteable())
{ {
if (_endp.flush(_out)>0) ByteBuffer out=_out.duplicate();
BufferUtil.clear(_out);
for (int i=0;i<_writeCount;i++)
{
_endp.write(out.asReadOnlyBuffer()).await();
}
progress=true; progress=true;
} }
} }
} }
catch(ClosedChannelException e) catch(ClosedChannelException e)
{ {
System.err.println(e); // System.err.println(e);
} }
catch(IOException e) catch(ExecutionException e)
{
// Timeout does not close, so echo exception then shutdown
try
{
// System.err.println("Execution Exception! "+e);
_endp.write(BufferUtil.toBuffer("Timeout: "+BufferUtil.toString(_in))).await();
_endp.shutdownOutput();
}
catch(Exception e2)
{
e2.printStackTrace();
}
}
catch(InterruptedException e)
{
// System.err.println(e);
}
catch(Exception e)
{ {
e.printStackTrace(); e.printStackTrace();
} }
finally finally
{ {
_endp.setReadInterested(true); if (_endp.isOpen())
{
_endp.setCheckForIdle(true);
scheduleOnReadable();
}
} }
} }
@ -186,12 +220,6 @@ public class SelectChannelEndPointTest
{ {
} }
@Override
public boolean isIdle()
{
return false;
}
} }
@ -201,7 +229,7 @@ public class SelectChannelEndPointTest
{ {
Socket client = newClient(); Socket client = newClient();
client.setSoTimeout(500); client.setSoTimeout(60000);
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
@ -220,6 +248,7 @@ public class SelectChannelEndPointTest
} }
// wait for read timeout // wait for read timeout
client.setSoTimeout(500);
long start=System.currentTimeMillis(); long start=System.currentTimeMillis();
try try
{ {
@ -310,7 +339,7 @@ public class SelectChannelEndPointTest
@Test @Test
public void testBlockIn() throws Exception public void testBlockRead() throws Exception
{ {
Socket client = newClient(); Socket client = newClient();
@ -330,6 +359,8 @@ public class SelectChannelEndPointTest
clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush(); clientOutputStream.flush();
while(_lastEndp==null);
_lastEndp.setMaxIdleTime(10*specifiedTimeout); _lastEndp.setMaxIdleTime(10*specifiedTimeout);
Thread.sleep(2 * specifiedTimeout); Thread.sleep(2 * specifiedTimeout);
@ -407,6 +438,63 @@ public class SelectChannelEndPointTest
} }
@Test
public void testBlockedReadIdle() throws Exception
{
Socket client = newClient();
OutputStream clientOutputStream = client.getOutputStream();
client.setSoTimeout(5000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.register(server);
// Write client to server
clientOutputStream.write("HelloWorld".getBytes("UTF-8"));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b>0);
assertEquals(c,(char)b);
}
// Set Max idle
_lastEndp.setMaxIdleTime(500);
// Write 8 and cause block waiting for 10
_blockAt=10;
clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush();
// read until idle shutdown received
long start=System.currentTimeMillis();
int b=client.getInputStream().read();
assertEquals('T',b);
long idle=System.currentTimeMillis()-start;
assertTrue(idle>400);
assertTrue(idle<2000);
for (char c : "imeout: 12345678".toCharArray())
{
b = client.getInputStream().read();
assertTrue(b>0);
assertEquals(c,(char)b);
}
// But endpoint is still open.
assertTrue(_lastEndp.isOpen());
// Wait for another idle callback
Thread.sleep(2000);
// endpoint is closed.
assertFalse(_lastEndp.isOpen());
}
@Test @Test
public void testStress() throws Exception public void testStress() throws Exception
@ -478,4 +566,51 @@ public class SelectChannelEndPointTest
assertTrue(latch.await(100,TimeUnit.SECONDS)); assertTrue(latch.await(100,TimeUnit.SECONDS));
} }
@Test
public void testWriteBlock() throws Exception
{
Socket client = newClient();
client.setSoTimeout(10000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.register(server);
// Write client to server
_writeCount=10000;
String data="Now is the time for all good men to come to the aid of the party";
client.getOutputStream().write(data.getBytes("UTF-8"));
for (int i=0;i<_writeCount;i++)
{
// Verify echo server to client
for (int j=0;j<data.length();j++)
{
char c=data.charAt(j);
int b = client.getInputStream().read();
assertTrue(b>0);
assertEquals("test-"+i+"/"+j,c,(char)b);
}
if (i==0)
_lastEndp.setMaxIdleTime(60000);
if (i%100==0)
TimeUnit.MILLISECONDS.sleep(10);
}
client.close();
int i=0;
while (server.isOpen())
{
assert(i++<10);
Thread.sleep(10);
}
}
} }

View File

@ -29,7 +29,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Buffers.Type; import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
@ -1167,7 +1167,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void connectionOpened(Connection connection) protected void connectionOpened(AsyncConnection connection)
{ {
if (_statsStartedAt.get() == -1) if (_statsStartedAt.get() == -1)
return; return;
@ -1176,13 +1176,13 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void connectionUpgraded(Connection oldConnection, Connection newConnection) protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection)
{ {
_requestStats.set((oldConnection instanceof HttpConnection)?((HttpConnection)oldConnection).getHttpChannel().getRequests():0); _requestStats.set((oldConnection instanceof HttpConnection)?((HttpConnection)oldConnection).getHttpChannel().getRequests():0);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void connectionClosed(Connection connection) protected void connectionClosed(AsyncConnection connection)
{ {
if (_statsStartedAt.get() == -1) if (_statsStartedAt.get() == -1)
return; return;

View File

@ -239,9 +239,6 @@ public interface Connector extends LifeCycle
*/ */
void setMaxIdleTime(int ms); void setMaxIdleTime(int ms);
/* ------------------------------------------------------------ */
int getLowResourceMaxIdleTime();
void setLowResourceMaxIdleTime(int ms);
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -357,25 +354,4 @@ public interface Connector extends LifeCycle
public long getStatsOnMs(); public long getStatsOnMs();
/* ------------------------------------------------------------ */
/** Check if low on resources.
* For most connectors, low resources is measured by calling
* {@link ThreadPool#isLowOnThreads()} on the connector threadpool
* or the server threadpool if there is no connector threadpool.
* <p>
* For blocking connectors, low resources is used to trigger
* usage of {@link #getLowResourceMaxIdleTime()} for the timeout
* of an idle connection.
* <p>
* for non-blocking connectors, the number of connections is
* used instead of this method, to select the timeout of an
* idle connection.
* <p>
* For all connectors, low resources is used to trigger the
* usage of {@link #getLowResourceMaxIdleTime()} for read and
* write operations.
*
* @return true if this connector is low on resources.
*/
public boolean isLowResources();
} }

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.UncheckedPrintWriter; import org.eclipse.jetty.io.UncheckedPrintWriter;
@ -75,7 +75,7 @@ public abstract class HttpChannel
private int _requests; private int _requests;
private final Server _server; private final Server _server;
private final Connection _connection; private final AsyncConnection _connection;
private final HttpURI _uri; private final HttpURI _uri;
private final HttpFields _requestFields; private final HttpFields _requestFields;
@ -111,7 +111,7 @@ public abstract class HttpChannel
/** Constructor /** Constructor
* *
*/ */
public HttpChannel(Server server,Connection connection) public HttpChannel(Server server,AsyncConnection connection)
{ {
_server = server; _server = server;
_connection = connection; _connection = connection;
@ -124,7 +124,7 @@ public abstract class HttpChannel
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public Connection getConnection() public AsyncConnection getConnection()
{ {
return _connection; return _connection;
} }
@ -452,7 +452,7 @@ public abstract class HttpChannel
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @see org.eclipse.jetty.io.Connection#isSuspended() * @see org.eclipse.jetty.io.AsyncConnection#isSuspended()
*/ */
public boolean isSuspended() public boolean isSuspended()
{ {

View File

@ -23,10 +23,10 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.HttpGenerator.Action; import org.eclipse.jetty.http.HttpGenerator.Action;
import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.SelectableConnection; import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.SelectableEndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -34,7 +34,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
/** /**
*/ */
public class HttpConnection extends SelectableConnection public class HttpConnection extends AbstractAsyncConnection
{ {
private static final Logger LOG = Log.getLogger(HttpConnection.class); private static final Logger LOG = Log.getLogger(HttpConnection.class);
@ -71,7 +71,7 @@ public class HttpConnection extends SelectableConnection
/** Constructor /** Constructor
* *
*/ */
public HttpConnection(Connector connector, SelectableEndPoint endpoint, Server server) public HttpConnection(Connector connector, AsyncEndPoint endpoint, Server server)
{ {
super(endpoint); super(endpoint);
_connector = connector; _connector = connector;
@ -145,23 +145,6 @@ public class HttpConnection extends SelectableConnection
return _generator; return _generator;
} }
/* ------------------------------------------------------------ */
@Override
public boolean isIdle()
{
return _parser.isIdle() && _generator.isIdle();
}
/* ------------------------------------------------------------ */
@Override
public int getMaxIdleTime()
{
if (_connector.isLowResources() && _endp.getMaxIdleTime()==_connector.getMaxIdleTime())
return _connector.getLowResourceMaxIdleTime();
if (_endp.getMaxIdleTime()>0)
return _endp.getMaxIdleTime();
return _connector.getMaxIdleTime();
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
@ -176,9 +159,9 @@ public class HttpConnection extends SelectableConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public void doRead() public void onReadable()
{ {
Connection connection = this; AsyncConnection connection = this;
boolean progress=true; boolean progress=true;
try try
@ -201,7 +184,7 @@ public class HttpConnection extends SelectableConnection
if (BufferUtil.hasContent(_requestBuffer) && _parser.parseNext(_requestBuffer)) if (BufferUtil.hasContent(_requestBuffer) && _parser.parseNext(_requestBuffer))
{ {
// don't check for idle while dispatched (unless blocking IO is done). // don't check for idle while dispatched (unless blocking IO is done).
getSelectableEndPoint().setCheckForIdle(false); getEndPoint().setCheckForIdle(false);
try try
{ {
_channel.handleRequest(); _channel.handleRequest();
@ -211,7 +194,7 @@ public class HttpConnection extends SelectableConnection
// If we are not suspended // If we are not suspended
if (!_channel.getRequest().getAsyncContinuation().isAsyncStarted()) if (!_channel.getRequest().getAsyncContinuation().isAsyncStarted())
// reenable idle checking unless request is suspended // reenable idle checking unless request is suspended
getSelectableEndPoint().setCheckForIdle(true); getEndPoint().setCheckForIdle(true);
} }
} }
@ -236,7 +219,7 @@ public class HttpConnection extends SelectableConnection
// look for a switched connection instance? // look for a switched connection instance?
if (_channel.getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) if (_channel.getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{ {
Connection switched=(Connection)_channel.getRequest().getAttribute("org.eclipse.jetty.io.Connection"); AsyncConnection switched=(AsyncConnection)_channel.getRequest().getAttribute("org.eclipse.jetty.io.Connection");
if (switched!=null) if (switched!=null)
connection=switched; connection=switched;
} }
@ -426,41 +409,30 @@ public class HttpConnection extends SelectableConnection
switch(_toFlush) switch(_toFlush)
{ {
case 10: case 10:
_endp.flush(_responseHeader,_responseBuffer); _endp.write(_responseHeader,_responseBuffer);
_toFlush=(BufferUtil.hasContent(_responseHeader)?8:0)+(BufferUtil.hasContent(_responseBuffer)?2:0);
break; break;
case 9: case 9:
_endp.flush(_responseHeader,_content); _endp.write(_responseHeader,_content);
_toFlush=(BufferUtil.hasContent(_responseHeader)?8:0)+(BufferUtil.hasContent(_content)?1:0);
if (_toFlush==0)
_content=null; _content=null;
break; break;
case 8: case 8:
_endp.flush(_responseHeader); _endp.write(_responseHeader);
_toFlush=(BufferUtil.hasContent(_responseHeader)?8:0);
break; break;
case 6: case 6:
_endp.flush(_chunk,_responseBuffer); _endp.write(_chunk,_responseBuffer);
_toFlush=(BufferUtil.hasContent(_chunk)?4:0)+(BufferUtil.hasContent(_responseBuffer)?2:0);
break; break;
case 5: case 5:
_endp.flush(_chunk,_content); _endp.write(_chunk,_content);
_toFlush=(BufferUtil.hasContent(_chunk)?4:0)+(BufferUtil.hasContent(_content)?1:0);
if (_toFlush==0)
_content=null; _content=null;
break; break;
case 4: case 4:
_endp.flush(_chunk); _endp.write(_chunk);
_toFlush=(BufferUtil.hasContent(_chunk)?4:0);
break; break;
case 2: case 2:
_endp.flush(_responseBuffer); _endp.write(_responseBuffer);
_toFlush=(BufferUtil.hasContent(_responseBuffer)?2:0);
break; break;
case 1: case 1:
_endp.flush(_content); _endp.write(_content);
_toFlush=(BufferUtil.hasContent(_content)?1:0);
if (_toFlush==0)
_content=null; _content=null;
break; break;
case 0: case 0:
@ -486,14 +458,21 @@ public class HttpConnection extends SelectableConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public void onInputShutdown() throws IOException public void onInputShutdown()
{ {
// If we don't have a committed response and we are not suspended // If we don't have a committed response and we are not suspended
if (_generator.isIdle() && !_channel.getRequest().getAsyncContinuation().isSuspended()) if (_generator.isIdle() && !_channel.getRequest().getAsyncContinuation().isSuspended())
{ {
// then no more can happen, so close. // then no more can happen, so close.
try
{
_endp.close(); _endp.close();
} }
catch (IOException e)
{
LOG.debug(e);
}
}
// Make idle parser seek EOF // Make idle parser seek EOF
if (_parser.isIdle()) if (_parser.isIdle())
@ -513,7 +492,7 @@ public class HttpConnection extends SelectableConnection
@Override @Override
public long getMaxIdleTime() public long getMaxIdleTime()
{ {
return HttpConnection.this.getMaxIdleTime(); return getEndPoint().getMaxIdleTime();
} }
@Override @Override
@ -661,7 +640,7 @@ public class HttpConnection extends SelectableConnection
try try
{ {
// Wait until we can read // Wait until we can read
blockReadable(); getEndPoint().blockReadable();
// We will need a buffer to read into // We will need a buffer to read into
if (_requestBuffer==null) if (_requestBuffer==null)

View File

@ -20,7 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteArrayEndPoint; import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -107,7 +107,7 @@ public class LocalConnector extends AbstractConnector
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(_requestsBuffer.asArray(), 1024) ByteArrayEndPoint endPoint = new ByteArrayEndPoint(_requestsBuffer.asArray(), 1024)
{ {
@Override @Override
public void setConnection(Connection connection) public void setConnection(AsyncConnection connection)
{ {
if (getConnection()!=null && connection!=getConnection()) if (getConnection()!=null && connection!=getConnection())
connectionUpgraded(getConnection(),connection); connectionUpgraded(getConnection(),connection);
@ -127,8 +127,8 @@ public class LocalConnector extends AbstractConnector
{ {
while (true) while (true)
{ {
final Connection con = endPoint.getConnection(); final AsyncConnection con = endPoint.getConnection();
final Connection next = con.handle(); final AsyncConnection next = con.handle();
if (next!=con) if (next!=con)
{ {
endPoint.setConnection(next); endPoint.setConnection(next);

View File

@ -21,7 +21,7 @@ import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectableEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
@ -350,7 +350,7 @@ public class ConnectHandler extends HandlerWrapper
{ {
} }
private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, AsyncConnection connection) throws IOException
{ {
// Set the new connection as request attribute and change the status to 101 // Set the new connection as request attribute and change the status to 101
// so that Jetty understands that it has to upgrade the connection // so that Jetty understands that it has to upgrade the connection
@ -428,7 +428,7 @@ public class ConnectHandler extends HandlerWrapper
} }
@Override @Override
public Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{ {
ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment; ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment;
proxyToServer.setTimeStamp(System.currentTimeMillis()); proxyToServer.setTimeStamp(System.currentTimeMillis());
@ -450,19 +450,19 @@ public class ConnectHandler extends HandlerWrapper
} }
@Override @Override
protected void endPointClosed(SelectableEndPoint endpoint) protected void endPointClosed(AsyncEndPoint endpoint)
{ {
} }
@Override @Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) protected void endPointUpgraded(ConnectedEndPoint endpoint, AsyncConnection oldConnection)
{ {
} }
} }
public class ProxyToServerConnection implements Connection public class ProxyToServerConnection implements AsyncConnection
{ {
private final CountDownLatch _ready = new CountDownLatch(1); private final CountDownLatch _ready = new CountDownLatch(1);
private final ByteBuffer _buffer = new IndirectNIOBuffer(1024); private final ByteBuffer _buffer = new IndirectNIOBuffer(1024);
@ -487,7 +487,7 @@ public class ConnectHandler extends HandlerWrapper
return builder.append(")").toString(); return builder.append(")").toString();
} }
public Connection handle() throws IOException public AsyncConnection handle() throws IOException
{ {
_logger.debug("{}: begin reading from server", this); _logger.debug("{}: begin reading from server", this);
try try
@ -677,7 +677,7 @@ public class ConnectHandler extends HandlerWrapper
} }
} }
public class ClientToProxyConnection implements Connection public class ClientToProxyConnection implements AsyncConnection
{ {
private final ByteBuffer _buffer = new IndirectNIOBuffer(1024); private final ByteBuffer _buffer = new IndirectNIOBuffer(1024);
private final ConcurrentMap<String, Object> _context; private final ConcurrentMap<String, Object> _context;
@ -704,7 +704,7 @@ public class ConnectHandler extends HandlerWrapper
return builder.append(")").toString(); return builder.append(")").toString();
} }
public Connection handle() throws IOException public AsyncConnection handle() throws IOException
{ {
_logger.debug("{}: begin reading from client", this); _logger.debug("{}: begin reading from client", this);
try try

View File

@ -23,7 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint; import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectableEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SelectorManager;
/** /**
@ -55,13 +55,13 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key) throws IOException
{ {
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, _maxIdleTime, listeners); NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, _maxIdleTime, listeners);
endPoint.setSelectableConnection(selectSet.getManager().newConnection(channel,endPoint, key.attachment())); endPoint.setAsyncConnection(selectSet.getManager().newConnection(channel,endPoint, key.attachment()));
endPoint.notifyOpened(); endPoint.notifyOpened();
return endPoint; return endPoint;
} }
@Override @Override
protected void endPointClosed(SelectableEndPoint endpoint) protected void endPointClosed(AsyncEndPoint endpoint)
{ {
super.endPointClosed(endpoint); super.endPointClosed(endpoint);
((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed(); ((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed();

View File

@ -21,11 +21,11 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectableConnection; import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.SelectableEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnection;
@ -244,18 +244,18 @@ public class SelectChannelConnector extends AbstractNIOConnector
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{ {
SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); endp.setAsyncConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
return endp; return endp;
} }
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
protected void endPointClosed(SelectableEndPoint endpoint) protected void endPointClosed(AsyncEndPoint endpoint)
{ {
connectionClosed(endpoint.getSelectableConnection()); connectionClosed(endpoint.getSelectableConnection());
} }
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
protected SelectableConnection newConnection(SocketChannel channel,final SelectableEndPoint endpoint) protected AbstractAsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint)
{ {
return new HttpConnection(SelectChannelConnector.this,endpoint,getServer()); return new HttpConnection(SelectChannelConnector.this,endpoint,getServer());
} }
@ -289,13 +289,13 @@ public class SelectChannelConnector extends AbstractNIOConnector
} }
@Override @Override
protected void endPointUpgraded(SelectChannelEndPoint endpoint, Connection oldConnection) protected void endPointUpgraded(SelectChannelEndPoint endpoint, AsyncConnection oldConnection)
{ {
connectionUpgraded(oldConnection,endpoint.getSelectableConnection()); connectionUpgraded(oldConnection,endpoint.getSelectableConnection());
} }
@Override @Override
public SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment) public AbstractAsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment)
{ {
return SelectChannelConnector.this.newConnection(channel,endpoint); return SelectChannelConnector.this.newConnection(channel,endpoint);
} }

View File

@ -542,13 +542,13 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
@Override @Override
protected Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint) protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
{ {
try try
{ {
SSLEngine engine = createSSLEngine(channel); SSLEngine engine = createSSLEngine(channel);
SslConnection connection = newSslConnection(endpoint, engine); SslConnection connection = newSslConnection(endpoint, engine);
Connection delegate = newPlainConnection(channel, connection.getAppEndPoint()); AsyncConnection delegate = newPlainConnection(channel, connection.getAppEndPoint());
connection.getAppEndPoint().setConnection(delegate); connection.getAppEndPoint().setConnection(delegate);
connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate()); connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate());
return connection; return connection;
@ -559,7 +559,7 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
} }
} }
protected Connection newPlainConnection(SocketChannel channel, AsyncEndPoint endPoint) protected AsyncConnection newPlainConnection(SocketChannel channel, AsyncEndPoint endPoint)
{ {
return super.newConnection(channel, endPoint); return super.newConnection(channel, endPoint);
} }

View File

@ -59,7 +59,7 @@ public class AbstractConnectorTest
_server = new Server(); _server = new Server();
_connector = new SelectChannelConnector() _connector = new SelectChannelConnector()
{ {
public void connectionClosed(Connection connection) public void connectionClosed(AsyncConnection connection)
{ {
super.connectionClosed(connection); super.connectionClosed(connection);
_closed.countDown(); _closed.countDown();

View File

@ -76,7 +76,7 @@ public class HttpWriterTest
AbstractHttpConnection connection = new AbstractHttpConnection(null,endp,new Server(),null,generator,null) AbstractHttpConnection connection = new AbstractHttpConnection(null,endp,new Server(),null,generator,null)
{ {
@Override @Override
public Connection handle() throws IOException public AsyncConnection handle() throws IOException
{ {
return null; return null;
} }
@ -169,7 +169,7 @@ public class HttpWriterTest
AbstractHttpConnection connection = new AbstractHttpConnection(null,endp,new Server(),null,hb,null) AbstractHttpConnection connection = new AbstractHttpConnection(null,endp,new Server(),null,hb,null)
{ {
@Override @Override
public Connection handle() throws IOException public AsyncConnection handle() throws IOException
{ {
return null; return null;
} }

View File

@ -622,7 +622,7 @@ public class ResponseTest
} }
@Override @Override
public Connection handle() throws IOException public AsyncConnection handle() throws IOException
{ {
return this; return this;
} }

View File

@ -6,14 +6,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectableEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.junit.Test; import org.junit.Test;
public class SelectChannelAsyncContextTest extends LocalAsyncContextTest public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
{ {
volatile SelectableEndPoint _endp; volatile AsyncEndPoint _endp;
@Override @Override
protected Connector initConnector() protected Connector initConnector()
@ -24,7 +24,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
public void customize(EndPoint endpoint, Request request) throws IOException public void customize(EndPoint endpoint, Request request) throws IOException
{ {
super.customize(endpoint,request); super.customize(endpoint,request);
_endp=(SelectableEndPoint)endpoint; _endp=(AsyncEndPoint)endpoint;
} }
}; };
@ -54,7 +54,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
try try
{ {
TimeUnit.MILLISECONDS.sleep(200); TimeUnit.MILLISECONDS.sleep(200);
SelectableEndPoint endp=_endp; AsyncEndPoint endp=_endp;
if (endp!=null && endp.isOpen()) if (endp!=null && endp.isOpen())
endp.asyncDispatch(); endp.asyncDispatch();
} }

View File

@ -271,7 +271,7 @@ public class BufferUtil
* @param to Buffer to put bytes to in flush mode. The buffer is flipToFill before the put and flipToFlush after. * @param to Buffer to put bytes to in flush mode. The buffer is flipToFill before the put and flipToFlush after.
* @return number of bytes moved * @return number of bytes moved
*/ */
public static int flipPutFlip(ByteBuffer from, ByteBuffer to) public static int append(ByteBuffer from, ByteBuffer to)
{ {
int pos= flipToFill(to); int pos= flipToFill(to);
try try
@ -636,6 +636,9 @@ public class BufferUtil
return "null"; return "null";
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder();
buf.append(buffer.getClass().getSimpleName());
buf.append("@");
buf.append(Integer.toHexString(buffer.hashCode()));
buf.append("[p="); buf.append("[p=");
buf.append(buffer.position()); buf.append(buffer.position());
buf.append(",l="); buf.append(",l=");