jetty-9 flush returns boolean to say completely flushed or not

This commit is contained in:
Greg Wilkins 2012-09-13 08:11:05 +10:00
parent 427a01855c
commit b3e8f91026
10 changed files with 82 additions and 99 deletions

View File

@ -159,12 +159,6 @@ public abstract class AbstractEndPoint implements EndPoint
_writeFlusher.write(context, callback, buffers);
}
@Override
public boolean isBufferingOutput()
{
return false;
}
protected abstract void onIncompleteFlush();
protected abstract boolean needsFill() throws IOException;

View File

@ -321,14 +321,15 @@ public class ByteArrayEndPoint extends AbstractEndPoint
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/
@Override
public int flush(ByteBuffer... buffers) throws IOException
public boolean flush(ByteBuffer... buffers) throws IOException
{
if (_closed)
throw new IOException("CLOSED");
if (_oshut)
throw new IOException("OSHUT");
int flushed=0;
boolean flushed=true;
boolean idle=true;
for (ByteBuffer b : buffers)
{
@ -345,13 +346,17 @@ public class ByteArrayEndPoint extends AbstractEndPoint
}
}
flushed+=BufferUtil.flipPutFlip(b,_out);
if (BufferUtil.flipPutFlip(b,_out)>0)
idle=false;
if (BufferUtil.hasContent(b))
{
flushed=false;
break;
}
}
if (flushed>0)
}
if (!idle)
notIdle();
return flushed;
}

View File

@ -154,7 +154,7 @@ public class ChannelEndPoint extends AbstractEndPoint
}
@Override
public int flush(ByteBuffer... buffers) throws IOException
public boolean flush(ByteBuffer... buffers) throws IOException
{
int flushed=0;
try
@ -172,7 +172,7 @@ public class ChannelEndPoint extends AbstractEndPoint
int l=_channel.write(b);
if (l>0)
flushed+=l;
else
if (b.hasRemaining())
break;
}
}
@ -183,16 +183,23 @@ public class ChannelEndPoint extends AbstractEndPoint
{
throw new EofException(e);
}
boolean all_flushed=true;
if (flushed>0)
{
notIdle();
// clear empty buffers to prevent position creeping up the buffer
for (ByteBuffer b : buffers)
{
if (BufferUtil.isEmpty(b))
BufferUtil.clear(b);
else
all_flushed=false;
}
return flushed;
}
return all_flushed;
}
public ByteChannel getChannel()

View File

@ -170,11 +170,12 @@ public interface EndPoint extends Closeable
* Flush data from the passed header/buffer to this endpoint. As many bytes as can be consumed
* are taken from the header/buffer position up until the buffer limit. The header/buffers position
* is updated to indicate how many bytes have been consumed.
* @return True IFF all the buffers have been consumed and the endpoint has flushed the data to its
* destination (ie is not buffering any data).
*
* @return the number of bytes written
* @throws EofException If the endpoint is closed or output is shutdown.
*/
int flush(ByteBuffer... buffer) throws IOException;
boolean flush(ByteBuffer... buffer) throws IOException;
/* ------------------------------------------------------------ */
/**
@ -242,9 +243,4 @@ public interface EndPoint extends Closeable
void onClose();
/**
* @return True if the endpoint is buffering output.
*/
boolean isBufferingOutput();
}

View File

@ -49,23 +49,22 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
}
@Override
public int flush(ByteBuffer... buffers) throws IOException
public boolean flush(ByteBuffer... buffers) throws IOException
{
int written=0;
boolean flushed=true;
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int position = b.position();
int l = super.flush(b);
flushed|=super.flush(b);
int l=b.position()-position;
notifyOutgoing(b, position, l);
if (l==0)
if (!flushed)
break;
else
written+=l;
}
}
return written;
return flushed;
}

View File

