jetty-9 new simple SslConnection - passing tests even with stress

This commit is contained in:
Greg Wilkins 2012-05-31 15:15:07 +02:00
parent b9e28ba51e
commit 887e27531f
5 changed files with 83 additions and 46 deletions

View File

@ -36,8 +36,9 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
protected void scheduleCompleteWrite() protected boolean canFlush()
{ {
return false;
} }
}; };

View File

@ -63,10 +63,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
protected void scheduleCompleteWrite() protected boolean canFlush()
{ {
_interestOps = _interestOps | SelectionKey.OP_WRITE; _interestOps = _interestOps | SelectionKey.OP_WRITE;
updateKey(); updateKey();
return false;
} }
}; };

View File

@ -15,7 +15,7 @@ import org.eclipse.jetty.util.Callback;
/** /**
* A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)} * A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written. * by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #scheduleCompleteWrite()} is called when not all content has been * The abstract method {@link #canFlush()} is called when not all content has been
* written after a call to flush and should organise for the {@link #completeWrite()} * written after a call to flush and should organise for the {@link #completeWrite()}
* method to be called when a subsequent call to flush should be able to make more progress. * method to be called when a subsequent call to flush should be able to make more progress.
* *
@ -52,8 +52,11 @@ abstract public class WriteFlusher
_buffers=buffers; _buffers=buffers;
_context=context; _context=context;
_callback=callback; _callback=callback;
scheduleCompleteWrite(); if(canFlush())
completeWrite();
else
_writing.set(true); // Needed as memory barrier _writing.set(true); // Needed as memory barrier
return; return;
} }
} }
@ -73,7 +76,7 @@ abstract public class WriteFlusher
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
abstract protected void scheduleCompleteWrite(); abstract protected boolean canFlush();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -101,7 +104,7 @@ abstract public class WriteFlusher
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Complete a write that has not completed and that called * Complete a write that has not completed and that called
* {@link #scheduleCompleteWrite()} to request a call to this * {@link #canFlush()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)} * method when a call to {@link EndPoint#flush(ByteBuffer...)}
* is likely to be able to progress. * is likely to be able to progress.
* @return true if a write was in progress * @return true if a write was in progress
@ -112,6 +115,8 @@ abstract public class WriteFlusher
return false; return false;
try try
{
retry: while(true)
{ {
_buffers=compact(_buffers); _buffers=compact(_buffers);
_endp.flush(_buffers); _endp.flush(_buffers);
@ -121,11 +126,13 @@ abstract public class WriteFlusher
{ {
if (b.hasRemaining()) if (b.hasRemaining())
{ {
scheduleCompleteWrite(); if (canFlush())
continue retry;
return true; return true;
} }
} }
break;
}
// we are complete and ready // we are complete and ready
Callback callback=_callback; Callback callback=_callback;
Object context=_context; Object context=_context;

View File

@ -63,6 +63,8 @@ public class SslConnection extends AbstractAsyncConnection
private ByteBuffer _netOut; private ByteBuffer _netOut;
private final boolean _netDirect=false; private final boolean _netDirect=false;
private final boolean _appDirect=false; private final boolean _appDirect=false;
private SSLEngineResult _unwrapResult;
private SSLEngineResult _wrapResult;
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, AsyncEndPoint endPoint, SSLEngine sslEngine) public SslConnection(ByteBufferPool byteBufferPool, Executor executor, AsyncEndPoint endPoint, SSLEngine sslEngine)
{ {
@ -138,7 +140,6 @@ public class SslConnection extends AbstractAsyncConnection
@Override @Override
public void onReadFail(Throwable cause) public void onReadFail(Throwable cause)
{ {
System.err.println("SSL onReadFail "+cause);
super.onReadFail(cause); super.onReadFail(cause);
} }
@ -157,7 +158,7 @@ public class SslConnection extends AbstractAsyncConnection
public class SslEndPoint extends AbstractEndPoint implements AsyncEndPoint public class SslEndPoint extends AbstractEndPoint implements AsyncEndPoint
{ {
private AsyncConnection _connection; private AsyncConnection _connection;
private boolean _fillWrap; private boolean _fillWrite;
private boolean _writing; private boolean _writing;
private boolean _underflown; private boolean _underflown;
private boolean _ishut=false; private boolean _ishut=false;
@ -169,7 +170,7 @@ public class SslConnection extends AbstractAsyncConnection
{ {
synchronized (SslEndPoint.this) synchronized (SslEndPoint.this)
{ {
LOG.debug("{} write.complete {}",SslConnection.this,_writing?(_fillWrap?"FW":"F"):(_fillWrap?"W":"")); LOG.debug("{} write.complete {}",SslConnection.this,_writing?(_fillWrite?"FW":"F"):(_fillWrite?"W":""));
if (_netOut==null && !_netOut.hasRemaining()) if (_netOut==null && !_netOut.hasRemaining())
{ {
@ -178,9 +179,9 @@ public class SslConnection extends AbstractAsyncConnection
} }
_writing=false; _writing=false;
if (_fillWrap) if (_fillWrite)
{ {
_fillWrap=false; _fillWrite=false;
_readInterest.readable(); _readInterest.readable();
} }
@ -196,9 +197,9 @@ public class SslConnection extends AbstractAsyncConnection
{ {
LOG.debug("{} write.failed",SslConnection.this,x); LOG.debug("{} write.failed",SslConnection.this,x);
_writing=false; _writing=false;
if (_fillWrap) if (_fillWrite)
{ {
_fillWrap=false; _fillWrite=false;
_readInterest.failed(x); _readInterest.failed(x);
} }
@ -238,7 +239,7 @@ public class SslConnection extends AbstractAsyncConnection
return true; return true;
// otherwise write the net data // otherwise write the net data
_fillWrap=true; _fillWrite=true;
_writing=true; _writing=true;
getEndPoint().write(null,_writeCallback,_netOut); getEndPoint().write(null,_writeCallback,_netOut);
} }
@ -254,7 +255,7 @@ public class SslConnection extends AbstractAsyncConnection
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
protected void scheduleCompleteWrite() protected boolean canFlush()
{ {
synchronized (SslEndPoint.this) synchronized (SslEndPoint.this)
{ {
@ -268,6 +269,12 @@ public class SslConnection extends AbstractAsyncConnection
else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP ) else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP )
// we are actually read blocked in order to write // we are actually read blocked in order to write
scheduleOnReadable(); scheduleOnReadable();
else
{
// try the flush again
return true;
}
return false;
} }
} }
}; };
@ -325,12 +332,12 @@ public class SslConnection extends AbstractAsyncConnection
// Let's try the SSL thang even if we have no net data because in that // Let's try the SSL thang even if we have no net data because in that
// case we want to fall through to the handshake handling // case we want to fall through to the handshake handling
int pos=BufferUtil.flipToFill(app_in); int pos=BufferUtil.flipToFill(app_in);
SSLEngineResult result = _sslEngine.unwrap(_netIn,app_in); _unwrapResult = _sslEngine.unwrap(_netIn,app_in);
LOG.debug("{} unwrap {}",SslConnection.this,result); LOG.debug("{} unwrap {}",SslConnection.this,_unwrapResult);
BufferUtil.flipToFlush(app_in,pos); BufferUtil.flipToFlush(app_in,pos);
// and deal with the results // and deal with the results
switch(result.getStatus()) switch(_unwrapResult.getStatus())
{ {
case BUFFER_OVERFLOW: case BUFFER_OVERFLOW:
throw new IllegalStateException(); throw new IllegalStateException();
@ -350,7 +357,7 @@ public class SslConnection extends AbstractAsyncConnection
case NEED_WRAP: case NEED_WRAP:
// we need to send some handshake data // we need to send some handshake data
_fillWrap=true; _fillWrite=true;
flush(BufferUtil.EMPTY_BUFFER); flush(BufferUtil.EMPTY_BUFFER);
getEndPoint().close(); getEndPoint().close();
return -1; return -1;
@ -366,10 +373,10 @@ public class SslConnection extends AbstractAsyncConnection
default: default:
// if we produced bytes, we don't care about the handshake state // if we produced bytes, we don't care about the handshake state
if (result.bytesProduced()>0) if (_unwrapResult.bytesProduced()>0)
{ {
if (app_in==buffer) if (app_in==buffer)
return result.bytesProduced(); return _unwrapResult.bytesProduced();
return BufferUtil.append(_appIn,buffer); return BufferUtil.append(_appIn,buffer);
} }
@ -390,7 +397,7 @@ public class SslConnection extends AbstractAsyncConnection
case NEED_WRAP: case NEED_WRAP:
// TODO maybe just do the wrap here ourselves? // TODO maybe just do the wrap here ourselves?
// we need to send some handshake data // we need to send some handshake data
_fillWrap=true; _fillWrite=true;
flush(BufferUtil.EMPTY_BUFFER); flush(BufferUtil.EMPTY_BUFFER);
continue; continue;
@ -448,21 +455,19 @@ public class SslConnection extends AbstractAsyncConnection
// We will need a network buffer // We will need a network buffer
if (_netOut==null) if (_netOut==null)
_netOut=_bufferPool.acquire(_sslEngine.getSession().getPacketBufferSize(),_netDirect); _netOut=_bufferPool.acquire(_sslEngine.getSession().getPacketBufferSize()*2,_netDirect);
else
BufferUtil.compact(_netOut);
while(true) while(true)
{ {
// do the funky SSL thang! // do the funky SSL thang!
BufferUtil.compact(_netOut);
int pos=BufferUtil.flipToFill(_netOut); int pos=BufferUtil.flipToFill(_netOut);
SSLEngineResult result=_sslEngine.wrap(appOuts,_netOut); _wrapResult=_sslEngine.wrap(appOuts,_netOut);
LOG.debug("{} wrap {}",SslConnection.this,result); LOG.debug("{} wrap {}",SslConnection.this,_wrapResult);
BufferUtil.flipToFlush(_netOut,pos); BufferUtil.flipToFlush(_netOut,pos);
// and deal with the results // and deal with the results
switch(result.getStatus()) switch(_wrapResult.getStatus())
{ {
case CLOSED: case CLOSED:
if (appOuts.length==1 && appOuts[0]==BufferUtil.EMPTY_BUFFER) if (appOuts.length==1 && appOuts[0]==BufferUtil.EMPTY_BUFFER)
@ -472,12 +477,17 @@ public class SslConnection extends AbstractAsyncConnection
case BUFFER_UNDERFLOW: case BUFFER_UNDERFLOW:
throw new IllegalStateException(); throw new IllegalStateException();
case BUFFER_OVERFLOW:
if (LOG.isDebugEnabled())
LOG.debug("{} OVERFLOW {}",this,BufferUtil.toDetailString(_netOut));
//$FALL-THROUGH$
default: default:
// if we have net bytes, let's try to flush them // if we have net bytes, let's try to flush them
if (BufferUtil.hasContent(_netOut)) if (BufferUtil.hasContent(_netOut))
{ {
getEndPoint().flush(_netOut); getEndPoint().flush(_netOut);
return result.bytesConsumed(); return _wrapResult.bytesConsumed();
} }
// Dang! we have to deal with handshake state // Dang! we have to deal with handshake state
@ -573,7 +583,7 @@ public class SslConnection extends AbstractAsyncConnection
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s{%s%s}",super.toString(),_readInterest.isInterested()?"R":"",_writeFlusher.isWriting()?"W":""); return String.format("%s{%s%s%s}",super.toString(),_readInterest.isInterested()?"R":"",_writeFlusher.isWriting()?"W":"",_writing?"w":"");
} }
} }
} }

View File

@ -496,13 +496,13 @@ public class SelectChannelEndPointTest
public void testStress() throws Exception public void testStress() throws Exception
{ {
Socket client = newClient(); Socket client = newClient();
client.setSoTimeout(60000); client.setSoTimeout(30000);
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.accept(server); _manager.accept(server);
int writes = 100000; final int writes = 200000;
final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET);
byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET); byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET);
@ -546,17 +546,23 @@ public class SelectChannelEndPointTest
} }
last=System.currentTimeMillis(); last=System.currentTimeMillis();
//if (latch.getCount()%1000==0)
// System.out.println(writes-latch.getCount());
latch.countDown(); latch.countDown();
} }
} }
catch(Throwable e) catch(Throwable e)
{ {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
System.err.println("count="+count); System.err.println("count="+count);
System.err.println("latch="+latch.getCount()); System.err.println("latch="+latch.getCount());
System.err.println("time="+(now-start)); System.err.println("time="+(now-start));
System.err.println("last="+(now-last)); System.err.println("last="+(now-last));
System.err.println("endp="+_lastEndp); System.err.println("endp="+_lastEndp);
System.err.println("conn="+_lastEndp.getAsyncConnection());
e.printStackTrace(); e.printStackTrace();
} }
} }
@ -568,13 +574,25 @@ public class SelectChannelEndPointTest
out.write(bytes); out.write(bytes);
out.write(Integer.toString(i).getBytes(StringUtil.__ISO_8859_1_CHARSET)); out.write(Integer.toString(i).getBytes(StringUtil.__ISO_8859_1_CHARSET));
out.write('\n'); out.write('\n');
if (i%100==0) if (i%1000==0)
{
//System.err.println(i+"/"+writes);
out.flush(); out.flush();
}
Thread.yield(); Thread.yield();
} }
out.flush(); out.flush();
assertTrue(latch.await(100,TimeUnit.SECONDS)); long last=latch.getCount();
while(!latch.await(5,TimeUnit.SECONDS))
{
//System.err.println(latch.getCount());
if (latch.getCount()==last)
Assert.fail();
last=latch.getCount();
}
assertEquals(0,latch.getCount());
} }