jetty-9 slow progress on SSL robustness

This commit is contained in:
Greg Wilkins 2012-05-04 17:59:32 +02:00
parent ad108c42c9
commit 6e62ab9bbe
9 changed files with 205 additions and 150 deletions

View File

@ -15,20 +15,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class);
protected final AsyncEndPoint _endp;
private IOFuture.Callback _readCallback = new IOFuture.Callback()
{
@Override
public void onReady()
{
onReadable();
}
@Override
public void onFail(Throwable cause)
{
onReadFail(cause);
}
};
private final IOFuture.Callback _readCallback = new ReadCallback();
public AbstractAsyncConnection(AsyncEndPoint endp)
@ -89,4 +76,27 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
private class ReadCallback implements IOFuture.Callback
{
@Override
public void onReady()
{
onReadable();
}
@Override
public void onFail(Throwable cause)
{
onReadFail(cause);
}
@Override
public String toString()
{
return String.format("AAC$ReadCB@%x",hashCode());
}
}
}

View File

@ -81,9 +81,8 @@ public class CompletedIOFuture implements IOFuture
@Override
public String toString()
{
return String.format("CIOF@%x{r=%b,c=%s}",
return String.format("CIOF@%x{%s}",
hashCode(),
_ready,
_cause);
_ready?"R":_cause);
}
}

View File

@ -265,11 +265,10 @@ public class DispatchedIOFuture implements IOFuture
@Override
public String toString()
{
return String.format("RIOF@%x{c=%b,r=%b,c=%s}",
return String.format("DIOF@%x{%s,%s}",
hashCode(),
_complete,
_ready,
_cause);
_complete?(_ready?"R":_cause):"-",
_callback==null?"-":_callback);
}
public static void rethrow(ExecutionException e) throws IOException

View File

@ -34,7 +34,9 @@ final class RunnableIOFuture extends DispatchedIOFuture
public void run()
{
takeTask().run();
Runnable task=takeTask();
if (task!=null)
task.run();
}
public boolean isDispatched()

View File

