Code cleanup.

This commit is contained in:
Simone Bordet 2016-08-26 12:49:31 +02:00
parent 2389b65578
commit 0322a1640d
1 changed files with 171 additions and 163 deletions

View File

@ -54,66 +54,67 @@ import org.eclipse.jetty.util.log.Logger;
* close the stream, to be reopened after the inclusion ends.</p> * close the stream, to be reopened after the inclusion ends.</p>
*/ */
public class HttpOutput extends ServletOutputStream implements Runnable public class HttpOutput extends ServletOutputStream implements Runnable
{ {
/** /**
* The HttpOutput.Inteceptor is a single intercept point for all * The HttpOutput.Interceptor is a single intercept point for all
* output written to the HttpOutput: via writer; via output stream; * output written to the HttpOutput: via writer; via output stream;
* asynchronously; or blocking. * asynchronously; or blocking.
* <p> * <p>
* The Interceptor can be used to implement translations (eg Gzip) or * The Interceptor can be used to implement translations (eg Gzip) or
* additional buffering that acts on all output. Interceptors are * additional buffering that acts on all output. Interceptors are
* created in a chain, so that multiple concerns may intercept. * created in a chain, so that multiple concerns may intercept.
* <p> * <p>
* The {@link HttpChannel} is an {@link Interceptor} and is always the * The {@link HttpChannel} is an {@link Interceptor} and is always the
* last link in any Interceptor chain. * last link in any Interceptor chain.
* <p> * <p>
* Responses are committed by the first call to * Responses are committed by the first call to
* {@link #write(ByteBuffer, boolean, Callback)} * {@link #write(ByteBuffer, boolean, Callback)}
* and closed by a call to {@link #write(ByteBuffer, boolean, Callback)} * and closed by a call to {@link #write(ByteBuffer, boolean, Callback)}
* with the last boolean set true. If no content is available to commit * with the last boolean set true. If no content is available to commit
* or close, then a null buffer is passed. * or close, then a null buffer is passed.
*/ */
public interface Interceptor public interface Interceptor
{ {
/** /**
* Write content. * Write content.
* The response is committed by the first call to write and is closed by * The response is committed by the first call to write and is closed by
* a call with last == true. Empty content buffers may be passed to * a call with last == true. Empty content buffers may be passed to
* force a commit or close. * force a commit or close.
* @param content The content to be written or an empty buffer. *
* @param last True if this is the last call to write * @param content The content to be written or an empty buffer.
* @param callback The callback to use to indicate {@link Callback#succeeded()} * @param last True if this is the last call to write
* or {@link Callback#failed(Throwable)}. * @param callback The callback to use to indicate {@link Callback#succeeded()}
* or {@link Callback#failed(Throwable)}.
*/ */
void write(ByteBuffer content, boolean last, Callback callback); void write(ByteBuffer content, boolean last, Callback callback);
/** /**
* @return The next Interceptor in the chain or null if this is the * @return The next Interceptor in the chain or null if this is the
* last Interceptor in the chain. * last Interceptor in the chain.
*/ */
Interceptor getNextInterceptor(); Interceptor getNextInterceptor();
/** /**
* @return True if the Interceptor is optimized to receive direct * @return True if the Interceptor is optimized to receive direct
* {@link ByteBuffer}s in the {@link #write(ByteBuffer, boolean, Callback)} * {@link ByteBuffer}s in the {@link #write(ByteBuffer, boolean, Callback)}
* method. If false is returned, then passing direct buffers may cause * method. If false is returned, then passing direct buffers may cause
* inefficiencies. * inefficiencies.
*/ */
boolean isOptimizedForDirectBuffers(); boolean isOptimizedForDirectBuffers();
/** /**
* Reset the buffers. * Reset the buffers.
* <p>If the Interceptor contains buffers then reset them. * <p>If the Interceptor contains buffers then reset them.
* @throws IllegalStateException Thrown if the response has been *
* committed and buffers and/or headers cannot be reset. * @throws IllegalStateException Thrown if the response has been
* committed and buffers and/or headers cannot be reset.
*/ */
default void resetBuffer() throws IllegalStateException default void resetBuffer() throws IllegalStateException
{ {
Interceptor next = getNextInterceptor(); Interceptor next = getNextInterceptor();
if (next!=null) if (next != null)
next.resetBuffer(); next.resetBuffer();
}; }
} }
private static Logger LOG = Log.getLogger(HttpOutput.class); private static Logger LOG = Log.getLogger(HttpOutput.class);
@ -122,7 +123,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private final SharedBlockingCallback _writeBlock; private final SharedBlockingCallback _writeBlock;
private Interceptor _interceptor; private Interceptor _interceptor;
/** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */ /**
* Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written.
*/
private long _written; private long _written;
private ByteBuffer _aggregate; private ByteBuffer _aggregate;
@ -130,6 +133,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private int _commitSize; private int _commitSize;
private WriteListener _writeListener; private WriteListener _writeListener;
private volatile Throwable _onError; private volatile Throwable _onError;
/* /*
ACTION OPEN ASYNC READY PENDING UNREADY CLOSED ACTION OPEN ASYNC READY PENDING UNREADY CLOSED
------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------
@ -140,8 +144,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
write completed - - - ASYNC READY->owp - write completed - - - ASYNC READY->owp -
*/ */
private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED } private enum OutputState
private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN); {
OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED
}
private final AtomicReference<OutputState> _state = new AtomicReference<>(OutputState.OPEN);
public HttpOutput(HttpChannel channel) public HttpOutput(HttpChannel channel)
{ {
@ -153,9 +161,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
protected long getIdleTimeout() protected long getIdleTimeout()
{ {
long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout(); long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout();
if (bto>0) if (bto > 0)
return bto; return bto;
if (bto<0) if (bto < 0)
return -1; return -1;
return _channel.getIdleTimeout(); return _channel.getIdleTimeout();
} }
@ -163,10 +171,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
HttpConfiguration config = channel.getHttpConfiguration(); HttpConfiguration config = channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize(); _bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize(); _commitSize = config.getOutputAggregationSize();
if (_commitSize>_bufferSize) if (_commitSize > _bufferSize)
{ {
LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize); LOG.warn("OutputAggregationSize {} exceeds bufferSize {}", _commitSize, _bufferSize);
_commitSize=_bufferSize; _commitSize = _bufferSize;
} }
} }
@ -182,7 +190,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void setInterceptor(Interceptor filter) public void setInterceptor(Interceptor filter)
{ {
_interceptor=filter; _interceptor = filter;
} }
public boolean isWritten() public boolean isWritten()
@ -202,7 +210,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private boolean isLastContentToWrite(int len) private boolean isLastContentToWrite(int len)
{ {
_written+=len; _written += len;
return _channel.getResponse().isAllContentWritten(_written); return _channel.getResponse().isAllContentWritten(_written);
} }
@ -248,9 +256,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void close() public void close()
{ {
while(true) while (true)
{ {
OutputState state=_state.get(); OutputState state = _state.get();
switch (state) switch (state)
{ {
case CLOSED: case CLOSED:
@ -259,18 +267,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
case UNREADY: case UNREADY:
{ {
if (_state.compareAndSet(state,OutputState.ERROR)) if (_state.compareAndSet(state, OutputState.ERROR))
_writeListener.onError(_onError==null?new EofException("Async close"):_onError); _writeListener.onError(_onError == null ? new EofException("Async close") : _onError);
break; break;
} }
default: default:
{ {
if (!_state.compareAndSet(state,OutputState.CLOSED)) if (!_state.compareAndSet(state, OutputState.CLOSED))
break; break;
try try
{ {
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
} }
catch (IOException x) catch (IOException x)
{ {
@ -293,9 +301,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
void closed() void closed()
{ {
while(true) while (true)
{ {
OutputState state=_state.get(); OutputState state = _state.get();
switch (state) switch (state)
{ {
case CLOSED: case CLOSED:
@ -304,8 +312,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
case UNREADY: case UNREADY:
{ {
if (_state.compareAndSet(state,OutputState.ERROR)) if (_state.compareAndSet(state, OutputState.ERROR))
_writeListener.onError(_onError==null?new EofException("Async closed"):_onError); _writeListener.onError(_onError == null ? new EofException("Async closed") : _onError);
break; break;
} }
default: default:
@ -345,18 +353,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public boolean isClosed() public boolean isClosed()
{ {
return _state.get()==OutputState.CLOSED; return _state.get() == OutputState.CLOSED;
} }
@Override @Override
public void flush() throws IOException public void flush() throws IOException
{ {
while(true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false); write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false);
return; return;
case ASYNC: case ASYNC:
@ -388,9 +396,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void write(byte[] b, int off, int len) throws IOException public void write(byte[] b, int off, int len) throws IOException
{ {
// Async or Blocking ? // Async or Blocking ?
while(true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
// process blocking below // process blocking below
@ -405,7 +413,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Should we aggregate? // Should we aggregate?
boolean last = isLastContentToWrite(len); boolean last = isLastContentToWrite(len);
if (!last && len<=_commitSize) if (!last && len <= _commitSize)
{ {
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers()); _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
@ -414,7 +422,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
int filled = BufferUtil.fill(_aggregate, b, off, len); int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content // return if we are not complete, not full and filled all the content
if (filled==len && !BufferUtil.isFull(_aggregate)) if (filled == len && !BufferUtil.isFull(_aggregate))
{ {
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
throw new IllegalStateException(); throw new IllegalStateException();
@ -422,12 +430,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
// adjust offset/length // adjust offset/length
off+=filled; off += filled;
len-=filled; len -= filled;
} }
// Do the asynchronous writing from the callback // Do the asynchronous writing from the callback
new AsyncWrite(b,off,len,last).iterate(); new AsyncWrite(b, off, len, last).iterate();
return; return;
case PENDING: case PENDING:
@ -451,7 +459,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Should we aggregate? // Should we aggregate?
int capacity = getBufferSize(); int capacity = getBufferSize();
boolean last = isLastContentToWrite(len); boolean last = isLastContentToWrite(len);
if (!last && len<=_commitSize) if (!last && len <= _commitSize)
{ {
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers()); _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
@ -460,21 +468,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
int filled = BufferUtil.fill(_aggregate, b, off, len); int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content // return if we are not complete, not full and filled all the content
if (filled==len && !BufferUtil.isFull(_aggregate)) if (filled == len && !BufferUtil.isFull(_aggregate))
return; return;
// adjust offset/length // adjust offset/length
off+=filled; off += filled;
len-=filled; len -= filled;
} }
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
write(_aggregate, last && len==0); write(_aggregate, last && len == 0);
// should we fill aggregate again from the buffer? // should we fill aggregate again from the buffer?
if (len>0 && !last && len<=_commitSize && len<=BufferUtil.space(_aggregate)) if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate))
{ {
BufferUtil.append(_aggregate, b, off, len); BufferUtil.append(_aggregate, b, off, len);
return; return;
@ -482,26 +490,26 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
// write any remaining content in the buffer directly // write any remaining content in the buffer directly
if (len>0) if (len > 0)
{ {
// write a buffer capacity at a time to avoid JVM pooling large direct buffers // write a buffer capacity at a time to avoid JVM pooling large direct buffers
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
ByteBuffer view = ByteBuffer.wrap(b, off, len); ByteBuffer view = ByteBuffer.wrap(b, off, len);
while (len>getBufferSize()) while (len > getBufferSize())
{ {
int p=view.position(); int p = view.position();
int l=p+getBufferSize(); int l = p + getBufferSize();
view.limit(p+getBufferSize()); view.limit(p + getBufferSize());
write(view,false); write(view, false);
len-=getBufferSize(); len -= getBufferSize();
view.limit(l+Math.min(len,getBufferSize())); view.limit(l + Math.min(len, getBufferSize()));
view.position(l); view.position(l);
} }
write(view,last); write(view, last);
} }
else if (last) else if (last)
{ {
write(BufferUtil.EMPTY_BUFFER,true); write(BufferUtil.EMPTY_BUFFER, true);
} }
if (last) if (last)
@ -511,9 +519,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void write(ByteBuffer buffer) throws IOException public void write(ByteBuffer buffer) throws IOException
{ {
// Async or Blocking ? // Async or Blocking ?
while(true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
// process blocking below // process blocking below
@ -528,7 +536,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Do the asynchronous writing from the callback // Do the asynchronous writing from the callback
boolean last = isLastContentToWrite(buffer.remaining()); boolean last = isLastContentToWrite(buffer.remaining());
new AsyncWrite(buffer,last).iterate(); new AsyncWrite(buffer, last).iterate();
return; return;
case PENDING: case PENDING:
@ -547,17 +555,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break; break;
} }
// handle blocking write // handle blocking write
int len=BufferUtil.length(buffer); int len = BufferUtil.length(buffer);
boolean last = isLastContentToWrite(len); boolean last = isLastContentToWrite(len);
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
write(_aggregate, last && len==0); write(_aggregate, last && len == 0);
// write any remaining content in the buffer directly // write any remaining content in the buffer directly
if (len>0) if (len > 0)
write(buffer, last); write(buffer, last);
else if (last) else if (last)
write(BufferUtil.EMPTY_BUFFER, true); write(BufferUtil.EMPTY_BUFFER, true);
@ -569,13 +576,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void write(int b) throws IOException public void write(int b) throws IOException
{ {
_written+=1; _written += 1;
boolean complete=_channel.getResponse().isAllContentWritten(_written); boolean complete = _channel.getResponse().isAllContentWritten(_written);
// Async or Blocking ? // Async or Blocking ?
while(true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
if (_aggregate == null) if (_aggregate == null)
@ -649,7 +656,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void sendContent(ByteBuffer content) throws IOException public void sendContent(ByteBuffer content) throws IOException
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent({})",BufferUtil.toDetailString(content)); LOG.debug("sendContent({})", BufferUtil.toDetailString(content));
write(content, true); write(content, true);
closed(); closed();
@ -663,7 +670,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(InputStream in) throws IOException public void sendContent(InputStream in) throws IOException
{ {
try(Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlock.acquire())
{ {
new InputStreamWritingCB(in, blocker).iterate(); new InputStreamWritingCB(in, blocker).iterate();
blocker.block(); blocker.block();
@ -685,7 +692,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(ReadableByteChannel in) throws IOException public void sendContent(ReadableByteChannel in) throws IOException
{ {
try(Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlock.acquire())
{ {
new ReadableByteChannelWritingCB(in, blocker).iterate(); new ReadableByteChannelWritingCB(in, blocker).iterate();
blocker.block(); blocker.block();
@ -707,7 +714,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(HttpContent content) throws IOException public void sendContent(HttpContent content) throws IOException
{ {
try(Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlock.acquire())
{ {
sendContent(content, blocker); sendContent(content, blocker);
blocker.block(); blocker.block();
@ -723,13 +730,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
/** /**
* Asynchronous send of whole content. * Asynchronous send of whole content.
* @param content The whole content to send *
* @param content The whole content to send
* @param callback The callback to use to notify success or failure * @param callback The callback to use to notify success or failure
*/ */
public void sendContent(ByteBuffer content, final Callback callback) public void sendContent(ByteBuffer content, final Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback); LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback);
write(content, true, new Callback.Nested(callback) write(content, true, new Callback.Nested(callback)
{ {
@ -753,13 +761,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
* Asynchronous send of stream content. * Asynchronous send of stream content.
* The stream will be closed after reading all content. * The stream will be closed after reading all content.
* *
* @param in The stream content to send * @param in The stream content to send
* @param callback The callback to use to notify success or failure * @param callback The callback to use to notify success or failure
*/ */
public void sendContent(InputStream in, Callback callback) public void sendContent(InputStream in, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent(stream={},{})",in,callback); LOG.debug("sendContent(stream={},{})", in, callback);
new InputStreamWritingCB(in, callback).iterate(); new InputStreamWritingCB(in, callback).iterate();
} }
@ -768,13 +776,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
* Asynchronous send of channel content. * Asynchronous send of channel content.
* The channel will be closed after reading all content. * The channel will be closed after reading all content.
* *
* @param in The channel content to send * @param in The channel content to send
* @param callback The callback to use to notify success or failure * @param callback The callback to use to notify success or failure
*/ */
public void sendContent(ReadableByteChannel in, Callback callback) public void sendContent(ReadableByteChannel in, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent(channel={},{})",in,callback); LOG.debug("sendContent(channel={},{})", in, callback);
new ReadableByteChannelWritingCB(in, callback).iterate(); new ReadableByteChannelWritingCB(in, callback).iterate();
} }
@ -783,12 +791,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
* Asynchronous send of HTTP content. * Asynchronous send of HTTP content.
* *
* @param httpContent The HTTP content to send * @param httpContent The HTTP content to send
* @param callback The callback to use to notify success or failure * @param callback The callback to use to notify success or failure
*/ */
public void sendContent(HttpContent httpContent, Callback callback) public void sendContent(HttpContent httpContent, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent(http={},{})",httpContent,callback); LOG.debug("sendContent(http={},{})", httpContent, callback);
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
@ -803,7 +811,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
while (true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING)) if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
@ -824,37 +832,36 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break; break;
} }
ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
if (buffer == null) if (buffer == null)
buffer = httpContent.getIndirectBuffer(); buffer = httpContent.getIndirectBuffer();
if (buffer!=null) if (buffer != null)
{ {
sendContent(buffer,callback); sendContent(buffer, callback);
return; return;
} }
try try
{ {
ReadableByteChannel rbc=httpContent.getReadableByteChannel(); ReadableByteChannel rbc = httpContent.getReadableByteChannel();
if (rbc!=null) if (rbc != null)
{ {
// Close of the rbc is done by the async sendContent // Close of the rbc is done by the async sendContent
sendContent(rbc,callback); sendContent(rbc, callback);
return; return;
} }
InputStream in = httpContent.getInputStream(); InputStream in = httpContent.getInputStream();
if (in!=null) if (in != null)
{ {
sendContent(in,callback); sendContent(in, callback);
return; return;
} }
throw new IllegalArgumentException("unknown content for "+httpContent); throw new IllegalArgumentException("unknown content for " + httpContent);
} }
catch(Throwable th) catch (Throwable th)
{ {
abort(th); abort(th);
callback.failed(th); callback.failed(th);
@ -874,7 +881,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void recycle() public void recycle()
{ {
_interceptor=_channel; _interceptor = _channel;
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
BufferUtil.clear(_aggregate); BufferUtil.clear(_aggregate);
_written = 0; _written = 0;
@ -914,7 +921,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
while (true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
return true; return true;
@ -950,28 +957,29 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void run() public void run()
{ {
loop: while (true) loop:
while (true)
{ {
OutputState state = _state.get(); OutputState state = _state.get();
if(_onError!=null) if (_onError != null)
{ {
switch(state) switch (state)
{ {
case CLOSED: case CLOSED:
case ERROR: case ERROR:
{ {
_onError=null; _onError = null;
break loop; break loop;
} }
default: default:
{ {
if (_state.compareAndSet(state, OutputState.ERROR)) if (_state.compareAndSet(state, OutputState.ERROR))
{ {
Throwable th=_onError; Throwable th = _onError;
_onError=null; _onError = null;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onError",th); LOG.debug("onError", th);
_writeListener.onError(th); _writeListener.onError(th);
close(); close();
break loop; break loop;
@ -981,7 +989,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
continue; continue;
} }
switch(_state.get()) switch (_state.get())
{ {
case ASYNC: case ASYNC:
case READY: case READY:
@ -1003,7 +1011,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break; break;
default: default:
_onError=new IllegalStateException("state="+_state.get()); _onError = new IllegalStateException("state=" + _state.get());
} }
} }
} }
@ -1023,25 +1031,25 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get()); return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get());
} }
private abstract class AsyncICB extends IteratingCallback private abstract class AsyncICB extends IteratingCallback
{ {
final boolean _last; final boolean _last;
AsyncICB(boolean last) AsyncICB(boolean last)
{ {
_last=last; _last = last;
} }
@Override @Override
protected void onCompleteSuccess() protected void onCompleteSuccess()
{ {
while(true) while (true)
{ {
OutputState last=_state.get(); OutputState last = _state.get();
switch(last) switch (last)
{ {
case PENDING: case PENDING:
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
@ -1070,7 +1078,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void onCompleteFailure(Throwable e) public void onCompleteFailure(Throwable e)
{ {
_onError=e==null?new IOException():e; _onError = e == null ? new IOException() : e;
if (_channel.getState().onWritePossible()) if (_channel.getState().onWritePossible())
_channel.execute(_channel); _channel.execute(_channel);
} }
@ -1090,15 +1098,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
_flushed=true; _flushed = true;
write(_aggregate, false, this); write(_aggregate, false, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
if (!_flushed) if (!_flushed)
{ {
_flushed=true; _flushed = true;
write(BufferUtil.EMPTY_BUFFER,false,this); write(BufferUtil.EMPTY_BUFFER, false, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -1116,23 +1124,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public AsyncWrite(byte[] b, int off, int len, boolean last) public AsyncWrite(byte[] b, int off, int len, boolean last)
{ {
super(last); super(last);
_buffer=ByteBuffer.wrap(b, off, len); _buffer = ByteBuffer.wrap(b, off, len);
_len=len; _len = len;
// always use a view for large byte arrays to avoid JVM pooling large direct buffers // always use a view for large byte arrays to avoid JVM pooling large direct buffers
_slice=_len<getBufferSize()?null:_buffer.duplicate(); _slice = _len < getBufferSize() ? null : _buffer.duplicate();
} }
public AsyncWrite(ByteBuffer buffer, boolean last) public AsyncWrite(ByteBuffer buffer, boolean last)
{ {
super(last); super(last);
_buffer=buffer; _buffer = buffer;
_len=buffer.remaining(); _len = buffer.remaining();
// Use a slice buffer for large indirect to avoid JVM pooling large direct buffers // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
if (_buffer.isDirect()||_len<getBufferSize()) if (_buffer.isDirect() || _len < getBufferSize())
_slice=null; _slice = null;
else else
{ {
_slice=_buffer.duplicate(); _slice = _buffer.duplicate();
} }
} }
@ -1142,16 +1150,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
_completed=_len==0; _completed = _len == 0;
write(_aggregate, _last && _completed, this); write(_aggregate, _last && _completed, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
// Can we just aggregate the remainder? // Can we just aggregate the remainder?
if (!_last && _len<BufferUtil.space(_aggregate) && _len<_commitSize) if (!_last && _len < BufferUtil.space(_aggregate) && _len < _commitSize)
{ {
int position = BufferUtil.flipToFill(_aggregate); int position = BufferUtil.flipToFill(_aggregate);
BufferUtil.put(_buffer,_aggregate); BufferUtil.put(_buffer, _aggregate);
BufferUtil.flipToFlush(_aggregate, position); BufferUtil.flipToFlush(_aggregate, position);
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@ -1160,21 +1168,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (_buffer.hasRemaining()) if (_buffer.hasRemaining())
{ {
// if there is no slice, just write it // if there is no slice, just write it
if (_slice==null) if (_slice == null)
{ {
_completed=true; _completed = true;
write(_buffer, _last, this); write(_buffer, _last, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
// otherwise take a slice // otherwise take a slice
int p=_buffer.position(); int p = _buffer.position();
int l=Math.min(getBufferSize(),_buffer.remaining()); int l = Math.min(getBufferSize(), _buffer.remaining());
int pl=p+l; int pl = p + l;
_slice.limit(pl); _slice.limit(pl);
_buffer.position(pl); _buffer.position(pl);
_slice.position(p); _slice.position(p);
_completed=!_buffer.hasRemaining(); _completed = !_buffer.hasRemaining();
write(_slice, _last && _completed, this); write(_slice, _last && _completed, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -1183,13 +1191,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// need to do so // need to do so
if (_last && !_completed) if (_last && !_completed)
{ {
_completed=true; _completed = true;
write(BufferUtil.EMPTY_BUFFER, true, this); write(BufferUtil.EMPTY_BUFFER, true, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
if (LOG.isDebugEnabled() && _completed) if (LOG.isDebugEnabled() && _completed)
LOG.debug("EOF of {}",this); LOG.debug("EOF of {}", this);
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
} }
@ -1211,7 +1219,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public InputStreamWritingCB(InputStream in, Callback callback) public InputStreamWritingCB(InputStream in, Callback callback)
{ {
super(callback); super(callback);
_in=in; _in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false); _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
} }
@ -1223,7 +1231,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (_eof) if (_eof)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("EOF of {}",this); LOG.debug("EOF of {}", this);
// Handle EOF // Handle EOF
_in.close(); _in.close();
closed(); closed();
@ -1232,20 +1240,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
// Read until buffer full or EOF // Read until buffer full or EOF
int len=0; int len = 0;
while (len<_buffer.capacity() && !_eof) while (len < _buffer.capacity() && !_eof)
{ {
int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len); int r = _in.read(_buffer.array(), _buffer.arrayOffset() + len, _buffer.capacity() - len);
if (r<0) if (r < 0)
_eof=true; _eof = true;
else else
len+=r; len += r;
} }
// write what we have // write what we have
_buffer.position(0); _buffer.position(0);
_buffer.limit(len); _buffer.limit(len);
write(_buffer,_eof,this); write(_buffer, _eof, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -1259,8 +1267,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
} }
/* ------------------------------------------------------------ */ /**
/** An iterating callback that will take content from a * An iterating callback that will take content from a
* ReadableByteChannel and write it to the {@link HttpChannel}. * ReadableByteChannel and write it to the {@link HttpChannel}.
* A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
* {@link HttpChannel#useDirectBuffers()} is true. * {@link HttpChannel#useDirectBuffers()} is true.
@ -1277,7 +1285,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{ {
super(callback); super(callback);
_in=in; _in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers()); _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
} }
@ -1289,7 +1297,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (_eof) if (_eof)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("EOF of {}",this); LOG.debug("EOF of {}", this);
_in.close(); _in.close();
closed(); closed();
_channel.getByteBufferPool().release(_buffer); _channel.getByteBufferPool().release(_buffer);
@ -1299,11 +1307,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Read from stream until buffer full or EOF // Read from stream until buffer full or EOF
BufferUtil.clearToFill(_buffer); BufferUtil.clearToFill(_buffer);
while (_buffer.hasRemaining() && !_eof) while (_buffer.hasRemaining() && !_eof)
_eof = (_in.read(_buffer)) < 0; _eof = (_in.read(_buffer)) < 0;
// write what we have // write what we have
BufferUtil.flipToFlush(_buffer, 0); BufferUtil.flipToFlush(_buffer, 0);
write(_buffer,_eof,this); write(_buffer, _eof, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }