different callbacks for HTTP blocking and async reads

This commit is contained in:
Greg Wilkins 2014-12-24 10:31:33 +01:00
parent fecc03a2f5
commit 3e8969be6f
4 changed files with 78 additions and 13 deletions

View File

@ -61,6 +61,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final HttpParser _parser;
private volatile ByteBuffer _requestBuffer = null;
private volatile ByteBuffer _chunk = null;
private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
private final HttpInput.PoisonPillContent _recycle = new RecycleBufferContent();
@ -199,11 +201,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
final HttpConnection last=setCurrentConnection(this);
// If the channel state is not idle, then a request is in progress and
// has previously been dispatched. Thus if this call to onFillable produces
// a parsed event, it will be handled by the channel mechanism and this call
// does not need to call fillInterested
final boolean handling = !_channel.getState().isIdle();
try
{
@ -228,7 +225,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
boolean suspended = !_channel.handle();
// We should break iteration if we have suspended or changed connection or this is not the handling thread.
if (suspended || getEndPoint().getConnection() != this || handling )
if (suspended || getEndPoint().getConnection() != this )
break;
}
@ -254,18 +251,20 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
/** Fill and parse data looking for content
* @throws IOException
*/
protected void parseContent()
protected boolean parseContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} parseContent",this);
boolean handled=false;
while (_parser.inContentState())
{
int filled = fillRequestBuffer();
boolean handle = parseRequestBuffer();
handled|=handle;
if (handle || filled<=0)
break;
}
return handled;
}
@ -302,6 +301,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (LOG.isDebugEnabled())
LOG.debug("{} filled {}",this,filled);
return filled;
}
catch (IOException e)
@ -508,6 +508,42 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
private class BlockingReadCallback implements Callback
{
@Override
public void succeeded()
{
_input.unblock();
}
@Override
public void failed(Throwable x)
{
_input.failed(x);
}
}
private class AsyncReadCallback implements Callback
{
@Override
public void succeeded()
{
if (parseContent())
_channel.handle();
else
asyncReadFillInterested();
}
@Override
public void failed(Throwable x)
{
_input.failed(x);
_channel.handle();
}
}
private class SendCallback extends IteratingCallback
{
private MetaData.Response _info;
@ -700,7 +736,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return false;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.server.HttpTransport#push(org.eclipse.jetty.http.MetaData.Request)
*/
@ -709,4 +744,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
LOG.debug("ignore push in {}",this);
}
public void asyncReadFillInterested()
{
getEndPoint().fillInterested(_asyncReadCallback);
}
public void blockingReadFillInterested()
{
getEndPoint().fillInterested(_blockingReadCallback);
}
}

View File

@ -233,6 +233,11 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
protected void onBlockForContent()
{
}
/**
* Blocks until some content or some end-of-file event arrives.
*
@ -248,7 +253,9 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content...", this);
onBlockForContent();
_inputQ.wait();
produceContent();
}
catch (InterruptedException e)
{
@ -285,7 +292,17 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
if (_inputQ.isEmpty())
pill.succeeded();
else
{
_inputQ.add(pill);
}
}
}
public void unblock()
{
synchronized (_inputQ)
{
_inputQ.notify();
}
}
@ -481,6 +498,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
LOG.warn(x);
else
_state = new ErrorState(x);
_inputQ.notify();
}
}

View File

@ -41,16 +41,15 @@ public class HttpInputOverHTTP extends HttpInput
}
@Override
protected void blockForContent() throws IOException
protected void onBlockForContent()
{
_httpConnection.fillInterested();
super.blockForContent();
_httpConnection.blockingReadFillInterested();
}
@Override
protected void unready()
{
_httpConnection.fillInterested();
_httpConnection.asyncReadFillInterested();
}
@Override

View File

@ -227,6 +227,7 @@ public class HttpInputTest
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("blockForContent"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'B'));
@ -322,6 +323,7 @@ public class HttpInputTest
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("blockForContent"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
}