@ -40,6 +40,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
private final DispatchedIOFuture _readFuture = new InterestedFuture(SelectionKey.OP_READ,true,_lock);
private final DispatchedIOFuture _writeFuture = new InterestedFuture(SelectionKey.OP_WRITE,true,_lock);
private SelectionKey _key;
private boolean _selected;
@ -54,56 +57,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private volatile boolean _idlecheck;
private volatile AbstractAsyncConnection _connection;
private DispatchedIOFuture _readFuture = new DispatchedIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable task)
{
_manager.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
_interestOps=_interestOps&~SelectionKey.OP_READ;
updateKey();
cancelled();
}
finally
{
_lock.unlock();
}
}
};
private ByteBuffer[] _writeBuffers;
private DispatchedIOFuture _writeFuture = new DispatchedIOFuture(true,_lock)
{
@Override
protected void dispatch(Runnable task)
{
_manager.dispatch(task);
}
@Override
public void cancel()
{
_lock.lock();
try
{
_interestOps=_interestOps&~SelectionKey.OP_WRITE;
updateKey();
cancelled();
}
finally
{
_lock.unlock();
}
}
};
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
@ -475,6 +429,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
@Override
public void close() throws IOException
{
_lock.lock();
try
{
super.close();
@ -486,6 +441,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
finally
{
updateKey();
_lock.unlock();
}
}
@ -518,7 +474,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s}-{%s}",
return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}",
hashCode(),
getRemoteAddress(),
getLocalAddress(),
@ -527,6 +483,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
isOutputShutdown(),
_interestOps,
keyString,
_readFuture,
_writeFuture,
getAsyncConnection());
}
@ -536,6 +494,44 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
return _selectSet;
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class InterestedFuture extends DispatchedIOFuture
{
final int _interest;
private InterestedFuture(int interest,boolean ready, Lock lock)
{
super(ready,lock);
_interest=interest;
}
@Override
protected void dispatch(Runnable task)
{
if (!_manager.dispatch(task))
{
LOG.warn("Dispatch failed: i="+_interest);
throw new IllegalStateException();
}
}
@Override
public void cancel()
{
_lock.lock();
try
{
_interestOps=_interestOps&~_interest;
updateKey();
cancelled();
}
finally
{
_lock.unlock();
}
}
}
}

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.util.log.Logger;
public class SslConnection extends AbstractAsyncConnection
{
static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.ssl");
private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0);
private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
@ -238,31 +239,24 @@ public class SslConnection extends AbstractAsyncConnection
@Override
public void onIdleExpired(long idleForMs)
{
try
{
LOG.debug("onIdleExpired {}ms on {}",idleForMs,this);
if (_endp.isOutputShutdown())
_appEndPoint.close();
else
_appEndPoint.shutdownOutput();
}
catch (IOException e)
{
LOG.warn(e);
super.onIdleExpired(idleForMs);
}
System.err.println("LAST "+(System.currentTimeMillis()-_last));
_appConnection.onIdleExpired(idleForMs);
}
long _last;
/* ------------------------------------------------------------ */
@Override
public void onReadable()
{
LOG.debug("onReadable {}",this);
_lock.lock();
try
{
System.err.println("onReadable");
_last=System.currentTimeMillis();
LOG.debug("onReadable {}",this);
_netReadFuture=null;
allocateBuffers();
@ -292,17 +286,18 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
releaseBuffers();
if (!_appReadFuture.isComplete() && _netReadFuture==null)
if (!_appReadFuture.isComplete() && _netReadFuture==null && !BufferUtil.isFull(_inNet))
_netReadFuture=scheduleOnReadable();
LOG.debug("!onReadable {} {}",this,_netReadFuture);
_lock.unlock();
// Run any ready callback from _appReadFuture in this thread.
_appReadFuture.run();
_appWriteFuture.run();
}
// Run any ready callback from _appReadFuture in this thread.
if (_appReadFuture.isDispatched())
_appReadFuture.run();
}
/* ------------------------------------------------------------ */
@ -414,6 +409,10 @@ public class SslConnection extends AbstractAsyncConnection
}
finally
{
// Has the net data consumed allowed us to release net backpressure?
if (BufferUtil.compact(_inNet) && !_appReadFuture.isComplete() && _netReadFuture==null)
_netReadFuture=scheduleOnReadable();
releaseBuffers();
_lock.unlock();
}
@ -605,6 +604,9 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
_lock.unlock();
_appReadFuture.run();
_appWriteFuture.run();
}
}
@ -654,7 +656,10 @@ public class SslConnection extends AbstractAsyncConnection
process(null);
if (BufferUtil.hasContent(_inApp))
{
BufferUtil.append(_inApp,buffer);
BufferUtil.compact(_inApp);
}
}
finally
{
@ -670,23 +675,31 @@ public class SslConnection extends AbstractAsyncConnection
@Override
public int flush(ByteBuffer... buffers) throws IOException
{
int len=0;
bufloop: for (ByteBuffer b : buffers)
_lock.lock();
try
{
while (b.hasRemaining())
int len=0;
bufloop: for (ByteBuffer b : buffers)
{
int l = b.remaining();
if (!process(b))
break bufloop;
l=l-b.remaining();
while (b.hasRemaining())
{
int l = b.remaining();
if (!process(b))
break bufloop;
l=l-b.remaining();
if (l>0)
len+=l;
else
break bufloop;
if (l>0)
len+=l;
else
break bufloop;
}
}
return len;
}
finally
{
_lock.unlock();
}
return len;
}
@Override
@ -726,11 +739,12 @@ public class SslConnection extends AbstractAsyncConnection
int i = inbound == null? -1 : inbound.remaining();
int o = outbound == null ? -1 : outbound.remaining();
int u = unwrap == null ? -1 : unwrap.remaining();
return String.format("SSL %s %s i/o/u=%d/%d/%d ep.ishut=%b oshut=%b {%s}",
return String.format("SSL%s[%s,i/o/u=%d/%d/%d,ep.ishut=%b,oshut=%b,r=%s,w=%s}-{%s}",
super.toString(),
_engine.getHandshakeStatus(),
i, o, u,
_endp.isInputShutdown(), _oshut,
_appReadFuture,_appWriteFuture,
_appConnection);
}
@ -806,6 +820,8 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
_lock.unlock();
_appReadFuture.run();
_appWriteFuture.run();
}
}
@ -831,9 +847,8 @@ public class SslConnection extends AbstractAsyncConnection
finally
{
_lock.unlock();
if (_appWriteFuture.isDispatched())
_appWriteFuture.run();
_appReadFuture.run();
_appWriteFuture.run();
}
}
}

View File

@ -194,6 +194,6 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
@Override
public void testStress() throws Exception
{
// super.testStress();
super.testStress();
}
}

View File

