From 3b066ca2aef2ccbd27e4ac6ca216d8551a1ece3f Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 18 Sep 2014 15:31:59 +1000 Subject: [PATCH] 444415 iterative WriteFlusher --- .../org/eclipse/jetty/io/WriteFlusher.java | 60 +++++++++-- .../eclipse/jetty/io/ssl/SslConnection.java | 46 ++++---- .../eclipse/jetty/io/WriteFlusherTest.java | 101 +++++++++++++++++- 3 files changed, 176 insertions(+), 31 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index b2daffb2187..b2910dda39c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -334,13 +334,13 @@ abstract public class WriteFlusher try { - boolean flushed=_endPoint.flush(buffers); - if (DEBUG) - LOG.debug("flushed {}", flushed); + buffers=flush(buffers); // if we are incomplete? - if (!flushed) + if (buffers!=null) { + if (DEBUG) + LOG.debug("flushed incomplete"); PendingState pending=new PendingState(buffers, callback); if (updateState(__WRITING,pending)) onIncompleteFlushed(); @@ -379,7 +379,7 @@ abstract public class WriteFlusher * {@link #onFail(Throwable)} or {@link #onClose()} */ public void completeWrite() - { + { if (DEBUG) LOG.debug("completeWrite: {}", this); @@ -396,13 +396,15 @@ abstract public class WriteFlusher { ByteBuffer[] buffers = pending.getBuffers(); - boolean flushed=_endPoint.flush(buffers); - if (DEBUG) - LOG.debug("flushed {}", flushed); + buffers=flush(buffers); // if we are incomplete? - if (!flushed) + if (buffers!=null) { + if (DEBUG) + LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers)); + if (buffers!=pending.getBuffers()) + pending=new PendingState(buffers, pending._callback); if (updateState(__COMPLETING,pending)) onIncompleteFlushed(); else @@ -426,6 +428,46 @@ abstract public class WriteFlusher } } + /* ------------------------------------------------------------ */ + /** Flush the buffers iteratively until no progress is made + * @param buffers The buffers to flush + * @return The unflushed buffers, or null if all flushed + * @throws IOException + */ + protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException + { + // try the simple direct flush first, which also ensures that any null buffer + // flushes are passed through (for commits etc.) + if (_endPoint.flush(buffers)) + return null; + + // We were not fully flushed, so let's try again iteratively while we can make + // some progress + boolean progress=true; + while(true) + { + // Compact buffers array? + int not_empty=0; + while(not_empty0) + buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length); + + if (!progress) + break; + + // try to flush the remainder + int r=buffers[0].remaining(); + if (_endPoint.flush(buffers)) + return null; + progress=r!=buffers[0].remaining(); + } + + return buffers; + } + /* ------------------------------------------------------------ */ /** Notify the flusher of a failure * @param cause The cause of the failure diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 7ac32a0ef34..4610b515252 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -346,12 +346,12 @@ public class SslConnection extends AbstractConnection @Override protected void onIncompleteFlush() - { + { // This means that the decrypted endpoint write method was called and not // all data could be wrapped. So either we need to write some encrypted data, // OR if we are handshaking we need to read some encrypted data OR // if neither then we should just try the flush again. - boolean flush = false; + boolean try_again = false; synchronized (DecryptedEndPoint.this) { if (DEBUG) @@ -372,10 +372,17 @@ public class SslConnection extends AbstractConnection } else { - flush = true; + // We can get here because the WriteFlusher might not see progress + // when it has just flushed the encrypted data, but not consumed anymore + // of the application buffers. This is mostly avoided by another iteration + // within DecryptedEndPoint flush(), but I cannot convince myself that + // this is never ever the case. + try_again = true; } } - if (flush) + + + if (try_again) { // If the output is closed, if (isOutputShutdown()) @@ -387,7 +394,9 @@ public class SslConnection extends AbstractConnection else { // try to flush what is pending - getWriteFlusher().completeWrite(); + // because this is a special case (see above) we could probably + // avoid the dispatch, but best to be sure + getExecutor().execute(_runCompletWrite); } } } @@ -715,18 +724,12 @@ public class SslConnection extends AbstractConnection BufferUtil.flipToFlush(_encryptedOutput, pos); if (wrapResult.bytesConsumed()>0) consumed+=wrapResult.bytesConsumed(); - - boolean allConsumed=true; - // clear empty buffers to prevent position creeping up the buffer - for (ByteBuffer b : appOuts) - { - if (BufferUtil.isEmpty(b)) - BufferUtil.clear(b); - else - allConsumed=false; - } - Status wrapResultStatus = wrapResult.getStatus(); + + boolean allConsumed=true; + for (ByteBuffer b : appOuts) + if (BufferUtil.hasContent(b)) + allConsumed=false; // and deal with the results returned from the sslEngineWrap switch (wrapResultStatus) @@ -779,13 +782,20 @@ public class SslConnection extends AbstractConnection // if we have net bytes, let's try to flush them if (BufferUtil.hasContent(_encryptedOutput)) - getEndPoint().flush(_encryptedOutput); + if (!getEndPoint().flush(_encryptedOutput)); + getEndPoint().flush(_encryptedOutput); // one retry // But we also might have more to do for the handshaking state. switch (handshakeStatus) { case NOT_HANDSHAKING: - // Return with the number of bytes consumed (which may be 0) + // If we have not consumed all and had just finished handshaking, then we may + // have just flushed the last handshake in the encrypted buffers, so we should + // try again. + if (!allConsumed && wrapResult.getHandshakeStatus()==HandshakeStatus.FINISHED && BufferUtil.isEmpty(_encryptedOutput)) + continue; + + // Return true if we consumed all the bytes and encrypted are all flushed return allConsumed && BufferUtil.isEmpty(_encryptedOutput); case NEED_TASK: diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java index 20e08d1eb24..572ff79a6e3 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java @@ -30,10 +30,12 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; +import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,7 +43,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; @@ -509,7 +513,7 @@ public class WriteFlusherTest BufferUtil.flipToFill(byteBuffer); // pretend everything has been written writeCalledLatch.countDown(); failedCalledLatch.await(5, TimeUnit.SECONDS); - return null; + return Boolean.TRUE; } }); } @@ -522,7 +526,7 @@ public class WriteFlusherTest final CountDownLatch writeCalledLatch = new CountDownLatch(1); final CountDownLatch completeWrite = new CountDownLatch(1); - final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch)) + final WriteFlusher writeFlusher = new WriteFlusher(new EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(writeCalledLatch, failedCalledLatch)) { @Override protected void onIncompleteFlushed() @@ -555,12 +559,13 @@ public class WriteFlusherTest assertThat("callback complete has not been called", callback.isCompleted(), is(false)); } - private static class EndPointMock extends ByteArrayEndPoint + private static class EndPointConcurrentAccessToIncompleteWriteAndOnFailMock extends ByteArrayEndPoint { private final CountDownLatch writeCalledLatch; private final CountDownLatch failedCalledLatch; + private final AtomicBoolean stalled=new AtomicBoolean(false); - public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch) + public EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch) { this.writeCalledLatch = writeCalledLatch; this.failedCalledLatch = failedCalledLatch; @@ -574,6 +579,13 @@ public class WriteFlusherTest int oldPos = byteBuffer.position(); if (byteBuffer.remaining() == 2) { + // make sure we stall at least once + if (!stalled.get()) + { + stalled.set(true); + return false; + } + // make sure failed is called before we go on try { @@ -601,6 +613,87 @@ public class WriteFlusherTest } } + @Test + public void testIterationOnNonBlockedStall() throws Exception + { + final Exchanger exchange = new Exchanger<>(); + final AtomicInteger window = new AtomicInteger(10); + EndPointIterationOnNonBlockedStallMock endp=new EndPointIterationOnNonBlockedStallMock(window); + final WriteFlusher writeFlusher = new WriteFlusher(endp) + { + @Override + protected void onIncompleteFlushed() + { + executor.submit(new Runnable() + { + public void run() + { + try + { + while(window.get()==0) + window.addAndGet(exchange.exchange(0)); + completeWrite(); + } + catch(Throwable th) + { + th.printStackTrace(); + } + } + }); + + } + }; + + BlockingCallback callback = new BlockingCallback(); + writeFlusher.write(callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow.")); + exchange.exchange(0); + + Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("How now br")); + + exchange.exchange(1); + exchange.exchange(0); + + Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("o")); + + exchange.exchange(8); + callback.block(); + + Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("wn cow.")); + + } + + private static class EndPointIterationOnNonBlockedStallMock extends ByteArrayEndPoint + { + final AtomicInteger _window; + + public EndPointIterationOnNonBlockedStallMock(AtomicInteger window) + { + _window=window; + } + + @Override + public boolean flush(ByteBuffer... buffers) throws IOException + { + ByteBuffer byteBuffer = buffers[0]; + + if (_window.get()>0 && byteBuffer.hasRemaining()) + { + // consume 1 byte + byte one = byteBuffer.get(byteBuffer.position()); + if (super.flush(ByteBuffer.wrap(new byte[]{one}))) + { + _window.decrementAndGet(); + byteBuffer.position(byteBuffer.position()+1); + } + } + for (ByteBuffer b: buffers) + if (BufferUtil.hasContent(b)) + return false; + return true; + } + } + + private static class FailedCaller implements Callable { private final WriteFlusher writeFlusher;