444415 iterative WriteFlusher

This commit is contained in:
Greg Wilkins 2014-09-18 15:31:59 +10:00
parent 88f4e5ab86
commit 3b066ca2ae
3 changed files with 176 additions and 31 deletions

View File

@ -334,13 +334,13 @@ abstract public class WriteFlusher
try
{
boolean flushed=_endPoint.flush(buffers);
if (DEBUG)
LOG.debug("flushed {}", flushed);
buffers=flush(buffers);
// if we are incomplete?
if (!flushed)
if (buffers!=null)
{
if (DEBUG)
LOG.debug("flushed incomplete");
PendingState pending=new PendingState(buffers, callback);
if (updateState(__WRITING,pending))
onIncompleteFlushed();
@ -379,7 +379,7 @@ abstract public class WriteFlusher
* {@link #onFail(Throwable)} or {@link #onClose()}
*/
public void completeWrite()
{
{
if (DEBUG)
LOG.debug("completeWrite: {}", this);
@ -396,13 +396,15 @@ abstract public class WriteFlusher
{
ByteBuffer[] buffers = pending.getBuffers();
boolean flushed=_endPoint.flush(buffers);
if (DEBUG)
LOG.debug("flushed {}", flushed);
buffers=flush(buffers);
// if we are incomplete?
if (!flushed)
if (buffers!=null)
{
if (DEBUG)
LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
if (buffers!=pending.getBuffers())
pending=new PendingState(buffers, pending._callback);
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
else
@ -426,6 +428,46 @@ abstract public class WriteFlusher
}
}
/* ------------------------------------------------------------ */
/** Flush the buffers iteratively until no progress is made
* @param buffers The buffers to flush
* @return The unflushed buffers, or null if all flushed
* @throws IOException
*/
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{
// try the simple direct flush first, which also ensures that any null buffer
// flushes are passed through (for commits etc.)
if (_endPoint.flush(buffers))
return null;
// We were not fully flushed, so let's try again iteratively while we can make
// some progress
boolean progress=true;
while(true)
{
// Compact buffers array?
int not_empty=0;
while(not_empty<buffers.length && BufferUtil.isEmpty(buffers[not_empty]))
not_empty++;
if (not_empty==buffers.length)
return null;
if (not_empty>0)
buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
if (!progress)
break;
// try to flush the remainder
int r=buffers[0].remaining();
if (_endPoint.flush(buffers))
return null;
progress=r!=buffers[0].remaining();
}
return buffers;
}
/* ------------------------------------------------------------ */
/** Notify the flusher of a failure
* @param cause The cause of the failure

View File

@ -346,12 +346,12 @@ public class SslConnection extends AbstractConnection
@Override
protected void onIncompleteFlush()
{
{
// This means that the decrypted endpoint write method was called and not
// all data could be wrapped. So either we need to write some encrypted data,
// OR if we are handshaking we need to read some encrypted data OR
// if neither then we should just try the flush again.
boolean flush = false;
boolean try_again = false;
synchronized (DecryptedEndPoint.this)
{
if (DEBUG)
@ -372,10 +372,17 @@ public class SslConnection extends AbstractConnection
}
else
{
flush = true;
// We can get here because the WriteFlusher might not see progress
// when it has just flushed the encrypted data, but not consumed anymore
// of the application buffers. This is mostly avoided by another iteration
// within DecryptedEndPoint flush(), but I cannot convince myself that
// this is never ever the case.
try_again = true;
}
}
if (flush)
if (try_again)
{
// If the output is closed,
if (isOutputShutdown())
@ -387,7 +394,9 @@ public class SslConnection extends AbstractConnection
else
{
// try to flush what is pending
getWriteFlusher().completeWrite();
// because this is a special case (see above) we could probably
// avoid the dispatch, but best to be sure
getExecutor().execute(_runCompletWrite);
}
}
}
@ -715,18 +724,12 @@ public class SslConnection extends AbstractConnection
BufferUtil.flipToFlush(_encryptedOutput, pos);
if (wrapResult.bytesConsumed()>0)
consumed+=wrapResult.bytesConsumed();
boolean allConsumed=true;
// clear empty buffers to prevent position creeping up the buffer
for (ByteBuffer b : appOuts)
{
if (BufferUtil.isEmpty(b))
BufferUtil.clear(b);
else
allConsumed=false;
}
Status wrapResultStatus = wrapResult.getStatus();
boolean allConsumed=true;
for (ByteBuffer b : appOuts)
if (BufferUtil.hasContent(b))
allConsumed=false;
// and deal with the results returned from the sslEngineWrap
switch (wrapResultStatus)
@ -779,13 +782,20 @@ public class SslConnection extends AbstractConnection
// if we have net bytes, let's try to flush them
if (BufferUtil.hasContent(_encryptedOutput))
getEndPoint().flush(_encryptedOutput);
if (!getEndPoint().flush(_encryptedOutput));
getEndPoint().flush(_encryptedOutput); // one retry
// But we also might have more to do for the handshaking state.
switch (handshakeStatus)
{
case NOT_HANDSHAKING:
// Return with the number of bytes consumed (which may be 0)
// If we have not consumed all and had just finished handshaking, then we may
// have just flushed the last handshake in the encrypted buffers, so we should
// try again.
if (!allConsumed && wrapResult.getHandshakeStatus()==HandshakeStatus.FINISHED && BufferUtil.isEmpty(_encryptedOutput))
continue;
// Return true if we consumed all the bytes and encrypted are all flushed
return allConsumed && BufferUtil.isEmpty(_encryptedOutput);
case NEED_TASK:

View File

@ -30,10 +30,12 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -41,7 +43,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
@ -509,7 +513,7 @@ public class WriteFlusherTest
BufferUtil.flipToFill(byteBuffer); // pretend everything has been written
writeCalledLatch.countDown();
failedCalledLatch.await(5, TimeUnit.SECONDS);
return null;
return Boolean.TRUE;
}
});
}
@ -522,7 +526,7 @@ public class WriteFlusherTest
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch completeWrite = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch))
final WriteFlusher writeFlusher = new WriteFlusher(new EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(writeCalledLatch, failedCalledLatch))
{
@Override
protected void onIncompleteFlushed()
@ -555,12 +559,13 @@ public class WriteFlusherTest
assertThat("callback complete has not been called", callback.isCompleted(), is(false));
}
private static class EndPointMock extends ByteArrayEndPoint
private static class EndPointConcurrentAccessToIncompleteWriteAndOnFailMock extends ByteArrayEndPoint
{
private final CountDownLatch writeCalledLatch;
private final CountDownLatch failedCalledLatch;
private final AtomicBoolean stalled=new AtomicBoolean(false);
public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
public EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
{
this.writeCalledLatch = writeCalledLatch;
this.failedCalledLatch = failedCalledLatch;
@ -574,6 +579,13 @@ public class WriteFlusherTest
int oldPos = byteBuffer.position();
if (byteBuffer.remaining() == 2)
{
// make sure we stall at least once
if (!stalled.get())
{
stalled.set(true);
return false;
}
// make sure failed is called before we go on
try
{
@ -601,6 +613,87 @@ public class WriteFlusherTest
}
}
@Test
public void testIterationOnNonBlockedStall() throws Exception
{
final Exchanger<Integer> exchange = new Exchanger<>();
final AtomicInteger window = new AtomicInteger(10);
EndPointIterationOnNonBlockedStallMock endp=new EndPointIterationOnNonBlockedStallMock(window);
final WriteFlusher writeFlusher = new WriteFlusher(endp)
{
@Override
protected void onIncompleteFlushed()
{
executor.submit(new Runnable()
{
public void run()
{
try
{
while(window.get()==0)
window.addAndGet(exchange.exchange(0));
completeWrite();
}
catch(Throwable th)
{
th.printStackTrace();
}
}
});
}
};
BlockingCallback callback = new BlockingCallback();
writeFlusher.write(callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow."));
exchange.exchange(0);
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("How now br"));
exchange.exchange(1);
exchange.exchange(0);
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("o"));
exchange.exchange(8);
callback.block();
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("wn cow."));
}
private static class EndPointIterationOnNonBlockedStallMock extends ByteArrayEndPoint
{
final AtomicInteger _window;
public EndPointIterationOnNonBlockedStallMock(AtomicInteger window)
{
_window=window;
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
ByteBuffer byteBuffer = buffers[0];
if (_window.get()>0 && byteBuffer.hasRemaining())
{
// consume 1 byte
byte one = byteBuffer.get(byteBuffer.position());
if (super.flush(ByteBuffer.wrap(new byte[]{one})))
{
_window.decrementAndGet();
byteBuffer.position(byteBuffer.position()+1);
}
}
for (ByteBuffer b: buffers)
if (BufferUtil.hasContent(b))
return false;
return true;
}
}
private static class FailedCaller implements Callable<FutureCallback>
{
private final WriteFlusher writeFlusher;