jetty-9 progress on SslConnection tests

This commit is contained in:
Greg Wilkins 2012-04-05 23:17:30 +10:00
parent e7dcd16757
commit 737db225e3
7 changed files with 148 additions and 36 deletions

View File

@ -32,7 +32,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
*/ */
public class SelectChannelEndPoint extends ChannelEndPoint implements SelectableEndPoint public class SelectChannelEndPoint extends ChannelEndPoint implements SelectableEndPoint
{ {
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); public static final Logger LOG=Log.getLogger(SelectChannelEndPoint.class);
private final Lock _lock = new ReentrantLock(); private final Lock _lock = new ReentrantLock();
private final SelectorManager.SelectSet _selectSet; private final SelectorManager.SelectSet _selectSet;

View File

@ -14,12 +14,12 @@ public abstract class SelectableConnection implements Connection
{ {
private static final Logger LOG = Log.getLogger(SelectableConnection.class); private static final Logger LOG = Log.getLogger(SelectableConnection.class);
protected final Lock _lock=new ReentrantLock();
protected final SelectableEndPoint _endp; protected final SelectableEndPoint _endp;
private final long _createdTimeStamp; private final long _createdTimeStamp;
private final Lock _lock=new ReentrantLock();
private final Condition _readable=_lock.newCondition(); private final Condition _readable=_lock.newCondition();
private final Condition _writeable=_lock.newCondition(); private final Condition _writeable=_lock.newCondition();
private boolean _readBlocked; private Thread _readBlocked;
private boolean _writeBlocked; private boolean _writeBlocked;
private final Runnable _reader=new Runnable() private final Runnable _reader=new Runnable()
@ -83,7 +83,7 @@ public abstract class SelectableConnection implements Connection
_lock.lock(); _lock.lock();
try try
{ {
if (_readBlocked) if (_readBlocked!=null)
_readable.signalAll(); _readable.signalAll();
else else
return _reader; return _reader;
@ -118,9 +118,15 @@ public abstract class SelectableConnection implements Connection
boolean readable=false; boolean readable=false;
try try
{ {
if (_readBlocked) if (_readBlocked!=null)
{
System.err.println("Already blocked by "+_readBlocked);
for (StackTraceElement e :_readBlocked.getStackTrace())
System.err.println(" at "+e);
throw new IllegalStateException(); throw new IllegalStateException();
_readBlocked=true; }
_readBlocked=Thread.currentThread();
_endp.setReadInterested(true); _endp.setReadInterested(true);
readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS); readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS);
} }
@ -132,7 +138,7 @@ public abstract class SelectableConnection implements Connection
{ {
if (!readable) if (!readable)
_endp.setReadInterested(false); _endp.setReadInterested(false);
_readBlocked=false; _readBlocked=null;
_lock.unlock(); _lock.unlock();
} }
return readable; return readable;
@ -214,7 +220,7 @@ public abstract class SelectableConnection implements Connection
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x", getClass().getSimpleName(), hashCode()); return String.format("%s@%x rb=%s wb=%b", getClass().getSimpleName(), hashCode(),_readBlocked,_writeBlocked);
} }
public void onInputShutdown() throws IOException public void onInputShutdown() throws IOException

View File

@ -49,7 +49,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
*/ */
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
{ {
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); public static final Logger LOG=Log.getLogger(SelectorManager.class);
private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();

View File

@ -18,6 +18,8 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult;
@ -40,7 +42,7 @@ import org.eclipse.jetty.util.log.Logger;
*/ */
public class SslConnection extends SelectableConnection public class SslConnection extends SelectableConnection
{ {
private static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0); private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0);
@ -59,7 +61,6 @@ public class SslConnection extends SelectableConnection
private boolean _handshook; private boolean _handshook;
private boolean _ishut; private boolean _ishut;
private boolean _oshut; private boolean _oshut;
private final AtomicBoolean _progressed = new AtomicBoolean();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* this is a half baked buffer pool /* this is a half baked buffer pool
@ -136,7 +137,9 @@ public class SslConnection extends SelectableConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private void allocateBuffers() private void allocateBuffers()
{ {
synchronized (this) // TODO remove this lock if always called with lock held?
_lock.lock();
try
{ {
if (_allocations++==0) if (_allocations++==0)
{ {
@ -152,12 +155,18 @@ public class SslConnection extends SelectableConnection
} }
} }
} }
finally
{
_lock.unlock();
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private void releaseBuffers() private void releaseBuffers()
{ {
synchronized (this) // TODO remove this lock if always called with lock held?
_lock.lock();
try
{ {
if (--_allocations==0) if (--_allocations==0)
{ {
@ -178,6 +187,10 @@ public class SslConnection extends SelectableConnection
} }
} }
} }
finally
{
_lock.unlock();
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
@ -217,6 +230,9 @@ public class SslConnection extends SelectableConnection
@Override @Override
public void doRead() public void doRead()
{ {
LOG.debug("do Read {}",_endp);
_lock.lock();
try try
{ {
allocateBuffers(); allocateBuffers();
@ -234,11 +250,27 @@ public class SslConnection extends SelectableConnection
if (BufferUtil.hasContent(_inApp) && _appEndPoint.isReadInterested()) if (BufferUtil.hasContent(_inApp) && _appEndPoint.isReadInterested())
{ {
_appEndPoint._readInterested=false;
progress=true; progress=true;
Runnable task =_appConnection.onReadable(); Runnable task =_appConnection.onReadable();
if (task!=null) if (task!=null)
{
// We have a task from the application connection. We could
// dispatch this to a thread, but we are likely just to return afterwards.
// So we unlock (so another thread can call doRead if the app blocks) and
// call the app ourselves.
try
{
_lock.unlock();
task.run(); task.run();
} }
finally
{
_lock.lock();
}
}
}
} }
} }
catch(IOException e) catch(IOException e)
@ -250,6 +282,8 @@ public class SslConnection extends SelectableConnection
releaseBuffers(); releaseBuffers();
_endp.setReadInterested(_appEndPoint.isReadInterested()); _endp.setReadInterested(_appEndPoint.isReadInterested());
_endp.setWriteInterested(BufferUtil.hasContent(_outNet)); _endp.setWriteInterested(BufferUtil.hasContent(_outNet));
LOG.debug("done Read {}",_endp);
_lock.unlock();
} }
} }
@ -257,6 +291,7 @@ public class SslConnection extends SelectableConnection
@Override @Override
public void doWrite() public void doWrite()
{ {
_lock.lock();
try try
{ {
while (BufferUtil.hasContent(_outNet)) while (BufferUtil.hasContent(_outNet))
@ -279,15 +314,19 @@ public class SslConnection extends SelectableConnection
{ {
if (BufferUtil.hasContent(_outNet)) if (BufferUtil.hasContent(_outNet))
_endp.setWriteInterested(true); _endp.setWriteInterested(true);
_lock.unlock();
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private synchronized boolean process(ByteBuffer appOut) throws IOException private boolean process(ByteBuffer appOut) throws IOException
{ {
boolean some_progress=false; boolean some_progress=false;
_lock.lock();
try try
{ {
allocateBuffers();
// If we have no data to flush, flush the empty buffer // If we have no data to flush, flush the empty buffer
if (appOut==null) if (appOut==null)
appOut=__ZERO_BUFFER; appOut=__ZERO_BUFFER;
@ -362,15 +401,21 @@ public class SslConnection extends SelectableConnection
some_progress|=progress; some_progress|=progress;
} }
} }
catch(SSLException e)
{
LOG.warn(e.toString());
LOG.debug(e);
_endp.close();
}
finally finally
{ {
if (some_progress) releaseBuffers();
_progressed.set(true); _lock.unlock();
} }
return some_progress; return some_progress;
} }
private synchronized boolean wrap(final ByteBuffer outApp) throws IOException private boolean wrap(final ByteBuffer outApp) throws IOException
{ {
final SSLEngineResult result; final SSLEngineResult result;
@ -421,7 +466,7 @@ public class SslConnection extends SelectableConnection
return result.bytesConsumed()>0 || result.bytesProduced()>0 || flushed>0; return result.bytesConsumed()>0 || result.bytesProduced()>0 || flushed>0;
} }
private synchronized boolean unwrap() throws IOException private boolean unwrap() throws IOException
{ {
if (BufferUtil.isEmpty(_inNet)) if (BufferUtil.isEmpty(_inNet))
return false; return false;
@ -511,7 +556,7 @@ public class SslConnection extends SelectableConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public class AppEndPoint implements SelectableEndPoint public class AppEndPoint implements SelectableEndPoint
{ {
boolean _readInterested=true; boolean _readInterested;
boolean _writeInterested; boolean _writeInterested;
public SSLEngine getSslEngine() public SSLEngine getSslEngine()
@ -527,22 +572,32 @@ public class SslConnection extends SelectableConnection
@Override @Override
public void shutdownOutput() throws IOException public void shutdownOutput() throws IOException
{ {
synchronized (SslConnection.this) _lock.lock();
try
{ {
LOG.debug("{} ssl endp.oshut {}",_session,this); LOG.debug("{} ssl endp.oshut {}",_session,this);
_engine.closeOutbound(); _engine.closeOutbound();
_oshut=true; _oshut=true;
} }
finally
{
_lock.unlock();
}
flush(); flush();
} }
@Override @Override
public boolean isOutputShutdown() public boolean isOutputShutdown()
{ {
synchronized (SslConnection.this) _lock.lock();
try
{ {
return _oshut||!isOpen()||_engine.isOutboundDone(); return _oshut||!isOpen()||_engine.isOutboundDone();
} }
finally
{
_lock.unlock();
}
} }
@Override @Override
@ -556,12 +611,17 @@ public class SslConnection extends SelectableConnection
@Override @Override
public boolean isInputShutdown() public boolean isInputShutdown()
{ {
synchronized (SslConnection.this) _lock.lock();
try
{ {
return _endp.isInputShutdown() && return _endp.isInputShutdown() &&
!(_inApp!=null&&BufferUtil.hasContent(_inApp)) && !(_inApp!=null&&BufferUtil.hasContent(_inApp)) &&
!(_inNet!=null&&BufferUtil.hasContent(_inNet)); !(_inNet!=null&&BufferUtil.hasContent(_inNet));
} }
finally
{
_lock.unlock();
}
} }
@Override @Override
@ -575,7 +635,8 @@ public class SslConnection extends SelectableConnection
public int fill(ByteBuffer buffer) throws IOException public int fill(ByteBuffer buffer) throws IOException
{ {
int size=buffer.remaining(); int size=buffer.remaining();
synchronized (this) _lock.lock();
try
{ {
if (!BufferUtil.hasContent(_inApp)) if (!BufferUtil.hasContent(_inApp))
process(null); process(null);
@ -583,6 +644,10 @@ public class SslConnection extends SelectableConnection
if (BufferUtil.hasContent(_inApp)) if (BufferUtil.hasContent(_inApp))
BufferUtil.flipPutFlip(_inApp,buffer); BufferUtil.flipPutFlip(_inApp,buffer);
} }
finally
{
_lock.unlock();
}
int filled=buffer.remaining()-size; int filled=buffer.remaining()-size;
if (filled==0 && isInputShutdown()) if (filled==0 && isInputShutdown())
@ -685,7 +750,7 @@ public class SslConnection extends SelectableConnection
@Override @Override
public String toString() public String toString()
{ {
// Do NOT use synchronized (SslConnection.this) // Do NOT use _lock.lock();try
// because it's very easy to deadlock when debugging is enabled. // because it's very easy to deadlock when debugging is enabled.
// We do a best effort to print the right toString() and that's it. // We do a best effort to print the right toString() and that's it.
ByteBuffer inbound = _inNet; ByteBuffer inbound = _inNet;
@ -694,7 +759,8 @@ public class SslConnection extends SelectableConnection
int i = inbound == null? -1 : inbound.remaining(); int i = inbound == null? -1 : inbound.remaining();
int o = outbound == null ? -1 : outbound.remaining(); int o = outbound == null ? -1 : outbound.remaining();
int u = unwrap == null ? -1 : unwrap.remaining(); int u = unwrap == null ? -1 : unwrap.remaining();
return String.format("SSL %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}", return String.format("SSL %s %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}",
super.toString(),
_engine.getHandshakeStatus(), _engine.getHandshakeStatus(),
i, o, u, i, o, u,
_ishut, _oshut, _ishut, _oshut,
@ -703,8 +769,18 @@ public class SslConnection extends SelectableConnection
@Override @Override
public void setWriteInterested(boolean interested) public void setWriteInterested(boolean interested)
{
_lock.lock();
try
{ {
_writeInterested=interested; _writeInterested=interested;
if (interested)
_endp.setWriteInterested(true);
}
finally
{
_lock.unlock();
}
} }
@Override @Override
@ -715,8 +791,18 @@ public class SslConnection extends SelectableConnection
@Override @Override
public void setReadInterested(boolean interested) public void setReadInterested(boolean interested)
{
_lock.lock();
try
{ {
_readInterested=interested; _readInterested=interested;
if (interested)
_endp.setReadInterested(true);
}
finally
{
_lock.unlock();
}
} }
@Override @Override

View File

@ -19,6 +19,7 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -53,6 +54,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
SelectableConnection delegate = super.newConnection(channel,connection.getAppEndPoint()); SelectableConnection delegate = super.newConnection(channel,connection.getAppEndPoint());
connection.setAppConnection(delegate); connection.setAppConnection(delegate);
connection.getAppEndPoint().setReadInterested(endpoint.isReadInterested());
return connection; return connection;
} }
@ -64,17 +66,23 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
} }
@Test @Ignore
@Override @Override
public void testShutdown() throws Exception public void testShutdown() throws Exception
{ {
// SSL does not do half closes // SSL does not do half closes
} }
@Override
public void testBlockIn() throws Exception
{
super.testBlockIn();
}
@Test @Test
public void testTcpClose() throws Exception public void testTcpClose() throws Exception
{ {
// This test replaces SSLSocket() with a very manual SSL client // This test replaces SSLSocket() with a very manual SSL client
// so we can close TCP underneath SSL. // so we can close TCP underneath SSL.
@ -183,6 +191,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
} }
@Test @Test
@Override
public void testStress() throws Exception public void testStress() throws Exception
{ {
super.testStress(); super.testStress();

View File

@ -14,6 +14,7 @@ import java.io.PrintStream;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
@ -71,8 +72,8 @@ public class SelectChannelEndPointTest
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{ {
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000); SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000);
endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
endp.setReadInterested(true); endp.setReadInterested(true);
endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
_lastEndp=endp; _lastEndp=endp;
return endp; return endp;
} }
@ -141,16 +142,20 @@ public class SelectChannelEndPointTest
// Try non blocking write // Try non blocking write
if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0) if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0)
progress=true;
// Try blocking write // Try blocking write
while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out)) while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out) && blockWriteable())
{ {
blockWriteable();
if (_endp.flush(_out)>0) if (_endp.flush(_out)>0)
progress=true; progress=true;
} }
} }
} }
catch(ClosedChannelException e)
{
System.err.println(e);
}
catch(IOException e) catch(IOException e)
{ {
e.printStackTrace(); e.printStackTrace();
@ -317,7 +322,7 @@ public class SelectChannelEndPointTest
OutputStream clientOutputStream = client.getOutputStream(); OutputStream clientOutputStream = client.getOutputStream();
InputStream clientInputStream = client.getInputStream(); InputStream clientInputStream = client.getInputStream();
int specifiedTimeout = 400; int specifiedTimeout = SslConnection.LOG.isDebugEnabled()?2000:400;
client.setSoTimeout(specifiedTimeout); client.setSoTimeout(specifiedTimeout);
// Write 8 and cause block waiting for 10 // Write 8 and cause block waiting for 10
@ -325,6 +330,7 @@ public class SelectChannelEndPointTest
clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush(); clientOutputStream.flush();
_lastEndp.setMaxIdleTime(10*specifiedTimeout);
Thread.sleep(2 * specifiedTimeout); Thread.sleep(2 * specifiedTimeout);
// No echo as blocking for 10 // No echo as blocking for 10

View File

@ -551,12 +551,17 @@ public abstract class HttpProcessor
throw new SocketTimeoutException(">"+getMaxIdleTime()+"ms"); throw new SocketTimeoutException(">"+getMaxIdleTime()+"ms");
try try
{ {
setReadInterested(true);
_inputQ.wait(timeout); _inputQ.wait(timeout);
} }
catch(InterruptedException e) catch(InterruptedException e)
{ {
LOG.ignore(e); LOG.ignore(e);
} }
finally
{
setReadInterested(false);
}
content=_inputQ.peekUnsafe(); content=_inputQ.peekUnsafe();
} }
} }