Jetty9 - More work on parser reentrancy, but more work needed.

This commit is contained in:
Simone Bordet 2012-08-22 16:46:52 +02:00
parent c6094c2398
commit ab5c8ee508
7 changed files with 186 additions and 149 deletions

View File

@ -80,7 +80,7 @@ public class HttpChannel implements HttpParser.RequestHandler
private boolean _expect102Processing = false;
private boolean _host = false;
public HttpChannel(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport)
public HttpChannel(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput input)
{
_connector = connector;
_configuration = configuration;
@ -89,7 +89,7 @@ public class HttpChannel implements HttpParser.RequestHandler
_uri = new HttpURI(URIUtil.__CHARSET);
_state = new HttpChannelState(this);
_request = new Request(this, new HttpInput());
_request = new Request(this, input);
_response = new Response(this, new HttpOutput(this));
}
@ -177,6 +177,7 @@ public class HttpChannel implements HttpParser.RequestHandler
public void reset()
{
_committed.set(false);
_expect = false;
_expect100Continue = false;
_expect102Processing = false;
@ -265,42 +266,40 @@ public class HttpChannel implements HttpParser.RequestHandler
protected boolean complete()
{
LOG.debug("{} complete", this);
if (!_state.isCompleting())
return false;
_state.completed();
if (isExpecting100Continue())
if (_state.isCompleting())
{
LOG.debug("100-Continue response not sent");
// We didn't send 100 continues, but the latest interpretation
// of the spec (see httpbis) is that the client will either
// send the body anyway, or close. So we no longer need to
// do anything special here other than make the connection not persistent
_expect100Continue = false;
if (!isCommitted())
_response.addHeader(HttpHeader.CONNECTION.toString(), HttpHeaderValue.CLOSE.toString());
else
LOG.warn("Cannot send 'Connection: close' for 100-Continue, response is already committed");
}
_state.completed();
if (isExpecting100Continue())
{
LOG.debug("100-Continue response not sent");
// We didn't send 100 continues, but the latest interpretation
// of the spec (see httpbis) is that the client will either
// send the body anyway, or close. So we no longer need to
// do anything special here other than make the connection not persistent
_expect100Continue = false;
if (!isCommitted())
_response.addHeader(HttpHeader.CONNECTION.toString(), HttpHeaderValue.CLOSE.toString());
else
LOG.warn("Cannot send 'Connection: close' for 100-Continue, response is already committed");
}
if (!_response.isCommitted() && !_request.isHandled())
_response.sendError(Response.SC_NOT_FOUND, null, null);
if (!_response.isCommitted() && !_request.isHandled())
_response.sendError(Response.SC_NOT_FOUND, null, null);
_request.setHandled(true);
_request.getHttpInput().consumeAll();
try
{
_response.getHttpOutput().close();
}
catch (IOException x)
{
// We cannot write the response, so there is no point in calling
// response.sendError() since that writes, and we already know we cannot write.
LOG.debug("Could not write response", x);
}
_request.setHandled(true);
return true;
try
{
_response.getHttpOutput().close();
}
catch (IOException x)
{
// We cannot write the response, so there is no point in calling
// response.sendError() since that writes, and we already know we cannot write.
LOG.debug("Could not write response", x);
}
}
return _request.getHttpInput().isShutdown();
}
/**

View File

@ -23,7 +23,6 @@ import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
@ -326,8 +325,9 @@ public class HttpChannelState implements AsyncContext, Continuation
_state=State.COMPLETING;
return false;
case COMPLETING:
case ASYNCWAIT:
case COMPLETING:
case COMPLETED:
return false;
case REDISPATCH:

View File

@ -19,7 +19,9 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.http.HttpGenerator;
@ -29,7 +31,7 @@ import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -42,18 +44,16 @@ public class HttpConnection extends AbstractConnection implements Runnable
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
private final Server _server;
private final HttpConfiguration _configuration;
private final Connector _connector;
private final HttpParser _parser;
private final ByteBufferPool _bufferPool;
private final Server _server;
private final HttpGenerator _generator;
private final HttpChannel _channel;
private final ByteBufferPool _bufferPool;
private final HttpParser _parser;
private ByteBuffer _requestBuffer = null;
private int _headerBytes;
public static HttpConnection getCurrentConnection()
{
return __currentConnection.get();
@ -72,10 +72,10 @@ public class HttpConnection extends AbstractConnection implements Runnable
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_server = connector.getServer();
_channel = new HttpChannel(connector, config, endPoint, new HttpTransportOverHttp(_bufferPool, _configuration, endPoint));
_parser = new HttpParser(_channel);
_generator = new HttpGenerator();
_generator = new HttpGenerator(); // TODO: consider moving the generator to the transport, where it belongs
_generator.setSendServerVersion(_server.getSendServerVersion());
_channel = new HttpChannel(connector, config, endPoint, new HttpTransportOverHttp(_bufferPool, _configuration, endPoint, _generator), new Input());
_parser = new HttpParser(_channel);
LOG.debug("New HTTP Connection {}", this);
}
@ -125,6 +125,50 @@ public class HttpConnection extends AbstractConnection implements Runnable
}
}
protected boolean readAndParse() throws IOException
{
// If there is a request buffer, we are re-entering here
if (_requestBuffer == null)
{
_requestBuffer = _bufferPool.acquire(_configuration.getRequestHeaderSize(), false);
int filled = getEndPoint().fill(_requestBuffer);
LOG.debug("{} filled {}", this, filled);
// If we failed to fill
if (filled == 0)
{
// Somebody wanted to read, we didn't so schedule another attempt
releaseRequestBuffer();
fillInterested();
return false;
}
else if (filled < 0)
{
_parser.inputShutdown();
// We were only filling if fully consumed, so if we have
// read -1 then we have nothing to parse and thus nothing that
// will generate a response. If we had a suspended request pending
// a response or a request waiting in the buffer, we would not be here.
if (getEndPoint().isOutputShutdown())
getEndPoint().close();
else
getEndPoint().shutdownOutput();
// buffer must be empty and the channel must be idle, so we can release.
releaseRequestBuffer();
return false;
}
else
{
_headerBytes += filled;
}
}
// Parse the buffer
return _parser.parseNext(_requestBuffer);
}
/**
* <p>Parses and handles HTTP messages.</p>
* <p>This method is called when this {@link Connection} is ready to read bytes from the {@link EndPoint}.
@ -136,60 +180,17 @@ public class HttpConnection extends AbstractConnection implements Runnable
@Override
public void onFillable()
{
LOG.debug("{} onReadable {}", this, _channel.getState());
LOG.debug("{} onFillable {}", this, _channel.getState());
setCurrentConnection(this);
try
{
while (true)
{
// Fill the request buffer with data only if it is totally empty.
if (BufferUtil.isEmpty(_requestBuffer))
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(_configuration.getRequestHeaderSize(), false);
int filled = getEndPoint().fill(_requestBuffer);
LOG.debug("{} filled {}", this, filled);
// If we failed to fill
if (filled == 0)
{
// Somebody wanted to read, we didn't so schedule another attempt
releaseRequestBuffer();
fillInterested();
return;
}
else if (filled < 0)
{
_parser.inputShutdown();
// We were only filling if fully consumed, so if we have
// read -1 then we have nothing to parse and thus nothing that
// will generate a response. If we had a suspended request pending
// a response or a request waiting in the buffer, we would not be here.
if (getEndPoint().isOutputShutdown())
getEndPoint().close();
else
getEndPoint().shutdownOutput();
// buffer must be empty and the channel must be idle, so we can release.
releaseRequestBuffer();
return;
}
else
{
_headerBytes += filled;
}
}
// Parse the buffer
if (_parser.parseNext(_requestBuffer))
if (readAndParse())
{
// reset header count
_headerBytes = 0;
// For most requests, there will not be a body, so we can try to recycle the buffer now
releaseRequestBuffer();
_headerBytes = 0; // TODO: put this logic into the parser ?
if (!_channel.getRequest().isPersistent())
_generator.setPersistent(false);
@ -199,72 +200,75 @@ public class HttpConnection extends AbstractConnection implements Runnable
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
boolean complete = _channel.handle(); // TODO: should we perform special processing if we are complete ?
// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
if (complete)
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
LOG.debug("Upgrade from {} to {}", this, connection);
getEndPoint().setConnection(connection);
}
}
HttpConnection.this.reset();
// Is this thread dispatched from a resume ?
if (getCurrentConnection() != HttpConnection.this)
{
if (_parser.isStart())
{
// it wants to eat more
if (_requestBuffer == null)
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
fillInterested();
LOG.debug("Upgrade from {} to {}", this, connection);
getEndPoint().setConnection(connection);
}
else if (getConnector().isStarted())
{
LOG.debug("{} pipelined", this);
}
try
reset();
// Is this thread dispatched from a resume ?
if (getCurrentConnection() != HttpConnection.this)
{
if (_parser.isStart())
{
// it wants to eat more
if (_requestBuffer == null)
{
getExecutor().execute(this);
fillInterested();
}
catch (RejectedExecutionException e)
else if (getConnector().isStarted())
{
LOG.debug("{} pipelined", this);
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isStarted())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
else
{
if (getConnector().isStarted())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
else
if (_parser.isClosed() && !getEndPoint().isOutputShutdown())
{
getEndPoint().close();
// TODO This is a catch all indicating some protocol handling failure
// Currently needed for requests saying they are HTTP/2.0.
// This should be removed once better error handling is in place
LOG.warn("Endpoint output not shutdown when seeking EOF");
getEndPoint().shutdownOutput();
}
}
if (_parser.isClosed() && !getEndPoint().isOutputShutdown())
// make sure that an oshut connection is driven towards close
// TODO this is a little ugly
if (getEndPoint().isOpen() && getEndPoint().isOutputShutdown())
{
// TODO This is a catch all indicating some protocol handling failure
// Currently needed for requests saying they are HTTP/2.0.
// This should be removed once better error handling is in place
LOG.warn("Endpoint output not shutdown when seeking EOF");
getEndPoint().shutdownOutput();
fillInterested();
}
}
// make sure that an oshut connection is driven towards close
// TODO this is a little ugly
if (getEndPoint().isOpen() && getEndPoint().isOutputShutdown())
{
fillInterested();
// return if the connection has been changed
if (getEndPoint().getConnection() != this)
return;
}
// return if the connection has been changed
if (getEndPoint().getConnection() != this)
return;
}
else if (_headerBytes >= _configuration.getRequestHeaderSize())
{
@ -272,6 +276,11 @@ public class HttpConnection extends AbstractConnection implements Runnable
_parser.close();
_channel.getResponse().sendError(Response.SC_REQUEST_ENTITY_TOO_LARGE, null, null);
// TODO: close the connection !
break;
}
else
{
releaseRequestBuffer();
}
}
}
@ -306,4 +315,33 @@ public class HttpConnection extends AbstractConnection implements Runnable
{
onFillable();
}
private class Input extends HttpInput
{
@Override
protected void blockForContent() throws IOException
{
try
{
while (true)
{
FutureCallback<Void> callback = new FutureCallback<>();
getEndPoint().fillInterested(null, callback);
callback.get();
if (readAndParse())
break;
else
releaseRequestBuffer();
}
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
}
}

View File

@ -24,8 +24,6 @@ import java.nio.ByteBuffer;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -42,10 +40,9 @@ import org.eclipse.jetty.util.log.Logger;
*/
public class HttpInput extends ServletInputStream
{
private static final Logger LOG = Log.getLogger(HttpInput.class);
protected final byte[] _oneByte=new byte[1];
protected final ArrayQueue<ByteBuffer> _inputQ=new ArrayQueue<>();
private final byte[] _oneByte=new byte[1];
private final ArrayQueue<ByteBuffer> _inputQ=new ArrayQueue<>();
// TODO: what is this field for ?
private ByteBuffer _content;
private boolean _inputEOF;
@ -144,7 +141,7 @@ public class HttpInput extends ServletInputStream
{
synchronized (lock())
{
while(_inputQ.isEmpty())
while (_inputQ.isEmpty())
{
try
{
@ -179,6 +176,7 @@ public class HttpInput extends ServletInputStream
{
synchronized (lock())
{
// TODO: no copy of the buffer; is this safe ?
_inputQ.add(ref);
onContentQueued(ref);
}
@ -201,6 +199,7 @@ public class HttpInput extends ServletInputStream
}
}
// TODO: is this method needed at all ?
public void consumeAll()
{
/*

View File

@ -34,17 +34,18 @@ import org.eclipse.jetty.util.log.Logger;
public class HttpTransportOverHttp implements HttpTransport
{
private static final Logger logger = Log.getLogger(HttpTransportOverHttp.class);
private final HttpGenerator _generator = new HttpGenerator();
private final ByteBufferPool _bufferPool;
private final HttpConfiguration _configuration;
private final EndPoint _endPoint;
private final HttpGenerator _generator;
private HttpGenerator.ResponseInfo _info;
public HttpTransportOverHttp(ByteBufferPool _bufferPool, HttpConfiguration _configuration, EndPoint _endPoint)
public HttpTransportOverHttp(ByteBufferPool _bufferPool, HttpConfiguration _configuration, EndPoint _endPoint, HttpGenerator generator)
{
this._bufferPool = _bufferPool;
this._configuration = _configuration;
this._endPoint = _endPoint;
this._generator = generator;
}
@Override

View File

@ -43,7 +43,7 @@ public class HttpWriterTest
{
_bytes = BufferUtil.allocate(2048);
HttpChannel channel = new HttpChannel(null,null,null,null)
HttpChannel channel = new HttpChannel(null,null,null,null,null)
{
@Override
public HttpConfiguration getHttpConfiguration()

View File

@ -103,7 +103,7 @@ public class ResponseTest
}
};
_channel = new HttpChannel(_connector,null,endp,null)
_channel = new HttpChannel(_connector,null,endp,null,null)
{
// @Override
protected void flush(ByteBuffer content, boolean last) throws IOException