jetty-9 cleanups and fixes after refactor

This commit is contained in:
Greg Wilkins 2012-08-23 14:07:10 +10:00
parent 74e632cd34
commit cd719bf979
13 changed files with 517 additions and 364 deletions

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpTokens.EndOfContent;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -62,11 +63,14 @@ public class HttpParser
private final HttpHandler _handler;
private final RequestHandler _requestHandler;
private final ResponseHandler _responseHandler;
private final int _maxHeaderBytes;
private HttpHeader _header;
private String _headerString;
private HttpHeaderValue _value;
private String _valueString;
private int _responseStatus;
private int _headerBytes;
private boolean _host;
/* ------------------------------------------------------------------------------- */
private State _state=State.START;
@ -88,25 +92,33 @@ public class HttpParser
private final Utf8StringBuilder _utf8=new Utf8StringBuilder();
/* ------------------------------------------------------------------------------- */
/**
* Constructor.
*/
public HttpParser(RequestHandler handler)
{
this(handler,-1);
}
/* ------------------------------------------------------------------------------- */
public HttpParser(ResponseHandler handler)
{
this(handler,-1);
}
/* ------------------------------------------------------------------------------- */
public HttpParser(RequestHandler handler,int maxHeaderBytes)
{
_handler=handler;
_requestHandler=handler;
_responseHandler=null;
_maxHeaderBytes=maxHeaderBytes;
}
/* ------------------------------------------------------------------------------- */
/**
* Constructor.
*/
public HttpParser(ResponseHandler handler)
public HttpParser(ResponseHandler handler,int maxHeaderBytes)
{
_handler=handler;
_requestHandler=null;
_responseHandler=handler;
_maxHeaderBytes=maxHeaderBytes;
}
/* ------------------------------------------------------------------------------- */
@ -151,7 +163,7 @@ public class HttpParser
/* ------------------------------------------------------------------------------- */
public boolean isInContent()
{
return _endOfContent!=EndOfContent.NO_CONTENT && _endOfContent!=EndOfContent.UNKNOWN_CONTENT;
return _state.ordinal()>State.END.ordinal() && _state.ordinal()<State.CLOSED.ordinal();
}
/* ------------------------------------------------------------------------------- */
@ -269,6 +281,12 @@ public class HttpParser
// process each character
byte ch=buffer.get();
if (_maxHeaderBytes>0 && ++_headerBytes>_maxHeaderBytes)
{
badMessage(buffer,HttpStatus.REQUEST_URI_TOO_LONG_414,null);
return true;
}
if (_eol == HttpTokens.CARRIAGE_RETURN && ch == HttpTokens.LINE_FEED)
{
_eol=HttpTokens.LINE_FEED;
@ -289,7 +307,7 @@ public class HttpParser
}
else if (ch < HttpTokens.SPACE && ch>=0)
{
badMessage(buffer, "No URI");
badMessage(buffer,HttpStatus.BAD_REQUEST_400,"No URI");
return true;
}
else
@ -303,14 +321,14 @@ public class HttpParser
_version=HttpVersion.CACHE.get(version);
if (_version==null)
{
badMessage(buffer, "Unknown Version");
badMessage(buffer,HttpStatus.BAD_REQUEST_400,"Unknown Version");
return true;
}
_state=State.SPACE1;
}
else if (ch < HttpTokens.SPACE && ch>=0)
{
badMessage(buffer, "No Status");
badMessage(buffer,HttpStatus.BAD_REQUEST_400,"No Status");
return true;
}
else
@ -334,7 +352,7 @@ public class HttpParser
}
else if (ch < HttpTokens.SPACE)
{
badMessage(buffer, _requestHandler!=null?"No URI":"No Status");
badMessage(buffer,HttpStatus.BAD_REQUEST_400,_requestHandler!=null?"No URI":"No Status");
return true;
}
break;
@ -438,7 +456,7 @@ public class HttpParser
_version=HttpVersion.CACHE.get(version);
if (_version==null)
{
badMessage(buffer, "Unknown Version");
badMessage(buffer,HttpStatus.BAD_REQUEST_400,"Unknown Version");
return true;
}
@ -492,6 +510,11 @@ public class HttpParser
{
// process each character
byte ch=buffer.get();
if (_maxHeaderBytes>0 && ++_headerBytes>_maxHeaderBytes)
{
badMessage(buffer,HttpStatus.REQUEST_ENTITY_TOO_LARGE_413,null);
return true;
}
if (_eol == HttpTokens.CARRIAGE_RETURN && ch == HttpTokens.LINE_FEED)
{
@ -536,7 +559,7 @@ public class HttpParser
catch(NumberFormatException e)
{
LOG.ignore(e);
badMessage(buffer, "Bad Content-Length");
badMessage(buffer,HttpStatus.BAD_REQUEST_400,"Bad Content-Length");
return true;
}
if (_contentLength <= 0)
@ -555,11 +578,41 @@ public class HttpParser
_endOfContent=EndOfContent.CHUNKED_CONTENT;
else if (_valueString.indexOf(HttpHeaderValue.CHUNKED.toString()) >= 0)
{
badMessage(buffer, "Bad chunking");
badMessage(buffer,HttpStatus.BAD_REQUEST_400,"Bad chunking");
return true;
}
}
break;
case HOST:
_host=true;
String host=_valueString;
int port=0;
loop: for (int i = host.length(); i-- > 0;)
{
char c2 = (char)(0xff & host.charAt(i));
switch (c2)
{
case ']':
break loop;
case ':':
try
{
port = StringUtil.toInt(host.substring(i+1));
}
catch (NumberFormatException e)
{
LOG.debug(e);
badMessage(buffer,HttpStatus.BAD_REQUEST_400,"Bad Host header");
return true;
}
host = host.substring(0,i);
break loop;
}
}
if (_requestHandler!=null)
_requestHandler.parsedHostHeader(host,port);
}
}
@ -578,6 +631,14 @@ public class HttpParser
_contentPosition=0;
// End of headers!
// Was there a required host header?
if (!_host && _version!=HttpVersion.HTTP_1_0 && _requestHandler!=null)
{
badMessage(buffer,HttpStatus.BAD_REQUEST_400,"No Host");
return true;
}
// so work out the _content demarcation
if (_endOfContent == EndOfContent.UNKNOWN_CONTENT)
{
@ -1053,17 +1114,17 @@ public class HttpParser
LOG.warn("badMessage: "+e.toString()+" for "+_handler);
LOG.debug(e);
badMessage(buffer,e.toString());
badMessage(buffer,HttpStatus.BAD_REQUEST_400,e.toString());
return true;
}
}
/* ------------------------------------------------------------------------------- */
private void badMessage(ByteBuffer buffer, String reason)
private void badMessage(ByteBuffer buffer, int status, String reason)
{
BufferUtil.clear(buffer);
_state=State.END;
_handler.badMessage(400, reason);
_handler.badMessage(status, reason);
}
/* ------------------------------------------------------------------------------- */
@ -1099,6 +1160,7 @@ public class HttpParser
_endOfContent=EndOfContent.UNKNOWN_CONTENT;
_contentPosition=0;
_responseStatus=0;
_headerBytes=0;
_contentChunk=null;
}
@ -1111,6 +1173,8 @@ public class HttpParser
_contentPosition=0;
_responseStatus=0;
_contentChunk=null;
_headerBytes=0;
_host=false;
}
/* ------------------------------------------------------------------------------- */
@ -1165,6 +1229,13 @@ public class HttpParser
* This is the method called by parser when the HTTP request line is parsed
*/
public abstract boolean startRequest(HttpMethod method, String methodString, String uri, HttpVersion version);
/**
* This is the method called by the parser after it has parsed the host header (and checked it's format). This is
* called after the {@link HttpHandler#parsedHeader(HttpHeader, String, String) methods and before
* HttpHandler#headerComplete();
*/
public abstract boolean parsedHostHeader(String host,int port);
}
public interface ResponseHandler extends HttpHandler

