yet another refactor of HttpInput.... must read -1 before isFinished is true

This commit is contained in:
Greg Wilkins 2014-12-24 19:34:06 +01:00
parent ee3481cb3f
commit a89a6927e8
8 changed files with 175 additions and 212 deletions

View File

@ -1226,6 +1226,7 @@ public class HttpParser
}
catch(NumberFormatException|IllegalStateException e)
{
e.printStackTrace();
BufferUtil.clear(buffer);
LOG.warn("parse exception: {} in {} for {}",e.toString(),_state,_handler);
if (DEBUG)

View File

@ -535,7 +535,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
LOG.debug("{} content {}", this, content);
HttpInput input = _request.getHttpInput();
input.content(content);
input.addContent(content);
}
public void onRequestComplete()

View File

@ -699,6 +699,7 @@ public class HttpChannelState
}
if (handle)
// TODO, do we need to execute or just run?
_channel.execute(_channel);
}

View File

@ -64,7 +64,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
private final HttpInput.PoisonPillContent _recycle = new RecycleBufferContent();
private final HttpInput.PoisonPillContent _recycleRequestBuffer = new RecycleBufferContent();
/**
* Get the current connection that this thread is dispatched to.
@ -319,17 +319,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (LOG.isDebugEnabled())
LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer));
boolean had_input=BufferUtil.hasContent(_requestBuffer);
boolean buffer_had_content=BufferUtil.hasContent(_requestBuffer);
boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}",this,handle,_parser);
// recycle buffer ?
if (had_input && BufferUtil.isEmpty(_requestBuffer))
if (buffer_had_content && BufferUtil.isEmpty(_requestBuffer))
{
if (_parser.inContentState())
_input.addPoisonPillContent(_recycle);
_input.addContent(_recycleRequestBuffer);
else
releaseRequestBuffer();
}

View File

@ -86,17 +86,25 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
@Override
public int available()
{
try
synchronized (_inputQ)
{
synchronized (_inputQ)
Content content = _inputQ.peekUnsafe();
if (content==null)
{
Content item = nextContent();
return item == null ? 0 : remaining(item);
try
{
produceContent();
}
catch(IOException e)
{
failed(e);
}
content = _inputQ.peekUnsafe();
}
}
catch (IOException e)
{
throw new RuntimeIOException(e);
if (content!=null)
return remaining(content);
return 0;
}
}
@ -114,20 +122,24 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
synchronized (_inputQ)
{
Content item = nextContent();
if (item == null && _state.blockForContent(this))
item = nextContent();
if (item == null)
return _state.noContent();
while(true)
{
Content item = nextContent();
if (item!=null)
{
int l = get(item, b, off, len);
return l;
}
int l = get(item, b, off, len);
return l;
if (!_state.blockForContent(this))
return _state.noContent();
}
}
}
/**
* Called when derived implementations should attempt to
* produce more Content and add it via {@link #content(Content)}.
* produce more Content and add it via {@link #addContent(Content)}.
* For protocols that are constantly producing (eg HTTP2) this can
* be left as a noop;
* @throws IOException
@ -233,11 +245,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
protected void onBlockForContent()
{
}
/**
* Blocks until some content or some end-of-file event arrives.
*
@ -247,20 +254,15 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
while (_inputQ.isEmpty() && !isFinished())
try
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content...", this);
onBlockForContent();
_inputQ.wait();
produceContent();
}
catch (InterruptedException e)
{
throw (IOException)new InterruptedIOException().initCause(e);
}
if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content...", this);
_inputQ.wait();
}
catch (InterruptedException e)
{
throw (IOException)new InterruptedIOException().initCause(e);
}
}
@ -269,33 +271,27 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*
* @param content the content to add
*/
public void content(Content item)
public void addContent(Content item)
{
boolean call_on_read_possible=false;
synchronized (_inputQ)
{
boolean wasEmpty = _inputQ.isEmpty();
_inputQ.add(item);
if (LOG.isDebugEnabled())
LOG.debug("{} queued {}", this, item);
if (wasEmpty)
{
if (!onAsyncRead())
if (_listener==null)
_inputQ.notify();
else
call_on_read_possible = _unready;
}
}
}
public void addPoisonPillContent(PoisonPillContent pill)
{
synchronized (_inputQ)
{
if (_inputQ.isEmpty())
pill.succeeded();
else
{
_inputQ.add(pill);
}
}
if (call_on_read_possible)
onReadPossible();
}
public void unblock()
@ -306,20 +302,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
}
protected boolean onAsyncRead()
{
boolean needReadCB=false;
synchronized (_inputQ)
{
if (_listener == null)
return false;
needReadCB=_unready;
}
if (needReadCB)
onReadPossible();
return true;
}
public long getContentConsumed()
{
synchronized (_inputQ)
@ -337,31 +319,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*/
public void earlyEOF()
{
synchronized (_inputQ)
{
if (!isFinished())
{
if (_inputQ.isEmpty())
{
if (LOG.isDebugEnabled())
LOG.debug("{} Early EOF", this);
_state=EARLY_EOF;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} Early EOF pending", this);
_inputQ.add(EARLY_EOF_CONTENT);
}
if (_listener == null)
{
_inputQ.notify();
return;
}
}
}
onReadPossible();
addContent(EARLY_EOF_CONTENT);
}
/**
@ -370,31 +328,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*/
public void eof()
{
synchronized (_inputQ)
{
if (!isFinished())
{
if (_inputQ.isEmpty())
{
if (LOG.isDebugEnabled())
LOG.debug("{} EOF", this);
_state=EOF;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} EOF pending", this);
_inputQ.add(EOF_CONTENT);
}
if (_listener == null)
{
_inputQ.notify();
return;
}
}
}
onReadPossible();
addContent(EOF_CONTENT);
}
public boolean consumeAll()
@ -450,24 +384,34 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
@Override
public boolean isReady()
{
boolean finished;
boolean finished=false;
synchronized (_inputQ)
{
if (available() > 0)
return true;
if (_state.isEOF())
return true;
if (_listener == null )
return true;
if (_unready)
return false;
if (_state.isEOF())
return true;
if (_inputQ.isEmpty())
{
try
{
produceContent();
}
catch(IOException e)
{
failed(e);
}
}
if (!_inputQ.isEmpty())
return true;
_unready = true;
finished = isFinished();
}
if (finished)
onReadPossible();
else
unready();
unready();
return false;
}
@ -492,14 +436,23 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
public void failed(Throwable x)
{
boolean call_on_read_possible=false;
synchronized (_inputQ)
{
if (_state instanceof ErrorState)
LOG.warn(x);
else
_state = new ErrorState(x);
_inputQ.notify();
if (_listener==null)
_inputQ.notify();
else
call_on_read_possible=true;
}
if (call_on_read_possible)
onReadPossible();
}
@Override
@ -507,7 +460,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
final Throwable error;
final ReadListener listener;
boolean available = false;
final boolean eof;
synchronized (_inputQ)
@ -515,33 +467,26 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
if (!_unready || _listener == null)
return;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
listener = _listener;
try
{
Content item = nextContent();
available = item != null && remaining(item) > 0;
}
catch (Exception e)
{
failed(e);
}
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
eof = isFinished();
eof = !available && isFinished();
_unready = !available && !eof;
_unready=false;
}
try
{
if (error != null)
listener.onError(error);
else if (available)
listener.onDataAvailable();
else if (eof)
listener.onAllDataRead();
else
unready();
{
listener.onDataAvailable();
if (isFinished())
listener.onAllDataRead();
}
}
catch (Throwable e)
{
@ -627,6 +572,8 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
@Override
public int noContent() throws IOException
{
if (_error instanceof IOException)
throw (IOException)_error;
throw new IOException(_error);
}

View File

@ -41,9 +41,10 @@ public class HttpInputOverHTTP extends HttpInput
}
@Override
protected void onBlockForContent()
protected void blockForContent() throws IOException
{
_httpConnection.blockingReadFillInterested();
super.blockForContent();
}
@Override