@ -7,6 +7,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -21,6 +22,7 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AbstractAsyncConnection;
@ -119,6 +121,7 @@ public class SelectChannelEndPointTest
{
ByteBuffer _in = BufferUtil.allocate(32*1024);
ByteBuffer _out = BufferUtil.allocate(32*1024);
long _last=-1;
public TestConnection(AsyncEndPoint endp)
{
@ -126,10 +129,12 @@ public class SelectChannelEndPointTest
}
@Override
public void onReadable()
public synchronized void onReadable()
{
System.err.println("APP onReadable");
try
{
_last=System.currentTimeMillis();
_endp.setCheckForIdle(false);
boolean progress=true;
while(progress)
@ -137,18 +142,17 @@ public class SelectChannelEndPointTest
progress=false;
// Fill the input buffer with everything available
if (!BufferUtil.isFull(_in))
{
int filled=_endp.fill(_in);
if (filled>0)
progress=true;
}
if (BufferUtil.isFull(_in))
throw new IllegalStateException("FULL "+BufferUtil.toDetailString(_in));
int filled=_endp.fill(_in);
if (filled>0)
progress=true;
// If the tests wants to block, then block
while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt)
{
_endp.read().block();
int filled=_endp.fill(_in);
filled=_endp.fill(_in);
progress|=filled>0;
}
@ -162,21 +166,15 @@ public class SelectChannelEndPointTest
ByteBuffer out=_out.duplicate();
BufferUtil.clear(_out);
for (int i=0;i<_writeCount;i++)
{
_endp.write(out.asReadOnlyBuffer()).block();
}
progress=true;
}
// are we done?
if (BufferUtil.isEmpty(_out) && _endp.isInputShutdown())
if (_endp.isInputShutdown())
_endp.shutdownOutput();
}
}
catch(ClosedChannelException e)
{
// System.err.println(e);
}
catch(ExecutionException e)
{
// Timeout does not close, so echo exception then shutdown
@ -190,10 +188,6 @@ public class SelectChannelEndPointTest
e2.printStackTrace();
}
}
catch(InterruptedException e)
{
// System.err.println(e);
}
catch(Exception e)
{
e.printStackTrace();
@ -208,10 +202,28 @@ public class SelectChannelEndPointTest
}
}
@Override
public void onIdleExpired(long idleForMs)
{
System.err.println("IDLE "+idleForMs);
System.err.println("last "+(System.currentTimeMillis()-_last));
System.err.println("ENDP "+_endp);
System.err.println("tran "+_endp.getTransport());
System.err.println();
super.onIdleExpired(idleForMs);
}
@Override
public void onClose()
{
}
@Override
public String toString()
{
return String.format("%s{}",
super.toString());
}
}
@ -365,7 +377,6 @@ public class SelectChannelEndPointTest
catch(SocketTimeoutException e)
{
int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue();
// System.err.println("blocked for " + elapsed+ "ms");
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3*specifiedTimeout/4));
}
@ -491,7 +502,7 @@ public class SelectChannelEndPointTest
public void testStress() throws Exception
{
Socket client = newClient();
client.setSoTimeout(30000);
client.setSoTimeout(60000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
@ -501,17 +512,25 @@ public class SelectChannelEndPointTest
final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET);
byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET);
BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream());
final CountDownLatch latch = new CountDownLatch(writes);
final InputStream in = new BufferedInputStream(client.getInputStream());
final long start = System.currentTimeMillis();
client.getOutputStream().write(bytes);
client.getOutputStream().write(count);
client.getOutputStream().flush();
out.write(bytes);
out.write(count);
out.flush();
while (_lastEndp==null)
Thread.sleep(10);
_lastEndp.setMaxIdleTime(5000);
new Thread()
{
public void run()
{
Thread.currentThread().setPriority(MAX_PRIORITY);
long last=-1;
int count=-1;
try
{
while (latch.getCount()>0)
@ -524,36 +543,42 @@ public class SelectChannelEndPointTest
assertEquals(0xff&b0,b);
}
count=0;
int b=in.read();
while(b>0 && b!='\n')
{
count=count*10+(b-'0');
b=in.read();
}
last=System.currentTimeMillis();
latch.countDown();
}
}
catch(Throwable e)
{
long now = System.currentTimeMillis();
System.err.println("count="+count);
System.err.println("latch="+latch.getCount());
System.err.println("time="+(System.currentTimeMillis()-start));
System.err.println("time="+(now-start));
System.err.println("last="+(now-last));
System.err.println("endp="+_lastEndp);
e.printStackTrace();
}
}
}.start();
PrintStream print = new PrintStream(client.getOutputStream());
// Write client to server
for (int i=1;i<writes;i++)
{
print.write(bytes);
print.print(i);
print.print('\n');
out.write(bytes);
out.write(Integer.toString(i).getBytes(StringUtil.__ISO_8859_1_CHARSET));
out.write('\n');
if (i%100==0)
print.flush();
out.flush();
Thread.yield();
}
client.getOutputStream().flush();
out.flush();
assertTrue(latch.await(100,TimeUnit.SECONDS));
}

View File

@ -17,6 +17,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@ -240,10 +241,13 @@ public class BufferUtil
/* ------------------------------------------------------------ */
/** Compact the buffer
* @param buffer
* @return true if the compact made a full buffer have space
*/
public static void compact(ByteBuffer buffer)
public static boolean compact(ByteBuffer buffer)
{
boolean full=buffer.limit()==buffer.capacity();
buffer.compact().flip();
return full && buffer.limit()<buffer.capacity();
}
/* ------------------------------------------------------------ */
@ -263,6 +267,8 @@ public class BufferUtil
{
to.put(from);
put=remaining;
from.position(0);
from.limit(0);
}
else if (from.hasArray())
{
@ -659,7 +665,10 @@ public class BufferUtil
StringBuilder buf = new StringBuilder();
buf.append(buffer.getClass().getSimpleName());
buf.append("@");
buf.append(Integer.toHexString(buffer.hashCode()));
if (buffer.hasArray())
buf.append(Integer.toHexString(buffer.array().hashCode()));
else
buf.append("?");
buf.append("[p=");
buf.append(buffer.position());
buf.append(",l=");