View File

@ -275,6 +275,12 @@ public class HttpTester
{
put(name,value);
}
@Override
public boolean parsedHostHeader(String host,int port)
{
return false;
}
}
public static class Response extends Message implements HttpParser.ResponseHandler

View File

@ -739,6 +739,13 @@ public class HttpParserTest
return false;
}
@Override
public boolean parsedHostHeader(String host,int port)
{
// TODO test this
return false;
}
@Override
public boolean headerComplete()
{

View File

@ -256,12 +256,14 @@ abstract public class WriteFlusher
protected void fail(Throwable cause)
{
_callback.failed(_context, cause);
if (_callback!=null)
_callback.failed(_context, cause);
}
protected void complete()
{
_callback.completed(_context);
if (_callback!=null)
_callback.completed(_context);
}
}
@ -290,9 +292,6 @@ abstract public class WriteFlusher
if (LOG.isDebugEnabled())
LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
if (callback == null)
throw new IllegalArgumentException();
if (!updateState(__IDLE,__WRITING))
throw new WritePendingException();
@ -317,12 +316,16 @@ abstract public class WriteFlusher
// If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__WRITING,__IDLE))
ignoreFail();
callback.completed(context);
if (callback!=null)
callback.completed(context);
}
catch (IOException e)
{
if (updateState(__WRITING,__IDLE))
callback.failed(context, e);
{
if (callback!=null)
callback.failed(context, e);
}
else
fail(new PendingState<>(buffers, context, callback));
}

