Code cleanup.

This commit is contained in:
Simone Bordet 2016-08-26 10:45:04 +02:00
parent 33ca8cf695
commit 2389b65578
1 changed files with 103 additions and 94 deletions

View File

@ -59,16 +59,16 @@ public class HttpInput extends ServletInputStream implements Runnable
private final HttpChannelState _channelState; private final HttpChannelState _channelState;
private ReadListener _listener; private ReadListener _listener;
private State _state = STREAM; private State _state = STREAM;
private long _firstByteTimeStamp=-1; private long _firstByteTimeStamp = -1;
private long _contentArrived; private long _contentArrived;
private long _contentConsumed; private long _contentConsumed;
private long _blockingTimeoutAt = -1; private long _blockingTimeoutAt = -1;
public HttpInput(HttpChannelState state) public HttpInput(HttpChannelState state)
{ {
_channelState=state; _channelState = state;
if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0) if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout() > 0)
_blockingTimeoutAt=0; _blockingTimeoutAt = 0;
} }
protected HttpChannelState getHttpChannelState() protected HttpChannelState getHttpChannelState()
@ -98,26 +98,26 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public int available() public int available()
{ {
int available=0; int available = 0;
boolean woken=false; boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
Content content = _inputQ.peek(); Content content = _inputQ.peek();
if (content==null) if (content == null)
{ {
try try
{ {
produceContent(); produceContent();
} }
catch(IOException e) catch (IOException e)
{ {
woken=failed(e); woken = failed(e);
} }
content = _inputQ.peek(); content = _inputQ.peek();
} }
if (content!=null) if (content != null)
available= remaining(content); available = remaining(content);
} }
if (woken) if (woken)
@ -137,7 +137,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public int read() throws IOException public int read() throws IOException
{ {
int read = read(_oneByteBuffer, 0, 1); int read = read(_oneByteBuffer, 0, 1);
if (read==0) if (read == 0)
throw new IllegalStateException("unready read=0"); throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
} }
@ -147,29 +147,29 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_blockingTimeoutAt>=0 && !isAsync()) if (_blockingTimeoutAt >= 0 && !isAsync())
_blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); _blockingTimeoutAt = System.currentTimeMillis() + getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
int minRequestDataRate=_channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); int minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate();
if (minRequestDataRate>0 && _firstByteTimeStamp!=-1) if (minRequestDataRate > 0 && _firstByteTimeStamp != -1)
{ {
long period=System.nanoTime()-_firstByteTimeStamp; long period = System.nanoTime() - _firstByteTimeStamp;
if (period>0) if (period > 0)
{ {
long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period)/TimeUnit.SECONDS.toMillis(1); long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentArrived<minimum_data) if (_contentArrived < minimum_data)
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate)); throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, String.format("Request data rate < %d B/s", minRequestDataRate));
} }
} }
while(true) while (true)
{ {
Content item = nextContent(); Content item = nextContent();
if (item!=null) if (item != null)
{ {
int l = get(item, b, off, len); int l = get(item, b, off, len);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} read {} from {}",this,l,item); LOG.debug("{} read {} from {}", this, l, item);
consumeNonContent(); consumeNonContent();
@ -187,6 +187,7 @@ public class HttpInput extends ServletInputStream implements Runnable
* produce more Content and add it via {@link #addContent(Content)}. * produce more Content and add it via {@link #addContent(Content)}.
* For protocols that are constantly producing (eg HTTP2) this can * For protocols that are constantly producing (eg HTTP2) this can
* be left as a noop; * be left as a noop;
*
* @throws IOException if unable to produce content * @throws IOException if unable to produce content
*/ */
protected void produceContent() throws IOException protected void produceContent() throws IOException
@ -203,7 +204,7 @@ public class HttpInput extends ServletInputStream implements Runnable
protected Content nextContent() throws IOException protected Content nextContent() throws IOException
{ {
Content content = pollContent(); Content content = pollContent();
if (content==null && !isFinished()) if (content == null && !isFinished())
{ {
produceContent(); produceContent();
content = pollContent(); content = pollContent();
@ -211,9 +212,11 @@ public class HttpInput extends ServletInputStream implements Runnable
return content; return content;
} }
/** Poll the inputQ for Content. /**
* Poll the inputQ for Content.
* Consumed buffers and {@link PoisonPillContent}s are removed and * Consumed buffers and {@link PoisonPillContent}s are removed and
* EOF state updated if need be. * EOF state updated if need be.
*
* @return Content or null * @return Content or null
*/ */
protected Content pollContent() protected Content pollContent()
@ -228,20 +231,20 @@ public class HttpInput extends ServletInputStream implements Runnable
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content); LOG.debug("{} consumed {}", this, content);
if (content==EOF_CONTENT) if (content == EOF_CONTENT)
{ {
if (_listener==null) if (_listener == null)
_state=EOF; _state = EOF;
else else
{ {
_state=AEOF; _state = AEOF;
boolean woken = _channelState.onReadReady(); // force callback? boolean woken = _channelState.onReadReady(); // force callback?
if (woken) if (woken)
wake(); wake();
} }
} }
else if (content==EARLY_EOF_CONTENT) else if (content == EARLY_EOF_CONTENT)
_state=EARLY_EOF; _state = EARLY_EOF;
content = _inputQ.peek(); content = _inputQ.peek();
} }
@ -281,7 +284,7 @@ public class HttpInput extends ServletInputStream implements Runnable
protected Content nextReadable() throws IOException protected Content nextReadable() throws IOException
{ {
Content content = pollReadable(); Content content = pollReadable();
if (content==null && !isFinished()) if (content == null && !isFinished())
{ {
produceContent(); produceContent();
content = pollReadable(); content = pollReadable();
@ -289,9 +292,11 @@ public class HttpInput extends ServletInputStream implements Runnable
return content; return content;
} }
/** Poll the inputQ for Content or EOF. /**
* Poll the inputQ for Content or EOF.
* Consumed buffers and non EOF {@link PoisonPillContent}s are removed. * Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
* EOF state is not updated. * EOF state is not updated.
*
* @return Content, EOF or null * @return Content, EOF or null
*/ */
protected Content pollReadable() protected Content pollReadable()
@ -302,7 +307,7 @@ public class HttpInput extends ServletInputStream implements Runnable
// Skip consumed items at the head of the queue except EOF // Skip consumed items at the head of the queue except EOF
while (content != null) while (content != null)
{ {
if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0) if (content == EOF_CONTENT || content == EARLY_EOF_CONTENT || remaining(content) > 0)
return content; return content;
_inputQ.poll(); _inputQ.poll();
@ -337,7 +342,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
int l = Math.min(content.remaining(), length); int l = Math.min(content.remaining(), length);
content.getContent().get(buffer, offset, l); content.getContent().get(buffer, offset, l);
_contentConsumed+=l; _contentConsumed += l;
return l; return l;
} }
@ -352,9 +357,9 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
int l = Math.min(content.remaining(), length); int l = Math.min(content.remaining(), length);
ByteBuffer buffer = content.getContent(); ByteBuffer buffer = content.getContent();
buffer.position(buffer.position()+l); buffer.position(buffer.position() + l);
_contentConsumed+=l; _contentConsumed += l;
if (l>0 && !content.hasContent()) if (l > 0 && !content.hasContent())
pollContent(); // hungry succeed pollContent(); // hungry succeed
} }
@ -368,22 +373,22 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
try try
{ {
long timeout=0; long timeout = 0;
if (_blockingTimeoutAt>=0) if (_blockingTimeoutAt >= 0)
{ {
timeout=_blockingTimeoutAt-System.currentTimeMillis(); timeout = _blockingTimeoutAt - System.currentTimeMillis();
if (timeout<=0) if (timeout <= 0)
throw new TimeoutException(); throw new TimeoutException();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content timeout={}", this,timeout); LOG.debug("{} blocking for content timeout={}", this, timeout);
if (timeout>0) if (timeout > 0)
_inputQ.wait(timeout); _inputQ.wait(timeout);
else else
_inputQ.wait(); _inputQ.wait();
if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt) if (_blockingTimeoutAt > 0 && System.currentTimeMillis() >= _blockingTimeoutAt)
throw new TimeoutException(); throw new TimeoutException();
} }
catch (Throwable e) catch (Throwable e)
@ -397,23 +402,24 @@ public class HttpInput extends ServletInputStream implements Runnable
* <p>Typically used to push back content that has * <p>Typically used to push back content that has
* been read, perhaps mutated. The bytes prepended are * been read, perhaps mutated. The bytes prepended are
* deducted for the contentConsumed total</p> * deducted for the contentConsumed total</p>
*
* @param item the content to add * @param item the content to add
* @return true if content channel woken for read * @return true if content channel woken for read
*/ */
public boolean prependContent(Content item) public boolean prependContent(Content item)
{ {
boolean woken=false; boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
_inputQ.push(item); _inputQ.push(item);
_contentConsumed-=item.remaining(); _contentConsumed -= item.remaining();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} prependContent {}", this, item); LOG.debug("{} prependContent {}", this, item);
if (_listener==null) if (_listener == null)
_inputQ.notify(); _inputQ.notify();
else else
woken=_channelState.onReadPossible(); woken = _channelState.onReadPossible();
} }
return woken; return woken;
@ -427,20 +433,20 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public boolean addContent(Content item) public boolean addContent(Content item)
{ {
boolean woken=false; boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_firstByteTimeStamp==-1) if (_firstByteTimeStamp == -1)
_firstByteTimeStamp=System.nanoTime(); _firstByteTimeStamp = System.nanoTime();
_contentArrived+=item.remaining(); _contentArrived += item.remaining();
_inputQ.offer(item); _inputQ.offer(item);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, item); LOG.debug("{} addContent {}", this, item);
if (_listener==null) if (_listener == null)
_inputQ.notify(); _inputQ.notify();
else else
woken=_channelState.onReadPossible(); woken = _channelState.onReadPossible();
} }
return woken; return woken;
@ -450,7 +456,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
return _inputQ.size()>0; return _inputQ.size() > 0;
} }
} }
@ -476,6 +482,7 @@ public class HttpInput extends ServletInputStream implements Runnable
* <p> * <p>
* Typically this will result in an EOFException being thrown * Typically this will result in an EOFException being thrown
* from a subsequent read rather than a -1 return. * from a subsequent read rather than a -1 return.
*
* @return true if content channel woken for read * @return true if content channel woken for read
*/ */
public boolean earlyEOF() public boolean earlyEOF()
@ -486,6 +493,7 @@ public class HttpInput extends ServletInputStream implements Runnable
/** /**
* This method should be called to signal that all the expected * This method should be called to signal that all the expected
* content arrived. * content arrived.
*
* @return true if content channel woken for read * @return true if content channel woken for read
*/ */
public boolean eof() public boolean eof()
@ -529,7 +537,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
return _state==ASYNC; return _state == ASYNC;
} }
} }
@ -550,18 +558,18 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_listener == null ) if (_listener == null)
return true; return true;
if (_state instanceof EOFState) if (_state instanceof EOFState)
return true; return true;
if (nextReadable()!=null) if (nextReadable() != null)
return true; return true;
_channelState.onReadUnready(); _channelState.onReadUnready();
} }
return false; return false;
} }
catch(IOException e) catch (IOException e)
{ {
LOG.ignore(e); LOG.ignore(e);
return true; return true;
@ -572,7 +580,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public void setReadListener(ReadListener readListener) public void setReadListener(ReadListener readListener)
{ {
readListener = Objects.requireNonNull(readListener); readListener = Objects.requireNonNull(readListener);
boolean woken=false; boolean woken = false;
try try
{ {
synchronized (_inputQ) synchronized (_inputQ)
@ -580,11 +588,11 @@ public class HttpInput extends ServletInputStream implements Runnable
if (_listener != null) if (_listener != null)
throw new IllegalStateException("ReadListener already set"); throw new IllegalStateException("ReadListener already set");
if (_state != STREAM) if (_state != STREAM)
throw new IllegalStateException("State "+STREAM+" != " + _state); throw new IllegalStateException("State " + STREAM + " != " + _state);
_state = ASYNC; _state = ASYNC;
_listener = readListener; _listener = readListener;
boolean content=nextContent()!=null; boolean content = nextContent() != null;
if (content) if (content)
woken = _channelState.onReadReady(); woken = _channelState.onReadReady();
@ -592,7 +600,7 @@ public class HttpInput extends ServletInputStream implements Runnable
_channelState.onReadUnready(); _channelState.onReadUnready();
} }
} }
catch(IOException e) catch (IOException e)
{ {
throw new RuntimeIOException(e); throw new RuntimeIOException(e);
} }
@ -603,7 +611,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public boolean failed(Throwable x) public boolean failed(Throwable x)
{ {
boolean woken=false; boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_state instanceof ErrorState) if (_state instanceof ErrorState)
@ -611,16 +619,15 @@ public class HttpInput extends ServletInputStream implements Runnable
else else
_state = new ErrorState(x); _state = new ErrorState(x);
if (_listener==null) if (_listener == null)
_inputQ.notify(); _inputQ.notify();
else else
woken=_channelState.onReadPossible(); woken = _channelState.onReadPossible();
} }
return woken; return woken;
} }
/* ------------------------------------------------------------ */
/* /*
* <p> * <p>
* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a * While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
@ -633,26 +640,26 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
final Throwable error; final Throwable error;
final ReadListener listener; final ReadListener listener;
boolean aeof=false; boolean aeof = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_state==EOF) if (_state == EOF)
return; return;
if (_state==AEOF) if (_state == AEOF)
{ {
_state=EOF; _state = EOF;
aeof=true; aeof = true;
} }
listener = _listener; listener = _listener;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null; error = _state instanceof ErrorState ? ((ErrorState)_state).getError() : null;
} }
try try
{ {
if (error!=null) if (error != null)
{ {
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
listener.onError(error); listener.onError(error);
@ -672,7 +679,7 @@ public class HttpInput extends ServletInputStream implements Runnable
LOG.debug(e); LOG.debug(e);
try try
{ {
if (aeof || error==null) if (aeof || error == null)
{ {
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
listener.onError(e); listener.onError(e);
@ -696,10 +703,10 @@ public class HttpInput extends ServletInputStream implements Runnable
Content content; Content content;
synchronized (_inputQ) synchronized (_inputQ)
{ {
state=_state; state = _state;
consumed=_contentConsumed; consumed = _contentConsumed;
q=_inputQ.size(); q = _inputQ.size();
content=_inputQ.peekFirst(); content = _inputQ.peekFirst();
} }
return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]", return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]",
getClass().getSimpleName(), getClass().getSimpleName(),
@ -713,10 +720,11 @@ public class HttpInput extends ServletInputStream implements Runnable
public static class PoisonPillContent extends Content public static class PoisonPillContent extends Content
{ {
private final String _name; private final String _name;
public PoisonPillContent(String name) public PoisonPillContent(String name)
{ {
super(BufferUtil.EMPTY_BUFFER); super(BufferUtil.EMPTY_BUFFER);
_name=name; _name = name;
} }
@Override @Override
@ -740,7 +748,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public Content(ByteBuffer content) public Content(ByteBuffer content)
{ {
_content=content; _content = content;
} }
@Override @Override
@ -768,7 +776,7 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public String toString() public String toString()
{ {
return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content)); return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content));
} }
} }
@ -793,9 +801,10 @@ public class HttpInput extends ServletInputStream implements Runnable
protected class ErrorState extends EOFState protected class ErrorState extends EOFState
{ {
final Throwable _error; final Throwable _error;
ErrorState(Throwable error) ErrorState(Throwable error)
{ {
_error=error; _error = error;
} }
public Throwable getError() public Throwable getError()
@ -814,7 +823,7 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public String toString() public String toString()
{ {
return "ERROR:"+_error; return "ERROR:" + _error;
} }
} }