jetty-9 SSL partially working

This commit is contained in:
Greg Wilkins 2012-04-04 13:47:46 +10:00
parent feca0aa672
commit dfab993fcb
28 changed files with 906 additions and 651 deletions

View File

@ -1,70 +0,0 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final long _timeStamp;
protected final EndPoint _endp;
public AbstractConnection(EndPoint endp)
{
_endp=endp;
_timeStamp = System.currentTimeMillis();
}
public AbstractConnection(EndPoint endp,long timestamp)
{
_endp=endp;
_timeStamp = timestamp;
}
@Override
public EndPoint getEndPoint()
{
return _endp;
}
public long getTimeStamp()
{
return _timeStamp;
}
public void onIdleExpired(long idleForMs)
{
try
{
LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
if (_endp.isInputShutdown() || _endp.isOutputShutdown())
_endp.close();
else
_endp.shutdownOutput();
}
catch(IOException e)
{
LOG.ignore(e);
try
{
_endp.close();
}
catch(IOException e2)
{
LOG.ignore(e2);
}
}
}
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

View File

@ -0,0 +1,226 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractSelectableConnection implements SelectableConnection
{
private static final Logger LOG = Log.getLogger(AbstractSelectableConnection.class);
protected final SelectableEndPoint _endp;
private final long _createdTimeStamp;
private final Lock _lock=new ReentrantLock();
private final Condition _readable=_lock.newCondition();
private final Condition _writeable=_lock.newCondition();
private boolean _readBlocked;
private boolean _writeBlocked;
private final Runnable _reader=new Runnable()
{
@Override
public void run()
{
try
{
doRead();
}
catch(Throwable th)
{
LOG.warn(th);
}
}
};
private final Runnable _writer=new Runnable()
{
@Override
public void run()
{
try
{
doWrite();
}
catch(Throwable th)
{
LOG.warn(th);
}
}
};
private volatile int _maxIdleTime=-1;
public AbstractSelectableConnection(SelectableEndPoint endp)
{
_endp=endp;
_createdTimeStamp = System.currentTimeMillis();
}
@Override
public EndPoint getEndPoint()
{
return _endp;
}
@Override
public SelectableEndPoint getSelectableEndPoint()
{
return _endp;
}
@Override
public long getCreatedTimeStamp()
{
return _createdTimeStamp;
}
@Override
public Runnable onReadable()
{
_lock.lock();
try
{
if (_readBlocked)
_readable.signalAll();
else
return _reader;
}
finally
{
_lock.unlock();
}
return null;
}
@Override
public Runnable onWriteable()
{
_lock.lock();
try
{
if (_writeBlocked)
_writeable.signalAll();
else
return _writer;
}
finally
{
_lock.unlock();
}
return null;
}
@Override
public boolean blockReadable()
{
_lock.lock();
boolean readable=false;
try
{
if (_readBlocked)
throw new IllegalStateException();
_readBlocked=true;
_endp.setReadInterested(true);
readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
finally
{
if (!readable)
_endp.setReadInterested(false);
_readBlocked=false;
_lock.unlock();
}
return readable;
}
@Override
public boolean blockWriteable()
{
_lock.lock();
boolean writeable=false;
try
{
if (_writeBlocked)
throw new IllegalStateException();
_writeBlocked=true;
_endp.setWriteInterested(true);
writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
finally
{
if (!writeable)
_endp.setWriteInterested(false);
_writeBlocked=false;
_lock.unlock();
}
return writeable;
}
@Override
public void onIdleExpired(long idleForMs)
{
try
{
LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
if (_endp.isInputShutdown() || _endp.isOutputShutdown())
_endp.close();
else
_endp.shutdownOutput();
}
catch(IOException e)
{
LOG.ignore(e);
try
{
_endp.close();
}
catch(IOException e2)
{
LOG.ignore(e2);
}
}
}
protected void doRead()
{
throw new IllegalStateException();
}
protected void doWrite()
{
throw new IllegalStateException();
}
@Override
public int getMaxIdleTime()
{
int max=_maxIdleTime;
return (max==-1)?_endp.getMaxIdleTime():max;
}
public void setMaxIdleTime(int max)
{
_maxIdleTime=max;
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

View File

@ -17,7 +17,6 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -36,7 +35,6 @@ public class ByteArrayEndPoint implements EndPoint
protected boolean _growOutput; protected boolean _growOutput;
protected int _maxIdleTime; protected int _maxIdleTime;
protected Connection _connection; protected Connection _connection;
private boolean _idleCheck;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -97,6 +95,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.io.EndPoint#isOpen() * @see org.eclipse.io.EndPoint#isOpen()
*/ */
@Override
public boolean isOpen() public boolean isOpen()
{ {
return !_closed; return !_closed;
@ -106,6 +105,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.jetty.io.EndPoint#isInputShutdown() * @see org.eclipse.jetty.io.EndPoint#isInputShutdown()
*/ */
@Override
public boolean isInputShutdown() public boolean isInputShutdown()
{ {
return _closed; return _closed;
@ -115,6 +115,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.jetty.io.EndPoint#isOutputShutdown() * @see org.eclipse.jetty.io.EndPoint#isOutputShutdown()
*/ */
@Override
public boolean isOutputShutdown() public boolean isOutputShutdown()
{ {
return _closed; return _closed;
@ -124,6 +125,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.io.EndPoint#shutdownOutput() * @see org.eclipse.io.EndPoint#shutdownOutput()
*/ */
@Override
public void shutdownOutput() throws IOException public void shutdownOutput() throws IOException
{ {
close(); close();
@ -133,6 +135,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.io.EndPoint#shutdownInput() * @see org.eclipse.io.EndPoint#shutdownInput()
*/ */
@Override
public void shutdownInput() throws IOException public void shutdownInput() throws IOException
{ {
close(); close();
@ -142,6 +145,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.io.EndPoint#close() * @see org.eclipse.io.EndPoint#close()
*/ */
@Override
public void close() throws IOException public void close() throws IOException
{ {
_closed=true; _closed=true;
@ -151,6 +155,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer) * @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer)
*/ */
@Override
public int fill(ByteBuffer buffer) throws IOException public int fill(ByteBuffer buffer) throws IOException
{ {
if (_closed) if (_closed)
@ -165,6 +170,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer) * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/ */
@Override
public int flush(ByteBuffer... buffers) throws IOException public int flush(ByteBuffer... buffers) throws IOException
{ {
if (_closed) if (_closed)
@ -230,6 +236,7 @@ public class ByteArrayEndPoint implements EndPoint
/* /*
* @see org.eclipse.io.EndPoint#getConnection() * @see org.eclipse.io.EndPoint#getConnection()
*/ */
@Override
public Object getTransport() public Object getTransport()
{ {
return _inBytes; return _inBytes;
@ -257,6 +264,7 @@ public class ByteArrayEndPoint implements EndPoint
/** /**
* @see org.eclipse.jetty.io.EndPoint#getMaxIdleTime() * @see org.eclipse.jetty.io.EndPoint#getMaxIdleTime()
*/ */
@Override
public int getMaxIdleTime() public int getMaxIdleTime()
{ {
return _maxIdleTime; return _maxIdleTime;
@ -266,39 +274,12 @@ public class ByteArrayEndPoint implements EndPoint
/** /**
* @see org.eclipse.jetty.io.EndPoint#setMaxIdleTime(int) * @see org.eclipse.jetty.io.EndPoint#setMaxIdleTime(int)
*/ */
@Override
public void setMaxIdleTime(int timeMs) throws IOException public void setMaxIdleTime(int timeMs) throws IOException
{ {
_maxIdleTime=timeMs; _maxIdleTime=timeMs;
} }
@Override
public Connection getConnection()
{
return _connection;
}
@Override
public void setConnection(Connection connection)
{
_connection=connection;
}
@Override
public void onIdleExpired(long idleForMs)
{
}
@Override
public void setCheckForIdle(boolean check)
{
_idleCheck=check;
}
@Override
public boolean isCheckForIdle()
{
return _idleCheck;
}
} }

View File

@ -11,7 +11,7 @@
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -23,7 +23,7 @@ import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -43,9 +43,6 @@ public class ChannelEndPoint implements EndPoint
private volatile int _maxIdleTime; private volatile int _maxIdleTime;
private volatile boolean _ishut; private volatile boolean _ishut;
private volatile boolean _oshut; private volatile boolean _oshut;
private Connection _connection;
private boolean _idleCheck;
public ChannelEndPoint(ByteChannel channel) throws IOException public ChannelEndPoint(ByteChannel channel) throws IOException
{ {
@ -90,6 +87,7 @@ public class ChannelEndPoint implements EndPoint
/* /*
* @see org.eclipse.io.EndPoint#isOpen() * @see org.eclipse.io.EndPoint#isOpen()
*/ */
@Override
public boolean isOpen() public boolean isOpen()
{ {
return _channel.isOpen(); return _channel.isOpen();
@ -133,6 +131,7 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc) /* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close() * @see org.eclipse.io.EndPoint#close()
*/ */
@Override
public void shutdownInput() throws IOException public void shutdownInput() throws IOException
{ {
shutdownChannelInput(); shutdownChannelInput();
@ -172,16 +171,19 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc) /* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close() * @see org.eclipse.io.EndPoint#close()
*/ */
@Override
public void shutdownOutput() throws IOException public void shutdownOutput() throws IOException
{ {
shutdownChannelOutput(); shutdownChannelOutput();
} }
@Override
public boolean isOutputShutdown() public boolean isOutputShutdown()
{ {
return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown(); return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown();
} }
@Override
public boolean isInputShutdown() public boolean isInputShutdown()
{ {
return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown(); return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown();
@ -190,6 +192,7 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc) /* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close() * @see org.eclipse.io.EndPoint#close()
*/ */
@Override
public void close() throws IOException public void close() throws IOException
{ {
LOG.debug("close {}",this); LOG.debug("close {}",this);
@ -199,17 +202,15 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc) /* (non-Javadoc)
* @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer) * @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer)
*/ */
@Override
public int fill(ByteBuffer buffer) throws IOException public int fill(ByteBuffer buffer) throws IOException
{ {
if (_ishut) if (_ishut)
return -1; return -1;
int pos=buffer.position(); int pos=BufferUtil.flipToFill(buffer);
try try
{ {
buffer.position(buffer.limit());
buffer.limit(buffer.capacity());
int filled = _channel.read(buffer); int filled = _channel.read(buffer);
if (filled==-1) if (filled==-1)
@ -217,23 +218,29 @@ public class ChannelEndPoint implements EndPoint
return filled; return filled;
} }
catch(IOException e)
{
LOG.debug(e);
shutdownInput();
return -1;
}
finally finally
{ {
buffer.limit(buffer.position()); BufferUtil.flipToFlush(buffer,pos);
buffer.position(pos);
} }
} }
/* (non-Javadoc) /* (non-Javadoc)
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer) * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/ */
@Override
public int flush(ByteBuffer... buffers) throws IOException public int flush(ByteBuffer... buffers) throws IOException
{ {
int len=0; int len=0;
if (_channel instanceof GatheringByteChannel) if (buffers.length==1)
{ len=_channel.write(buffers[0]);
len= (int)((GatheringByteChannel)_channel).write(buffers,0,2); else if (buffers.length>1 && _channel instanceof GatheringByteChannel)
} len= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length);
else else
{ {
for (ByteBuffer b : buffers) for (ByteBuffer b : buffers)
@ -275,6 +282,7 @@ public class ChannelEndPoint implements EndPoint
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public Object getTransport() public Object getTransport()
{ {
return _channel; return _channel;
@ -287,6 +295,7 @@ public class ChannelEndPoint implements EndPoint
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public int getMaxIdleTime() public int getMaxIdleTime()
{ {
return _maxIdleTime; return _maxIdleTime;
@ -296,6 +305,7 @@ public class ChannelEndPoint implements EndPoint
/** /**
* @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int) * @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int)
*/ */
@Override
public void setMaxIdleTime(int timeMs) throws IOException public void setMaxIdleTime(int timeMs) throws IOException
{ {
//if (_socket!=null && timeMs!=_maxIdleTime) //if (_socket!=null && timeMs!=_maxIdleTime)
@ -303,34 +313,4 @@ public class ChannelEndPoint implements EndPoint
_maxIdleTime=timeMs; _maxIdleTime=timeMs;
} }
@Override
public Connection getConnection()
{
return _connection;
}
@Override
public void setConnection(Connection connection)
{
_connection=connection;
}
@Override
public void onIdleExpired(long idleForMs)
{
}
@Override
public void setCheckForIdle(boolean check)
{
_idleCheck=check;
}
@Override
public boolean isCheckForIdle()
{
return _idleCheck;
}
} }

View File

@ -11,42 +11,21 @@
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import java.io.IOException;
import org.eclipse.jetty.io.EndPoint;
public interface Connection public interface Connection
{ {
EndPoint getEndPoint(); EndPoint getEndPoint();
void canRead();
void canWrite();
boolean isReadInterested();
boolean isWriteInterested();
void onInputShutdown() throws IOException;
/**
* Called when the connection is closed
*/
void onClose();
/**
* Called when the connection idle timeout expires
* @param idleForMs TODO
*/
void onIdleExpired(long idleForMs);
int getMaxIdleTime();
/** /**
* @return the timestamp at which the connection was created * @return the timestamp at which the connection was created
*/ */
long getTimeStamp(); long getCreatedTimeStamp();
boolean isIdle(); boolean isIdle();
} }

View File

@ -17,7 +17,6 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.io.nio.Connection;
/** /**
@ -113,28 +112,5 @@ public interface EndPoint
void setMaxIdleTime(int timeMs) throws IOException; void setMaxIdleTime(int timeMs) throws IOException;
/* ------------------------------------------------------------ */
Connection getConnection();
/* ------------------------------------------------------------ */
void setConnection(Connection connection);
/* ------------------------------------------------------------ */
/** Callback when idle.
* <p>An endpoint is idle if there has been no IO activity for
* {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true.
* @param idleForMs TODO
*/
public void onIdleExpired(long idleForMs);
/* ------------------------------------------------------------ */
/** Set if the endpoint should be checked for idleness
*/
public void setCheckForIdle(boolean check);
/* ------------------------------------------------------------ */
/** Get if the endpoint should be checked for idleness
*/
public boolean isCheckForIdle();
} }

View File

@ -11,7 +11,7 @@
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -19,7 +19,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.List; import java.util.List;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;

View File

@ -11,17 +11,17 @@
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task; import org.eclipse.jetty.util.thread.Timeout.Task;
@ -30,22 +30,29 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
/** /**
* An Endpoint that can be scheduled by {@link SelectorManager}. * An Endpoint that can be scheduled by {@link SelectorManager}.
*/ */
public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint public class SelectChannelEndPoint extends ChannelEndPoint implements SelectableEndPoint
{ {
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
private final Lock _lock = new ReentrantLock();
private final SelectorManager.SelectSet _selectSet; private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager; private final SelectorManager _manager;
private SelectionKey _key; private SelectionKey _key;
private boolean _selected;
private boolean _changing;
/** The desired value for {@link SelectionKey#interestOps()} */ /** The desired value for {@link SelectionKey#interestOps()} */
private int _interestOps; private int _interestOps;
private boolean _ishutCalled;
/** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
private boolean _open; private boolean _open;
private volatile long _idleTimestamp; private volatile boolean _idlecheck;
private volatile Connection _connection; private volatile long _lastNotIdleTimestamp;
private volatile SelectableConnection _connection;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
@ -78,18 +85,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void setConnection(Connection connection) public void setSelectableConnection(SelectableConnection connection)
{ {
Connection old=getConnection(); Connection old=getSelectableConnection();
_connection=connection; _connection=connection;
if (old!=null && old!=connection) if (old!=null && old!=connection)
_manager.endPointUpgraded(this,(Connection)old); _manager.endPointUpgraded(this,old);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public long getIdleTimestamp() @Override
public long getLastNotIdleTimestamp()
{ {
return _idleTimestamp; return _lastNotIdleTimestamp;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -98,9 +106,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
*/ */
public void selected() public void selected()
{ {
final boolean can_read; _lock.lock();
final boolean can_write; _selected=true;
synchronized (this) try
{ {
// If there is no key, then do nothing // If there is no key, then do nothing
if (_key == null || !_key.isValid()) if (_key == null || !_key.isValid())
@ -109,17 +117,101 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
return; return;
} }
can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0); boolean can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0);
can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0); boolean can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0);
_interestOps=0; _interestOps=0;
_key.interestOps(0);
}
if (can_read) if (can_read)
getConnection().canRead(); {
if (can_write) Runnable task=getSelectableConnection().onReadable();
getConnection().canWrite(); if (task!=null)
_manager.dispatch(task);
} }
if (can_write)
{
Runnable task=getSelectableConnection().onWriteable();
if (task!=null)
_manager.dispatch(task);
}
if (isInputShutdown() && !_ishutCalled)
{
_ishutCalled=true;
getSelectableConnection().onInputShutdown();
}
}
finally
{
doUpdateKey();
_selected=false;
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public boolean isReadInterested()
{
_lock.lock();
try
{
return (_interestOps&SelectionKey.OP_READ)!=0;
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public void setReadInterested(boolean interested)
{
_lock.lock();
try
{
_interestOps=interested?(_interestOps|SelectionKey.OP_READ):(_interestOps&~SelectionKey.OP_READ);
if (!_selected)
updateKey();
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public boolean isWriteInterested()
{
_lock.lock();
try
{
return (_interestOps&SelectionKey.OP_READ)!=0;
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public void setWriteInterested(boolean interested)
{
_lock.lock();
try
{
_interestOps=interested?(_interestOps|SelectionKey.OP_WRITE):(_interestOps&~SelectionKey.OP_WRITE);
if (!_selected)
updateKey();
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void cancelTimeout(Task task) public void cancelTimeout(Task task)
@ -134,28 +226,31 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public void setCheckForIdle(boolean check) public void setCheckForIdle(boolean check)
{ {
_idleTimestamp=check?System.currentTimeMillis():0; _idlecheck=true;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public boolean isCheckForIdle() public boolean isCheckForIdle()
{ {
return _idleTimestamp!=0; return _idlecheck;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void notIdle() protected void notIdle()
{ {
if (_idleTimestamp!=0) _lastNotIdleTimestamp=System.currentTimeMillis();
_idleTimestamp=System.currentTimeMillis();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void checkIdleTimestamp(long now) public void checkForIdle(long now)
{ {
long idleTimestamp=_idleTimestamp; if (_idlecheck)
{
long idleTimestamp=_lastNotIdleTimestamp;
long max_idle_time=getMaxIdleTime(); long max_idle_time=getMaxIdleTime();
if (idleTimestamp!=0 && max_idle_time>0) if (idleTimestamp!=0 && max_idle_time>0)
@ -165,15 +260,17 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
if (idleForMs>max_idle_time) if (idleForMs>max_idle_time)
{ {
onIdleExpired(idleForMs); onIdleExpired(idleForMs);
_idleTimestamp=now; _lastNotIdleTimestamp=now;
}
} }
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public void onIdleExpired(long idleForMs) public void onIdleExpired(long idleForMs)
{ {
getConnection().onIdleExpired(idleForMs); getSelectableConnection().onIdleExpired(idleForMs);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -199,23 +296,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Updates selection key. Adds operations types to the selection key as needed. No operations * Updates selection key. This method schedules a call to doUpdateKey to do the keyChange
* are removed as this is only done during dispatch. This method records the new key and
* schedules a call to doUpdateKey to do the keyChange
*/ */
public void updateKey() private void updateKey()
{
final boolean changed;
synchronized (this)
{ {
int current_ops=-1; int current_ops=-1;
if (getChannel().isOpen()) if (getChannel().isOpen())
{ {
Socket socket = getSocket();
boolean read_interest = getConnection().isReadInterested() && !socket.isInputShutdown();
boolean write_interest= getConnection().isWriteInterested() && !socket.isOutputShutdown();
_interestOps = (read_interest?SelectionKey.OP_READ:0)|(write_interest?SelectionKey.OP_WRITE:0);
try try
{ {
current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
@ -226,11 +313,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
LOG.ignore(e); LOG.ignore(e);
} }
} }
changed=_interestOps!=current_ops; if (_interestOps!=current_ops && !_changing)
}
if(changed)
{ {
_changing=true;
_selectSet.addChange(this); _selectSet.addChange(this);
_selectSet.wakeup(); _selectSet.wakeup();
} }
@ -243,8 +328,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
*/ */
void doUpdateKey() void doUpdateKey()
{ {
synchronized (this) _lock.lock();
try
{ {
_changing=false;
if (getChannel().isOpen()) if (getChannel().isOpen())
{ {
if (_interestOps>0) if (_interestOps>0)
@ -305,6 +392,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
_key = null; _key = null;
} }
} }
finally
{
_lock.unlock();
}
} }
@ -367,7 +458,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
isOutputShutdown(), isOutputShutdown(),
_interestOps, _interestOps,
keyString, keyString,
getConnection()); getSelectableConnection());
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -378,9 +469,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public Connection getConnection() public SelectableConnection getSelectableConnection()
{ {
return _connection; return _connection;
} }
} }

View File

@ -0,0 +1,28 @@
package org.eclipse.jetty.io;
public interface SelectableConnection extends Connection
{
SelectableEndPoint getSelectableEndPoint();
Runnable onReadable();
Runnable onWriteable();
public boolean blockReadable();
public boolean blockWriteable();
/**
* Called when the connection idle timeout expires
* @param idleForMs TODO
*/
void onIdleExpired(long idleForMs);
void onInputShutdown();
/**
* Called when the connection is closed
*/
void onClose();
}

View File

@ -0,0 +1,38 @@
package org.eclipse.jetty.io;
public interface SelectableEndPoint extends EndPoint
{
public abstract void setWriteInterested(boolean interested);
public abstract boolean isWriteInterested();
public abstract void setReadInterested(boolean interested);
public abstract boolean isReadInterested();
/* ------------------------------------------------------------ */
SelectableConnection getSelectableConnection();
/* ------------------------------------------------------------ */
/** Callback when idle.
* <p>An endpoint is idle if there has been no IO activity for
* {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true.
* @param idleForMs TODO
*/
public void onIdleExpired(long idleForMs);
/* ------------------------------------------------------------ */
/** Set if the endpoint should be checked for idleness
*/
public void setCheckForIdle(boolean check);
/* ------------------------------------------------------------ */
/** Get if the endpoint should be checked for idleness
*/
public boolean isCheckForIdle();
public long getLastNotIdleTimestamp();
public void checkForIdle(long now);
}

View File

@ -11,7 +11,7 @@
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
@ -338,10 +337,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
protected abstract void endPointOpened(SelectChannelEndPoint endpoint); protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected abstract void endPointUpgraded(EndPoint endpoint,Connection oldConnection); protected abstract void endPointUpgraded(SelectChannelEndPoint endpoint,Connection oldConnection);
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment); public abstract SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment);
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -472,7 +471,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
key = channel.register(selector,SelectionKey.OP_READ,att); key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint); key.attach(endpoint);
endpoint.selected();
} }
else if (channel.isOpen()) else if (channel.isOpen())
{ {
@ -487,7 +485,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
key = channel.register(selector,SelectionKey.OP_READ,null); key = channel.register(selector,SelectionKey.OP_READ,null);
SelectChannelEndPoint endpoint = createEndPoint(channel,key); SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint); key.attach(endpoint);
endpoint.selected();
} }
else if (change instanceof ChangeTask) else if (change instanceof ChangeTask)
{ {
@ -703,7 +700,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{ {
for (SelectChannelEndPoint endp:_endPoints.keySet()) for (SelectChannelEndPoint endp:_endPoints.keySet())
{ {
endp.checkIdleTimestamp(idle_now); endp.checkForIdle(idle_now);
} }
} }
public String toString() {return "Idle-"+super.toString();} public String toString() {return "Idle-"+super.toString();}
@ -842,6 +839,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{ {
LOG.debug("destroyEndPoint {}",endp); LOG.debug("destroyEndPoint {}",endp);
_endPoints.remove(endp); _endPoints.remove(endp);
SelectableConnection connection=endp.getSelectableConnection();
if (connection!=null)
connection.onClose();
endPointClosed(endp); endPointClosed(endp);
} }

View File

@ -11,11 +11,12 @@
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
@ -24,8 +25,6 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException; import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -35,27 +34,27 @@ import org.eclipse.jetty.util.log.Logger;
* An AysyncConnection that acts as an interceptor between and EndPoint and another * An AysyncConnection that acts as an interceptor between and EndPoint and another
* Connection, that implements TLS encryption using an {@link SSLEngine}. * Connection, that implements TLS encryption using an {@link SSLEngine}.
* <p> * <p>
* The connector uses an {@link AsyncEndPoint} (like {@link SelectChannelEndPoint}) as * The connector uses an {@link EndPoint} (like {@link SelectChannelEndPoint}) as
* it's source/sink of encrypted data. It then provides {@link #getSslEndPoint()} to * it's source/sink of encrypted data. It then provides {@link #getAppEndPoint()} to
* expose a source/sink of unencrypted data to another connection (eg HttpConnection). * expose a source/sink of unencrypted data to another connection (eg HttpConnection).
*/ */
public class SslConnection extends AbstractConnection public class SslConnection extends AbstractSelectableConnection
{ {
private final Logger _logger = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); private static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0); private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0);
private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>(); private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
private final SSLEngine _engine; private final SSLEngine _engine;
private final SSLSession _session; private final SSLSession _session;
private Connection _connection; private SelectableConnection _appConnection;
private final SslEndPoint _sslEndPoint; private final AppEndPoint _appEndPoint;
private int _allocations; private int _allocations;
private SslBuffers _buffers; private SslBuffers _buffers;
private ByteBuffer _inbound; private ByteBuffer _inNet;
private ByteBuffer _unwrapBuf; private ByteBuffer _inApp;
private ByteBuffer _outbound; private ByteBuffer _outNet;
private AsyncEndPoint _aEndp; private SelectableEndPoint _endp;
private boolean _allowRenegotiate=true; private boolean _allowRenegotiate=true;
private boolean _handshook; private boolean _handshook;
private boolean _ishut; private boolean _ishut;
@ -67,44 +66,46 @@ public class SslConnection extends AbstractConnection
*/ */
private static class SslBuffers private static class SslBuffers
{ {
final ByteBuffer _in; final ByteBuffer _inNet;
final ByteBuffer _out; final ByteBuffer _outNet;
final ByteBuffer _unwrap; final ByteBuffer _inApp;
SslBuffers(int packetSize, int appSize) SslBuffers(int packetSize, int appSize)
{ {
_in=BufferUtil.allocateDirect(packetSize); _inNet=BufferUtil.allocateDirect(packetSize);
_out=BufferUtil.allocateDirect(packetSize); _outNet=BufferUtil.allocateDirect(packetSize);
_unwrap=BufferUtil.allocate(appSize); _inApp=BufferUtil.allocate(appSize);
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SslConnection(SSLEngine engine,AsyncEndPoint endp) public SslConnection(SSLEngine engine,SelectableEndPoint endp)
{ {
this(engine,endp,System.currentTimeMillis()); this(engine,endp,System.currentTimeMillis());
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SslConnection(SSLEngine engine,AsyncEndPoint endp, long timeStamp) public SslConnection(SSLEngine engine,SelectableEndPoint endp, long timeStamp)
{ {
super(endp,timeStamp); super(endp);
_engine=engine; _engine=engine;
_session=_engine.getSession(); _session=_engine.getSession();
_aEndp=(AsyncEndPoint)endp; _endp=endp;
_sslEndPoint = newSslEndPoint(); _appEndPoint = newAppEndPoint();
}
/* ------------------------------------------------------------ */
public void setAppConnection(SelectableConnection connection)
{
_appConnection=connection;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected SslEndPoint newSslEndPoint() protected AppEndPoint newAppEndPoint()
{ {
return new SslEndPoint(); return new AppEndPoint();
}
/* ------------------------------------------------------------ */
public EndPoint getEndPoint()
{
return _aEndp;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -143,10 +144,10 @@ public class SslConnection extends AbstractConnection
{ {
_buffers=__buffers.get(); _buffers=__buffers.get();
if (_buffers==null) if (_buffers==null)
_buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2); _buffers=new SslBuffers(_session.getPacketBufferSize(),_session.getApplicationBufferSize());
_inbound=_buffers._in; _inNet=_buffers._inNet;
_outbound=_buffers._out; _outNet=_buffers._outNet;
_unwrapBuf=_buffers._unwrap; _inApp=_buffers._inApp;
__buffers.set(null); __buffers.set(null);
} }
} }
@ -161,16 +162,16 @@ public class SslConnection extends AbstractConnection
if (--_allocations==0) if (--_allocations==0)
{ {
if (_buffers!=null && if (_buffers!=null &&
_inbound.remaining()==0 && _inNet.remaining()==0 &&
_outbound.remaining()==0 && _outNet.remaining()==0 &&
_unwrapBuf.remaining()==0) _inApp.remaining()==0)
{ {
_inbound=null; _inNet=null;
_outbound=null; _outNet=null;
_unwrapBuf=null; _inApp=null;
_buffers._in.clear().limit(0); _buffers._inNet.clear().limit(0);
_buffers._out.clear().limit(0); _buffers._outNet.clear().limit(0);
_buffers._unwrap.clear().limit(0); _buffers._inApp.clear().limit(0);
__buffers.set(_buffers); __buffers.set(_buffers);
_buffers=null; _buffers=null;
@ -178,75 +179,18 @@ public class SslConnection extends AbstractConnection
} }
} }
} }
/* ------------------------------------------------------------ */
public void canRead() throws IOException
{
try
{
allocateBuffers();
boolean progress=true;
while (progress)
{
progress=false;
// If we are handshook let the delegate connection
if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
progress=process(null,null);
// handle the delegate connection
_connection.canRead();
_logger.debug("{} handle {} progress={}", _session, this, progress);
}
}
finally
{
releaseBuffers();
if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen())
{
_ishut=true;
try
{
_connection.onInputShutdown();
}
catch(Throwable x)
{
_logger.warn("onInputShutdown failed", x);
try{_sslEndPoint.close();}
catch(IOException e2){
_logger.ignore(e2);}
}
}
}
}
/* ------------------------------------------------------------ */
public void canWrite() throws IOException
{
// TODO
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public boolean isIdle() public boolean isIdle()
{ {
return _connection.isIdle(); return _appConnection.isIdle();
}
/* ------------------------------------------------------------ */
public boolean isReadInterested()
{
return _connection.isReadInterested();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public void onClose() public void onClose()
{ {
_connection.onClose(); _appConnection.onClose();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -255,71 +199,98 @@ public class SslConnection extends AbstractConnection
{ {
try try
{ {
_logger.debug("onIdleExpired {}ms on {}",idleForMs,this); LOG.debug("onIdleExpired {}ms on {}",idleForMs,this);
if (_endp.isOutputShutdown()) if (_endp.isOutputShutdown())
_sslEndPoint.close(); _appEndPoint.close();
else else
_sslEndPoint.shutdownOutput(); _appEndPoint.shutdownOutput();
} }
catch (IOException e) catch (IOException e)
{ {
_logger.warn(e); LOG.warn(e);
super.onIdleExpired(idleForMs); super.onIdleExpired(idleForMs);
} }
} }
/* ------------------------------------------------------------ */
public void onInputShutdown() throws IOException
{
/* ------------------------------------------------------------ */
@Override
public void doRead()
{
try
{
allocateBuffers();
boolean progress=true;
while(progress)
{
progress=false;
// Fill the input buffer with everything available
if (!BufferUtil.isFull(_inNet))
progress|=_endp.fill(_inNet)>0;
progress|=process(null);
if (BufferUtil.hasContent(_inApp) && _appEndPoint.isReadInterested())
{
progress=true;
Runnable task =_appConnection.onReadable();
if (task!=null)
task.run();
}
}
}
catch(IOException e)
{
LOG.warn(e);
}
finally
{
releaseBuffers();
_endp.setReadInterested(_appEndPoint.isReadInterested());
_endp.setWriteInterested(BufferUtil.hasContent(_outNet));
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private synchronized boolean process(ByteBuffer toFill, ByteBuffer toFlush) throws IOException @Override
public void doWrite()
{
try
{
while (BufferUtil.hasContent(_outNet))
{
int written = _endp.flush(_outNet);
if (written>0 && _appEndPoint.isWriteInterested())
{
Runnable task =_appConnection.onWriteable();
if (task!=null)
task.run();
}
}
}
catch(IOException e)
{
LOG.warn(e);
}
finally
{
if (BufferUtil.hasContent(_outNet))
_endp.setWriteInterested(true);
}
}
/* ------------------------------------------------------------ */
private synchronized boolean process(ByteBuffer appOut) throws IOException
{ {
boolean some_progress=false; boolean some_progress=false;
try try
{ {
// We need buffers to progress
allocateBuffers();
// if we don't have a buffer to put received data into
if (toFill==null)
{
// use the unwrapbuffer to hold received data.
_unwrapBuf.compact().flip();
toFill=_unwrapBuf;
}
// Else if the fill buffer is too small for the SSL session
else if (toFill.capacity()<_session.getApplicationBufferSize())
{
// fill to the temporary unwrapBuffer
boolean progress=process(null,toFlush);
// if we received any data,
if (BufferUtil.hasContent(_unwrapBuf))
{
// transfer from temp buffer to fill buffer
BufferUtil.flipPutFlip(_unwrapBuf,toFill);
return true;
}
else
// return progress from recursive call
return progress;
}
// Else if there is some temporary data
else if (BufferUtil.hasContent(_unwrapBuf))
{
// transfer from temp buffer to fill buffer
BufferUtil.flipPutFlip(_unwrapBuf,toFill);
return true;
}
// If we are here, we have a buffer ready into which we can put some read data.
// If we have no data to flush, flush the empty buffer // If we have no data to flush, flush the empty buffer
if (toFlush==null) if (appOut==null)
toFlush=__ZERO_BUFFER; appOut=__ZERO_BUFFER;
// While we are making progress processing SSL engine // While we are making progress processing SSL engine
boolean progress=true; boolean progress=true;
@ -327,34 +298,6 @@ public class SslConnection extends AbstractConnection
{ {
progress=false; progress=false;
// Do any real IO
int filled=0,flushed=0;
try
{
// Read any available data
if (!BufferUtil.isFull(_inbound) && (filled=_endp.fill(_inbound))>0)
progress = true;
else
_inbound.compact().flip();
// flush any output data
if (BufferUtil.hasContent(_outbound) && (flushed=_endp.flush(_outbound))>0)
{
progress = true;
_outbound.compact().flip();
}
}
catch (IOException e)
{
_endp.close();
throw e;
}
finally
{
_logger.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.remaining(),flushed,_outbound.remaining());
}
// handle the current hand share status // handle the current hand share status
switch(_engine.getHandshakeStatus()) switch(_engine.getHandshakeStatus())
{ {
@ -364,11 +307,11 @@ public class SslConnection extends AbstractConnection
case NOT_HANDSHAKING: case NOT_HANDSHAKING:
{ {
// Try unwrapping some application data // Try unwrapping some application data
if (!BufferUtil.isFull(toFill) && BufferUtil.hasContent(_inbound) && unwrap(toFill)) if (!BufferUtil.isFull(_inApp) && BufferUtil.hasContent(_inNet) && unwrap())
progress=true; progress=true;
// Try wrapping some application data // Try wrapping some application data
if (BufferUtil.hasContent(toFlush) && !BufferUtil.isFull(_outbound) && wrap(toFlush)) if (BufferUtil.hasContent(appOut) && !BufferUtil.isFull(_outNet) && wrap(appOut))
progress=true; progress=true;
} }
break; break;
@ -382,7 +325,6 @@ public class SslConnection extends AbstractConnection
progress=true; progress=true;
task.run(); task.run();
} }
} }
break; break;
@ -391,7 +333,7 @@ public class SslConnection extends AbstractConnection
// The SSL needs to send some handshake data to the other side // The SSL needs to send some handshake data to the other side
if (_handshook && !_allowRenegotiate) if (_handshook && !_allowRenegotiate)
_endp.close(); _endp.close();
else if (wrap(toFlush)) else if (wrap(appOut))
progress=true; progress=true;
} }
break; break;
@ -401,55 +343,54 @@ public class SslConnection extends AbstractConnection
// The SSL needs to receive some handshake data from the other side // The SSL needs to receive some handshake data from the other side
if (_handshook && !_allowRenegotiate) if (_handshook && !_allowRenegotiate)
_endp.close(); _endp.close();
else if (BufferUtil.isEmpty(_inbound)&&filled==-1) else if (BufferUtil.isEmpty(_inNet) && _endp.isInputShutdown())
{ _endp.close();
// No more input coming else if (unwrap())
_endp.shutdownInput();
}
else if (unwrap(toFill))
progress=true; progress=true;
} }
break; break;
} }
// pass on ishut/oshut state // pass on ishut/oshut state
if (_endp.isOpen() && _endp.isInputShutdown() && BufferUtil.isEmpty(_inbound)) if (_endp.isOpen() && _endp.isInputShutdown() && BufferUtil.isEmpty(_inNet))
_engine.closeInbound(); _engine.closeInbound();
if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outbound)) if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outNet))
_endp.shutdownOutput(); _endp.shutdownOutput();
// remember if any progress has been made // remember if any progress has been made
some_progress|=progress; some_progress|=progress;
} }
// If we are reading into the temp buffer and it has some content, then we should be dispatched.
if (toFill==_unwrapBuf && BufferUtil.hasContent(_unwrapBuf))
_aEndp.asyncDispatch();
} }
finally finally
{ {
releaseBuffers();
if (some_progress) if (some_progress)
_progressed.set(true); _progressed.set(true);
} }
return some_progress; return some_progress;
} }
private synchronized boolean wrap(final ByteBuffer buffer) throws IOException private synchronized boolean wrap(final ByteBuffer outApp) throws IOException
{ {
final SSLEngineResult result; final SSLEngineResult result;
_outbound.compact(); int pos=BufferUtil.flipToFill(_outNet);
result=_engine.wrap(buffer,_outbound); try
if (_logger.isDebugEnabled()) {
_logger.debug("{} wrap {} {} consumed={} produced={}", result=_engine.wrap(outApp,_outNet);
}
finally
{
BufferUtil.flipToFlush(_outNet,pos);
}
if (LOG.isDebugEnabled())
LOG.debug("{} wrap {} {} consumed={} produced={}",
_session, _session,
result.getStatus(), result.getStatus(),
result.getHandshakeStatus(), result.getHandshakeStatus(),
result.bytesConsumed(), result.bytesConsumed(),
result.bytesProduced()); result.bytesProduced());
_outbound.flip();
switch(result.getStatus()) switch(result.getStatus())
{ {
@ -465,58 +406,64 @@ public class SslConnection extends AbstractConnection
break; break;
case CLOSED: case CLOSED:
_logger.debug("wrap CLOSE {} {}",this,result); LOG.debug("wrap CLOSE {} {}",this,result);
if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
_endp.close(); _endp.close();
break; break;
default: default:
_logger.debug("{} wrap default {}",_session,result); LOG.debug("{} wrap default {}",_session,result);
throw new IOException(result.toString()); throw new IOException(result.toString());
} }
return result.bytesConsumed()>0 || result.bytesProduced()>0; int flushed = _endp.flush(_outNet);
return result.bytesConsumed()>0 || result.bytesProduced()>0 || flushed>0;
} }
private synchronized boolean unwrap(final ByteBuffer buffer) throws IOException private synchronized boolean unwrap() throws IOException
{ {
if (BufferUtil.isEmpty(_inbound)) if (BufferUtil.isEmpty(_inNet))
return false; return false;
final SSLEngineResult result; final SSLEngineResult result;
int pos = BufferUtil.flipToFill(_inApp);
try try
{ {
buffer.compact(); result=_engine.unwrap(_inNet,_inApp);
result=_engine.unwrap(_inbound,buffer); }
buffer.flip(); catch(SSLException e)
{
LOG.debug(String.valueOf(_endp), e);
_endp.close();
throw e;
}
finally
{
BufferUtil.flipToFlush(_inApp,pos);
}
if (_logger.isDebugEnabled()) if (LOG.isDebugEnabled())
_logger.debug("{} unwrap {} {} consumed={} produced={}", LOG.debug("{} unwrap {} {} consumed={} produced={}",
_session, _session,
result.getStatus(), result.getStatus(),
result.getHandshakeStatus(), result.getHandshakeStatus(),
result.bytesConsumed(), result.bytesConsumed(),
result.bytesProduced()); result.bytesProduced());
}
catch(SSLException e)
{
_logger.debug(String.valueOf(_endp), e);
_endp.close();
throw e;
}
switch(result.getStatus()) switch(result.getStatus())
{ {
case BUFFER_UNDERFLOW: case BUFFER_UNDERFLOW:
_inbound.compact().flip(); // need to wait for more net data
_inNet.compact().flip();
if (_endp.isInputShutdown()) if (_endp.isInputShutdown())
_inbound.clear().limit(0); _inNet.clear().limit(0);
break; break;
case BUFFER_OVERFLOW: case BUFFER_OVERFLOW:
_logger.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound,buffer); // need to wait until more app data has been consumed.
LOG.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inNet,_inApp);
break; break;
case OK: case OK:
@ -525,13 +472,13 @@ public class SslConnection extends AbstractConnection
break; break;
case CLOSED: case CLOSED:
_logger.debug("unwrap CLOSE {} {}",this,result); LOG.debug("unwrap CLOSE {} {}",this,result);
if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
_endp.close(); _endp.close();
break; break;
default: default:
_logger.debug("{} wrap default {}",_session,result); LOG.debug("{} wrap default {}",_session,result);
throw new IOException(result.toString()); throw new IOException(result.toString());
} }
@ -542,42 +489,54 @@ public class SslConnection extends AbstractConnection
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public AsyncEndPoint getSslEndPoint() @Override
public void onInputShutdown()
{ {
return _sslEndPoint;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SelectableEndPoint getAppEndPoint()
{
return _appEndPoint;
}
/* ------------------------------------------------------------ */
@Override
public String toString() public String toString()
{ {
return String.format("%s %s", super.toString(), _sslEndPoint); return String.format("%s %s", super.toString(), _appEndPoint);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public class SslEndPoint implements AsyncEndPoint public class AppEndPoint implements SelectableEndPoint
{ {
boolean _readInterested=true;
boolean _writeInterested;
public SSLEngine getSslEngine() public SSLEngine getSslEngine()
{ {
return _engine; return _engine;
} }
public AsyncEndPoint getIoEndPoint() public EndPoint getIoEndPoint()
{ {
return _aEndp; return _endp;
} }
@Override
public void shutdownOutput() throws IOException public void shutdownOutput() throws IOException
{ {
synchronized (SslConnection.this) synchronized (SslConnection.this)
{ {
_logger.debug("{} ssl endp.oshut {}",_session,this); LOG.debug("{} ssl endp.oshut {}",_session,this);
_engine.closeOutbound(); _engine.closeOutbound();
_oshut=true; _oshut=true;
} }
flush(); flush();
} }
@Override
public boolean isOutputShutdown() public boolean isOutputShutdown()
{ {
synchronized (SslConnection.this) synchronized (SslConnection.this)
@ -586,34 +545,44 @@ public class SslConnection extends AbstractConnection
} }
} }
@Override
public void shutdownInput() throws IOException public void shutdownInput() throws IOException
{ {
_logger.debug("{} ssl endp.ishut!",_session); LOG.debug("{} ssl endp.ishut!",_session);
// We do not do a closeInput here, as SSL does not support half close. // We do not do a closeInput here, as SSL does not support half close.
// isInputShutdown works it out itself from buffer state and underlying endpoint state. // isInputShutdown works it out itself from buffer state and underlying endpoint state.
} }
@Override
public boolean isInputShutdown() public boolean isInputShutdown()
{ {
synchronized (SslConnection.this) synchronized (SslConnection.this)
{ {
return _endp.isInputShutdown() && return _endp.isInputShutdown() &&
!(_unwrapBuf!=null&&BufferUtil.hasContent(_unwrapBuf)) && !(_inApp!=null&&BufferUtil.hasContent(_inApp)) &&
!(_inbound!=null&&BufferUtil.hasContent(_inbound)); !(_inNet!=null&&BufferUtil.hasContent(_inNet));
} }
} }
@Override
public void close() throws IOException public void close() throws IOException
{ {
_logger.debug("{} ssl endp.close",_session); LOG.debug("{} ssl endp.close",_session);
_endp.close(); _endp.close();
} }
@Override
public int fill(ByteBuffer buffer) throws IOException public int fill(ByteBuffer buffer) throws IOException
{ {
int size=buffer.remaining(); int size=buffer.remaining();
process(buffer, null); synchronized (this)
{
if (!BufferUtil.hasContent(_inApp))
process(null);
if (BufferUtil.hasContent(_inApp))
BufferUtil.flipPutFlip(_inApp,buffer);
}
int filled=buffer.remaining()-size; int filled=buffer.remaining()-size;
if (filled==0 && isInputShutdown()) if (filled==0 && isInputShutdown())
@ -621,31 +590,35 @@ public class SslConnection extends AbstractConnection
return filled; return filled;
} }
@Override
public int flush(ByteBuffer... buffers) throws IOException public int flush(ByteBuffer... buffers) throws IOException
{ {
int len=0; int len=0;
for (ByteBuffer b : buffers) bufloop: for (ByteBuffer b : buffers)
{ {
if (b.hasRemaining()) while (b.hasRemaining())
{ {
int l = b.remaining(); int l = b.remaining();
process(null, b); if (!process(b))
break bufloop;
l=l-b.remaining(); l=l-b.remaining();
if (l>0) if (l>0)
len+=l; len+=l;
else else
break; break bufloop;
} }
} }
return len; return len;
} }
@Override
public boolean isOpen() public boolean isOpen()
{ {
return _endp.isOpen(); return _endp.isOpen();
} }
@Override
public Object getTransport() public Object getTransport()
{ {
return _endp; return _endp;
@ -653,67 +626,71 @@ public class SslConnection extends AbstractConnection
public void flush() throws IOException public void flush() throws IOException
{ {
process(null, null); process(null);
} }
@Override
public void onIdleExpired(long idleForMs) public void onIdleExpired(long idleForMs)
{ {
_aEndp.onIdleExpired(idleForMs); _endp.onIdleExpired(idleForMs);
} }
@Override
public void setCheckForIdle(boolean check) public void setCheckForIdle(boolean check)
{ {
_aEndp.setCheckForIdle(check); _endp.setCheckForIdle(check);
} }
@Override
public boolean isCheckForIdle() public boolean isCheckForIdle()
{ {
return _aEndp.isCheckForIdle(); return _endp.isCheckForIdle();
} }
@Override
public InetSocketAddress getLocalAddress() public InetSocketAddress getLocalAddress()
{ {
return _aEndp.getLocalAddress(); return _endp.getLocalAddress();
} }
@Override
public InetSocketAddress getRemoteAddress() public InetSocketAddress getRemoteAddress()
{ {
return _aEndp.getRemoteAddress(); return _endp.getRemoteAddress();
}
public boolean isBlocking()
{
return false;
} }
@Override
public int getMaxIdleTime() public int getMaxIdleTime()
{ {
return _aEndp.getMaxIdleTime(); return _endp.getMaxIdleTime();
} }
@Override
public void setMaxIdleTime(int timeMs) throws IOException public void setMaxIdleTime(int timeMs) throws IOException
{ {
_aEndp.setMaxIdleTime(timeMs); _endp.setMaxIdleTime(timeMs);
} }
public Connection getConnection() @Override
public SelectableConnection getSelectableConnection()
{ {
return _connection; return _appConnection;
} }
public void setConnection(Connection connection) public void setSelectableConnection(SelectableConnection connection)
{ {
_connection=(Connection)connection; _appConnection=(AbstractSelectableConnection)connection;
} }
@Override
public String toString() public String toString()
{ {
// Do NOT use synchronized (SslConnection.this) // Do NOT use synchronized (SslConnection.this)
// because it's very easy to deadlock when debugging is enabled. // because it's very easy to deadlock when debugging is enabled.
// We do a best effort to print the right toString() and that's it. // We do a best effort to print the right toString() and that's it.
ByteBuffer inbound = _inbound; ByteBuffer inbound = _inNet;
ByteBuffer outbound = _outbound; ByteBuffer outbound = _outNet;
ByteBuffer unwrap = _unwrapBuf; ByteBuffer unwrap = _inApp;
int i = inbound == null? -1 : inbound.remaining(); int i = inbound == null? -1 : inbound.remaining();
int o = outbound == null ? -1 : outbound.remaining(); int o = outbound == null ? -1 : outbound.remaining();
int u = unwrap == null ? -1 : unwrap.remaining(); int u = unwrap == null ? -1 : unwrap.remaining();
@ -721,7 +698,42 @@ public class SslConnection extends AbstractConnection
_engine.getHandshakeStatus(), _engine.getHandshakeStatus(),
i, o, u, i, o, u,
_ishut, _oshut, _ishut, _oshut,
_connection); _appConnection);
}
@Override
public void setWriteInterested(boolean interested)
{
_writeInterested=interested;
}
@Override
public boolean isWriteInterested()
{
return _writeInterested;
}
@Override
public void setReadInterested(boolean interested)
{
_readInterested=interested;
}
@Override
public boolean isReadInterested()
{
return _readInterested;
}
@Override
public long getLastNotIdleTimestamp()
{
return _endp.getLastNotIdleTimestamp();
}
@Override
public void checkForIdle(long now)
{
} }
} }
} }

View File

@ -11,12 +11,11 @@
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.io.bio; package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;

View File

@ -1,4 +1,4 @@
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;

View File

@ -1,9 +1,9 @@
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.EndPointTest; import org.eclipse.jetty.io.ChannelEndPoint;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;

View File

@ -11,7 +11,7 @@
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertFalse;

View File

@ -1,4 +1,4 @@
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -11,6 +11,10 @@ import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocket;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SslConnection;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert; import org.junit.Assert;
@ -41,14 +45,14 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
} }
@Override @Override
protected Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint) protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint)
{ {
SSLEngine engine = __sslCtxFactory.newSslEngine(); SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(false); engine.setUseClientMode(false);
SslConnection connection = new SslConnection(engine,endpoint); SslConnection connection = new SslConnection(engine,endpoint);
Connection delegate = super.newConnection(channel,connection.getSslEndPoint()); SelectableConnection delegate = super.newConnection(channel,connection.getAppEndPoint());
connection.getSslEndPoint().setConnection(delegate); connection.setAppConnection(delegate);
return connection; return connection;
} }

View File

@ -1,4 +1,4 @@
package org.eclipse.jetty.io.nio; package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -20,8 +20,11 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.AbstractSelectableConnection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -32,7 +35,7 @@ import org.junit.Test;
public class SelectChannelEndPointTest public class SelectChannelEndPointTest
{ {
protected SelectChannelEndPoint _lastEndp; protected SelectableEndPoint _lastEndp;
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected SelectorManager _manager = new SelectorManager() protected SelectorManager _manager = new SelectorManager()
@ -54,12 +57,12 @@ public class SelectChannelEndPointTest
} }
@Override @Override
protected void endPointUpgraded(EndPoint endpoint, Connection oldConnection) protected void endPointUpgraded(SelectChannelEndPoint endpoint, Connection oldConnection)
{ {
} }
@Override @Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) public SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment)
{ {
return SelectChannelEndPointTest.this.newConnection(channel,endpoint); return SelectChannelEndPointTest.this.newConnection(channel,endpoint);
} }
@ -68,7 +71,8 @@ public class SelectChannelEndPointTest
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{ {
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000); SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000);
endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
endp.setReadInterested(true);
_lastEndp=endp; _lastEndp=endp;
return endp; return endp;
} }
@ -99,88 +103,94 @@ public class SelectChannelEndPointTest
return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort()); return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort());
} }
protected Connection newConnection(SocketChannel channel, EndPoint endpoint) protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint)
{ {
return new TestConnection(endpoint); return new TestConnection(endpoint);
} }
public class TestConnection extends AbstractConnection implements Connection public class TestConnection extends AbstractSelectableConnection
{ {
ByteBuffer _in = BufferUtil.allocate(32*1024); ByteBuffer _in = BufferUtil.allocate(32*1024);
ByteBuffer _out = BufferUtil.allocate(32*1024); ByteBuffer _out = BufferUtil.allocate(32*1024);
public TestConnection(EndPoint endp) public TestConnection(SelectableEndPoint endp)
{ {
super(endp); super(endp);
} }
public void canRead() @Override
public void doRead()
{
try
{ {
boolean progress=true; boolean progress=true;
while(progress) while(progress)
{ {
progress=false; progress=false;
_in.compact().flip();
if (!BufferUtil.isFull(_in) && _endp.fill(_in)>0)
{
progress=true;
}
// Fill the input buffer with everything available
if (!BufferUtil.isFull(_in))
progress|=_endp.fill(_in)>0;
// If the tests wants to block, then block
while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt && blockReadable())
progress|=_endp.fill(_in)>0;
while (_blockAt>0 && _in.remaining()>0 && _in.remaining()<_blockAt) // Copy to the out buffer
{
// ((AsyncEndPoint)_endp).blockReadable(10000);
if (!BufferUtil.isFull(_in) && _endp.fill(_in)>0)
progress=true;
}
if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in,_out)>0) if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in,_out)>0)
progress=true; progress=true;
// Try non blocking write
if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0) if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0)
// Try blocking write
while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out))
{
blockWriteable();
if (_endp.flush(_out)>0)
progress=true; progress=true;
}
}
}
catch(IOException e)
{
e.printStackTrace();
}
finally
{
_endp.setReadInterested(true);
}
}
_out.compact().flip();
if (BufferUtil.isEmpty(_out) && _endp.isInputShutdown()) @Override
public void onInputShutdown()
{
try
{
if (BufferUtil.isEmpty(_out))
_endp.shutdownOutput(); _endp.shutdownOutput();
} }
} catch(IOException e)
public void canWrite()
{ {
e.printStackTrace();
}
} }
@Override
public void onClose()
{
}
@Override
public boolean isIdle() public boolean isIdle()
{ {
return false; return false;
} }
@Override
public boolean isReadInterested()
{
return true;
}
@Override
public EndPoint getEndPoint()
{
return _endp;
}
public void onClose()
{
// System.err.println("onClose");
}
public void onInputShutdown() throws IOException
{
// System.err.println("onInputShutdown");
}
} }
@Test @Test
public void testEcho() throws Exception public void testEcho() throws Exception
{ {
@ -310,7 +320,7 @@ public class SelectChannelEndPointTest
int specifiedTimeout = 400; int specifiedTimeout = 400;
client.setSoTimeout(specifiedTimeout); client.setSoTimeout(specifiedTimeout);
// Write 8 and cause block for 10 // Write 8 and cause block waiting for 10
_blockAt=10; _blockAt=10;
clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush(); clientOutputStream.flush();
@ -327,7 +337,7 @@ public class SelectChannelEndPointTest
catch(SocketTimeoutException e) catch(SocketTimeoutException e)
{ {
int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue(); int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue();
System.err.println("blocked for " + elapsed+ "ms"); // System.err.println("blocked for " + elapsed+ "ms");
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3*specifiedTimeout/4)); Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3*specifiedTimeout/4));
} }

View File

@ -28,9 +28,9 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Buffers.Type; import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -1142,12 +1142,10 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void connectionClosed(Connection connection) protected void connectionClosed(Connection connection)
{ {
connection.onClose();
if (_statsStartedAt.get() == -1) if (_statsStartedAt.get() == -1)
return; return;
long duration = System.currentTimeMillis() - connection.getTimeStamp(); long duration = System.currentTimeMillis() - connection.getCreatedTimeStamp();
int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0; int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0;
_requestStats.set(requests); _requestStats.set(requests);
_connectionStats.decrement(); _connectionStats.decrement();

View File

@ -27,17 +27,17 @@ import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpGenerator.Action; import org.eclipse.jetty.http.HttpGenerator.Action;
import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractSelectableConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
/** /**
*/ */
public abstract class HttpConnection extends AbstractConnection public abstract class HttpConnection extends AbstractSelectableConnection
{ {
private static final Logger LOG = Log.getLogger(HttpConnection.class); private static final Logger LOG = Log.getLogger(HttpConnection.class);

View File

@ -20,7 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteArrayEndPoint; import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.nio.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;

View File

@ -20,8 +20,9 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager; import org.eclipse.jetty.io.SelectableEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -449,7 +450,7 @@ public class ConnectHandler extends HandlerWrapper
} }
@Override @Override
protected void endPointClosed(SelectChannelEndPoint endpoint) protected void endPointClosed(SelectableEndPoint endpoint)
{ {
} }

View File

@ -21,9 +21,10 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.io.nio.NetworkTrafficSelectChannelEndPoint; import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager; import org.eclipse.jetty.io.SelectableEndPoint;
import org.eclipse.jetty.io.SelectorManager;
/** /**
* <p>A specialized version of {@link SelectChannelConnector} that supports {@link NetworkTrafficListener}s.</p> * <p>A specialized version of {@link SelectChannelConnector} that supports {@link NetworkTrafficListener}s.</p>
@ -60,7 +61,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector
} }
@Override @Override
protected void endPointClosed(SelectChannelEndPoint endpoint) protected void endPointClosed(SelectableEndPoint endpoint)
{ {
super.endPointClosed(endpoint); super.endPointClosed(endpoint);
((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed(); ((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed();

View File

@ -21,11 +21,12 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.Connection; import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectableEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager; import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.ThreadPool;
@ -255,7 +256,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
} }
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
protected void endPointClosed(SelectChannelEndPoint endpoint) protected void endPointClosed(SelectableEndPoint endpoint)
{ {
connectionClosed(endpoint.getConnection()); connectionClosed(endpoint.getConnection());
} }
@ -282,7 +283,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
} }
@Override @Override
protected void endPointClosed(final SelectChannelEndPoint endpoint) protected void endPointClosed(final SelectableEndPoint endpoint)
{ {
SelectChannelConnector.this.endPointClosed(endpoint); SelectChannelConnector.this.endPointClosed(endpoint);
} }

View File

@ -28,7 +28,7 @@ import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.BuffersFactory; import org.eclipse.jetty.io.BuffersFactory;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.nio.SslConnection; import org.eclipse.jetty.io.SslConnection;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
@ -98,7 +98,7 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
request.setScheme(HttpScheme.HTTPS); request.setScheme(HttpScheme.HTTPS);
super.customize(endpoint,request); super.customize(endpoint,request);
SslConnection.SslEndPoint sslEndpoint=(SslConnection.SslEndPoint)endpoint; SslConnection.AppEndPoint sslEndpoint=(SslConnection.AppEndPoint)endpoint;
SSLEngine sslEngine=sslEndpoint.getSslEngine(); SSLEngine sslEngine=sslEndpoint.getSslEngine();
SSLSession sslSession=sslEngine.getSession(); SSLSession sslSession=sslEngine.getSession();
@ -548,8 +548,8 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
{ {
SSLEngine engine = createSSLEngine(channel); SSLEngine engine = createSSLEngine(channel);
SslConnection connection = newSslConnection(endpoint, engine); SslConnection connection = newSslConnection(endpoint, engine);
Connection delegate = newPlainConnection(channel, connection.getSslEndPoint()); Connection delegate = newPlainConnection(channel, connection.getAppEndPoint());
connection.getSslEndPoint().setConnection(delegate); connection.getAppEndPoint().setConnection(delegate);
connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate()); connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate());
return connection; return connection;
} }

View File

@ -31,7 +31,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SslConnection; import org.eclipse.jetty.io.SslConnection;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.junit.Assert; import org.junit.Assert;
@ -149,8 +149,8 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
// Get the server side endpoint // Get the server side endpoint
EndPoint endp = endpoint.exchange(null,10,TimeUnit.SECONDS); EndPoint endp = endpoint.exchange(null,10,TimeUnit.SECONDS);
if (endp instanceof SslConnection.SslEndPoint) if (endp instanceof SslConnection.AppEndPoint)
endp=((SslConnection.SslEndPoint)endp).getEndpoint(); endp=((SslConnection.AppEndPoint)endp).getEndpoint();
// read the response // read the response
String result=IO.toString(is); String result=IO.toString(is);

View File

@ -6,14 +6,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectableEndPoint;
import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.junit.Test; import org.junit.Test;
public class SelectChannelAsyncContextTest extends LocalAsyncContextTest public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
{ {
volatile SelectChannelEndPoint _endp; volatile SelectableEndPoint _endp;
@Override @Override
protected Connector initConnector() protected Connector initConnector()
@ -24,7 +24,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
public void customize(EndPoint endpoint, Request request) throws IOException public void customize(EndPoint endpoint, Request request) throws IOException
{ {
super.customize(endpoint,request); super.customize(endpoint,request);
_endp=(SelectChannelEndPoint)endpoint; _endp=(SelectableEndPoint)endpoint;
} }
}; };
@ -54,7 +54,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
try try
{ {
TimeUnit.MILLISECONDS.sleep(200); TimeUnit.MILLISECONDS.sleep(200);
SelectChannelEndPoint endp=_endp; SelectableEndPoint endp=_endp;
if (endp!=null && endp.isOpen()) if (endp!=null && endp.isOpen())
endp.asyncDispatch(); endp.asyncDispatch();
} }

View File

@ -268,7 +268,7 @@ public class BufferUtil
/** /**
* Put data from one buffer into another, avoiding over/under flows * Put data from one buffer into another, avoiding over/under flows
* @param from Buffer to take bytes from in flush mode * @param from Buffer to take bytes from in flush mode
* @param to Buffer to put bytes to in flush mode. The buffer is flipped before and after the put. * @param to Buffer to put bytes to in flush mode. The buffer is flipToFill before the put and flipToFlush after.
* @return number of bytes moved * @return number of bytes moved
*/ */
public static int flipPutFlip(ByteBuffer from, ByteBuffer to) public static int flipPutFlip(ByteBuffer from, ByteBuffer to)
@ -649,12 +649,12 @@ public class BufferUtil
for (int i=0;i<buffer.position();i++) for (int i=0;i<buffer.position();i++)
{ {
char c=(char)buffer.get(i); char c=(char)buffer.get(i);
if (c>=' ') if (c>=' ' && c<=127)
buf.append(c); buf.append(c);
else if (c=='\r'||c=='\n') else if (c=='\r'||c=='\n')
buf.append('|'); buf.append('|');
else else
buf.append('?'); buf.append('\ufffd');
if (i==16&&buffer.position()>32) if (i==16&&buffer.position()>32)
{ {
buf.append("..."); buf.append("...");
@ -665,12 +665,12 @@ public class BufferUtil
for (int i=buffer.position();i<buffer.limit();i++) for (int i=buffer.position();i<buffer.limit();i++)
{ {
char c=(char)buffer.get(i); char c=(char)buffer.get(i);
if (c>=' ') if (c>=' ' && c<=127)
buf.append(c); buf.append(c);
else if (c=='\r'||c=='\n') else if (c=='\r'||c=='\n')
buf.append('|'); buf.append('|');
else else
buf.append('?'); buf.append('\ufffd');
if (i==buffer.position()+16&&buffer.limit()>buffer.position()+32) if (i==buffer.position()+16&&buffer.limit()>buffer.position()+32)
{ {
buf.append("..."); buf.append("...");
@ -683,12 +683,12 @@ public class BufferUtil
for (int i=limit;i<buffer.capacity();i++) for (int i=limit;i<buffer.capacity();i++)
{ {
char c=(char)buffer.get(i); char c=(char)buffer.get(i);
if (c>=' ') if (c>=' ' && c<=127)
buf.append(c); buf.append(c);
else if (c=='\r'||c=='\n') else if (c=='\r'||c=='\n')
buf.append('|'); buf.append('|');
else else
buf.append('?'); buf.append('\ufffd');
if (i==limit+16&&buffer.capacity()>limit+32) if (i==limit+16&&buffer.capacity()>limit+32)
{ {
buf.append("..."); buf.append("...");