View File

@ -49,7 +49,19 @@ import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpChannel implements HttpParser.RequestHandler
/* ------------------------------------------------------------ */
/** HttpChannel.
* Represents a single endpoint for HTTP semantic processing.
* The HttpChannel is both a HttpParser.RequestHandler, where it passively receives events from
* an incoming HTTP request, and a Runnable, where it actively takes control of the request/response
* life cycle and calls the application (perhaps suspending and resuming with multiple calls to run).
* The HttpChannel signals the switch from passive mode to active mode by returning true to one of the
* HttpParser.RequestHandler callbacks. The completion of the active phase is signalled by a call to
* HttpTransport.httpChannelCompleted().
*
*/
public class HttpChannel implements HttpParser.RequestHandler, Runnable
{
private static final Logger LOG = Log.getLogger(HttpChannel.class);
private static final ThreadLocal<HttpChannel> __currentChannel = new ThreadLocal<>();
@ -78,7 +90,6 @@ public class HttpChannel implements HttpParser.RequestHandler
private boolean _expect = false;
private boolean _expect100Continue = false;
private boolean _expect102Processing = false;
private boolean _host = false;
public HttpChannel(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput input)
{
@ -186,7 +197,8 @@ public class HttpChannel implements HttpParser.RequestHandler
_uri.clear();
}
protected boolean handle()
@Override
public void run()
{
LOG.debug("{} handle enter", this);
@ -223,7 +235,6 @@ public class HttpChannel implements HttpParser.RequestHandler
else
{
_request.setDispatcherType(DispatcherType.ASYNC);
// TODO: should be call customize() as above ?
getServer().handleAsync(this);
}
}
@ -249,16 +260,54 @@ public class HttpChannel implements HttpParser.RequestHandler
handling = !_state.unhandle();
}
}
return complete();
}
finally
{
if (threadName != null && LOG.isDebugEnabled())
Thread.currentThread().setName(threadName);
setCurrentHttpChannel(null);
if (_state.isCompleting())
{
try
{
_state.completed();
if (_expect100Continue)
{
LOG.debug("100 continues not sent");
// We didn't send 100 continues, but the latest interpretation
// of the spec (see httpbis) is that the client will either
// send the body anyway, or close. So we no longer need to
// do anything special here other than make the connection not persistent
_expect100Continue = false;
if (!_response.isCommitted())
_response.getHttpFields().add(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.toString());
else
LOG.warn("Can't close non-100 response");
}
if (!_response.isCommitted() && !_request.isHandled())
_response.sendError(404);
// Complete generating the response
_response.complete();
}
catch(EofException e)
{
LOG.debug(e);
}
catch(Exception e)
{
LOG.warn(e);
}
finally
{
_request.setHandled(true);
_transport.httpChannelCompleted();
}
}
LOG.debug("{} handle exit", this);
}
}
@ -452,7 +501,6 @@ public class HttpChannel implements HttpParser.RequestHandler
@Override
public boolean startRequest(HttpMethod httpMethod, String method, String uri, HttpVersion version)
{
_host = false;
_expect = false;
_expect100Continue = false;
_expect102Processing = false;
@ -498,11 +546,6 @@ public class HttpChannel implements HttpParser.RequestHandler
{
switch (header)
{
case HOST:
// TODO check if host matched a host in the URI.
_host = true;
break;
case EXPECT:
HttpHeaderValue expect = HttpHeaderValue.CACHE.get(value);
switch (expect == null ? HttpHeaderValue.UNKNOWN : expect)
@ -553,6 +596,14 @@ public class HttpChannel implements HttpParser.RequestHandler
return false;
}
@Override
public boolean parsedHostHeader(String host, int port)
{
_request.setServerName(host);
_request.setServerPort(port);
return false;
}
@Override
public boolean headerComplete()
{
@ -581,12 +632,6 @@ public class HttpChannel implements HttpParser.RequestHandler
if (getServer().getSendDateHeader())
_response.getHttpFields().putDateField(HttpHeader.DATE.toString(), _request.getTimeStamp());
if (!_host)
{
_response.sendError(Response.SC_BAD_REQUEST, "No Host Header", null);
return true;
}
if (_expect)
{
_response.sendError(Response.SC_EXPECTATION_FAILED, null, null);
@ -637,28 +682,21 @@ public class HttpChannel implements HttpParser.RequestHandler
try
{
commitResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true);
if (_state.handling())
{
commitResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true);
_state.unhandle();
}
}
catch (IOException e)
{
LOG.warn(e);
}
}
// TODO: port the logic present in this method
/*
@Override
public ResponseInfo commit()
finally
{
// If we are still expecting a 100, then this response must close
if (_expect100Continue)
{
_expect100Continue = false;
_response.getHttpFields().put(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
}
return _response.commit();
_state.completed();
}
*/
}
protected boolean commitResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{

View File

@ -717,7 +717,7 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
protected void scheduleDispatch()
{
_channel.execute(_handleRequest);
_channel.execute(_channel);
}
/* ------------------------------------------------------------ */
@ -1102,13 +1102,4 @@ public class HttpChannelState implements AsyncContext, Continuation
return _pathInContext;
}
}
private final Runnable _handleRequest = new Runnable()
{
@Override
public void run()
{
_channel.handle();
}
};
}

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -38,7 +39,7 @@ import org.eclipse.jetty.util.log.Logger;
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
*/
public class HttpConnection extends AbstractConnection implements Runnable
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
{
public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclispe.jetty.server.HttpConnection.UPGRADE";
private static final Logger LOG = Log.getLogger(HttpConnection.class);
@ -49,10 +50,9 @@ public class HttpConnection extends AbstractConnection implements Runnable
private final ByteBufferPool _bufferPool;
private final Server _server;
private final HttpGenerator _generator;
private final HttpChannel _channel;
private final HttpChannelOverHttp _channel;
private final HttpParser _parser;
private ByteBuffer _requestBuffer = null;
private int _headerBytes;
public static HttpConnection getCurrentConnection()
{
@ -74,8 +74,8 @@ public class HttpConnection extends AbstractConnection implements Runnable
_server = connector.getServer();
_generator = new HttpGenerator(); // TODO: consider moving the generator to the transport, where it belongs
_generator.setSendServerVersion(_server.getSendServerVersion());
_channel = new HttpChannel(connector, config, endPoint, new HttpTransportOverHttp(_bufferPool, _configuration, endPoint, _generator), new Input());
_parser = new HttpParser(_channel);
_channel = new HttpChannelOverHttp(connector, config, endPoint, this, new Input());
_parser = new HttpParser(_channel,config.getRequestHeaderSize());
LOG.debug("New HTTP Connection {}", this);
}
@ -159,10 +159,6 @@ public class HttpConnection extends AbstractConnection implements Runnable
releaseRequestBuffer();
return false;
}
else
{
_headerBytes += filled;
}
}
// Parse the buffer
@ -189,94 +185,25 @@ public class HttpConnection extends AbstractConnection implements Runnable
{
if (readAndParse())
{
// reset header count
_headerBytes = 0; // TODO: put this logic into the parser ?
if (!_channel.getRequest().isPersistent())
_generator.setPersistent(false);
// The parser returned true, which indicates the channel is ready to handle a request.
// Call the channel and this will either handle the request/response to completion OR,
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
boolean complete = _channel.handle(); // TODO: should we perform special processing if we are complete ?
_channel.run();
if (complete)
// Return if the channel is still processing the request
if (_channel.getState().isSuspending())
{
// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
LOG.debug("Upgrade from {} to {}", this, connection);
getEndPoint().setConnection(connection);
}
}
reset();
// Is this thread dispatched from a resume ?
if (getCurrentConnection() != HttpConnection.this)
{
if (_parser.isStart())
{
// it wants to eat more
if (_requestBuffer == null)
{
fillInterested();
}
else if (getConnector().isStarted())
{
LOG.debug("{} pipelined", this);
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isStarted())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
else
{
getEndPoint().close();
}
}
if (_parser.isClosed() && !getEndPoint().isOutputShutdown())
{
// TODO This is a catch all indicating some protocol handling failure
// Currently needed for requests saying they are HTTP/2.0.
// This should be removed once better error handling is in place
LOG.warn("Endpoint output not shutdown when seeking EOF");
getEndPoint().shutdownOutput();
}
}
// make sure that an oshut connection is driven towards close
// TODO this is a little ugly
if (getEndPoint().isOpen() && getEndPoint().isOutputShutdown())
{
fillInterested();
}
// return if the connection has been changed
if (getEndPoint().getConnection() != this)
return;
// release buffer if no input being held.
// This is needed here to handle the case of no request input. If there
// is request input, then the release is handled by Input@onAllContentConsumed()
if (_channel.getRequest().getHttpInput().available()==0)
releaseRequestBuffer();
return;
}
}
else if (_headerBytes >= _configuration.getRequestHeaderSize())
{
_parser.reset();
_parser.close();
_generator.setPersistent(false);
_channel.getResponse().sendError(Response.SC_REQUEST_ENTITY_TOO_LARGE, null, null);
break;
// return if the connection has been changed
if (getEndPoint().getConnection()!=this)
return;
}
else
{
@ -316,6 +243,191 @@ public class HttpConnection extends AbstractConnection implements Runnable
onFillable();
}
@Override
public void commit(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
// TODO This is always blocking! One of the important use-cases is to be able to write large static content without a thread
ByteBuffer header = null;
out: while (true)
{
HttpGenerator.Result result = _generator.generateResponse(info, header, content, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(content),
lastContent,
_generator.getState());
switch (result)
{
case NEED_HEADER:
{
if (header != null)
_bufferPool.release(header);
header = _bufferPool.acquire(_configuration.getResponseHeaderSize(), false);
continue;
}
case NEED_CHUNK:
{
if (header != null)
_bufferPool.release(header);
header = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
continue;
}
case FLUSH:
{
if (_channel.getRequest().isHead())
{
BufferUtil.clear(content);
if (BufferUtil.hasContent(header))
blockingWrite(header);
}
else if (BufferUtil.hasContent(header))
{
if (BufferUtil.hasContent(content))
blockingWrite(header, content);
else
blockingWrite(header);
}
else if (BufferUtil.hasContent(content))
{
blockingWrite(content);
}
continue;
}
case SHUTDOWN_OUT:
{
getEndPoint().shutdownOutput();
continue;
}
case DONE:
{
break out;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
@Override
public void write(ByteBuffer content, boolean lastContent) throws IOException
{
commit(null, content, lastContent);
}
private void blockingWrite(ByteBuffer... bytes) throws IOException
{
try
{
FutureCallback<Void> callback = new FutureCallback<>();
getEndPoint().write(null, callback, bytes);
callback.get();
}
catch (InterruptedException x)
{
throw (IOException)new InterruptedIOException().initCause(x);
}
catch (ExecutionException x)
{
Throwable cause = x.getCause();
if (cause instanceof IOException)
throw (IOException)cause;
else if (cause instanceof Exception)
throw new IOException(cause);
else
throw (Error)cause;
}
}
@Override
public void httpChannelCompleted()
{
// Finish consuming the request
if (_parser.isInContent() && _generator.isPersistent())
// Complete reading the request
_channel.getRequest().getHttpInput().consumeAll();
// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
LOG.debug("Upgrade from {} to {}", this, connection);
getEndPoint().setConnection(connection);
}
}
reset();
// Is this thread dispatched from a resume ?
if (getCurrentConnection() != HttpConnection.this)
{
if (_parser.isStart())
{
// it wants to eat more
if (_requestBuffer == null)
{
fillInterested();
}
else if (getConnector().isStarted())
{
LOG.debug("{} pipelined", this);
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isStarted())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
else
{
getEndPoint().close();
}
}
if (_parser.isClosed() && !getEndPoint().isOutputShutdown())
{
// TODO This is a catch all indicating some protocol handling failure
// Currently needed for requests saying they are HTTP/2.0.
// This should be removed once better error handling is in place
LOG.warn("Endpoint output not shutdown when seeking EOF");
getEndPoint().shutdownOutput();
}
}
// make sure that an oshut connection is driven towards close
// TODO this is a little ugly
if (getEndPoint().isOpen() && getEndPoint().isOutputShutdown())
{
fillInterested();
}
// return if the connection has been changed
if (getEndPoint().getConnection() != this)
return;
}
private class Input extends HttpInput
{
@Override
@ -343,5 +455,63 @@ public class HttpConnection extends AbstractConnection implements Runnable
FutureCallback.rethrow(e);
}
}
@Override
protected void onContentQueued(ByteBuffer ref)
{
/* This callback could be used to tell the connection
* that the request did contain content and thus the request
* buffer needs to be held until a call to #onAllContentConsumed
*
* However it turns out that nothing is needed here because either a
* request will have content, in which case the request buffer will be
* released by a call to onAllContentConsumed; or it will not have content.
* If it does not have content, either it will complete quickly and the
* buffers will be released in completed() or it will be suspended and
* onReadable() contains explicit handling to release if it is suspended.
*
* We extend this method anyway, to turn off the notify done by the
* default implementation as this is not needed by our implementation
* of blockForContent
*/
}
@Override
protected void onAllContentConsumed()
{
/* This callback tells the connection that all content that has
* been parsed has been consumed. Thus the request buffer may be
* released if it is empty.
*/
releaseRequestBuffer();
}
}
private class HttpChannelOverHttp extends HttpChannel
{
public HttpChannelOverHttp(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput input)
{
super(connector,configuration,endPoint,transport,input);
}
@Override
public void badMessage(int status, String reason)
{
_generator.setPersistent(false);
super.badMessage(status,reason);
}
@Override
public boolean headerComplete()
{
boolean result= super.headerComplete();
if (!getRequest().isPersistent())
_generator.setPersistent(false);
return result;
}
}
}

