diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index b46f3804dcf..017eef7fff9 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -36,8 +36,9 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @Override - protected void scheduleCompleteWrite() + protected boolean canFlush() { + return false; } }; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 05266030730..d1aa1019185 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -63,10 +63,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @Override - protected void scheduleCompleteWrite() + protected boolean canFlush() { _interestOps = _interestOps | SelectionKey.OP_WRITE; updateKey(); + return false; } }; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index e217f21bb60..756b739f088 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -15,7 +15,7 @@ import org.eclipse.jetty.util.Callback; /** * A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)} * by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written. - * The abstract method {@link #scheduleCompleteWrite()} is called when not all content has been + * The abstract method {@link #canFlush()} is called when not all content has been * written after a call to flush and should organise for the {@link #completeWrite()} * method to be called when a subsequent call to flush should be able to make more progress. * @@ -52,8 +52,11 @@ abstract public class WriteFlusher _buffers=buffers; _context=context; _callback=callback; - scheduleCompleteWrite(); - _writing.set(true); // Needed as memory barrier + if(canFlush()) + completeWrite(); + else + _writing.set(true); // Needed as memory barrier + return; } } @@ -73,7 +76,7 @@ abstract public class WriteFlusher } /* ------------------------------------------------------------ */ - abstract protected void scheduleCompleteWrite(); + abstract protected boolean canFlush(); /* ------------------------------------------------------------ */ @@ -101,7 +104,7 @@ abstract public class WriteFlusher /* ------------------------------------------------------------ */ /** * Complete a write that has not completed and that called - * {@link #scheduleCompleteWrite()} to request a call to this + * {@link #canFlush()} to request a call to this * method when a call to {@link EndPoint#flush(ByteBuffer...)} * is likely to be able to progress. * @return true if a write was in progress @@ -113,19 +116,23 @@ abstract public class WriteFlusher try { - _buffers=compact(_buffers); - _endp.flush(_buffers); - - // Are we complete? - for (ByteBuffer b : _buffers) + retry: while(true) { - if (b.hasRemaining()) + _buffers=compact(_buffers); + _endp.flush(_buffers); + + // Are we complete? + for (ByteBuffer b : _buffers) { - scheduleCompleteWrite(); - return true; + if (b.hasRemaining()) + { + if (canFlush()) + continue retry; + return true; + } } + break; } - // we are complete and ready Callback callback=_callback; Object context=_context; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 349d74aa764..f1bc66f1ec3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -63,6 +63,8 @@ public class SslConnection extends AbstractAsyncConnection private ByteBuffer _netOut; private final boolean _netDirect=false; private final boolean _appDirect=false; + private SSLEngineResult _unwrapResult; + private SSLEngineResult _wrapResult; public SslConnection(ByteBufferPool byteBufferPool, Executor executor, AsyncEndPoint endPoint, SSLEngine sslEngine) { @@ -138,7 +140,6 @@ public class SslConnection extends AbstractAsyncConnection @Override public void onReadFail(Throwable cause) { - System.err.println("SSL onReadFail "+cause); super.onReadFail(cause); } @@ -157,7 +158,7 @@ public class SslConnection extends AbstractAsyncConnection public class SslEndPoint extends AbstractEndPoint implements AsyncEndPoint { private AsyncConnection _connection; - private boolean _fillWrap; + private boolean _fillWrite; private boolean _writing; private boolean _underflown; private boolean _ishut=false; @@ -169,18 +170,18 @@ public class SslConnection extends AbstractAsyncConnection { synchronized (SslEndPoint.this) { - LOG.debug("{} write.complete {}",SslConnection.this,_writing?(_fillWrap?"FW":"F"):(_fillWrap?"W":"")); + LOG.debug("{} write.complete {}",SslConnection.this,_writing?(_fillWrite?"FW":"F"):(_fillWrite?"W":"")); if (_netOut==null && !_netOut.hasRemaining()) { _bufferPool.release(_netOut); _netOut=null; } - + _writing=false; - if (_fillWrap) + if (_fillWrite) { - _fillWrap=false; + _fillWrite=false; _readInterest.readable(); } @@ -196,9 +197,9 @@ public class SslConnection extends AbstractAsyncConnection { LOG.debug("{} write.failed",SslConnection.this,x); _writing=false; - if (_fillWrap) + if (_fillWrite) { - _fillWrap=false; + _fillWrite=false; _readInterest.failed(x); } @@ -238,7 +239,7 @@ public class SslConnection extends AbstractAsyncConnection return true; // otherwise write the net data - _fillWrap=true; + _fillWrite=true; _writing=true; getEndPoint().write(null,_writeCallback,_netOut); } @@ -254,7 +255,7 @@ public class SslConnection extends AbstractAsyncConnection private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @Override - protected void scheduleCompleteWrite() + protected boolean canFlush() { synchronized (SslEndPoint.this) { @@ -268,6 +269,12 @@ public class SslConnection extends AbstractAsyncConnection else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP ) // we are actually read blocked in order to write scheduleOnReadable(); + else + { + // try the flush again + return true; + } + return false; } } }; @@ -325,12 +332,12 @@ public class SslConnection extends AbstractAsyncConnection // Let's try the SSL thang even if we have no net data because in that // case we want to fall through to the handshake handling int pos=BufferUtil.flipToFill(app_in); - SSLEngineResult result = _sslEngine.unwrap(_netIn,app_in); - LOG.debug("{} unwrap {}",SslConnection.this,result); + _unwrapResult = _sslEngine.unwrap(_netIn,app_in); + LOG.debug("{} unwrap {}",SslConnection.this,_unwrapResult); BufferUtil.flipToFlush(app_in,pos); // and deal with the results - switch(result.getStatus()) + switch(_unwrapResult.getStatus()) { case BUFFER_OVERFLOW: throw new IllegalStateException(); @@ -350,7 +357,7 @@ public class SslConnection extends AbstractAsyncConnection case NEED_WRAP: // we need to send some handshake data - _fillWrap=true; + _fillWrite=true; flush(BufferUtil.EMPTY_BUFFER); getEndPoint().close(); return -1; @@ -366,10 +373,10 @@ public class SslConnection extends AbstractAsyncConnection default: // if we produced bytes, we don't care about the handshake state - if (result.bytesProduced()>0) + if (_unwrapResult.bytesProduced()>0) { if (app_in==buffer) - return result.bytesProduced(); + return _unwrapResult.bytesProduced(); return BufferUtil.append(_appIn,buffer); } @@ -390,7 +397,7 @@ public class SslConnection extends AbstractAsyncConnection case NEED_WRAP: // TODO maybe just do the wrap here ourselves? // we need to send some handshake data - _fillWrap=true; + _fillWrite=true; flush(BufferUtil.EMPTY_BUFFER); continue; @@ -448,21 +455,19 @@ public class SslConnection extends AbstractAsyncConnection // We will need a network buffer if (_netOut==null) - _netOut=_bufferPool.acquire(_sslEngine.getSession().getPacketBufferSize(),_netDirect); - else - BufferUtil.compact(_netOut); - + _netOut=_bufferPool.acquire(_sslEngine.getSession().getPacketBufferSize()*2,_netDirect); while(true) { // do the funky SSL thang! + BufferUtil.compact(_netOut); int pos=BufferUtil.flipToFill(_netOut); - SSLEngineResult result=_sslEngine.wrap(appOuts,_netOut); - LOG.debug("{} wrap {}",SslConnection.this,result); + _wrapResult=_sslEngine.wrap(appOuts,_netOut); + LOG.debug("{} wrap {}",SslConnection.this,_wrapResult); BufferUtil.flipToFlush(_netOut,pos); // and deal with the results - switch(result.getStatus()) + switch(_wrapResult.getStatus()) { case CLOSED: if (appOuts.length==1 && appOuts[0]==BufferUtil.EMPTY_BUFFER) @@ -472,12 +477,17 @@ public class SslConnection extends AbstractAsyncConnection case BUFFER_UNDERFLOW: throw new IllegalStateException(); + case BUFFER_OVERFLOW: + if (LOG.isDebugEnabled()) + LOG.debug("{} OVERFLOW {}",this,BufferUtil.toDetailString(_netOut)); + + //$FALL-THROUGH$ default: // if we have net bytes, let's try to flush them if (BufferUtil.hasContent(_netOut)) { getEndPoint().flush(_netOut); - return result.bytesConsumed(); + return _wrapResult.bytesConsumed(); } // Dang! we have to deal with handshake state @@ -573,7 +583,7 @@ public class SslConnection extends AbstractAsyncConnection @Override public String toString() { - return String.format("%s{%s%s}",super.toString(),_readInterest.isInterested()?"R":"",_writeFlusher.isWriting()?"W":""); + return String.format("%s{%s%s%s}",super.toString(),_readInterest.isInterested()?"R":"",_writeFlusher.isWriting()?"W":"",_writing?"w":""); } } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 7361fb6b6f0..aa53f612a80 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -496,13 +496,13 @@ public class SelectChannelEndPointTest public void testStress() throws Exception { Socket client = newClient(); - client.setSoTimeout(60000); + client.setSoTimeout(30000); SocketChannel server = _connector.accept(); server.configureBlocking(false); _manager.accept(server); - int writes = 100000; + final int writes = 200000; final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET); @@ -546,17 +546,23 @@ public class SelectChannelEndPointTest } last=System.currentTimeMillis(); + //if (latch.getCount()%1000==0) + // System.out.println(writes-latch.getCount()); + latch.countDown(); } } catch(Throwable e) { + long now = System.currentTimeMillis(); System.err.println("count="+count); System.err.println("latch="+latch.getCount()); System.err.println("time="+(now-start)); System.err.println("last="+(now-last)); System.err.println("endp="+_lastEndp); + System.err.println("conn="+_lastEndp.getAsyncConnection()); + e.printStackTrace(); } } @@ -568,13 +574,25 @@ public class SelectChannelEndPointTest out.write(bytes); out.write(Integer.toString(i).getBytes(StringUtil.__ISO_8859_1_CHARSET)); out.write('\n'); - if (i%100==0) + if (i%1000==0) + { + //System.err.println(i+"/"+writes); out.flush(); + } Thread.yield(); } out.flush(); - assertTrue(latch.await(100,TimeUnit.SECONDS)); + long last=latch.getCount(); + while(!latch.await(5,TimeUnit.SECONDS)) + { + //System.err.println(latch.getCount()); + if (latch.getCount()==last) + Assert.fail(); + last=latch.getCount(); + } + + assertEquals(0,latch.getCount()); }