@ -297,12 +297,12 @@ abstract public class WriteFlusher
try
{
_endPoint.flush(buffers);
boolean flushed=_endPoint.flush(buffers);
// Are we complete?
for (ByteBuffer b : buffers)
{
if (BufferUtil.hasContent(b))
if (!flushed||BufferUtil.hasContent(b))
{
PendingState<?> pending=new PendingState<>(buffers, context, callback);
if (updateState(__WRITING,pending))
@ -313,17 +313,6 @@ abstract public class WriteFlusher
}
}
// Handle buffering endpoint
if (_endPoint.isBufferingOutput())
{
PendingState<?> pending=new PendingState<>(buffers, context, callback);
if (updateState(__WRITING,pending))
onIncompleteFlushed();
else
fail(new PendingState<>(buffers, context, callback));
return;
}
// If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__WRITING,__IDLE))
ignoreFail();
@ -368,12 +357,12 @@ abstract public class WriteFlusher
{
ByteBuffer[] buffers = pending.getBuffers();
_endPoint.flush(buffers);
boolean flushed=_endPoint.flush(buffers);
// Are we complete?
for (ByteBuffer b : buffers)
{
if (BufferUtil.hasContent(b))
if (!flushed || BufferUtil.hasContent(b))
{
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
@ -383,16 +372,6 @@ abstract public class WriteFlusher
}
}
// Handle buffering endpoint
if (_endPoint.isBufferingOutput())
{
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
else
fail(pending);
return;
}
// If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__COMPLETING,__IDLE))
ignoreFail();

View File

@ -396,12 +396,6 @@ public class SslConnection extends AbstractConnection
}
}
@Override
public boolean isBufferingOutput()
{
return BufferUtil.hasContent(_encryptedOutput);
}
public SslConnection getSslConnection()
{
return SslConnection.this;
@ -613,7 +607,7 @@ public class SslConnection extends AbstractConnection
}
@Override
public synchronized int flush(ByteBuffer... appOuts) throws IOException
public synchronized boolean flush(ByteBuffer... appOuts) throws IOException
{
// TODO remove this when we are certain it is OK
if (Thread.currentThread().getName().contains("selector"))
@ -631,7 +625,7 @@ public class SslConnection extends AbstractConnection
try
{
if (_cannotAcceptMoreAppDataToFlush)
return 0;
return false;
// We will need a network buffer
if (_encryptedOutput == null)
@ -647,13 +641,16 @@ public class SslConnection extends AbstractConnection
LOG.debug("{} wrap {}", SslConnection.this, wrapResult);
BufferUtil.flipToFlush(_encryptedOutput, pos);
if (wrapResult.bytesConsumed()>0)
{
consumed+=wrapResult.bytesConsumed();
boolean all_consumed=true;
// clear empty buffers to prevent position creeping up the buffer
for (ByteBuffer b : appOuts)
{
if (BufferUtil.isEmpty(b))
BufferUtil.clear(b);
else
all_consumed=false;
}
// and deal with the results returned from the sslEngineWrap
@ -669,15 +666,11 @@ public class SslConnection extends AbstractConnection
// the write has progressed normally and let a subsequent call to flush (or WriteFlusher#onIncompleteFlushed)
// to finish writing the close handshake. The caller will find out about the close on a subsequent flush or fill.
if (BufferUtil.hasContent(_encryptedOutput))
return consumed;
return false;
}
// If we were flushing because of a fill needing to wrap, return normally and it will handle the closed state.
if (appOuts[0]==__FILL_CALLED_FLUSH)
return consumed;
// otherwise we have written, and the caller will close the underlying connection
return consumed;
return all_consumed;
case BUFFER_UNDERFLOW:
throw new IllegalStateException();
@ -695,7 +688,7 @@ public class SslConnection extends AbstractConnection
{
case NOT_HANDSHAKING:
// Return with the number of bytes consumed (which may be 0)
return consumed;
return all_consumed&&BufferUtil.isEmpty(_encryptedOutput);
case NEED_TASK:
// run the task and continue
@ -715,7 +708,7 @@ public class SslConnection extends AbstractConnection
_flushRequiresFillToProgress = true;
fill(__FLUSH_CALLED_FILL);
}
return consumed;
return all_consumed&&BufferUtil.isEmpty(_encryptedOutput);
case FINISHED:
throw new IllegalStateException();

View File

@ -115,23 +115,20 @@ public class ByteArrayEndPointTest
ByteArrayEndPoint endp = new ByteArrayEndPoint((byte[])null,15);
endp.setGrowOutput(true);
assertEquals(11,endp.flush(BufferUtil.toBuffer("some output")));
assertEquals(true,endp.flush(BufferUtil.toBuffer("some output")));
assertEquals("some output",endp.getOutputString());
assertEquals(10,endp.flush(BufferUtil.toBuffer(" some more")));
assertEquals(true,endp.flush(BufferUtil.toBuffer(" some more")));
assertEquals("some output some more",endp.getOutputString());
assertEquals(0,endp.flush());
assertEquals(true,endp.flush());
assertEquals("some output some more",endp.getOutputString());
assertEquals(0,endp.flush(BufferUtil.EMPTY_BUFFER));
assertEquals(true,endp.flush(BufferUtil.EMPTY_BUFFER));
assertEquals("some output some more",endp.getOutputString());
assertEquals(9,endp.flush(BufferUtil.EMPTY_BUFFER,BufferUtil.toBuffer(" and"),BufferUtil.toBuffer(" more")));
assertEquals(true,endp.flush(BufferUtil.EMPTY_BUFFER,BufferUtil.toBuffer(" and"),BufferUtil.toBuffer(" more")));
assertEquals("some output some more and more",endp.getOutputString());
}
@Test
@ -142,13 +139,13 @@ public class ByteArrayEndPointTest
endp.setOutput(BufferUtil.allocate(10));
ByteBuffer data = BufferUtil.toBuffer("Some more data.");
assertEquals(10,endp.flush(data));
assertEquals(false,endp.flush(data));
assertEquals("Some more ",endp.getOutputString());
assertEquals("data.",BufferUtil.toString(data));
assertEquals("Some more ",endp.takeOutputString());
assertEquals(5,endp.flush(data));
assertEquals(true,endp.flush(data));
assertEquals("data.",BufferUtil.toString(endp.takeOutput()));
}