View File

@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.ArrayQueue;
@ -42,8 +43,6 @@ public class HttpInput extends ServletInputStream
{
private final byte[] _oneByte=new byte[1];
private final ArrayQueue<ByteBuffer> _inputQ=new ArrayQueue<>();
// TODO: what is this field for ?
private ByteBuffer _content;
private boolean _inputEOF;
/* ------------------------------------------------------------ */
@ -62,15 +61,20 @@ public class HttpInput extends ServletInputStream
{
synchronized (lock())
{
ByteBuffer content=_inputQ.peekUnsafe();;
while(content!=null)
{
content.clear();
_inputQ.pollUnsafe();
onContentConsumed(content);
content=_inputQ.peekUnsafe();
if (content==null)
onAllContentConsumed();
}
_inputEOF=false;
if (_content!=null)
onContentConsumed(_content);
while ((_content=_inputQ.poll())!=null)
onContentConsumed(_content);
if (_content!=null)
onAllContentConsumed();
_content=null;
}
}
@ -115,11 +119,14 @@ public class HttpInput extends ServletInputStream
while (content!=null && !content.hasRemaining())
{
_inputQ.pollUnsafe();
onContentConsumed(content);
content=_inputQ.peekUnsafe();
}
if (content==null)
{
onAllContentConsumed();
// check for EOF
if (_inputEOF)
{
@ -176,7 +183,9 @@ public class HttpInput extends ServletInputStream
{
synchronized (lock())
{
// TODO: no copy of the buffer; is this safe ?
// The buffer is not copied here. This relies on the caller not recycling the buffer
// until the it is consumed. The onAllContentConsumed() callback is the signal to the
// caller that the buffers can be recycled.
_inputQ.add(ref);
onContentQueued(ref);
}
@ -199,27 +208,33 @@ public class HttpInput extends ServletInputStream
}
}
// TODO: is this method needed at all ?
public void consumeAll()
{
/*
while (true)
synchronized (lock())
{
synchronized (lock())
while(!_inputEOF)
{
_inputQ.clear();
}
if (_inputEOF)
return;
try
{
blockForContent();
}
catch(IOException e)
{
LOG.warn(e);
ByteBuffer content=_inputQ.peekUnsafe();
while(content!=null)
{
content.clear();
_inputQ.pollUnsafe();
onContentConsumed(content);
content=_inputQ.peekUnsafe();
if (content==null)
onAllContentConsumed();
}
try
{
blockForContent();
}
catch(IOException e)
{
throw new RuntimeIOException(e);
}
}
}
*/
}
}

View File

@ -7,7 +7,9 @@ import org.eclipse.jetty.http.HttpGenerator;
public interface HttpTransport
{
public void commit(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean complete) throws IOException;
void commit(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException;
public void write(ByteBuffer content, boolean complete) throws IOException;
void write(ByteBuffer content, boolean lastContent) throws IOException;
void httpChannelCompleted();
}

View File

@ -1,163 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpTransportOverHttp implements HttpTransport
{
private static final Logger logger = Log.getLogger(HttpTransportOverHttp.class);
private final ByteBufferPool _bufferPool;
private final HttpConfiguration _configuration;
private final EndPoint _endPoint;
private final HttpGenerator _generator;
private HttpGenerator.ResponseInfo _info;
public HttpTransportOverHttp(ByteBufferPool _bufferPool, HttpConfiguration _configuration, EndPoint _endPoint, HttpGenerator generator)
{
this._bufferPool = _bufferPool;
this._configuration = _configuration;
this._endPoint = _endPoint;
this._generator = generator;
}
@Override
public void commit(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{
// TODO This is blocking! One of the important use-cases is to be able to write large static content without a thread
generate(info, content, complete);
// TODO: Trick only needed by the current HttpGenerator, that always require a ResponseInfo object
if (!complete)
_info = info;
}
@Override
public void write(ByteBuffer content, boolean complete) throws IOException
{
generate(_info, content, complete);
// TODO: Trick only needed by the current HttpGenerator, that always require a ResponseInfo object
if (complete)
_info = null;
}
private void generate(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{
ByteBuffer header = null;
out: while (true)
{
HttpGenerator.Result result = _generator.generateResponse(info, header, content, complete);
if (logger.isDebugEnabled())
logger.debug("{} generate: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(content),
complete,
_generator.getState());
switch (result)
{
case NEED_HEADER:
{
if (header != null)
_bufferPool.release(header);
header = _bufferPool.acquire(_configuration.getResponseHeaderSize(), false);
continue;
}
case NEED_CHUNK:
{
if (header != null)
_bufferPool.release(header);
header = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
continue;
}
case FLUSH:
{
if (info.isHead())
{
BufferUtil.clear(content);
if (BufferUtil.hasContent(header))
blockingWrite(header);
}
else if (BufferUtil.hasContent(header))
{
blockingWrite(header, content);
}
else
{
blockingWrite(content);
}
continue;
}
case SHUTDOWN_OUT:
{
_endPoint.shutdownOutput();
continue;
}
case DONE:
{
break out;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
private void blockingWrite(ByteBuffer... bytes) throws IOException
{
try
{
FutureCallback<Void> callback = new FutureCallback<>();
_endPoint.write(null, callback, bytes);
callback.get();
}
catch (InterruptedException x)
{
throw (IOException)new InterruptedIOException().initCause(x);
}
catch (ExecutionException x)
{
Throwable cause = x.getCause();
if (cause instanceof IOException)
throw (IOException)cause;
else if (cause instanceof Exception)
throw new IOException(cause);
else
throw (Error)cause;
}
}
}

View File

@ -1070,9 +1070,7 @@ public class Request implements HttpServletRequest
}
catch (NumberFormatException e)
{
// TODO: this should be moved to the parser !
if (_channel != null)
getResponse().sendError(Response.SC_BAD_REQUEST, "Bad Host header", null);
LOG.warn(e);
}
return _serverName;
}

View File

@ -159,12 +159,12 @@ public class HttpConnectionTest
@Test
public void testHead() throws Exception
{
String responsePOST=connector.getResponses("POST /R1 HTTP/1.1\015\012"+
String responseHEAD=connector.getResponses("HEAD /R1 HTTP/1.1\015\012"+
"Host: localhost\015\012"+
"Connection: close\015\012"+
"\015\012");
String responseHEAD=connector.getResponses("HEAD /R1 HTTP/1.1\015\012"+
String responsePOST=connector.getResponses("POST /R1 HTTP/1.1\015\012"+
"Host: localhost\015\012"+
"Connection: close\015\012"+
"\015\012");
@ -182,9 +182,9 @@ public class HttpConnectionTest
}
@Test
public void testBad() throws Exception
public void testBadHostPort() throws Exception
{
Log.getLogger(HttpParser.class).info("badMessage: 3 bad messages expected ...");
Log.getLogger(HttpParser.class).info("badMessage: Number formate exception expected ...");
String response;
response=connector.getResponses("GET http://localhost:EXPECTED_NUMBER_FORMAT_EXCEPTION/ HTTP/1.1\n"+
@ -192,12 +192,26 @@ public class HttpConnectionTest
"Connection: close\015\012"+
"\015\012");
checkContains(response,0,"HTTP/1.1 400");
}
@Test
public void testBadURIencoding() throws Exception
{
Log.getLogger(HttpParser.class).info("badMessage: bad encoding expected ...");
String response;
response=connector.getResponses("GET /bad/encoding%1 HTTP/1.1\n"+
"Host: localhost\n"+
"Connection: close\n"+
"\015\012");
checkContains(response,0,"HTTP/1.1 400");
}
@Test
public void testBadUTF8FallsbackTo8859() throws Exception
{
Log.getLogger(HttpParser.class).info("badMessage: bad encoding expected ...");
String response;
response=connector.getResponses("GET /foo/bar%c0%00 HTTP/1.1\n"+
"Host: localhost\n"+

View File

@ -210,15 +210,16 @@ public class RequestTest
public void testInvalidHostHeader() throws Exception
{
// Use a contextHandler with vhosts to force call to Request.getServerName()
ContextHandler handler = new ContextHandler();
handler.addVirtualHosts(new String[1]);
ContextHandler context = new ContextHandler();
context.addVirtualHosts(new String[]{"something"});
_server.stop();
_server.setHandler(handler);
_server.setHandler(context);
_server.start();
// Request with illegal Host header
String request="GET / HTTP/1.1\n"+
"Host: whatever.com:\n"+
"Host: whatever.com:xxxx\n"+
"Content-Type: text/html;charset=utf8\n"+
"Connection: close\n"+
"\n";