Issue #798 Sporadic async IO failure

This commit is contained in:
Greg Wilkins 2016-08-03 17:22:31 +10:00
parent a1885afff8
commit d57a11668d
1 changed files with 42 additions and 37 deletions

View File

@ -146,6 +146,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_state.set(OutputState.OPEN); _state.set(OutputState.OPEN);
} }
private boolean isLastContentToWrite(int len)
{
_written+=len;
return _channel.getResponse().isAllContentWritten(_written);
}
public boolean isAllContentWritten() public boolean isAllContentWritten()
{ {
return _channel.getResponse().isAllContentWritten(_written); return _channel.getResponse().isAllContentWritten(_written);
@ -327,9 +333,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void write(byte[] b, int off, int len) throws IOException public void write(byte[] b, int off, int len) throws IOException
{ {
_written+=len;
boolean complete=_channel.getResponse().isAllContentWritten(_written);
// Async or Blocking ? // Async or Blocking ?
while(true) while(true)
{ {
@ -347,7 +350,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
continue; continue;
// Should we aggregate? // Should we aggregate?
if (!complete && len<=_commitSize) boolean last = isLastContentToWrite(len);
if (!last && len<=_commitSize)
{ {
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers()); _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
@ -369,7 +373,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
// Do the asynchronous writing from the callback // Do the asynchronous writing from the callback
new AsyncWrite(b,off,len,complete).iterate(); new AsyncWrite(b,off,len,last).iterate();
return; return;
case PENDING: case PENDING:
@ -392,7 +396,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Should we aggregate? // Should we aggregate?
int capacity = getBufferSize(); int capacity = getBufferSize();
if (!complete && len<=_commitSize) boolean last = isLastContentToWrite(len);
if (!last && len<=_commitSize)
{ {
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers()); _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
@ -412,10 +417,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
write(_aggregate, complete && len==0); write(_aggregate, last && len==0);
// should we fill aggregate again from the buffer? // should we fill aggregate again from the buffer?
if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate)) if (len>0 && !last && len<=_commitSize && len<=BufferUtil.space(_aggregate))
{ {
BufferUtil.append(_aggregate, b, off, len); BufferUtil.append(_aggregate, b, off, len);
return; return;
@ -438,22 +443,19 @@ public class HttpOutput extends ServletOutputStream implements Runnable
view.limit(l+Math.min(len,getBufferSize())); view.limit(l+Math.min(len,getBufferSize()));
view.position(l); view.position(l);
} }
write(view,complete); write(view,last);
} }
else if (complete) else if (last)
{ {
write(BufferUtil.EMPTY_BUFFER,true); write(BufferUtil.EMPTY_BUFFER,true);
} }
if (complete) if (last)
closed(); closed();
} }
public void write(ByteBuffer buffer) throws IOException public void write(ByteBuffer buffer) throws IOException
{ {
_written+=buffer.remaining();
boolean complete=_channel.getResponse().isAllContentWritten(_written);
// Async or Blocking ? // Async or Blocking ?
while(true) while(true)
{ {
@ -471,7 +473,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
continue; continue;
// Do the asynchronous writing from the callback // Do the asynchronous writing from the callback
new AsyncWrite(buffer,complete).iterate(); boolean last = isLastContentToWrite(buffer.remaining());
new AsyncWrite(buffer,last).iterate();
return; return;
case PENDING: case PENDING:
@ -493,18 +496,19 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// handle blocking write // handle blocking write
int len=BufferUtil.length(buffer); int len=BufferUtil.length(buffer);
boolean last = isLastContentToWrite(len);
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
write(_aggregate, complete && len==0); write(_aggregate, last && len==0);
// write any remaining content in the buffer directly // write any remaining content in the buffer directly
if (len>0) if (len>0)
write(buffer, complete); write(buffer, last);
else if (complete) else if (last)
write(BufferUtil.EMPTY_BUFFER, true); write(BufferUtil.EMPTY_BUFFER, true);
if (complete) if (last)
closed(); closed();
} }
@ -966,6 +970,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private abstract class AsyncICB extends IteratingCallback private abstract class AsyncICB extends IteratingCallback
{ {
final boolean _last;
AsyncICB(boolean last)
{
_last=last;
}
@Override @Override
protected void onCompleteSuccess() protected void onCompleteSuccess()
{ {
@ -982,6 +993,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case UNREADY: case UNREADY:
if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY)) if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
continue; continue;
if (_last)
closed();
if (_channel.getState().onWritePossible()) if (_channel.getState().onWritePossible())
_channel.execute(_channel); _channel.execute(_channel);
break; break;
@ -1011,6 +1024,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public AsyncFlush() public AsyncFlush()
{ {
super(false);
} }
@Override @Override
@ -1038,21 +1052,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
private final ByteBuffer _buffer; private final ByteBuffer _buffer;
private final ByteBuffer _slice; private final ByteBuffer _slice;
private final boolean _complete;
private final int _len; private final int _len;
protected volatile boolean _completed; protected volatile boolean _completed;
public AsyncWrite(byte[] b, int off, int len, boolean complete) public AsyncWrite(byte[] b, int off, int len, boolean last)
{ {
super(last);
_buffer=ByteBuffer.wrap(b, off, len); _buffer=ByteBuffer.wrap(b, off, len);
_len=len; _len=len;
// always use a view for large byte arrays to avoid JVM pooling large direct buffers // always use a view for large byte arrays to avoid JVM pooling large direct buffers
_slice=_len<getBufferSize()?null:_buffer.duplicate(); _slice=_len<getBufferSize()?null:_buffer.duplicate();
_complete=complete;
} }
public AsyncWrite(ByteBuffer buffer, boolean complete) public AsyncWrite(ByteBuffer buffer, boolean last)
{ {
super(last);
_buffer=buffer; _buffer=buffer;
_len=buffer.remaining(); _len=buffer.remaining();
// Use a slice buffer for large indirect to avoid JVM pooling large direct buffers // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
@ -1063,7 +1077,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_slice=_buffer.duplicate(); _slice=_buffer.duplicate();
_buffer.position(_buffer.limit()); _buffer.position(_buffer.limit());
} }
_complete=complete;
} }
@Override @Override
@ -1073,12 +1086,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
_completed=_len==0; _completed=_len==0;
write(_aggregate, _complete && _completed, this); write(_aggregate, _last && _completed, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
// Can we just aggregate the remainder? // Can we just aggregate the remainder?
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize) if (!_last && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
{ {
int position = BufferUtil.flipToFill(_aggregate); int position = BufferUtil.flipToFill(_aggregate);
BufferUtil.put(_buffer,_aggregate); BufferUtil.put(_buffer,_aggregate);
@ -1093,7 +1106,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (_slice==null) if (_slice==null)
{ {
_completed=true; _completed=true;
write(_buffer, _complete, this); write(_buffer, _last, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -1105,13 +1118,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.position(pl); _buffer.position(pl);
_slice.position(p); _slice.position(p);
_completed=!_buffer.hasRemaining(); _completed=!_buffer.hasRemaining();
write(_slice, _complete && _completed, this); write(_slice, _last && _completed, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
// all content written, but if we have not yet signal completion, we // all content written, but if we have not yet signal completion, we
// need to do so // need to do so
if (_complete && !_completed) if (_last && !_completed)
{ {
_completed=true; _completed=true;
write(BufferUtil.EMPTY_BUFFER, true, this); write(BufferUtil.EMPTY_BUFFER, true, this);
@ -1122,14 +1135,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
LOG.debug("EOF of {}",this); LOG.debug("EOF of {}",this);
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@Override
protected void onCompleteSuccess()
{
super.onCompleteSuccess();
if (_complete)
closed();
}
} }
/** /**