jetty-9 new simple SslConnection - still with problems

This commit is contained in:
Greg Wilkins 2012-05-31 11:48:47 +02:00
parent 117d7e5ad8
commit b8e2c65fa8
2 changed files with 52 additions and 27 deletions

View File

@ -22,6 +22,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException; import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AbstractAsyncConnection; import org.eclipse.jetty.io.AbstractAsyncConnection;
@ -127,7 +128,9 @@ public class SslConnection extends AbstractAsyncConnection
// do all the filling, unwrapping ,wrapping and flushing // do all the filling, unwrapping ,wrapping and flushing
if (_appEndPoint._readInterest.isInterested()) if (_appEndPoint._readInterest.isInterested())
_appEndPoint._readInterest.readable(); _appEndPoint._readInterest.readable();
else if (_appEndPoint._writeFlusher.isWriting())
else if (_appEndPoint._writeFlusher.isWriting() && _sslEngine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
// If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read
_appEndPoint._writeFlusher.completeWrite(); _appEndPoint._writeFlusher.completeWrite();
} }
@ -147,7 +150,7 @@ public class SslConnection extends AbstractAsyncConnection
{ {
private AsyncConnection _connection; private AsyncConnection _connection;
private boolean _fillWrap; private boolean _fillWrap;
private boolean _flushing; private boolean _writing;
private boolean _ishut=false; private boolean _ishut=false;
private final Callback<Void> _writeCallback = new Callback<Void>(){ private final Callback<Void> _writeCallback = new Callback<Void>(){
@ -157,7 +160,7 @@ public class SslConnection extends AbstractAsyncConnection
{ {
synchronized (SslEndPoint.this) synchronized (SslEndPoint.this)
{ {
LOG.debug("{} write.complete {}",SslConnection.this,_flushing?(_fillWrap?"FW":"F"):(_fillWrap?"W":"")); LOG.debug("{} write.complete {}",SslConnection.this,_writing?(_fillWrap?"FW":"F"):(_fillWrap?"W":""));
if (_netOut==null && !_netOut.hasRemaining()) if (_netOut==null && !_netOut.hasRemaining())
{ {
@ -165,7 +168,7 @@ public class SslConnection extends AbstractAsyncConnection
_netOut=null; _netOut=null;
} }
_flushing=false; _writing=false;
if (_fillWrap) if (_fillWrap)
{ {
_fillWrap=false; _fillWrap=false;
@ -183,7 +186,7 @@ public class SslConnection extends AbstractAsyncConnection
synchronized (SslEndPoint.this) synchronized (SslEndPoint.this)
{ {
LOG.debug("{} write.failed",SslConnection.this,x); LOG.debug("{} write.failed",SslConnection.this,x);
_flushing=false; _writing=false;
if (_fillWrap) if (_fillWrap)
{ {
_fillWrap=false; _fillWrap=false;
@ -202,14 +205,26 @@ public class SslConnection extends AbstractAsyncConnection
@Override @Override
protected boolean readInterested() throws IOException protected boolean readInterested() throws IOException
{ {
if (BufferUtil.hasContent(_appIn)||BufferUtil.hasContent(_netIn)) synchronized (SslEndPoint.this)
return true; {
if (BufferUtil.hasContent(_appIn)||BufferUtil.hasContent(_netIn))
return true;
// TODO handle the case where we need to wrap some more. // Are we actually write blocked?
if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_WRAP && BufferUtil.hasContent(_netOut) )
scheduleOnReadable(); {
return false; // we must be blocked trying to write before we can read, so
} // let's write the netdata
_fillWrap=true;
getEndPoint().write(null,_writeCallback,_netOut);
}
else
// Normal readable callback
scheduleOnReadable();
return false;
}
}
}; };
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
@ -217,11 +232,18 @@ public class SslConnection extends AbstractAsyncConnection
@Override @Override
protected void scheduleCompleteWrite() protected void scheduleCompleteWrite()
{ {
if (BufferUtil.hasContent(_netOut)) synchronized (SslEndPoint.this)
getEndPoint().write(null,_writeCallback,_netOut); {
else // If we have pending output data,
// TODO handle the case where we need to unwrap some more. if (BufferUtil.hasContent(_netOut))
throw new IllegalStateException(); { // write it
_writing=true;
getEndPoint().write(null,_writeCallback,_netOut);
}
else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP )
// we are actually read blocked in order to write
scheduleOnReadable();
}
} }
}; };
@ -356,6 +378,12 @@ public class SslConnection extends AbstractAsyncConnection
} }
} }
} }
catch(SSLException e)
{
getEndPoint().close();
LOG.debug(e);
throw new EofException(e);
}
catch(Exception e) catch(Exception e)
{ {
getEndPoint().close(); getEndPoint().close();
@ -383,7 +411,7 @@ public class SslConnection extends AbstractAsyncConnection
LOG.debug("{} flush",SslConnection.this); LOG.debug("{} flush",SslConnection.this);
try try
{ {
if (_flushing) if (_writing)
return 0; return 0;
// We will need a network buffer // We will need a network buffer
@ -412,12 +440,9 @@ public class SslConnection extends AbstractAsyncConnection
case BUFFER_UNDERFLOW: case BUFFER_UNDERFLOW:
throw new IllegalStateException(); throw new IllegalStateException();
case BUFFER_OVERFLOW:
return 0;
default: default:
// if we produced bytes, let's just flush them // if we have net bytes, let's try to flush them
if (result.bytesProduced()>0) if (BufferUtil.hasContent(_netOut))
{ {
getEndPoint().flush(_netOut); getEndPoint().flush(_netOut);
return result.bytesConsumed(); return result.bytesConsumed();

View File

@ -199,13 +199,13 @@ public class SelectChannelEndPointTest
e2.printStackTrace(); e2.printStackTrace();
} }
} }
catch(InterruptedException e) catch(InterruptedException|EofException e)
{ {
// e.printStackTrace(); SelectChannelEndPoint.LOG.ignore(e);
} }
catch(Exception e) catch(Exception e)
{ {
e.printStackTrace(); SelectChannelEndPoint.LOG.warn(e);
} }
finally finally
{ {
@ -503,7 +503,7 @@ public class SelectChannelEndPointTest
server.configureBlocking(false); server.configureBlocking(false);
_manager.accept(server); _manager.accept(server);
int writes = 100000; int writes = 1000000;
final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET);
byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET); byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET);