From d57a11668dd8998c4fb38a58e4f1354fb2526d0f Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 3 Aug 2016 17:22:31 +1000 Subject: [PATCH] Issue #798 Sporadic async IO failure --- .../org/eclipse/jetty/server/HttpOutput.java | 79 ++++++++++--------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index ac599b8c445..8b6e11798de 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -146,6 +146,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable _state.set(OutputState.OPEN); } + private boolean isLastContentToWrite(int len) + { + _written+=len; + return _channel.getResponse().isAllContentWritten(_written); + } + public boolean isAllContentWritten() { return _channel.getResponse().isAllContentWritten(_written); @@ -327,9 +333,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void write(byte[] b, int off, int len) throws IOException { - _written+=len; - boolean complete=_channel.getResponse().isAllContentWritten(_written); - // Async or Blocking ? while(true) { @@ -347,7 +350,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable continue; // Should we aggregate? - if (!complete && len<=_commitSize) + boolean last = isLastContentToWrite(len); + if (!last && len<=_commitSize) { if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers()); @@ -369,7 +373,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable } // Do the asynchronous writing from the callback - new AsyncWrite(b,off,len,complete).iterate(); + new AsyncWrite(b,off,len,last).iterate(); return; case PENDING: @@ -392,7 +396,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Should we aggregate? int capacity = getBufferSize(); - if (!complete && len<=_commitSize) + boolean last = isLastContentToWrite(len); + if (!last && len<=_commitSize) { if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers()); @@ -412,10 +417,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { - write(_aggregate, complete && len==0); + write(_aggregate, last && len==0); // should we fill aggregate again from the buffer? - if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate)) + if (len>0 && !last && len<=_commitSize && len<=BufferUtil.space(_aggregate)) { BufferUtil.append(_aggregate, b, off, len); return; @@ -438,22 +443,19 @@ public class HttpOutput extends ServletOutputStream implements Runnable view.limit(l+Math.min(len,getBufferSize())); view.position(l); } - write(view,complete); + write(view,last); } - else if (complete) + else if (last) { write(BufferUtil.EMPTY_BUFFER,true); } - if (complete) + if (last) closed(); } public void write(ByteBuffer buffer) throws IOException { - _written+=buffer.remaining(); - boolean complete=_channel.getResponse().isAllContentWritten(_written); - // Async or Blocking ? while(true) { @@ -471,7 +473,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable continue; // Do the asynchronous writing from the callback - new AsyncWrite(buffer,complete).iterate(); + boolean last = isLastContentToWrite(buffer.remaining()); + new AsyncWrite(buffer,last).iterate(); return; case PENDING: @@ -493,18 +496,19 @@ public class HttpOutput extends ServletOutputStream implements Runnable // handle blocking write int len=BufferUtil.length(buffer); + boolean last = isLastContentToWrite(len); // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) - write(_aggregate, complete && len==0); + write(_aggregate, last && len==0); // write any remaining content in the buffer directly if (len>0) - write(buffer, complete); - else if (complete) + write(buffer, last); + else if (last) write(BufferUtil.EMPTY_BUFFER, true); - if (complete) + if (last) closed(); } @@ -966,6 +970,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable private abstract class AsyncICB extends IteratingCallback { + final boolean _last; + + AsyncICB(boolean last) + { + _last=last; + } + @Override protected void onCompleteSuccess() { @@ -982,6 +993,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable case UNREADY: if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY)) continue; + if (_last) + closed(); if (_channel.getState().onWritePossible()) _channel.execute(_channel); break; @@ -1011,6 +1024,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable public AsyncFlush() { + super(false); } @Override @@ -1038,21 +1052,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable { private final ByteBuffer _buffer; private final ByteBuffer _slice; - private final boolean _complete; private final int _len; protected volatile boolean _completed; - public AsyncWrite(byte[] b, int off, int len, boolean complete) + public AsyncWrite(byte[] b, int off, int len, boolean last) { + super(last); _buffer=ByteBuffer.wrap(b, off, len); _len=len; // always use a view for large byte arrays to avoid JVM pooling large direct buffers _slice=_len