View File

@ -348,8 +348,10 @@ public class WriteFlusherTest
}
@Test
public void testConcurrentAccessToWriteAndOnFail() throws IOException, InterruptedException, ExecutionException
public void testConcurrentAccessToWriteAndOnFail() throws Exception
{
// TODO review this test - It was changed for the boolean flush return, but not really well inspected
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCompleteLatch = new CountDownLatch(1);
@ -375,14 +377,28 @@ public class WriteFlusherTest
executor.submit(new Writer(writeFlusher, callback));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
executor.submit(new FailedCaller(writeFlusher, failedCalledLatch)).get();
// callback failed is NOT called because in WRITING state failed() doesn't know about the callback. However
// either the write succeeds or we get an IOException which will call callback.failed()
assertThat("callback failed", callback.isFailed(), is(false));
assertThat("write complete", writeCompleteLatch.await(5, TimeUnit.SECONDS), is(true));
// in this testcase we more or less emulate that the write has successfully finished and we return from
// EndPoint.flush() back to WriteFlusher.write(). Then someone calls failed. So the callback should have been
// completed.
try
{
callback.get(5,TimeUnit.SECONDS);
assertThat("callback completed", callback.isCompleted(), is(true));
assertThat("callback failed", callback.isFailed(), is(false));
}
catch(ExecutionException e)
{
// ignored because failure is expected
assertThat("callback failed", callback.isFailed(), is(true));
}
assertThat("callback completed", callback.isDone(), is(true));
}
private class ExposingStateCallback extends FutureCallback
@ -437,7 +453,7 @@ public class WriteFlusherTest
flushCalledLatch.countDown();
// make sure we stay here, so write is called twice at the same time
Thread.sleep(5000);
return null;
return Boolean.TRUE;
}
});
@ -528,7 +544,7 @@ public class WriteFlusherTest
}
@Override
public int flush(ByteBuffer... buffers) throws IOException
public boolean flush(ByteBuffer... buffers) throws IOException
{
writeCalledLatch.countDown();
ByteBuffer byteBuffer = buffers[0];
@ -549,13 +565,16 @@ public class WriteFlusherTest
else if (byteBuffer.remaining() == 3)
{
byteBuffer.position(1); // pretend writing one byte
return 1;
}
else
{
byteBuffer.position(byteBuffer.limit());
}
return byteBuffer.limit() - oldPos;
for (ByteBuffer b: buffers)
if (BufferUtil.hasContent(b))
return false;
return true;
}
}

View File

@ -60,12 +60,6 @@ public class EmptyEndPoint implements EndPoint
oshut = true;
}
@Override
public boolean isBufferingOutput()
{
return false;
}
@Override
public boolean isOutputShutdown()
{
@ -91,9 +85,9 @@ public class EmptyEndPoint implements EndPoint
}
@Override
public int flush(ByteBuffer... buffer) throws IOException
public boolean flush(ByteBuffer... buffer) throws IOException
{
return 0;
return false;
}
@Override