From 737db225e39ec88167868d0ca6c4ef05eace2d89 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 5 Apr 2012 23:17:30 +1000 Subject: [PATCH] jetty-9 progress on SslConnection tests --- .../jetty/io/SelectChannelEndPoint.java | 2 +- .../jetty/io/SelectableConnection.java | 20 ++- .../org/eclipse/jetty/io/SelectorManager.java | 2 +- .../org/eclipse/jetty/io/SslConnection.java | 124 +++++++++++++++--- .../io/SelectChannelEndPointSslTest.java | 13 +- .../jetty/io/SelectChannelEndPointTest.java | 18 ++- .../eclipse/jetty/server/HttpProcessor.java | 5 + 7 files changed, 148 insertions(+), 36 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 86d832cdea8..ec6b462d4e7 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -32,7 +32,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task; */ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectableEndPoint { - public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); + public static final Logger LOG=Log.getLogger(SelectChannelEndPoint.class); private final Lock _lock = new ReentrantLock(); private final SelectorManager.SelectSet _selectSet; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java index c1b1a5ed895..9a266a5d449 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java @@ -14,12 +14,12 @@ public abstract class SelectableConnection implements Connection { private static final Logger LOG = Log.getLogger(SelectableConnection.class); + protected final Lock _lock=new ReentrantLock(); protected final SelectableEndPoint _endp; private final long _createdTimeStamp; - private final Lock _lock=new ReentrantLock(); private final Condition _readable=_lock.newCondition(); private final Condition _writeable=_lock.newCondition(); - private boolean _readBlocked; + private Thread _readBlocked; private boolean _writeBlocked; private final Runnable _reader=new Runnable() @@ -83,7 +83,7 @@ public abstract class SelectableConnection implements Connection _lock.lock(); try { - if (_readBlocked) + if (_readBlocked!=null) _readable.signalAll(); else return _reader; @@ -118,9 +118,15 @@ public abstract class SelectableConnection implements Connection boolean readable=false; try { - if (_readBlocked) + if (_readBlocked!=null) + { + System.err.println("Already blocked by "+_readBlocked); + for (StackTraceElement e :_readBlocked.getStackTrace()) + System.err.println(" at "+e); + throw new IllegalStateException(); - _readBlocked=true; + } + _readBlocked=Thread.currentThread(); _endp.setReadInterested(true); readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS); } @@ -132,7 +138,7 @@ public abstract class SelectableConnection implements Connection { if (!readable) _endp.setReadInterested(false); - _readBlocked=false; + _readBlocked=null; _lock.unlock(); } return readable; @@ -214,7 +220,7 @@ public abstract class SelectableConnection implements Connection @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x rb=%s wb=%b", getClass().getSimpleName(), hashCode(),_readBlocked,_writeBlocked); } public void onInputShutdown() throws IOException diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 6523505d672..f2b377bd2d0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -49,7 +49,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task; */ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { - public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); + public static final Logger LOG=Log.getLogger(SelectorManager.class); private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java index 968519d7f15..ad4a3cd7c59 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java @@ -18,6 +18,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; @@ -40,7 +42,7 @@ import org.eclipse.jetty.util.log.Logger; */ public class SslConnection extends SelectableConnection { - private static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); + static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0); @@ -59,7 +61,6 @@ public class SslConnection extends SelectableConnection private boolean _handshook; private boolean _ishut; private boolean _oshut; - private final AtomicBoolean _progressed = new AtomicBoolean(); /* ------------------------------------------------------------ */ /* this is a half baked buffer pool @@ -136,7 +137,9 @@ public class SslConnection extends SelectableConnection /* ------------------------------------------------------------ */ private void allocateBuffers() { - synchronized (this) + // TODO remove this lock if always called with lock held? + _lock.lock(); + try { if (_allocations++==0) { @@ -152,12 +155,18 @@ public class SslConnection extends SelectableConnection } } } + finally + { + _lock.unlock(); + } } /* ------------------------------------------------------------ */ private void releaseBuffers() { - synchronized (this) + // TODO remove this lock if always called with lock held? + _lock.lock(); + try { if (--_allocations==0) { @@ -178,6 +187,10 @@ public class SslConnection extends SelectableConnection } } } + finally + { + _lock.unlock(); + } } /* ------------------------------------------------------------ */ @Override @@ -217,6 +230,9 @@ public class SslConnection extends SelectableConnection @Override public void doRead() { + LOG.debug("do Read {}",_endp); + + _lock.lock(); try { allocateBuffers(); @@ -234,10 +250,26 @@ public class SslConnection extends SelectableConnection if (BufferUtil.hasContent(_inApp) && _appEndPoint.isReadInterested()) { + _appEndPoint._readInterested=false; progress=true; Runnable task =_appConnection.onReadable(); + if (task!=null) - task.run(); + { + // We have a task from the application connection. We could + // dispatch this to a thread, but we are likely just to return afterwards. + // So we unlock (so another thread can call doRead if the app blocks) and + // call the app ourselves. + try + { + _lock.unlock(); + task.run(); + } + finally + { + _lock.lock(); + } + } } } } @@ -250,6 +282,8 @@ public class SslConnection extends SelectableConnection releaseBuffers(); _endp.setReadInterested(_appEndPoint.isReadInterested()); _endp.setWriteInterested(BufferUtil.hasContent(_outNet)); + LOG.debug("done Read {}",_endp); + _lock.unlock(); } } @@ -257,6 +291,7 @@ public class SslConnection extends SelectableConnection @Override public void doWrite() { + _lock.lock(); try { while (BufferUtil.hasContent(_outNet)) @@ -279,15 +314,19 @@ public class SslConnection extends SelectableConnection { if (BufferUtil.hasContent(_outNet)) _endp.setWriteInterested(true); + _lock.unlock(); } } /* ------------------------------------------------------------ */ - private synchronized boolean process(ByteBuffer appOut) throws IOException + private boolean process(ByteBuffer appOut) throws IOException { boolean some_progress=false; + _lock.lock(); try { + allocateBuffers(); + // If we have no data to flush, flush the empty buffer if (appOut==null) appOut=__ZERO_BUFFER; @@ -362,15 +401,21 @@ public class SslConnection extends SelectableConnection some_progress|=progress; } } + catch(SSLException e) + { + LOG.warn(e.toString()); + LOG.debug(e); + _endp.close(); + } finally { - if (some_progress) - _progressed.set(true); + releaseBuffers(); + _lock.unlock(); } return some_progress; } - private synchronized boolean wrap(final ByteBuffer outApp) throws IOException + private boolean wrap(final ByteBuffer outApp) throws IOException { final SSLEngineResult result; @@ -421,7 +466,7 @@ public class SslConnection extends SelectableConnection return result.bytesConsumed()>0 || result.bytesProduced()>0 || flushed>0; } - private synchronized boolean unwrap() throws IOException + private boolean unwrap() throws IOException { if (BufferUtil.isEmpty(_inNet)) return false; @@ -511,7 +556,7 @@ public class SslConnection extends SelectableConnection /* ------------------------------------------------------------ */ public class AppEndPoint implements SelectableEndPoint { - boolean _readInterested=true; + boolean _readInterested; boolean _writeInterested; public SSLEngine getSslEngine() @@ -527,22 +572,32 @@ public class SslConnection extends SelectableConnection @Override public void shutdownOutput() throws IOException { - synchronized (SslConnection.this) + _lock.lock(); + try { LOG.debug("{} ssl endp.oshut {}",_session,this); _engine.closeOutbound(); _oshut=true; } + finally + { + _lock.unlock(); + } flush(); } @Override public boolean isOutputShutdown() { - synchronized (SslConnection.this) + _lock.lock(); + try { return _oshut||!isOpen()||_engine.isOutboundDone(); } + finally + { + _lock.unlock(); + } } @Override @@ -556,12 +611,17 @@ public class SslConnection extends SelectableConnection @Override public boolean isInputShutdown() { - synchronized (SslConnection.this) + _lock.lock(); + try { return _endp.isInputShutdown() && !(_inApp!=null&&BufferUtil.hasContent(_inApp)) && !(_inNet!=null&&BufferUtil.hasContent(_inNet)); } + finally + { + _lock.unlock(); + } } @Override @@ -575,7 +635,8 @@ public class SslConnection extends SelectableConnection public int fill(ByteBuffer buffer) throws IOException { int size=buffer.remaining(); - synchronized (this) + _lock.lock(); + try { if (!BufferUtil.hasContent(_inApp)) process(null); @@ -583,6 +644,10 @@ public class SslConnection extends SelectableConnection if (BufferUtil.hasContent(_inApp)) BufferUtil.flipPutFlip(_inApp,buffer); } + finally + { + _lock.unlock(); + } int filled=buffer.remaining()-size; if (filled==0 && isInputShutdown()) @@ -685,7 +750,7 @@ public class SslConnection extends SelectableConnection @Override public String toString() { - // Do NOT use synchronized (SslConnection.this) + // Do NOT use _lock.lock();try // because it's very easy to deadlock when debugging is enabled. // We do a best effort to print the right toString() and that's it. ByteBuffer inbound = _inNet; @@ -694,7 +759,8 @@ public class SslConnection extends SelectableConnection int i = inbound == null? -1 : inbound.remaining(); int o = outbound == null ? -1 : outbound.remaining(); int u = unwrap == null ? -1 : unwrap.remaining(); - return String.format("SSL %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}", + return String.format("SSL %s %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}", + super.toString(), _engine.getHandshakeStatus(), i, o, u, _ishut, _oshut, @@ -704,7 +770,17 @@ public class SslConnection extends SelectableConnection @Override public void setWriteInterested(boolean interested) { - _writeInterested=interested; + _lock.lock(); + try + { + _writeInterested=interested; + if (interested) + _endp.setWriteInterested(true); + } + finally + { + _lock.unlock(); + } } @Override @@ -716,7 +792,17 @@ public class SslConnection extends SelectableConnection @Override public void setReadInterested(boolean interested) { - _readInterested=interested; + _lock.lock(); + try + { + _readInterested=interested; + if (interested) + _endp.setReadInterested(true); + } + finally + { + _lock.unlock(); + } } @Override diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java index 919f9f1e08e..068bc2f5e17 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java @@ -19,6 +19,7 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; @@ -53,6 +54,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest SelectableConnection delegate = super.newConnection(channel,connection.getAppEndPoint()); connection.setAppConnection(delegate); + connection.getAppEndPoint().setReadInterested(endpoint.isReadInterested()); return connection; } @@ -64,17 +66,23 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest } - @Test + @Ignore @Override public void testShutdown() throws Exception { // SSL does not do half closes } + @Override + public void testBlockIn() throws Exception + { + super.testBlockIn(); + } + + @Test public void testTcpClose() throws Exception { - // This test replaces SSLSocket() with a very manual SSL client // so we can close TCP underneath SSL. @@ -183,6 +191,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest } @Test + @Override public void testStress() throws Exception { super.testStress(); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 095be5c5e7f..100187a7745 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -14,6 +14,7 @@ import java.io.PrintStream; import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -71,8 +72,8 @@ public class SelectChannelEndPointTest protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000); - endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); endp.setReadInterested(true); + endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); _lastEndp=endp; return endp; } @@ -141,16 +142,20 @@ public class SelectChannelEndPointTest // Try non blocking write if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0) - + progress=true; + // Try blocking write - while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out)) + while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out) && blockWriteable()) { - blockWriteable(); if (_endp.flush(_out)>0) progress=true; } } } + catch(ClosedChannelException e) + { + System.err.println(e); + } catch(IOException e) { e.printStackTrace(); @@ -317,7 +322,7 @@ public class SelectChannelEndPointTest OutputStream clientOutputStream = client.getOutputStream(); InputStream clientInputStream = client.getInputStream(); - int specifiedTimeout = 400; + int specifiedTimeout = SslConnection.LOG.isDebugEnabled()?2000:400; client.setSoTimeout(specifiedTimeout); // Write 8 and cause block waiting for 10 @@ -325,8 +330,9 @@ public class SelectChannelEndPointTest clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.flush(); + _lastEndp.setMaxIdleTime(10*specifiedTimeout); Thread.sleep(2 * specifiedTimeout); - + // No echo as blocking for 10 long start=System.currentTimeMillis(); try diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpProcessor.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpProcessor.java index 0f31c9e1c19..293dda1ace2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpProcessor.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpProcessor.java @@ -551,12 +551,17 @@ public abstract class HttpProcessor throw new SocketTimeoutException(">"+getMaxIdleTime()+"ms"); try { + setReadInterested(true); _inputQ.wait(timeout); } catch(InterruptedException e) { LOG.ignore(e); } + finally + { + setReadInterested(false); + } content=_inputQ.peekUnsafe(); } }