View File

@ -54,7 +54,7 @@ public class HttpInputTest
return super.add(s);
}
};
Queue<String> _content = new ConcurrentArrayQueue<>();
Queue<String> _fillAndParseSimulate = new ConcurrentArrayQueue<>();
HttpInput _in;
ReadListener _listener = new ReadListener()
@ -116,14 +116,14 @@ public class HttpInputTest
@Override
protected void produceContent() throws IOException
{
_history.add("produceContent "+_content.size());
_history.add("produceContent "+_fillAndParseSimulate.size());
for (String s=_content.poll();s!=null;s=_content.poll())
for (String s=_fillAndParseSimulate.poll();s!=null;s=_fillAndParseSimulate.poll())
{
if ("_EOF_".equals(s))
_in.eof();
else
_in.content(new TContent(s));
_in.addContent(new TContent(s));
}
}
@ -157,17 +157,16 @@ public class HttpInputTest
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
}
@Test
public void testRead() throws Exception
{
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_content.offer("EF");
_content.offer("GH");
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH");
assertThat(_in.available(),equalTo(2));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isReady(),equalTo(true));
@ -214,7 +213,7 @@ public class HttpInputTest
try
{
Thread.sleep(500);
_in.content(new TContent("AB"));
_in.addContent(new TContent("AB"));
}
catch(Throwable th)
{
@ -227,7 +226,6 @@ public class HttpInputTest
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("blockForContent"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'B'));
@ -239,8 +237,8 @@ public class HttpInputTest
@Test
public void testReadEOF() throws Exception
{
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_in.eof();
assertThat(_in.isFinished(),equalTo(false));
@ -268,8 +266,8 @@ public class HttpInputTest
@Test
public void testReadEarlyEOF() throws Exception
{
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_in.earlyEOF();
assertThat(_in.isFinished(),equalTo(false));
@ -323,7 +321,6 @@ public class HttpInputTest
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("blockForContent"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
}
@ -331,19 +328,19 @@ public class HttpInputTest
public void testAsyncEmpty() throws Exception
{
_in.setReadListener(_listener);
assertThat(_in.available(),equalTo(0));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(new byte[]{0}),equalTo(0));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
}
@ -356,12 +353,16 @@ public class HttpInputTest
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.content(new TContent("AB"));
_content.add("CD");
_in.addContent(new TContent("AB"));
_fillAndParseSimulate.add("CD");
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
@ -400,20 +401,18 @@ public class HttpInputTest
public void testAsyncEOF() throws Exception
{
_in.setReadListener(_listener);
_in.run();
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
_in.eof();
_in.run();
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),equalTo("onAllDataRead"));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
@ -428,15 +427,20 @@ public class HttpInputTest
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.content(new TContent("AB"));
_content.add("_EOF_");
_in.addContent(new TContent("AB"));
_fillAndParseSimulate.add("_EOF_");
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
@ -450,19 +454,18 @@ public class HttpInputTest
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 1"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),nullValue());
}
@ -471,20 +474,27 @@ public class HttpInputTest
public void testAsyncError() throws Exception
{
_in.setReadListener(_listener);
_in.run();
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.failed(new TimeoutException());
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("onError:java.util.concurrent.TimeoutException"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
try
{
_in.read();
@ -513,10 +523,10 @@ public class HttpInputTest
@Test
public void testConsumeAll() throws Exception
{
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_content.offer("EF");
_content.offer("GH");
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH");
assertThat(_in.read(),equalTo((int)'A'));
assertFalse(_in.consumeAll());
@ -534,11 +544,11 @@ public class HttpInputTest
@Test
public void testConsumeAllEOF() throws Exception
{
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_content.offer("EF");
_content.offer("GH");
_content.offer("_EOF_");
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH");
_fillAndParseSimulate.offer("_EOF_");
assertThat(_in.read(),equalTo((int)'A'));
assertTrue(_in.consumeAll());

View File

@ -1404,7 +1404,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
{
configureServer(new NoopHandler());
final int REQS = 2;
String content = "This is a coooooooooooooooooooooooooooooooooo" +
final String content = "This is a coooooooooooooooooooooooooooooooooo" +
"ooooooooooooooooooooooooooooooooooooooooooooo" +
"ooooooooooooooooooooooooooooooooooooooooooooo" +
"ooooooooooooooooooooooooooooooooooooooooooooo" +
@ -1414,7 +1414,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
"ooooooooooooooooooooooooooooooooooooooooooooo" +
"ooooooooooooooooooooooooooooooooooooooooooooo" +
"oooooooooooonnnnnnnnnnnnnnnntent";
final byte[] bytes = content.getBytes();
final int cl = content.getBytes().length;
Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort());
final OutputStream out = client.getOutputStream();
@ -1426,12 +1426,15 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
{
try
{
byte[] bytes=(
"GET / HTTP/1.1\r\n"+
"Host: localhost\r\n"
+"Content-Length: " + cl + "\r\n" +
"\r\n"+
content).getBytes(StandardCharsets.ISO_8859_1);
for (int i = 0; i < REQS; i++)
{
out.write("GET / HTTP/1.1\r\nHost: localhost\r\n".getBytes(StandardCharsets.ISO_8859_1));
out.write(("Content-Length: " + bytes.length + "\r\n" + "\r\n").getBytes(StandardCharsets.ISO_8859_1));
out.write(bytes, 0, bytes.length);
}
out.write("GET / HTTP/1.1\r\nHost: last\r\nConnection: close\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
out.flush();
}