diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
deleted file mode 100644
index 336d78a0033..00000000000
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.eclipse.jetty.io;
-
-import java.io.IOException;
-
-import org.eclipse.jetty.io.nio.Connection;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-
-
-public abstract class AbstractConnection implements Connection
-{
- private static final Logger LOG = Log.getLogger(AbstractConnection.class);
-
- private final long _timeStamp;
- protected final EndPoint _endp;
-
- public AbstractConnection(EndPoint endp)
- {
- _endp=endp;
- _timeStamp = System.currentTimeMillis();
- }
-
- public AbstractConnection(EndPoint endp,long timestamp)
- {
- _endp=endp;
- _timeStamp = timestamp;
- }
-
-
- @Override
- public EndPoint getEndPoint()
- {
- return _endp;
- }
-
- public long getTimeStamp()
- {
- return _timeStamp;
- }
-
- public void onIdleExpired(long idleForMs)
- {
- try
- {
- LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
- if (_endp.isInputShutdown() || _endp.isOutputShutdown())
- _endp.close();
- else
- _endp.shutdownOutput();
- }
- catch(IOException e)
- {
- LOG.ignore(e);
-
- try
- {
- _endp.close();
- }
- catch(IOException e2)
- {
- LOG.ignore(e2);
- }
- }
- }
-
- public String toString()
- {
- return String.format("%s@%x", getClass().getSimpleName(), hashCode());
- }
-}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java
new file mode 100644
index 00000000000..4dc1a189f6f
--- /dev/null
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java
@@ -0,0 +1,226 @@
+package org.eclipse.jetty.io;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+
+public abstract class AbstractSelectableConnection implements SelectableConnection
+{
+ private static final Logger LOG = Log.getLogger(AbstractSelectableConnection.class);
+
+ protected final SelectableEndPoint _endp;
+ private final long _createdTimeStamp;
+ private final Lock _lock=new ReentrantLock();
+ private final Condition _readable=_lock.newCondition();
+ private final Condition _writeable=_lock.newCondition();
+ private boolean _readBlocked;
+ private boolean _writeBlocked;
+
+ private final Runnable _reader=new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ doRead();
+ }
+ catch(Throwable th)
+ {
+ LOG.warn(th);
+ }
+ }
+ };
+ private final Runnable _writer=new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ doWrite();
+ }
+ catch(Throwable th)
+ {
+ LOG.warn(th);
+ }
+ }
+ };
+
+ private volatile int _maxIdleTime=-1;
+
+ public AbstractSelectableConnection(SelectableEndPoint endp)
+ {
+ _endp=endp;
+ _createdTimeStamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public EndPoint getEndPoint()
+ {
+ return _endp;
+ }
+
+ @Override
+ public SelectableEndPoint getSelectableEndPoint()
+ {
+ return _endp;
+ }
+
+ @Override
+ public long getCreatedTimeStamp()
+ {
+ return _createdTimeStamp;
+ }
+
+ @Override
+ public Runnable onReadable()
+ {
+ _lock.lock();
+ try
+ {
+ if (_readBlocked)
+ _readable.signalAll();
+ else
+ return _reader;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ return null;
+ }
+
+ @Override
+ public Runnable onWriteable()
+ {
+ _lock.lock();
+ try
+ {
+ if (_writeBlocked)
+ _writeable.signalAll();
+ else
+ return _writer;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ return null;
+ }
+
+ @Override
+ public boolean blockReadable()
+ {
+ _lock.lock();
+ boolean readable=false;
+ try
+ {
+ if (_readBlocked)
+ throw new IllegalStateException();
+ _readBlocked=true;
+ _endp.setReadInterested(true);
+ readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS);
+ }
+ catch(InterruptedException e)
+ {
+ LOG.ignore(e);
+ }
+ finally
+ {
+ if (!readable)
+ _endp.setReadInterested(false);
+ _readBlocked=false;
+ _lock.unlock();
+ }
+ return readable;
+ }
+
+ @Override
+ public boolean blockWriteable()
+ {
+ _lock.lock();
+ boolean writeable=false;
+ try
+ {
+ if (_writeBlocked)
+ throw new IllegalStateException();
+ _writeBlocked=true;
+ _endp.setWriteInterested(true);
+ writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS);
+ }
+ catch(InterruptedException e)
+ {
+ LOG.ignore(e);
+ }
+ finally
+ {
+ if (!writeable)
+ _endp.setWriteInterested(false);
+ _writeBlocked=false;
+ _lock.unlock();
+ }
+ return writeable;
+ }
+
+ @Override
+ public void onIdleExpired(long idleForMs)
+ {
+ try
+ {
+ LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
+ if (_endp.isInputShutdown() || _endp.isOutputShutdown())
+ _endp.close();
+ else
+ _endp.shutdownOutput();
+ }
+ catch(IOException e)
+ {
+ LOG.ignore(e);
+
+ try
+ {
+ _endp.close();
+ }
+ catch(IOException e2)
+ {
+ LOG.ignore(e2);
+ }
+ }
+ }
+
+ protected void doRead()
+ {
+ throw new IllegalStateException();
+ }
+
+ protected void doWrite()
+ {
+ throw new IllegalStateException();
+ }
+
+
+ @Override
+ public int getMaxIdleTime()
+ {
+ int max=_maxIdleTime;
+ return (max==-1)?_endp.getMaxIdleTime():max;
+ }
+
+ public void setMaxIdleTime(int max)
+ {
+ _maxIdleTime=max;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s@%x", getClass().getSimpleName(), hashCode());
+ }
+}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java
index a45cc47a97f..5147668895f 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java
@@ -17,7 +17,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.BufferUtil;
@@ -36,7 +35,6 @@ public class ByteArrayEndPoint implements EndPoint
protected boolean _growOutput;
protected int _maxIdleTime;
protected Connection _connection;
- private boolean _idleCheck;
/* ------------------------------------------------------------ */
/**
@@ -97,6 +95,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.io.EndPoint#isOpen()
*/
+ @Override
public boolean isOpen()
{
return !_closed;
@@ -106,6 +105,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.jetty.io.EndPoint#isInputShutdown()
*/
+ @Override
public boolean isInputShutdown()
{
return _closed;
@@ -115,6 +115,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.jetty.io.EndPoint#isOutputShutdown()
*/
+ @Override
public boolean isOutputShutdown()
{
return _closed;
@@ -124,6 +125,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.io.EndPoint#shutdownOutput()
*/
+ @Override
public void shutdownOutput() throws IOException
{
close();
@@ -133,6 +135,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.io.EndPoint#shutdownInput()
*/
+ @Override
public void shutdownInput() throws IOException
{
close();
@@ -142,6 +145,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.io.EndPoint#close()
*/
+ @Override
public void close() throws IOException
{
_closed=true;
@@ -151,6 +155,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer)
*/
+ @Override
public int fill(ByteBuffer buffer) throws IOException
{
if (_closed)
@@ -165,6 +170,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/
+ @Override
public int flush(ByteBuffer... buffers) throws IOException
{
if (_closed)
@@ -230,6 +236,7 @@ public class ByteArrayEndPoint implements EndPoint
/*
* @see org.eclipse.io.EndPoint#getConnection()
*/
+ @Override
public Object getTransport()
{
return _inBytes;
@@ -257,6 +264,7 @@ public class ByteArrayEndPoint implements EndPoint
/**
* @see org.eclipse.jetty.io.EndPoint#getMaxIdleTime()
*/
+ @Override
public int getMaxIdleTime()
{
return _maxIdleTime;
@@ -266,39 +274,12 @@ public class ByteArrayEndPoint implements EndPoint
/**
* @see org.eclipse.jetty.io.EndPoint#setMaxIdleTime(int)
*/
+ @Override
public void setMaxIdleTime(int timeMs) throws IOException
{
_maxIdleTime=timeMs;
}
- @Override
- public Connection getConnection()
- {
- return _connection;
- }
-
- @Override
- public void setConnection(Connection connection)
- {
- _connection=connection;
- }
-
- @Override
- public void onIdleExpired(long idleForMs)
- {
- }
-
- @Override
- public void setCheckForIdle(boolean check)
- {
- _idleCheck=check;
- }
-
- @Override
- public boolean isCheckForIdle()
- {
- return _idleCheck;
- }
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java
similarity index 90%
rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java
rename to jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java
index 61bb3c59d61..c15c94758e3 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java
@@ -11,7 +11,7 @@
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -23,7 +23,7 @@ import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
-import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -43,9 +43,6 @@ public class ChannelEndPoint implements EndPoint
private volatile int _maxIdleTime;
private volatile boolean _ishut;
private volatile boolean _oshut;
- private Connection _connection;
- private boolean _idleCheck;
-
public ChannelEndPoint(ByteChannel channel) throws IOException
{
@@ -90,6 +87,7 @@ public class ChannelEndPoint implements EndPoint
/*
* @see org.eclipse.io.EndPoint#isOpen()
*/
+ @Override
public boolean isOpen()
{
return _channel.isOpen();
@@ -133,6 +131,7 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close()
*/
+ @Override
public void shutdownInput() throws IOException
{
shutdownChannelInput();
@@ -172,16 +171,19 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close()
*/
+ @Override
public void shutdownOutput() throws IOException
{
shutdownChannelOutput();
}
+ @Override
public boolean isOutputShutdown()
{
return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown();
}
+ @Override
public boolean isInputShutdown()
{
return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown();
@@ -190,6 +192,7 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close()
*/
+ @Override
public void close() throws IOException
{
LOG.debug("close {}",this);
@@ -199,17 +202,15 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer)
*/
+ @Override
public int fill(ByteBuffer buffer) throws IOException
{
if (_ishut)
return -1;
- int pos=buffer.position();
+ int pos=BufferUtil.flipToFill(buffer);
try
{
- buffer.position(buffer.limit());
- buffer.limit(buffer.capacity());
-
int filled = _channel.read(buffer);
if (filled==-1)
@@ -217,23 +218,29 @@ public class ChannelEndPoint implements EndPoint
return filled;
}
+ catch(IOException e)
+ {
+ LOG.debug(e);
+ shutdownInput();
+ return -1;
+ }
finally
{
- buffer.limit(buffer.position());
- buffer.position(pos);
+ BufferUtil.flipToFlush(buffer,pos);
}
}
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/
+ @Override
public int flush(ByteBuffer... buffers) throws IOException
{
int len=0;
- if (_channel instanceof GatheringByteChannel)
- {
- len= (int)((GatheringByteChannel)_channel).write(buffers,0,2);
- }
+ if (buffers.length==1)
+ len=_channel.write(buffers[0]);
+ else if (buffers.length>1 && _channel instanceof GatheringByteChannel)
+ len= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length);
else
{
for (ByteBuffer b : buffers)
@@ -275,6 +282,7 @@ public class ChannelEndPoint implements EndPoint
}
/* ------------------------------------------------------------ */
+ @Override
public Object getTransport()
{
return _channel;
@@ -287,6 +295,7 @@ public class ChannelEndPoint implements EndPoint
}
/* ------------------------------------------------------------ */
+ @Override
public int getMaxIdleTime()
{
return _maxIdleTime;
@@ -296,6 +305,7 @@ public class ChannelEndPoint implements EndPoint
/**
* @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int)
*/
+ @Override
public void setMaxIdleTime(int timeMs) throws IOException
{
//if (_socket!=null && timeMs!=_maxIdleTime)
@@ -303,34 +313,4 @@ public class ChannelEndPoint implements EndPoint
_maxIdleTime=timeMs;
}
-
- @Override
- public Connection getConnection()
- {
- return _connection;
- }
-
- @Override
- public void setConnection(Connection connection)
- {
- _connection=connection;
- }
-
- @Override
- public void onIdleExpired(long idleForMs)
- {
- }
-
- @Override
- public void setCheckForIdle(boolean check)
- {
- _idleCheck=check;
- }
-
- @Override
- public boolean isCheckForIdle()
- {
- return _idleCheck;
- }
-
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/Connection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java
similarity index 63%
rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/Connection.java
rename to jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java
index 8b3c7e871c4..c4de0328ed0 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/Connection.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java
@@ -11,42 +11,21 @@
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
-import java.io.IOException;
-import org.eclipse.jetty.io.EndPoint;
public interface Connection
{
-
EndPoint getEndPoint();
- void canRead();
- void canWrite();
-
- boolean isReadInterested();
- boolean isWriteInterested();
-
-
- void onInputShutdown() throws IOException;
-
- /**
- * Called when the connection is closed
- */
- void onClose();
-
- /**
- * Called when the connection idle timeout expires
- * @param idleForMs TODO
- */
- void onIdleExpired(long idleForMs);
+ int getMaxIdleTime();
/**
* @return the timestamp at which the connection was created
*/
- long getTimeStamp();
+ long getCreatedTimeStamp();
boolean isIdle();
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java
index f075a5f7016..b1019fdfb70 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java
@@ -17,7 +17,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import org.eclipse.jetty.io.nio.Connection;
/**
@@ -113,28 +112,5 @@ public interface EndPoint
void setMaxIdleTime(int timeMs) throws IOException;
- /* ------------------------------------------------------------ */
- Connection getConnection();
- /* ------------------------------------------------------------ */
- void setConnection(Connection connection);
-
-
- /* ------------------------------------------------------------ */
- /** Callback when idle.
- *
An endpoint is idle if there has been no IO activity for
- * {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true.
- * @param idleForMs TODO
- */
- public void onIdleExpired(long idleForMs);
-
- /* ------------------------------------------------------------ */
- /** Set if the endpoint should be checked for idleness
- */
- public void setCheckForIdle(boolean check);
-
- /* ------------------------------------------------------------ */
- /** Get if the endpoint should be checked for idleness
- */
- public boolean isCheckForIdle();
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/NetworkTrafficSelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java
similarity index 97%
rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/NetworkTrafficSelectChannelEndPoint.java
rename to jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java
index 09c34cc661d..46b3a2a144e 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/NetworkTrafficSelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java
@@ -11,7 +11,7 @@
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -19,7 +19,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
-import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
similarity index 66%
rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java
rename to jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
index 8693a84f771..9dd159a0c71 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
@@ -11,17 +11,17 @@
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import java.io.IOException;
-import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
-import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
+import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task;
@@ -30,22 +30,29 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
/**
* An Endpoint that can be scheduled by {@link SelectorManager}.
*/
-public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
+public class SelectChannelEndPoint extends ChannelEndPoint implements SelectableEndPoint
{
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
+ private final Lock _lock = new ReentrantLock();
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
private SelectionKey _key;
+ private boolean _selected;
+ private boolean _changing;
+
/** The desired value for {@link SelectionKey#interestOps()} */
private int _interestOps;
+ private boolean _ishutCalled;
+
/** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
private boolean _open;
- private volatile long _idleTimestamp;
- private volatile Connection _connection;
+ private volatile boolean _idlecheck;
+ private volatile long _lastNotIdleTimestamp;
+ private volatile SelectableConnection _connection;
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
@@ -78,18 +85,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
/* ------------------------------------------------------------ */
- public void setConnection(Connection connection)
+ public void setSelectableConnection(SelectableConnection connection)
{
- Connection old=getConnection();
+ Connection old=getSelectableConnection();
_connection=connection;
if (old!=null && old!=connection)
- _manager.endPointUpgraded(this,(Connection)old);
+ _manager.endPointUpgraded(this,old);
}
-
+
/* ------------------------------------------------------------ */
- public long getIdleTimestamp()
+ @Override
+ public long getLastNotIdleTimestamp()
{
- return _idleTimestamp;
+ return _lastNotIdleTimestamp;
}
/* ------------------------------------------------------------ */
@@ -98,9 +106,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
*/
public void selected()
{
- final boolean can_read;
- final boolean can_write;
- synchronized (this)
+ _lock.lock();
+ _selected=true;
+ try
{
// If there is no key, then do nothing
if (_key == null || !_key.isValid())
@@ -108,19 +116,103 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
this.notifyAll();
return;
}
-
- can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0);
- can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0);
+
+ boolean can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0);
+ boolean can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0);
_interestOps=0;
- _key.interestOps(0);
+
+ if (can_read)
+ {
+ Runnable task=getSelectableConnection().onReadable();
+ if (task!=null)
+ _manager.dispatch(task);
+ }
+ if (can_write)
+ {
+ Runnable task=getSelectableConnection().onWriteable();
+ if (task!=null)
+ _manager.dispatch(task);
+ }
+
+ if (isInputShutdown() && !_ishutCalled)
+ {
+ _ishutCalled=true;
+ getSelectableConnection().onInputShutdown();
+ }
+ }
+ finally
+ {
+ doUpdateKey();
+ _selected=false;
+ _lock.unlock();
}
-
- if (can_read)
- getConnection().canRead();
- if (can_write)
- getConnection().canWrite();
}
+ /* ------------------------------------------------------------ */
+ @Override
+ public boolean isReadInterested()
+ {
+ _lock.lock();
+ try
+ {
+ return (_interestOps&SelectionKey.OP_READ)!=0;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ @Override
+ public void setReadInterested(boolean interested)
+ {
+ _lock.lock();
+ try
+ {
+ _interestOps=interested?(_interestOps|SelectionKey.OP_READ):(_interestOps&~SelectionKey.OP_READ);
+ if (!_selected)
+ updateKey();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ @Override
+ public boolean isWriteInterested()
+ {
+ _lock.lock();
+ try
+ {
+ return (_interestOps&SelectionKey.OP_READ)!=0;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ @Override
+ public void setWriteInterested(boolean interested)
+ {
+ _lock.lock();
+ try
+ {
+ _interestOps=interested?(_interestOps|SelectionKey.OP_WRITE):(_interestOps&~SelectionKey.OP_WRITE);
+ if (!_selected)
+ updateKey();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+
/* ------------------------------------------------------------ */
public void cancelTimeout(Task task)
{
@@ -134,46 +226,51 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
}
/* ------------------------------------------------------------ */
+ @Override
public void setCheckForIdle(boolean check)
{
- _idleTimestamp=check?System.currentTimeMillis():0;
+ _idlecheck=true;
}
/* ------------------------------------------------------------ */
+ @Override
public boolean isCheckForIdle()
{
- return _idleTimestamp!=0;
+ return _idlecheck;
}
/* ------------------------------------------------------------ */
protected void notIdle()
{
- if (_idleTimestamp!=0)
- _idleTimestamp=System.currentTimeMillis();
+ _lastNotIdleTimestamp=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
- public void checkIdleTimestamp(long now)
+ public void checkForIdle(long now)
{
- long idleTimestamp=_idleTimestamp;
- long max_idle_time=getMaxIdleTime();
-
- if (idleTimestamp!=0 && max_idle_time>0)
+ if (_idlecheck)
{
- long idleForMs=now-idleTimestamp;
+ long idleTimestamp=_lastNotIdleTimestamp;
+ long max_idle_time=getMaxIdleTime();
- if (idleForMs>max_idle_time)
+ if (idleTimestamp!=0 && max_idle_time>0)
{
- onIdleExpired(idleForMs);
- _idleTimestamp=now;
+ long idleForMs=now-idleTimestamp;
+
+ if (idleForMs>max_idle_time)
+ {
+ onIdleExpired(idleForMs);
+ _lastNotIdleTimestamp=now;
+ }
}
}
}
/* ------------------------------------------------------------ */
+ @Override
public void onIdleExpired(long idleForMs)
{
- getConnection().onIdleExpired(idleForMs);
+ getSelectableConnection().onIdleExpired(idleForMs);
}
/* ------------------------------------------------------------ */
@@ -199,38 +296,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
/* ------------------------------------------------------------ */
/**
- * Updates selection key. Adds operations types to the selection key as needed. No operations
- * are removed as this is only done during dispatch. This method records the new key and
- * schedules a call to doUpdateKey to do the keyChange
+ * Updates selection key. This method schedules a call to doUpdateKey to do the keyChange
*/
- public void updateKey()
+ private void updateKey()
{
- final boolean changed;
- synchronized (this)
+ int current_ops=-1;
+ if (getChannel().isOpen())
{
- int current_ops=-1;
- if (getChannel().isOpen())
+ try
{
- Socket socket = getSocket();
- boolean read_interest = getConnection().isReadInterested() && !socket.isInputShutdown();
- boolean write_interest= getConnection().isWriteInterested() && !socket.isOutputShutdown();
-
- _interestOps = (read_interest?SelectionKey.OP_READ:0)|(write_interest?SelectionKey.OP_WRITE:0);
- try
- {
- current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
- }
- catch(Exception e)
- {
- _key=null;
- LOG.ignore(e);
- }
+ current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
+ }
+ catch(Exception e)
+ {
+ _key=null;
+ LOG.ignore(e);
}
- changed=_interestOps!=current_ops;
}
-
- if(changed)
+ if (_interestOps!=current_ops && !_changing)
{
+ _changing=true;
_selectSet.addChange(this);
_selectSet.wakeup();
}
@@ -243,8 +328,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
*/
void doUpdateKey()
{
- synchronized (this)
+ _lock.lock();
+ try
{
+ _changing=false;
if (getChannel().isOpen())
{
if (_interestOps>0)
@@ -305,6 +392,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
_key = null;
}
}
+ finally
+ {
+ _lock.unlock();
+ }
}
@@ -328,7 +419,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
updateKey();
}
}
-
+
/* ------------------------------------------------------------ */
@Override
public String toString()
@@ -367,7 +458,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
isOutputShutdown(),
_interestOps,
keyString,
- getConnection());
+ getSelectableConnection());
}
/* ------------------------------------------------------------ */
@@ -375,12 +466,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
{
return _selectSet;
}
-
+
/* ------------------------------------------------------------ */
@Override
- public Connection getConnection()
+ public SelectableConnection getSelectableConnection()
{
return _connection;
}
+
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java
new file mode 100644
index 00000000000..26362565db2
--- /dev/null
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java
@@ -0,0 +1,28 @@
+package org.eclipse.jetty.io;
+
+public interface SelectableConnection extends Connection
+{
+ SelectableEndPoint getSelectableEndPoint();
+
+ Runnable onReadable();
+ Runnable onWriteable();
+
+ public boolean blockReadable();
+
+ public boolean blockWriteable();
+
+ /**
+ * Called when the connection idle timeout expires
+ * @param idleForMs TODO
+ */
+ void onIdleExpired(long idleForMs);
+
+ void onInputShutdown();
+
+ /**
+ * Called when the connection is closed
+ */
+ void onClose();
+
+
+}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java
new file mode 100644
index 00000000000..fcdc3cb85e1
--- /dev/null
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java
@@ -0,0 +1,38 @@
+package org.eclipse.jetty.io;
+
+public interface SelectableEndPoint extends EndPoint
+{
+ public abstract void setWriteInterested(boolean interested);
+
+ public abstract boolean isWriteInterested();
+
+ public abstract void setReadInterested(boolean interested);
+
+ public abstract boolean isReadInterested();
+
+ /* ------------------------------------------------------------ */
+ SelectableConnection getSelectableConnection();
+
+ /* ------------------------------------------------------------ */
+ /** Callback when idle.
+ *
An endpoint is idle if there has been no IO activity for
+ * {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true.
+ * @param idleForMs TODO
+ */
+ public void onIdleExpired(long idleForMs);
+
+ /* ------------------------------------------------------------ */
+ /** Set if the endpoint should be checked for idleness
+ */
+ public void setCheckForIdle(boolean check);
+
+ /* ------------------------------------------------------------ */
+ /** Get if the endpoint should be checked for idleness
+ */
+ public boolean isCheckForIdle();
+
+ public long getLastNotIdleTimestamp();
+
+ public void checkForIdle(long now);
+
+}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
similarity index 98%
rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java
rename to jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
index b15abb4a6ce..6523505d672 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
@@ -11,7 +11,7 @@
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
@@ -338,10 +337,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
/* ------------------------------------------------------------ */
- protected abstract void endPointUpgraded(EndPoint endpoint,Connection oldConnection);
+ protected abstract void endPointUpgraded(SelectChannelEndPoint endpoint,Connection oldConnection);
/* ------------------------------------------------------------------------------- */
- public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment);
+ public abstract SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment);
/* ------------------------------------------------------------ */
/**
@@ -472,7 +471,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
- endpoint.selected();
}
else if (channel.isOpen())
{
@@ -487,7 +485,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
key = channel.register(selector,SelectionKey.OP_READ,null);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
- endpoint.selected();
}
else if (change instanceof ChangeTask)
{
@@ -703,7 +700,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
for (SelectChannelEndPoint endp:_endPoints.keySet())
{
- endp.checkIdleTimestamp(idle_now);
+ endp.checkForIdle(idle_now);
}
}
public String toString() {return "Idle-"+super.toString();}
@@ -842,6 +839,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
LOG.debug("destroyEndPoint {}",endp);
_endPoints.remove(endp);
+ SelectableConnection connection=endp.getSelectableConnection();
+ if (connection!=null)
+ connection.onClose();
endPointClosed(endp);
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java
similarity index 58%
rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java
rename to jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java
index d20d85240ed..61aadcb5ac1 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java
@@ -11,11 +11,12 @@
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
@@ -24,8 +25,6 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
-import org.eclipse.jetty.io.AbstractConnection;
-import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -35,27 +34,27 @@ import org.eclipse.jetty.util.log.Logger;
* An AysyncConnection that acts as an interceptor between and EndPoint and another
* Connection, that implements TLS encryption using an {@link SSLEngine}.
*
- * The connector uses an {@link AsyncEndPoint} (like {@link SelectChannelEndPoint}) as
- * it's source/sink of encrypted data. It then provides {@link #getSslEndPoint()} to
+ * The connector uses an {@link EndPoint} (like {@link SelectChannelEndPoint}) as
+ * it's source/sink of encrypted data. It then provides {@link #getAppEndPoint()} to
* expose a source/sink of unencrypted data to another connection (eg HttpConnection).
*/
-public class SslConnection extends AbstractConnection
+public class SslConnection extends AbstractSelectableConnection
{
- private final Logger _logger = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
+ private static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0);
private static final ThreadLocal __buffers = new ThreadLocal();
private final SSLEngine _engine;
private final SSLSession _session;
- private Connection _connection;
- private final SslEndPoint _sslEndPoint;
+ private SelectableConnection _appConnection;
+ private final AppEndPoint _appEndPoint;
private int _allocations;
private SslBuffers _buffers;
- private ByteBuffer _inbound;
- private ByteBuffer _unwrapBuf;
- private ByteBuffer _outbound;
- private AsyncEndPoint _aEndp;
+ private ByteBuffer _inNet;
+ private ByteBuffer _inApp;
+ private ByteBuffer _outNet;
+ private SelectableEndPoint _endp;
private boolean _allowRenegotiate=true;
private boolean _handshook;
private boolean _ishut;
@@ -67,44 +66,46 @@ public class SslConnection extends AbstractConnection
*/
private static class SslBuffers
{
- final ByteBuffer _in;
- final ByteBuffer _out;
- final ByteBuffer _unwrap;
+ final ByteBuffer _inNet;
+ final ByteBuffer _outNet;
+ final ByteBuffer _inApp;
SslBuffers(int packetSize, int appSize)
{
- _in=BufferUtil.allocateDirect(packetSize);
- _out=BufferUtil.allocateDirect(packetSize);
- _unwrap=BufferUtil.allocate(appSize);
+ _inNet=BufferUtil.allocateDirect(packetSize);
+ _outNet=BufferUtil.allocateDirect(packetSize);
+ _inApp=BufferUtil.allocate(appSize);
}
+
}
-
+
/* ------------------------------------------------------------ */
- public SslConnection(SSLEngine engine,AsyncEndPoint endp)
+ public SslConnection(SSLEngine engine,SelectableEndPoint endp)
{
this(engine,endp,System.currentTimeMillis());
}
/* ------------------------------------------------------------ */
- public SslConnection(SSLEngine engine,AsyncEndPoint endp, long timeStamp)
+ public SslConnection(SSLEngine engine,SelectableEndPoint endp, long timeStamp)
{
- super(endp,timeStamp);
+ super(endp);
_engine=engine;
_session=_engine.getSession();
- _aEndp=(AsyncEndPoint)endp;
- _sslEndPoint = newSslEndPoint();
+ _endp=endp;
+ _appEndPoint = newAppEndPoint();
}
- /* ------------------------------------------------------------ */
- protected SslEndPoint newSslEndPoint()
- {
- return new SslEndPoint();
- }
/* ------------------------------------------------------------ */
- public EndPoint getEndPoint()
+ public void setAppConnection(SelectableConnection connection)
+ {
+ _appConnection=connection;
+ }
+
+ /* ------------------------------------------------------------ */
+ protected AppEndPoint newAppEndPoint()
{
- return _aEndp;
+ return new AppEndPoint();
}
/* ------------------------------------------------------------ */
@@ -143,10 +144,10 @@ public class SslConnection extends AbstractConnection
{
_buffers=__buffers.get();
if (_buffers==null)
- _buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2);
- _inbound=_buffers._in;
- _outbound=_buffers._out;
- _unwrapBuf=_buffers._unwrap;
+ _buffers=new SslBuffers(_session.getPacketBufferSize(),_session.getApplicationBufferSize());
+ _inNet=_buffers._inNet;
+ _outNet=_buffers._outNet;
+ _inApp=_buffers._inApp;
__buffers.set(null);
}
}
@@ -161,16 +162,16 @@ public class SslConnection extends AbstractConnection
if (--_allocations==0)
{
if (_buffers!=null &&
- _inbound.remaining()==0 &&
- _outbound.remaining()==0 &&
- _unwrapBuf.remaining()==0)
+ _inNet.remaining()==0 &&
+ _outNet.remaining()==0 &&
+ _inApp.remaining()==0)
{
- _inbound=null;
- _outbound=null;
- _unwrapBuf=null;
- _buffers._in.clear().limit(0);
- _buffers._out.clear().limit(0);
- _buffers._unwrap.clear().limit(0);
+ _inNet=null;
+ _outNet=null;
+ _inApp=null;
+ _buffers._inNet.clear().limit(0);
+ _buffers._outNet.clear().limit(0);
+ _buffers._inApp.clear().limit(0);
__buffers.set(_buffers);
_buffers=null;
@@ -178,75 +179,18 @@ public class SslConnection extends AbstractConnection
}
}
}
-
- /* ------------------------------------------------------------ */
- public void canRead() throws IOException
- {
- try
- {
- allocateBuffers();
-
- boolean progress=true;
-
- while (progress)
- {
- progress=false;
-
- // If we are handshook let the delegate connection
- if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
- progress=process(null,null);
-
- // handle the delegate connection
- _connection.canRead();
-
- _logger.debug("{} handle {} progress={}", _session, this, progress);
- }
- }
- finally
- {
- releaseBuffers();
-
- if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen())
- {
- _ishut=true;
- try
- {
- _connection.onInputShutdown();
- }
- catch(Throwable x)
- {
- _logger.warn("onInputShutdown failed", x);
- try{_sslEndPoint.close();}
- catch(IOException e2){
- _logger.ignore(e2);}
- }
- }
- }
- }
-
- /* ------------------------------------------------------------ */
- public void canWrite() throws IOException
- {
- // TODO
- }
-
-
/* ------------------------------------------------------------ */
+ @Override
public boolean isIdle()
{
- return _connection.isIdle();
- }
-
- /* ------------------------------------------------------------ */
- public boolean isReadInterested()
- {
- return _connection.isReadInterested();
+ return _appConnection.isIdle();
}
/* ------------------------------------------------------------ */
+ @Override
public void onClose()
{
- _connection.onClose();
+ _appConnection.onClose();
}
/* ------------------------------------------------------------ */
@@ -255,71 +199,98 @@ public class SslConnection extends AbstractConnection
{
try
{
- _logger.debug("onIdleExpired {}ms on {}",idleForMs,this);
+ LOG.debug("onIdleExpired {}ms on {}",idleForMs,this);
if (_endp.isOutputShutdown())
- _sslEndPoint.close();
+ _appEndPoint.close();
else
- _sslEndPoint.shutdownOutput();
+ _appEndPoint.shutdownOutput();
}
catch (IOException e)
{
- _logger.warn(e);
+ LOG.warn(e);
super.onIdleExpired(idleForMs);
}
}
- /* ------------------------------------------------------------ */
- public void onInputShutdown() throws IOException
- {
+ /* ------------------------------------------------------------ */
+ @Override
+ public void doRead()
+ {
+ try
+ {
+ allocateBuffers();
+
+ boolean progress=true;
+ while(progress)
+ {
+ progress=false;
+
+ // Fill the input buffer with everything available
+ if (!BufferUtil.isFull(_inNet))
+ progress|=_endp.fill(_inNet)>0;
+
+ progress|=process(null);
+
+ if (BufferUtil.hasContent(_inApp) && _appEndPoint.isReadInterested())
+ {
+ progress=true;
+ Runnable task =_appConnection.onReadable();
+ if (task!=null)
+ task.run();
+ }
+ }
+ }
+ catch(IOException e)
+ {
+ LOG.warn(e);
+ }
+ finally
+ {
+ releaseBuffers();
+ _endp.setReadInterested(_appEndPoint.isReadInterested());
+ _endp.setWriteInterested(BufferUtil.hasContent(_outNet));
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ @Override
+ public void doWrite()
+ {
+ try
+ {
+ while (BufferUtil.hasContent(_outNet))
+ {
+ int written = _endp.flush(_outNet);
+
+ if (written>0 && _appEndPoint.isWriteInterested())
+ {
+ Runnable task =_appConnection.onWriteable();
+ if (task!=null)
+ task.run();
+ }
+ }
+ }
+ catch(IOException e)
+ {
+ LOG.warn(e);
+ }
+ finally
+ {
+ if (BufferUtil.hasContent(_outNet))
+ _endp.setWriteInterested(true);
+ }
}
/* ------------------------------------------------------------ */
- private synchronized boolean process(ByteBuffer toFill, ByteBuffer toFlush) throws IOException
+ private synchronized boolean process(ByteBuffer appOut) throws IOException
{
boolean some_progress=false;
try
{
- // We need buffers to progress
- allocateBuffers();
-
- // if we don't have a buffer to put received data into
- if (toFill==null)
- {
- // use the unwrapbuffer to hold received data.
- _unwrapBuf.compact().flip();
- toFill=_unwrapBuf;
- }
- // Else if the fill buffer is too small for the SSL session
- else if (toFill.capacity()<_session.getApplicationBufferSize())
- {
- // fill to the temporary unwrapBuffer
- boolean progress=process(null,toFlush);
-
- // if we received any data,
- if (BufferUtil.hasContent(_unwrapBuf))
- {
- // transfer from temp buffer to fill buffer
- BufferUtil.flipPutFlip(_unwrapBuf,toFill);
- return true;
- }
- else
- // return progress from recursive call
- return progress;
- }
- // Else if there is some temporary data
- else if (BufferUtil.hasContent(_unwrapBuf))
- {
- // transfer from temp buffer to fill buffer
- BufferUtil.flipPutFlip(_unwrapBuf,toFill);
- return true;
- }
-
- // If we are here, we have a buffer ready into which we can put some read data.
-
// If we have no data to flush, flush the empty buffer
- if (toFlush==null)
- toFlush=__ZERO_BUFFER;
+ if (appOut==null)
+ appOut=__ZERO_BUFFER;
// While we are making progress processing SSL engine
boolean progress=true;
@@ -327,34 +298,6 @@ public class SslConnection extends AbstractConnection
{
progress=false;
- // Do any real IO
- int filled=0,flushed=0;
- try
- {
- // Read any available data
- if (!BufferUtil.isFull(_inbound) && (filled=_endp.fill(_inbound))>0)
- progress = true;
- else
- _inbound.compact().flip();
-
- // flush any output data
- if (BufferUtil.hasContent(_outbound) && (flushed=_endp.flush(_outbound))>0)
- {
- progress = true;
- _outbound.compact().flip();
- }
-
- }
- catch (IOException e)
- {
- _endp.close();
- throw e;
- }
- finally
- {
- _logger.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.remaining(),flushed,_outbound.remaining());
- }
-
// handle the current hand share status
switch(_engine.getHandshakeStatus())
{
@@ -364,11 +307,11 @@ public class SslConnection extends AbstractConnection
case NOT_HANDSHAKING:
{
// Try unwrapping some application data
- if (!BufferUtil.isFull(toFill) && BufferUtil.hasContent(_inbound) && unwrap(toFill))
+ if (!BufferUtil.isFull(_inApp) && BufferUtil.hasContent(_inNet) && unwrap())
progress=true;
// Try wrapping some application data
- if (BufferUtil.hasContent(toFlush) && !BufferUtil.isFull(_outbound) && wrap(toFlush))
+ if (BufferUtil.hasContent(appOut) && !BufferUtil.isFull(_outNet) && wrap(appOut))
progress=true;
}
break;
@@ -382,7 +325,6 @@ public class SslConnection extends AbstractConnection
progress=true;
task.run();
}
-
}
break;
@@ -391,7 +333,7 @@ public class SslConnection extends AbstractConnection
// The SSL needs to send some handshake data to the other side
if (_handshook && !_allowRenegotiate)
_endp.close();
- else if (wrap(toFlush))
+ else if (wrap(appOut))
progress=true;
}
break;
@@ -401,55 +343,54 @@ public class SslConnection extends AbstractConnection
// The SSL needs to receive some handshake data from the other side
if (_handshook && !_allowRenegotiate)
_endp.close();
- else if (BufferUtil.isEmpty(_inbound)&&filled==-1)
- {
- // No more input coming
- _endp.shutdownInput();
- }
- else if (unwrap(toFill))
+ else if (BufferUtil.isEmpty(_inNet) && _endp.isInputShutdown())
+ _endp.close();
+ else if (unwrap())
progress=true;
}
break;
}
// pass on ishut/oshut state
- if (_endp.isOpen() && _endp.isInputShutdown() && BufferUtil.isEmpty(_inbound))
+ if (_endp.isOpen() && _endp.isInputShutdown() && BufferUtil.isEmpty(_inNet))
_engine.closeInbound();
- if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outbound))
+ if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outNet))
_endp.shutdownOutput();
// remember if any progress has been made
some_progress|=progress;
}
-
- // If we are reading into the temp buffer and it has some content, then we should be dispatched.
- if (toFill==_unwrapBuf && BufferUtil.hasContent(_unwrapBuf))
- _aEndp.asyncDispatch();
}
finally
{
- releaseBuffers();
if (some_progress)
_progressed.set(true);
}
return some_progress;
}
- private synchronized boolean wrap(final ByteBuffer buffer) throws IOException
+ private synchronized boolean wrap(final ByteBuffer outApp) throws IOException
{
final SSLEngineResult result;
- _outbound.compact();
- result=_engine.wrap(buffer,_outbound);
- if (_logger.isDebugEnabled())
- _logger.debug("{} wrap {} {} consumed={} produced={}",
+ int pos=BufferUtil.flipToFill(_outNet);
+ try
+ {
+ result=_engine.wrap(outApp,_outNet);
+ }
+ finally
+ {
+ BufferUtil.flipToFlush(_outNet,pos);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} wrap {} {} consumed={} produced={}",
_session,
result.getStatus(),
result.getHandshakeStatus(),
result.bytesConsumed(),
result.bytesProduced());
- _outbound.flip();
switch(result.getStatus())
{
@@ -465,58 +406,64 @@ public class SslConnection extends AbstractConnection
break;
case CLOSED:
- _logger.debug("wrap CLOSE {} {}",this,result);
+ LOG.debug("wrap CLOSE {} {}",this,result);
if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
_endp.close();
break;
default:
- _logger.debug("{} wrap default {}",_session,result);
+ LOG.debug("{} wrap default {}",_session,result);
throw new IOException(result.toString());
}
- return result.bytesConsumed()>0 || result.bytesProduced()>0;
+ int flushed = _endp.flush(_outNet);
+
+ return result.bytesConsumed()>0 || result.bytesProduced()>0 || flushed>0;
}
- private synchronized boolean unwrap(final ByteBuffer buffer) throws IOException
+ private synchronized boolean unwrap() throws IOException
{
- if (BufferUtil.isEmpty(_inbound))
+ if (BufferUtil.isEmpty(_inNet))
return false;
final SSLEngineResult result;
+ int pos = BufferUtil.flipToFill(_inApp);
try
{
- buffer.compact();
- result=_engine.unwrap(_inbound,buffer);
- buffer.flip();
-
- if (_logger.isDebugEnabled())
- _logger.debug("{} unwrap {} {} consumed={} produced={}",
- _session,
- result.getStatus(),
- result.getHandshakeStatus(),
- result.bytesConsumed(),
- result.bytesProduced());
-
+ result=_engine.unwrap(_inNet,_inApp);
}
catch(SSLException e)
{
- _logger.debug(String.valueOf(_endp), e);
+ LOG.debug(String.valueOf(_endp), e);
_endp.close();
throw e;
}
-
+ finally
+ {
+ BufferUtil.flipToFlush(_inApp,pos);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} unwrap {} {} consumed={} produced={}",
+ _session,
+ result.getStatus(),
+ result.getHandshakeStatus(),
+ result.bytesConsumed(),
+ result.bytesProduced());
+
switch(result.getStatus())
{
case BUFFER_UNDERFLOW:
- _inbound.compact().flip();
+ // need to wait for more net data
+ _inNet.compact().flip();
if (_endp.isInputShutdown())
- _inbound.clear().limit(0);
+ _inNet.clear().limit(0);
break;
case BUFFER_OVERFLOW:
- _logger.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound,buffer);
+ // need to wait until more app data has been consumed.
+ LOG.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inNet,_inApp);
break;
case OK:
@@ -525,13 +472,13 @@ public class SslConnection extends AbstractConnection
break;
case CLOSED:
- _logger.debug("unwrap CLOSE {} {}",this,result);
+ LOG.debug("unwrap CLOSE {} {}",this,result);
if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
_endp.close();
break;
default:
- _logger.debug("{} wrap default {}",_session,result);
+ LOG.debug("{} wrap default {}",_session,result);
throw new IOException(result.toString());
}
@@ -542,42 +489,54 @@ public class SslConnection extends AbstractConnection
}
/* ------------------------------------------------------------ */
- public AsyncEndPoint getSslEndPoint()
+ @Override
+ public void onInputShutdown()
+ {
+ }
+
+ /* ------------------------------------------------------------ */
+ public SelectableEndPoint getAppEndPoint()
{
- return _sslEndPoint;
+ return _appEndPoint;
}
/* ------------------------------------------------------------ */
+ @Override
public String toString()
{
- return String.format("%s %s", super.toString(), _sslEndPoint);
+ return String.format("%s %s", super.toString(), _appEndPoint);
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
- public class SslEndPoint implements AsyncEndPoint
+ public class AppEndPoint implements SelectableEndPoint
{
+ boolean _readInterested=true;
+ boolean _writeInterested;
+
public SSLEngine getSslEngine()
{
return _engine;
}
- public AsyncEndPoint getIoEndPoint()
+ public EndPoint getIoEndPoint()
{
- return _aEndp;
+ return _endp;
}
+ @Override
public void shutdownOutput() throws IOException
{
synchronized (SslConnection.this)
{
- _logger.debug("{} ssl endp.oshut {}",_session,this);
+ LOG.debug("{} ssl endp.oshut {}",_session,this);
_engine.closeOutbound();
_oshut=true;
}
flush();
}
+ @Override
public boolean isOutputShutdown()
{
synchronized (SslConnection.this)
@@ -586,34 +545,44 @@ public class SslConnection extends AbstractConnection
}
}
+ @Override
public void shutdownInput() throws IOException
{
- _logger.debug("{} ssl endp.ishut!",_session);
+ LOG.debug("{} ssl endp.ishut!",_session);
// We do not do a closeInput here, as SSL does not support half close.
// isInputShutdown works it out itself from buffer state and underlying endpoint state.
}
+ @Override
public boolean isInputShutdown()
{
synchronized (SslConnection.this)
{
return _endp.isInputShutdown() &&
- !(_unwrapBuf!=null&&BufferUtil.hasContent(_unwrapBuf)) &&
- !(_inbound!=null&&BufferUtil.hasContent(_inbound));
+ !(_inApp!=null&&BufferUtil.hasContent(_inApp)) &&
+ !(_inNet!=null&&BufferUtil.hasContent(_inNet));
}
}
+ @Override
public void close() throws IOException
{
- _logger.debug("{} ssl endp.close",_session);
+ LOG.debug("{} ssl endp.close",_session);
_endp.close();
}
+ @Override
public int fill(ByteBuffer buffer) throws IOException
{
int size=buffer.remaining();
- process(buffer, null);
+ synchronized (this)
+ {
+ if (!BufferUtil.hasContent(_inApp))
+ process(null);
+ if (BufferUtil.hasContent(_inApp))
+ BufferUtil.flipPutFlip(_inApp,buffer);
+ }
int filled=buffer.remaining()-size;
if (filled==0 && isInputShutdown())
@@ -621,31 +590,35 @@ public class SslConnection extends AbstractConnection
return filled;
}
+ @Override
public int flush(ByteBuffer... buffers) throws IOException
{
int len=0;
- for (ByteBuffer b : buffers)
+ bufloop: for (ByteBuffer b : buffers)
{
- if (b.hasRemaining())
+ while (b.hasRemaining())
{
int l = b.remaining();
- process(null, b);
+ if (!process(b))
+ break bufloop;
l=l-b.remaining();
if (l>0)
len+=l;
else
- break;
+ break bufloop;
}
}
return len;
}
+ @Override
public boolean isOpen()
{
return _endp.isOpen();
}
+ @Override
public Object getTransport()
{
return _endp;
@@ -653,67 +626,71 @@ public class SslConnection extends AbstractConnection
public void flush() throws IOException
{
- process(null, null);
+ process(null);
}
+ @Override
public void onIdleExpired(long idleForMs)
{
- _aEndp.onIdleExpired(idleForMs);
+ _endp.onIdleExpired(idleForMs);
}
+ @Override
public void setCheckForIdle(boolean check)
{
- _aEndp.setCheckForIdle(check);
+ _endp.setCheckForIdle(check);
}
+ @Override
public boolean isCheckForIdle()
{
- return _aEndp.isCheckForIdle();
+ return _endp.isCheckForIdle();
}
+ @Override
public InetSocketAddress getLocalAddress()
{
- return _aEndp.getLocalAddress();
+ return _endp.getLocalAddress();
}
+ @Override
public InetSocketAddress getRemoteAddress()
{
- return _aEndp.getRemoteAddress();
- }
-
- public boolean isBlocking()
- {
- return false;
+ return _endp.getRemoteAddress();
}
+ @Override
public int getMaxIdleTime()
{
- return _aEndp.getMaxIdleTime();
+ return _endp.getMaxIdleTime();
}
+ @Override
public void setMaxIdleTime(int timeMs) throws IOException
{
- _aEndp.setMaxIdleTime(timeMs);
- }
-
- public Connection getConnection()
- {
- return _connection;
+ _endp.setMaxIdleTime(timeMs);
}
- public void setConnection(Connection connection)
+ @Override
+ public SelectableConnection getSelectableConnection()
{
- _connection=(Connection)connection;
+ return _appConnection;
}
+ public void setSelectableConnection(SelectableConnection connection)
+ {
+ _appConnection=(AbstractSelectableConnection)connection;
+ }
+
+ @Override
public String toString()
{
// Do NOT use synchronized (SslConnection.this)
// because it's very easy to deadlock when debugging is enabled.
// We do a best effort to print the right toString() and that's it.
- ByteBuffer inbound = _inbound;
- ByteBuffer outbound = _outbound;
- ByteBuffer unwrap = _unwrapBuf;
+ ByteBuffer inbound = _inNet;
+ ByteBuffer outbound = _outNet;
+ ByteBuffer unwrap = _inApp;
int i = inbound == null? -1 : inbound.remaining();
int o = outbound == null ? -1 : outbound.remaining();
int u = unwrap == null ? -1 : unwrap.remaining();
@@ -721,7 +698,42 @@ public class SslConnection extends AbstractConnection
_engine.getHandshakeStatus(),
i, o, u,
_ishut, _oshut,
- _connection);
+ _appConnection);
+ }
+
+ @Override
+ public void setWriteInterested(boolean interested)
+ {
+ _writeInterested=interested;
+ }
+
+ @Override
+ public boolean isWriteInterested()
+ {
+ return _writeInterested;
+ }
+
+ @Override
+ public void setReadInterested(boolean interested)
+ {
+ _readInterested=interested;
+ }
+
+ @Override
+ public boolean isReadInterested()
+ {
+ return _readInterested;
+ }
+
+ @Override
+ public long getLastNotIdleTimestamp()
+ {
+ return _endp.getLastNotIdleTimestamp();
+ }
+
+ @Override
+ public void checkForIdle(long now)
+ {
}
}
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StringEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/StringEndPoint.java
similarity index 96%
rename from jetty-io/src/main/java/org/eclipse/jetty/io/bio/StringEndPoint.java
rename to jetty-io/src/main/java/org/eclipse/jetty/io/StringEndPoint.java
index ea5f4fc717d..cda32506825 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StringEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/StringEndPoint.java
@@ -11,12 +11,11 @@
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
-package org.eclipse.jetty.io.bio;
+package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
-import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/BufferUtilTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java
similarity index 97%
rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/BufferUtilTest.java
rename to jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java
index 33062eec964..ae02dc36296 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/BufferUtilTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java
@@ -1,4 +1,4 @@
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/ChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ChannelEndPointTest.java
similarity index 93%
rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/ChannelEndPointTest.java
rename to jetty-io/src/test/java/org/eclipse/jetty/io/ChannelEndPointTest.java
index 1d8d5b2c7eb..6c849ca0a09 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/ChannelEndPointTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ChannelEndPointTest.java
@@ -1,9 +1,9 @@
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import org.eclipse.jetty.io.EndPointTest;
+import org.eclipse.jetty.io.ChannelEndPoint;
import org.junit.AfterClass;
import org.junit.BeforeClass;
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/NIOTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java
similarity index 99%
rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/NIOTest.java
rename to jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java
index 9366d8ce843..6ff25feb2aa 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/NIOTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java
@@ -11,7 +11,7 @@
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java
similarity index 93%
rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointSslTest.java
rename to jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java
index 66dbff1f51c..919f9f1e08e 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointSslTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java
@@ -1,4 +1,4 @@
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import java.io.File;
import java.io.IOException;
@@ -11,6 +11,10 @@ import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLSocket;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.SelectorManager;
+import org.eclipse.jetty.io.SslConnection;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@@ -41,14 +45,14 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
}
@Override
- protected Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
+ protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint)
{
SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(false);
SslConnection connection = new SslConnection(engine,endpoint);
- Connection delegate = super.newConnection(channel,connection.getSslEndPoint());
- connection.getSslEndPoint().setConnection(delegate);
+ SelectableConnection delegate = super.newConnection(channel,connection.getAppEndPoint());
+ connection.setAppConnection(delegate);
return connection;
}
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
similarity index 81%
rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java
rename to jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
index 9b1f5a3bba0..a4672478860 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
@@ -1,4 +1,4 @@
-package org.eclipse.jetty.io.nio;
+package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -20,8 +20,11 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.eclipse.jetty.io.AbstractConnection;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.AbstractSelectableConnection;
import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.SelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@@ -32,7 +35,7 @@ import org.junit.Test;
public class SelectChannelEndPointTest
{
- protected SelectChannelEndPoint _lastEndp;
+ protected SelectableEndPoint _lastEndp;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected SelectorManager _manager = new SelectorManager()
@@ -54,12 +57,12 @@ public class SelectChannelEndPointTest
}
@Override
- protected void endPointUpgraded(EndPoint endpoint, Connection oldConnection)
+ protected void endPointUpgraded(SelectChannelEndPoint endpoint, Connection oldConnection)
{
}
@Override
- public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
+ public SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment)
{
return SelectChannelEndPointTest.this.newConnection(channel,endpoint);
}
@@ -68,7 +71,8 @@ public class SelectChannelEndPointTest
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000);
- endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
+ endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
+ endp.setReadInterested(true);
_lastEndp=endp;
return endp;
}
@@ -99,88 +103,94 @@ public class SelectChannelEndPointTest
return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort());
}
- protected Connection newConnection(SocketChannel channel, EndPoint endpoint)
+ protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint)
{
return new TestConnection(endpoint);
}
- public class TestConnection extends AbstractConnection implements Connection
+ public class TestConnection extends AbstractSelectableConnection
{
ByteBuffer _in = BufferUtil.allocate(32*1024);
ByteBuffer _out = BufferUtil.allocate(32*1024);
- public TestConnection(EndPoint endp)
+ public TestConnection(SelectableEndPoint endp)
{
super(endp);
}
- public void canRead()
+ @Override
+ public void doRead()
{
- boolean progress=true;
- while(progress)
+ try
{
- progress=false;
- _in.compact().flip();
- if (!BufferUtil.isFull(_in) && _endp.fill(_in)>0)
+ boolean progress=true;
+ while(progress)
{
- progress=true;
- }
-
-
- while (_blockAt>0 && _in.remaining()>0 && _in.remaining()<_blockAt)
- {
- // ((AsyncEndPoint)_endp).blockReadable(10000);
- if (!BufferUtil.isFull(_in) && _endp.fill(_in)>0)
+ progress=false;
+
+ // Fill the input buffer with everything available
+ if (!BufferUtil.isFull(_in))
+ progress|=_endp.fill(_in)>0;
+ // If the tests wants to block, then block
+ while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt && blockReadable())
+ progress|=_endp.fill(_in)>0;
+
+ // Copy to the out buffer
+ if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in,_out)>0)
progress=true;
+
+ // Try non blocking write
+ if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0)
+
+ // Try blocking write
+ while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out))
+ {
+ blockWriteable();
+ if (_endp.flush(_out)>0)
+ progress=true;
+ }
}
-
- if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in,_out)>0)
- progress=true;
-
- if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0)
- progress=true;
-
- _out.compact().flip();
-
- if (BufferUtil.isEmpty(_out) && _endp.isInputShutdown())
- _endp.shutdownOutput();
+ }
+ catch(IOException e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ _endp.setReadInterested(true);
}
}
-
- public void canWrite()
+
+
+ @Override
+ public void onInputShutdown()
{
-
+ try
+ {
+ if (BufferUtil.isEmpty(_out))
+ _endp.shutdownOutput();
+ }
+ catch(IOException e)
+ {
+ e.printStackTrace();
+ }
}
-
+
+ @Override
+ public void onClose()
+ {
+ }
+
+ @Override
public boolean isIdle()
{
return false;
}
-
- @Override
- public boolean isReadInterested()
- {
- return true;
- }
-
- @Override
- public EndPoint getEndPoint()
- {
- return _endp;
- }
-
- public void onClose()
- {
- // System.err.println("onClose");
- }
-
- public void onInputShutdown() throws IOException
- {
- // System.err.println("onInputShutdown");
- }
-
+
+
}
+
@Test
public void testEcho() throws Exception
{
@@ -310,7 +320,7 @@ public class SelectChannelEndPointTest
int specifiedTimeout = 400;
client.setSoTimeout(specifiedTimeout);
- // Write 8 and cause block for 10
+ // Write 8 and cause block waiting for 10
_blockAt=10;
clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush();
@@ -327,7 +337,7 @@ public class SelectChannelEndPointTest
catch(SocketTimeoutException e)
{
int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue();
- System.err.println("blocked for " + elapsed+ "ms");
+ // System.err.println("blocked for " + elapsed+ "ms");
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3*specifiedTimeout/4));
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
index e0d528221e5..b33ba39a3b7 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
@@ -28,9 +28,9 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Buffers.Type;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
-import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
@@ -1142,12 +1142,10 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
/* ------------------------------------------------------------ */
protected void connectionClosed(Connection connection)
{
- connection.onClose();
-
if (_statsStartedAt.get() == -1)
return;
- long duration = System.currentTimeMillis() - connection.getTimeStamp();
+ long duration = System.currentTimeMillis() - connection.getCreatedTimeStamp();
int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0;
_requestStats.set(requests);
_connectionStats.decrement();
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
index 220d83b0d5b..54b011aef79 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
@@ -27,17 +27,17 @@ import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpGenerator.Action;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.io.AbstractConnection;
+import org.eclipse.jetty.io.AbstractSelectableConnection;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
-import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
*/
-public abstract class HttpConnection extends AbstractConnection
+public abstract class HttpConnection extends AbstractSelectableConnection
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java
index 6b795adc127..c85b347dd12 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java
@@ -20,7 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteArrayEndPoint;
-import org.eclipse.jetty.io.nio.Connection;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java
index fec8221972d..c9c25ab7561 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java
@@ -20,8 +20,9 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
-import org.eclipse.jetty.io.nio.SelectorManager;
+import org.eclipse.jetty.io.SelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectableEndPoint;
+import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
@@ -449,7 +450,7 @@ public class ConnectHandler extends HandlerWrapper
}
@Override
- protected void endPointClosed(SelectChannelEndPoint endpoint)
+ protected void endPointClosed(SelectableEndPoint endpoint)
{
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java
index 072de4cf778..8129a9ddda2 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java
@@ -21,9 +21,10 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.io.NetworkTrafficListener;
-import org.eclipse.jetty.io.nio.NetworkTrafficSelectChannelEndPoint;
-import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
-import org.eclipse.jetty.io.nio.SelectorManager;
+import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectableEndPoint;
+import org.eclipse.jetty.io.SelectorManager;
/**
* A specialized version of {@link SelectChannelConnector} that supports {@link NetworkTrafficListener}s.
@@ -60,7 +61,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector
}
@Override
- protected void endPointClosed(SelectChannelEndPoint endpoint)
+ protected void endPointClosed(SelectableEndPoint endpoint)
{
super.endPointClosed(endpoint);
((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed();
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java
index 9c620c54fc5..2dda923bae4 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java
@@ -21,11 +21,12 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.continuation.Continuation;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.nio.Connection;
-import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
-import org.eclipse.jetty.io.nio.SelectorManager;
-import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
+import org.eclipse.jetty.io.SelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectableEndPoint;
+import org.eclipse.jetty.io.SelectorManager;
+import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.thread.ThreadPool;
@@ -255,7 +256,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
}
/* ------------------------------------------------------------------------------- */
- protected void endPointClosed(SelectChannelEndPoint endpoint)
+ protected void endPointClosed(SelectableEndPoint endpoint)
{
connectionClosed(endpoint.getConnection());
}
@@ -282,7 +283,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
}
@Override
- protected void endPointClosed(final SelectChannelEndPoint endpoint)
+ protected void endPointClosed(final SelectableEndPoint endpoint)
{
SelectChannelConnector.this.endPointClosed(endpoint);
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java
index 3029fb9b6a2..5fc717b3c36 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java
@@ -28,7 +28,7 @@ import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.BuffersFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RuntimeIOException;
-import org.eclipse.jetty.io.nio.SslConnection;
+import org.eclipse.jetty.io.SslConnection;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
@@ -98,7 +98,7 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
request.setScheme(HttpScheme.HTTPS);
super.customize(endpoint,request);
- SslConnection.SslEndPoint sslEndpoint=(SslConnection.SslEndPoint)endpoint;
+ SslConnection.AppEndPoint sslEndpoint=(SslConnection.AppEndPoint)endpoint;
SSLEngine sslEngine=sslEndpoint.getSslEngine();
SSLSession sslSession=sslEngine.getSession();
@@ -548,8 +548,8 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
{
SSLEngine engine = createSSLEngine(channel);
SslConnection connection = newSslConnection(endpoint, engine);
- Connection delegate = newPlainConnection(channel, connection.getSslEndPoint());
- connection.getSslEndPoint().setConnection(delegate);
+ Connection delegate = newPlainConnection(channel, connection.getAppEndPoint());
+ connection.getAppEndPoint().setConnection(delegate);
connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate());
return connection;
}
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java
index cf0b18df910..e85c20a474f 100644
--- a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java
@@ -31,7 +31,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.nio.SslConnection;
+import org.eclipse.jetty.io.SslConnection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.junit.Assert;
@@ -149,8 +149,8 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
// Get the server side endpoint
EndPoint endp = endpoint.exchange(null,10,TimeUnit.SECONDS);
- if (endp instanceof SslConnection.SslEndPoint)
- endp=((SslConnection.SslEndPoint)endp).getEndpoint();
+ if (endp instanceof SslConnection.AppEndPoint)
+ endp=((SslConnection.AppEndPoint)endp).getEndpoint();
// read the response
String result=IO.toString(is);
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java
index 3bd3cf3920e..091a2c325b6 100644
--- a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java
@@ -6,14 +6,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectableEndPoint;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.IO;
import org.junit.Test;
public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
{
- volatile SelectChannelEndPoint _endp;
+ volatile SelectableEndPoint _endp;
@Override
protected Connector initConnector()
@@ -24,7 +24,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
public void customize(EndPoint endpoint, Request request) throws IOException
{
super.customize(endpoint,request);
- _endp=(SelectChannelEndPoint)endpoint;
+ _endp=(SelectableEndPoint)endpoint;
}
};
@@ -54,7 +54,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest
try
{
TimeUnit.MILLISECONDS.sleep(200);
- SelectChannelEndPoint endp=_endp;
+ SelectableEndPoint endp=_endp;
if (endp!=null && endp.isOpen())
endp.asyncDispatch();
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java
index 89ff23650db..c6aedaaacc2 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java
@@ -268,7 +268,7 @@ public class BufferUtil
/**
* Put data from one buffer into another, avoiding over/under flows
* @param from Buffer to take bytes from in flush mode
- * @param to Buffer to put bytes to in flush mode. The buffer is flipped before and after the put.
+ * @param to Buffer to put bytes to in flush mode. The buffer is flipToFill before the put and flipToFlush after.
* @return number of bytes moved
*/
public static int flipPutFlip(ByteBuffer from, ByteBuffer to)
@@ -649,12 +649,12 @@ public class BufferUtil
for (int i=0;i=' ')
+ if (c>=' ' && c<=127)
buf.append(c);
else if (c=='\r'||c=='\n')
buf.append('|');
else
- buf.append('?');
+ buf.append('\ufffd');
if (i==16&&buffer.position()>32)
{
buf.append("...");
@@ -665,12 +665,12 @@ public class BufferUtil
for (int i=buffer.position();i=' ')
+ if (c>=' ' && c<=127)
buf.append(c);
else if (c=='\r'||c=='\n')
buf.append('|');
else
- buf.append('?');
+ buf.append('\ufffd');
if (i==buffer.position()+16&&buffer.limit()>buffer.position()+32)
{
buf.append("...");
@@ -683,12 +683,12 @@ public class BufferUtil
for (int i=limit;i=' ')
+ if (c>=' ' && c<=127)
buf.append(c);
else if (c=='\r'||c=='\n')
buf.append('|');
else
- buf.append('?');
+ buf.append('\ufffd');
if (i==limit+16&&buffer.capacity()>limit+32)
{
buf.append("...");