398467 Servlet 3.1 Non Blocking IO

Asynchronous reads working.
This commit is contained in:
Greg Wilkins 2013-07-05 18:11:09 +10:00
parent 965918d8f2
commit 981102fda5
34 changed files with 1873 additions and 969 deletions

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -117,8 +118,8 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
private void shutdown()
{
// Shutting down the parser may invoke messageComplete() or fail()
parser.shutdownInput();
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
State state = this.state.get();
if (state == State.IDLE || state == State.RECEIVE)
{

View File

@ -48,6 +48,7 @@ import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ssl.SslBytesTest.TLSRecord.Type;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -67,6 +68,7 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
@ -163,7 +165,7 @@ public class SslBytesServerTest extends SslBytesTest
}
};
ServerConnector connector = new ServerConnector(server, sslFactory, httpFactory)
ServerConnector connector = new ServerConnector(server, null,null,null,1,1,sslFactory, httpFactory)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
@ -387,7 +389,14 @@ public class SslBytesServerTest extends SslBytesTest
// Socket close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
if (record!=null)
{
Assert.assertEquals(record.getType(),Type.ALERT);
// Now should be a raw close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
}
}
@Test
@ -675,7 +684,15 @@ public class SslBytesServerTest extends SslBytesTest
// Socket close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
// Raw close or alert
if (record!=null)
{
Assert.assertEquals(record.getType(),Type.ALERT);
// Now should be a raw close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
}
}
@Test
@ -730,7 +747,14 @@ public class SslBytesServerTest extends SslBytesTest
// Socket close
record = proxy.readFromServer();
Assert.assertNull(record);
if (record!=null)
{
Assert.assertEquals(record.getType(),Type.ALERT);
// Now should be a raw close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
}
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
@ -798,7 +822,14 @@ public class SslBytesServerTest extends SslBytesTest
// Socket close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
if (record!=null)
{
Assert.assertEquals(record.getType(),Type.ALERT);
// Now should be a raw close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
}
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
@ -850,9 +881,17 @@ public class SslBytesServerTest extends SslBytesTest
// Close the raw socket, this generates a truncation attack
proxy.flushToServer(null);
// Expect raw close from server
// Expect raw close from server OR ALERT
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
// TODO check that this is OK?
if (record!=null)
{
Assert.assertEquals(record.getType(),Type.ALERT);
// Now should be a raw close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
}
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
@ -902,7 +941,14 @@ public class SslBytesServerTest extends SslBytesTest
// Expect raw close from server
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
if (record!=null)
{
Assert.assertEquals(record.getType(),Type.ALERT);
// Now should be a raw close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
}
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
@ -1096,7 +1142,14 @@ public class SslBytesServerTest extends SslBytesTest
// Socket close
record = proxy.readFromServer();
Assert.assertNull(record);
if (record!=null)
{
Assert.assertEquals(record.getType(),Type.ALERT);
// Now should be a raw close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
}
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
@ -1815,6 +1868,6 @@ public class SslBytesServerTest extends SslBytesTest
// Socket close
record = proxy.readFromServer();
Assert.assertNull(String.valueOf(record), record);
Assert.assertThat(record,Matchers.notNullValue());
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpTokens.EndOfContent;
@ -86,13 +87,13 @@ public class HttpParser
HEADER_IN_NAME,
HEADER_VALUE,
HEADER_IN_VALUE,
END,
EOF_CONTENT,
CONTENT,
EOF_CONTENT,
CHUNKED_CONTENT,
CHUNK_SIZE,
CHUNK_PARAMS,
CHUNK,
END,
CLOSED
};
@ -111,6 +112,8 @@ public class HttpParser
/* ------------------------------------------------------------------------------- */
private volatile State _state=State.START;
private volatile boolean _eof;
private volatile boolean _closed;
private HttpMethod _method;
private String _methodString;
private HttpVersion _version;
@ -188,19 +191,13 @@ public class HttpParser
/* ------------------------------------------------------------------------------- */
public boolean inContentState()
{
return _state.ordinal() > State.END.ordinal();
return _state.ordinal()>=State.CONTENT.ordinal() && _state.ordinal()<State.END.ordinal();
}
/* ------------------------------------------------------------------------------- */
public boolean inHeaderState()
{
return _state.ordinal() < State.END.ordinal();
}
/* ------------------------------------------------------------------------------- */
public boolean isInContent()
{
return _state.ordinal()>State.END.ordinal() && _state.ordinal()<State.CLOSED.ordinal();
return _state.ordinal() < State.CONTENT.ordinal();
}
/* ------------------------------------------------------------------------------- */
@ -239,8 +236,10 @@ public class HttpParser
return _state == state;
}
/* ------------------------------------------------------------------------------- */
private static class BadMessage extends Error
{
private static final long serialVersionUID = 1L;
private final int _code;
private final String _message;
@ -268,65 +267,43 @@ public class HttpParser
}
/* ------------------------------------------------------------------------------- */
private byte next(ByteBuffer buffer)
private byte next(ByteBuffer buffer)
{
byte ch=buffer.get();
// If not a special character
if (ch>=HttpTokens.SPACE || ch<0)
{
if (_cr)
throw new BadMessage("Bad EOL");
/*
if (ch>HttpTokens.SPACE)
System.err.println("Next "+(char)ch);
else
System.err.println("Next ["+ch+"]");*/
return ch;
}
byte ch = buffer.get();
// Only a LF acceptable after CR
if (_cr)
{
if (ch!=HttpTokens.LINE_FEED)
throw new BadMessage("Bad EOL");
_cr=false;
if (ch==HttpTokens.LINE_FEED)
return ch;
throw new BadMessage("Bad EOL");
return ch;
}
// If it is a CR
if (ch==HttpTokens.CARRIAGE_RETURN)
if (ch>=0 && ch<HttpTokens.SPACE)
{
// Skip CR and look for a LF
if (buffer.hasRemaining())
if (ch==HttpTokens.CARRIAGE_RETURN)
{
if(_maxHeaderBytes>0 && _state.ordinal()<State.END.ordinal())
_headerBytes++;
ch=buffer.get();
if (ch==HttpTokens.LINE_FEED)
return ch;
throw new BadMessage();
if (buffer.hasRemaining())
{
if(_maxHeaderBytes>0 && _state.ordinal()<State.END.ordinal())
_headerBytes++;
ch=buffer.get();
if (ch!=HttpTokens.LINE_FEED)
throw new BadMessage("Bad EOL");
}
else
{
_cr=true;
// Can return 0 here to indicate the need for more characters,
// because a real 0 in the buffer would cause a BadMessage below
return 0;
}
}
// Defer lookup of LF
_cr=true;
return 0;
// Only LF or TAB acceptable special characters
else if (!(ch==HttpTokens.LINE_FEED || ch==HttpTokens.TAB))
throw new BadMessage();
}
// Only LF or TAB acceptable special characters
if (ch!=HttpTokens.LINE_FEED && ch!=HttpTokens.TAB)
throw new BadMessage();
/*
if (ch>HttpTokens.SPACE)
System.err.println("Next "+(char)ch);
else
System.err.println("Next ["+ch+"]");
*/
return ch;
}
@ -336,33 +313,33 @@ public class HttpParser
*/
private boolean quickStart(ByteBuffer buffer)
{
if (_requestHandler!=null)
{
_method = HttpMethod.lookAheadGet(buffer);
if (_method!=null)
{
_methodString = _method.asString();
buffer.position(buffer.position()+_methodString.length()+1);
setState(State.SPACE1);
return false;
}
}
else if (_responseHandler!=null)
{
_version = HttpVersion.lookAheadGet(buffer);
if (_version!=null)
{
buffer.position(buffer.position()+_version.asString().length()+1);
setState(State.SPACE1);
return false;
}
}
// Quick start look
while (_state==State.START && buffer.hasRemaining())
{
if (_requestHandler!=null)
{
_method = HttpMethod.lookAheadGet(buffer);
if (_method!=null)
{
_methodString = _method.asString();
buffer.position(buffer.position()+_methodString.length()+1);
setState(State.SPACE1);
return false;
}
}
else if (_responseHandler!=null)
{
_version = HttpVersion.lookAheadGet(buffer);
if (_version!=null)
{
buffer.position(buffer.position()+_version.asString().length()+1);
setState(State.SPACE1);
return false;
}
}
int ch=next(buffer);
byte ch=next(buffer);
if (ch > HttpTokens.SPACE)
{
_string.setLength(0);
@ -370,10 +347,13 @@ public class HttpParser
setState(_requestHandler!=null?State.METHOD:State.RESPONSE_VERSION);
return false;
}
else if (ch==0)
break;
}
return false;
}
/* ------------------------------------------------------------------------------- */
private String takeString()
{
String s =_string.toString();
@ -381,6 +361,7 @@ public class HttpParser
return s;
}
/* ------------------------------------------------------------------------------- */
private String takeLengthString()
{
_string.setLength(_length);
@ -395,17 +376,15 @@ public class HttpParser
*/
private boolean parseLine(ByteBuffer buffer)
{
boolean return_from_parse=false;
boolean handle=false;
// Process headers
while (_state.ordinal()<State.HEADER.ordinal() && buffer.hasRemaining() && !return_from_parse)
while (_state.ordinal()<State.HEADER.ordinal() && buffer.hasRemaining() && !handle)
{
// process each character
byte ch=next(buffer);
if (ch==-1)
return true;
if (ch==0)
continue;
break;
if (_maxHeaderBytes>0 && ++_headerBytes>_maxHeaderBytes)
{
@ -523,7 +502,7 @@ public class HttpParser
}
else if (ch < HttpTokens.SPACE && ch>=0)
{
return_from_parse=_responseHandler.startResponse(_version, _responseStatus, null)||return_from_parse;
handle=_responseHandler.startResponse(_version, _responseStatus, null)||handle;
setState(State.HEADER);
}
else
@ -541,11 +520,11 @@ public class HttpParser
{
// HTTP/0.9
_uri.flip();
return_from_parse=_requestHandler.startRequest(_method,_methodString,_uri,null)||return_from_parse;
handle=_requestHandler.startRequest(_method,_methodString,_uri,null)||handle;
setState(State.END);
BufferUtil.clear(buffer);
return_from_parse=_handler.headerComplete()||return_from_parse;
return_from_parse=_handler.messageComplete()||return_from_parse;
handle=_handler.headerComplete()||handle;
handle=_handler.messageComplete()||handle;
}
else
{
@ -575,28 +554,29 @@ public class HttpParser
setState(State.REQUEST_VERSION);
// try quick look ahead for HTTP Version
HttpVersion version;
if (buffer.position()>0 && buffer.hasArray())
version=HttpVersion.lookAheadGet(buffer.array(),buffer.arrayOffset()+buffer.position()-1,buffer.arrayOffset()+buffer.limit());
else
version=HttpVersion.CACHE.getBest(buffer,0,buffer.remaining());
if (version!=null)
{
HttpVersion version=HttpVersion.lookAheadGet(buffer.array(),buffer.arrayOffset()+buffer.position()-1,buffer.arrayOffset()+buffer.limit());
if (version!=null)
int pos = buffer.position()+version.asString().length()-1;
if (pos<buffer.limit())
{
int pos = buffer.position()+version.asString().length()-1;
if (pos<buffer.limit())
byte n=buffer.get(pos);
if (n==HttpTokens.CARRIAGE_RETURN)
{
byte n=buffer.get(pos);
if (n==HttpTokens.CARRIAGE_RETURN)
{
_cr=true;
_version=version;
_string.setLength(0);
buffer.position(pos+1);
}
else if (n==HttpTokens.LINE_FEED)
{
_version=version;
_string.setLength(0);
buffer.position(pos);
}
_cr=true;
_version=version;
_string.setLength(0);
buffer.position(pos+1);
}
else if (n==HttpTokens.LINE_FEED)
{
_version=version;
_string.setLength(0);
buffer.position(pos);
}
}
}
@ -606,18 +586,18 @@ public class HttpParser
{
if (_responseHandler!=null)
{
return_from_parse=_responseHandler.startResponse(_version, _responseStatus, null)||return_from_parse;
handle=_responseHandler.startResponse(_version, _responseStatus, null)||handle;
setState(State.HEADER);
}
else
{
// HTTP/0.9
_uri.flip();
return_from_parse=_requestHandler.startRequest(_method,_methodString,_uri, null)||return_from_parse;
handle=_requestHandler.startRequest(_method,_methodString,_uri, null)||handle;
setState(State.END);
BufferUtil.clear(buffer);
return_from_parse=_handler.headerComplete()||return_from_parse;
return_from_parse=_handler.messageComplete()||return_from_parse;
handle=_handler.headerComplete()||handle;
handle=_handler.messageComplete()||handle;
}
}
break;
@ -628,9 +608,7 @@ public class HttpParser
if (_version==null)
_version=HttpVersion.CACHE.get(takeString());
if (_version==null)
{
throw new BadMessage(HttpStatus.BAD_REQUEST_400,"Unknown Version");
}
// Should we try to cache header fields?
if (_connectionFields==null && _version.getVersion()>=HttpVersion.HTTP_1_1.getVersion())
@ -642,7 +620,7 @@ public class HttpParser
setState(State.HEADER);
_uri.flip();
return_from_parse=_requestHandler.startRequest(_method,_methodString,_uri, _version)||return_from_parse;
handle=_requestHandler.startRequest(_method,_methodString,_uri, _version)||handle;
continue;
}
else
@ -656,7 +634,7 @@ public class HttpParser
String reason=takeLengthString();
setState(State.HEADER);
return_from_parse=_responseHandler.startResponse(_version, _responseStatus, reason)||return_from_parse;
handle=_responseHandler.startResponse(_version, _responseStatus, reason)||handle;
continue;
}
else
@ -673,7 +651,7 @@ public class HttpParser
}
}
return return_from_parse;
return handle;
}
private boolean handleKnownHeaders(ByteBuffer buffer)
@ -766,7 +744,10 @@ public class HttpParser
case CONNECTION:
// Don't cache if not persistent
if (_valueString!=null && _valueString.indexOf("close")>=0)
{
_closed=true;
_connectionFields=null;
}
break;
case AUTHORIZATION:
@ -799,17 +780,15 @@ public class HttpParser
*/
private boolean parseHeaders(ByteBuffer buffer)
{
boolean return_from_parse=false;
boolean handle=false;
// Process headers
while (_state.ordinal()<State.END.ordinal() && buffer.hasRemaining() && !return_from_parse)
while (_state.ordinal()<State.CONTENT.ordinal() && buffer.hasRemaining() && !handle)
{
// process each character
byte ch=next(buffer);
if (ch==-1)
return true;
if (ch==0)
continue;
break;
if (_maxHeaderBytes>0 && ++_headerBytes>_maxHeaderBytes)
{
@ -853,7 +832,7 @@ public class HttpParser
_field=null;
return true;
}
return_from_parse=_handler.parsedHeader(_field!=null?_field:new HttpField(_header,_headerString,_valueString))||return_from_parse;
handle=_handler.parsedHeader(_field!=null?_field:new HttpField(_header,_headerString,_valueString))||handle;
}
_headerString=_valueString=null;
_header=null;
@ -897,23 +876,23 @@ public class HttpParser
{
case EOF_CONTENT:
setState(State.EOF_CONTENT);
return_from_parse=_handler.headerComplete()||return_from_parse;
handle=_handler.headerComplete()||handle;
break;
case CHUNKED_CONTENT:
setState(State.CHUNKED_CONTENT);
return_from_parse=_handler.headerComplete()||return_from_parse;
handle=_handler.headerComplete()||handle;
break;
case NO_CONTENT:
return_from_parse=_handler.headerComplete()||return_from_parse;
handle=_handler.headerComplete()||handle;
setState(State.END);
return_from_parse=_handler.messageComplete()||return_from_parse;
handle=_handler.messageComplete()||handle;
break;
default:
setState(State.CONTENT);
return_from_parse=_handler.headerComplete()||return_from_parse;
handle=_handler.headerComplete()||handle;
break;
}
}
@ -1151,7 +1130,7 @@ public class HttpParser
}
}
return return_from_parse;
return handle;
}
/* ------------------------------------------------------------------------------- */
@ -1161,208 +1140,100 @@ public class HttpParser
*/
public boolean parseNext(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("parseNext s={} {}",_state,BufferUtil.toDetailString(buffer));
try
{
// handle initial state
switch(_state)
boolean handle=false;
// Start a request/response
if (_state==State.START)
{
case START:
_version=null;
_method=null;
_methodString=null;
_endOfContent=EndOfContent.UNKNOWN_CONTENT;
_header=null;
if(quickStart(buffer))
return true;
break;
case CONTENT:
if (_contentPosition==_contentLength)
{
setState(State.END);
if(_handler.messageComplete())
return true;
}
break;
case END:
// eat white space
while (buffer.remaining()>0 && buffer.get(buffer.position())<=HttpTokens.SPACE)
buffer.get();
return false;
case CLOSED:
if (BufferUtil.hasContent(buffer))
{
// Just ignore data when closed
_headerBytes+=buffer.remaining();
BufferUtil.clear(buffer);
if (_headerBytes>_maxHeaderBytes)
{
// Don't want to waste time reading data of a closed request
throw new IllegalStateException("too much data after closed");
}
}
return false;
default: break;
_version=null;
_method=null;
_methodString=null;
_endOfContent=EndOfContent.UNKNOWN_CONTENT;
_header=null;
handle=quickStart(buffer);
}
// Request/response line
if (_state.ordinal()<State.HEADER.ordinal())
if (parseLine(buffer))
return true;
if (!handle && _state.ordinal()>= State.START.ordinal() && _state.ordinal()<State.HEADER.ordinal())
handle=parseLine(buffer);
if (_state.ordinal()<State.END.ordinal())
if (parseHeaders(buffer))
return true;
// Handle HEAD response
if (_responseStatus>0 && _headResponse)
// parse headers
if (!handle && _state.ordinal()>= State.HEADER.ordinal() && _state.ordinal()<State.CONTENT.ordinal())
handle=parseHeaders(buffer);
// parse content
if (!handle && _state.ordinal()>= State.CONTENT.ordinal() && _state.ordinal()<State.END.ordinal())
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
// Handle _content
byte ch;
while (_state.ordinal() > State.END.ordinal() && buffer.hasRemaining())
{
switch (_state)
// Handle HEAD response
if (_responseStatus>0 && _headResponse)
{
case EOF_CONTENT:
_contentChunk=buffer.asReadOnlyBuffer();
_contentPosition += _contentChunk.remaining();
buffer.position(buffer.position()+_contentChunk.remaining());
if (_handler.content(_contentChunk))
return true;
break;
case CONTENT:
setState(State.END);
handle=_handler.messageComplete();
}
else
handle=parseContent(buffer);
}
// handle end states
if (_state==State.END)
{
// eat white space
while (buffer.remaining()>0 && buffer.get(buffer.position())<=HttpTokens.SPACE)
buffer.get();
}
else if (_state==State.CLOSED)
{
if (BufferUtil.hasContent(buffer))
{
// Just ignore data when closed
_headerBytes+=buffer.remaining();
BufferUtil.clear(buffer);
if (_headerBytes>_maxHeaderBytes)
{
long remaining=_contentLength - _contentPosition;
if (remaining == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
else
{
_contentChunk=buffer.asReadOnlyBuffer();
// limit content by expected size
if (_contentChunk.remaining() > remaining)
{
// We can cast remaining to an int as we know that it is smaller than
// or equal to length which is already an int.
_contentChunk.limit(_contentChunk.position()+(int)remaining);
}
_contentPosition += _contentChunk.remaining();
buffer.position(buffer.position()+_contentChunk.remaining());
if (_handler.content(_contentChunk))
return true;
if(_contentPosition == _contentLength)
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
}
break;
}
case CHUNKED_CONTENT:
{
ch=next(buffer);
if (ch>HttpTokens.SPACE)
{
_chunkLength=TypeUtil.convertHexDigit(ch);
_chunkPosition=0;
setState(State.CHUNK_SIZE);
}
break;
}
case CHUNK_SIZE:
{
ch=next(buffer);
if (ch == HttpTokens.LINE_FEED)
{
if (_chunkLength == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
else
setState(State.CHUNK);
}
else if (ch <= HttpTokens.SPACE || ch == HttpTokens.SEMI_COLON)
setState(State.CHUNK_PARAMS);
else
_chunkLength=_chunkLength * 16 + TypeUtil.convertHexDigit(ch);
break;
}
case CHUNK_PARAMS:
{
ch=next(buffer);
if (ch == HttpTokens.LINE_FEED)
{
if (_chunkLength == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
else
setState(State.CHUNK);
}
break;
}
case CHUNK:
{
int remaining=_chunkLength - _chunkPosition;
if (remaining == 0)
{
setState(State.CHUNKED_CONTENT);
}
else
{
_contentChunk=buffer.asReadOnlyBuffer();
if (_contentChunk.remaining() > remaining)
_contentChunk.limit(_contentChunk.position()+remaining);
remaining=_contentChunk.remaining();
_contentPosition += remaining;
_chunkPosition += remaining;
buffer.position(buffer.position()+remaining);
if (_handler.content(_contentChunk))
return true;
}
break;
// Don't want to waste time reading data of a closed request
throw new IllegalStateException("too much data after closed");
}
}
}
// Handle EOF
if (_eof && !buffer.hasRemaining())
{
switch(_state)
{
case CLOSED:
{
BufferUtil.clear(buffer);
return false;
}
default:
break;
case END:
setState(State.CLOSED);
break;
case EOF_CONTENT:
handle=_handler.messageComplete()||handle;
setState(State.CLOSED);
break;
case CONTENT:
case CHUNKED_CONTENT:
case CHUNK_SIZE:
case CHUNK_PARAMS:
case CHUNK:
_handler.earlyEOF();
setState(State.CLOSED);
break;
default:
_handler.badMessage(400,null);
setState(State.CLOSED);
break;
}
}
return false;
return handle;
}
catch(BadMessage e)
{
@ -1396,80 +1267,181 @@ public class HttpParser
}
}
/**
* Notifies this parser that I/O code read a -1 and therefore no more data will arrive to be parsed.
* Calling this method may result in an invocation to {@link HttpHandler#messageComplete()}, for
* example when the content is delimited by the close of the connection.
* If the parser is already in a state that does not need data (for example, it is idle waiting for
* a request/response to be parsed), then calling this method is a no-operation.
*
* @return the result of the invocation to {@link HttpHandler#messageComplete()} if there has been
* one, or false otherwise.
*/
public boolean shutdownInput()
private boolean parseContent(ByteBuffer buffer)
{
LOG.debug("shutdownInput {}", this);
// was this unexpected?
switch(_state)
// Handle _content
byte ch;
while (_state.ordinal() < State.END.ordinal() && buffer.hasRemaining())
{
case START:
case END:
break;
switch (_state)
{
case EOF_CONTENT:
_contentChunk=buffer.asReadOnlyBuffer();
_contentPosition += _contentChunk.remaining();
buffer.position(buffer.position()+_contentChunk.remaining());
if (_handler.content(_contentChunk))
return true;
break;
case EOF_CONTENT:
setState(State.END);
return _handler.messageComplete();
case CONTENT:
{
long remaining=_contentLength - _contentPosition;
if (remaining == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
else
{
_contentChunk=buffer.asReadOnlyBuffer();
case CLOSED:
break;
// limit content by expected size
if (_contentChunk.remaining() > remaining)
{
// We can cast remaining to an int as we know that it is smaller than
// or equal to length which is already an int.
_contentChunk.limit(_contentChunk.position()+(int)remaining);
}
default:
setState(State.END);
if (!_headResponse)
_handler.earlyEOF();
return _handler.messageComplete();
_contentPosition += _contentChunk.remaining();
buffer.position(buffer.position()+_contentChunk.remaining());
boolean handle=_handler.content(_contentChunk);
if(_contentPosition == _contentLength)
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
if (handle)
return true;
}
break;
}
case CHUNKED_CONTENT:
{
ch=next(buffer);
if (ch>HttpTokens.SPACE)
{
_chunkLength=TypeUtil.convertHexDigit(ch);
_chunkPosition=0;
setState(State.CHUNK_SIZE);
}
break;
}
case CHUNK_SIZE:
{
ch=next(buffer);
if (ch==0)
break;
if (ch == HttpTokens.LINE_FEED)
{
if (_chunkLength == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
else
setState(State.CHUNK);
}
else if (ch <= HttpTokens.SPACE || ch == HttpTokens.SEMI_COLON)
setState(State.CHUNK_PARAMS);
else
_chunkLength=_chunkLength * 16 + TypeUtil.convertHexDigit(ch);
break;
}
case CHUNK_PARAMS:
{
ch=next(buffer);
if (ch == HttpTokens.LINE_FEED)
{
if (_chunkLength == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
}
else
setState(State.CHUNK);
}
break;
}
case CHUNK:
{
int remaining=_chunkLength - _chunkPosition;
if (remaining == 0)
{
setState(State.CHUNKED_CONTENT);
}
else
{
_contentChunk=buffer.asReadOnlyBuffer();
if (_contentChunk.remaining() > remaining)
_contentChunk.limit(_contentChunk.position()+remaining);
remaining=_contentChunk.remaining();
_contentPosition += remaining;
_chunkPosition += remaining;
buffer.position(buffer.position()+remaining);
if (_handler.content(_contentChunk))
return true;
}
break;
}
case CLOSED:
{
BufferUtil.clear(buffer);
return false;
}
default:
break;
}
}
return false;
}
/* ------------------------------------------------------------------------------- */
public boolean isAtEOF()
{
return _eof;
}
/* ------------------------------------------------------------------------------- */
public void atEOF()
{
LOG.debug("atEOF {}", this);
_eof=true;
}
/* ------------------------------------------------------------------------------- */
public void close()
{
switch(_state)
{
case START:
case CLOSED:
case END:
break;
case EOF_CONTENT:
_handler.messageComplete();
break;
default:
if (_state.ordinal()>State.END.ordinal())
{
_handler.earlyEOF();
_handler.messageComplete();
}
else
LOG.warn("Closing {}",this);
}
LOG.debug("close {}", this);
setState(State.CLOSED);
_endOfContent=EndOfContent.UNKNOWN_CONTENT;
_contentLength=-1;
_contentPosition=0;
_responseStatus=0;
_headerBytes=0;
_contentChunk=null;
}
/* ------------------------------------------------------------------------------- */
public void reset()
{
// reset state
if (_state==State.CLOSED)
return;
if (_closed)
{
setState(State.CLOSED);
return;
}
setState(State.START);
_endOfContent=EndOfContent.UNKNOWN_CONTENT;
_contentLength=-1;

View File

@ -74,7 +74,7 @@ public class HttpParserTest
assertEquals("POST", _methodOrVersion);
assertEquals("/foo", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(-1, _h);
assertEquals(-1, _headers);
}
@Test
@ -89,7 +89,7 @@ public class HttpParserTest
assertEquals("GET", _methodOrVersion);
assertEquals("/999", _uriOrStatus);
assertEquals(null, _versionOrReason);
assertEquals(-1, _h);
assertEquals(-1, _headers);
}
@Test
@ -104,7 +104,7 @@ public class HttpParserTest
assertEquals("POST", _methodOrVersion);
assertEquals("/222", _uriOrStatus);
assertEquals(null, _versionOrReason);
assertEquals(-1, _h);
assertEquals(-1, _headers);
}
@Test
@ -118,7 +118,7 @@ public class HttpParserTest
assertEquals("POST", _methodOrVersion);
assertEquals("/fo\u0690", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(-1, _h);
assertEquals(-1, _headers);
}
@Test
@ -132,7 +132,7 @@ public class HttpParserTest
assertEquals("POST", _methodOrVersion);
assertEquals("/foo?param=\u0690", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(-1, _h);
assertEquals(-1, _headers);
}
@Test
@ -146,7 +146,7 @@ public class HttpParserTest
assertEquals("POST", _methodOrVersion);
assertEquals("/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(-1, _h);
assertEquals(-1, _headers);
}
@Test
@ -159,7 +159,32 @@ public class HttpParserTest
assertEquals("CONNECT", _methodOrVersion);
assertEquals("192.168.1.2:80", _uriOrStatus);
assertEquals("HTTP/1.1", _versionOrReason);
assertEquals(-1, _h);
assertEquals(-1, _headers);
}
@Test
public void testSimple() throws Exception
{
ByteBuffer buffer= BufferUtil.toBuffer(
"GET / HTTP/1.0\015\012" +
"Host: localhost\015\012" +
"Connection: close\015\012" +
"\015\012");
HttpParser.RequestHandler<ByteBuffer> handler = new Handler();
HttpParser parser= new HttpParser(handler);
parseAll(parser,buffer);
assertTrue(_headerCompleted);
assertTrue(_messageCompleted);
assertEquals("GET", _methodOrVersion);
assertEquals("/", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals("Host", _hdr[0]);
assertEquals("localhost", _val[0]);
assertEquals("Connection", _hdr[1]);
assertEquals("close", _val[1]);
assertEquals(1, _headers);
}
@Test
@ -212,7 +237,7 @@ public class HttpParserTest
assertEquals("gzip, deflated", _val[8]);
assertEquals("Accept", _hdr[9]);
assertEquals("unknown", _val[9]);
assertEquals(9, _h);
assertEquals(9, _headers);
}
@Test
@ -260,7 +285,7 @@ public class HttpParserTest
assertEquals("gzip, deflated", _val[8]);
assertEquals("Accept", _hdr[9]);
assertEquals("unknown", _val[9]);
assertEquals(9, _h);
assertEquals(9, _headers);
}
@ -310,7 +335,7 @@ public class HttpParserTest
assertEquals("gzip, deflated", _val[8]);
assertEquals("Accept", _hdr[9]);
assertEquals("unknown", _val[9]);
assertEquals(9, _h);
assertEquals(9, _headers);
}
@Test
@ -365,7 +390,7 @@ public class HttpParserTest
assertEquals("value4", _val[4]);
assertEquals("Server5", _hdr[5]);
assertEquals("notServer", _val[5]);
assertEquals(5, _h);
assertEquals(5, _headers);
}
}
@ -390,12 +415,72 @@ public class HttpParserTest
assertEquals("GET", _methodOrVersion);
assertEquals("/chunk", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(1, _h);
assertEquals(1, _headers);
assertEquals("Header1", _hdr[0]);
assertEquals("value1", _val[0]);
assertEquals("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ", _content);
}
@Test
public void testStartEOF() throws Exception
{
HttpParser.RequestHandler<ByteBuffer> handler = new Handler();
HttpParser parser= new HttpParser(handler);
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
assertEquals("400",_bad);
}
@Test
public void testEarlyEOF() throws Exception
{
ByteBuffer buffer= BufferUtil.toBuffer(
"GET /uri HTTP/1.0\015\012"
+ "Content-Length: 20\015\012"
+ "\015\012"
+ "0123456789");
HttpParser.RequestHandler<ByteBuffer> handler = new Handler();
HttpParser parser= new HttpParser(handler);
parser.atEOF();
parseAll(parser,buffer);
assertEquals("GET", _methodOrVersion);
assertEquals("/uri", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals("0123456789", _content);
assertTrue(_early);
}
@Test
public void testChunkEarlyEOF() throws Exception
{
ByteBuffer buffer= BufferUtil.toBuffer(
"GET /chunk HTTP/1.0\015\012"
+ "Header1: value1\015\012"
+ "Transfer-Encoding: chunked\015\012"
+ "\015\012"
+ "a;\015\012"
+ "0123456789\015\012");
HttpParser.RequestHandler<ByteBuffer> handler = new Handler();
HttpParser parser= new HttpParser(handler);
parser.atEOF();
parseAll(parser,buffer);
assertEquals("GET", _methodOrVersion);
assertEquals("/chunk", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(1, _headers);
assertEquals("Header1", _hdr[0]);
assertEquals("value1", _val[0]);
assertEquals("0123456789", _content);
assertTrue(_early);
}
@Test
public void testMultiParse() throws Exception
{
@ -433,7 +518,7 @@ public class HttpParserTest
assertEquals("GET", _methodOrVersion);
assertEquals("/mp", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(2, _h);
assertEquals(2, _headers);
assertEquals("Header1", _hdr[1]);
assertEquals("value1", _val[1]);
assertEquals("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ", _content);
@ -444,7 +529,7 @@ public class HttpParserTest
assertEquals("POST", _methodOrVersion);
assertEquals("/foo", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(2, _h);
assertEquals(2, _headers);
assertEquals("Header2", _hdr[1]);
assertEquals("value2", _val[1]);
assertEquals(null, _content);
@ -452,16 +537,85 @@ public class HttpParserTest
parser.reset();
init();
parser.parseNext(buffer);
parser.shutdownInput();
parser.atEOF();
assertEquals("PUT", _methodOrVersion);
assertEquals("/doodle", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(2, _h);
assertEquals(2, _headers);
assertEquals("Header3", _hdr[1]);
assertEquals("value3", _val[1]);
assertEquals("0123456789", _content);
}
@Test
public void testMultiParseEarlyEOF() throws Exception
{
ByteBuffer buffer0= BufferUtil.toBuffer(
"GET /mp HTTP/1.0\015\012"
+ "Connection: Keep-Alive\015\012");
ByteBuffer buffer1= BufferUtil.toBuffer("Header1: value1\015\012"
+ "Transfer-Encoding: chunked\015\012"
+ "\015\012"
+ "a;\015\012"
+ "0123456789\015\012"
+ "1a\015\012"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ\015\012"
+ "0\015\012"
+ "\015\012"
+ "POST /foo HTTP/1.0\015\012"
+ "Connection: Keep-Alive\015\012"
+ "Header2: value2\015\012"
+ "Content-Length: 0\015\012"
+ "\015\012"
+ "PUT /doodle HTTP/1.0\015\012"
+ "Connection: close\015\012"
+ "Header3: value3\015\012"
+ "Content-Length: 10\015\012"
+ "\015\012"
+ "0123456789\015\012");
HttpParser.RequestHandler<ByteBuffer> handler = new Handler();
HttpParser parser= new HttpParser(handler);
parser.parseNext(buffer0);
parser.atEOF();
parser.parseNext(buffer1);
assertEquals("GET", _methodOrVersion);
assertEquals("/mp", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(2, _headers);
assertEquals("Header1", _hdr[1]);
assertEquals("value1", _val[1]);
assertEquals("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ", _content);
parser.reset();
init();
parser.parseNext(buffer1);
assertEquals("POST", _methodOrVersion);
assertEquals("/foo", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(2, _headers);
assertEquals("Header2", _hdr[1]);
assertEquals("value2", _val[1]);
assertEquals(null, _content);
parser.reset();
init();
parser.parseNext(buffer1);
assertEquals("PUT", _methodOrVersion);
assertEquals("/doodle", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(2, _headers);
assertEquals("Header3", _hdr[1]);
assertEquals("value3", _val[1]);
assertEquals("0123456789", _content);
}
@Test
public void testResponseParse0() throws Exception
{
@ -528,7 +682,7 @@ public class HttpParserTest
init();
parser.parseNext(buffer);
parser.shutdownInput();
parser.atEOF();
assertEquals("HTTP/1.1", _methodOrVersion);
assertEquals("200", _uriOrStatus);
assertEquals("Correct", _versionOrReason);
@ -580,6 +734,29 @@ public class HttpParserTest
assertTrue(_messageCompleted);
}
@Test
public void testResponseEOFContent() throws Exception
{
ByteBuffer buffer= BufferUtil.toBuffer(
"HTTP/1.1 200 \015\012"
+ "Content-Type: text/plain\015\012"
+ "\015\012"
+ "0123456789\015\012");
HttpParser.ResponseHandler<ByteBuffer> handler = new Handler();
HttpParser parser= new HttpParser(handler);
parser.atEOF();
parser.parseNext(buffer);
assertEquals("HTTP/1.1", _methodOrVersion);
assertEquals("200", _uriOrStatus);
assertEquals(null, _versionOrReason);
assertEquals(12,_content.length());
assertEquals("0123456789\015\012",_content);
assertTrue(_headerCompleted);
assertTrue(_messageCompleted);
}
@Test
public void testResponse304WithContentLength() throws Exception
{
@ -627,7 +804,7 @@ public class HttpParserTest
+ "Connection: close\015\012"
+ "\015\012"
+ "\015\012" // extra CRLF ignored
+ "HTTP/1.1 400 OK\015\012"); // extra data causes close
+ "HTTP/1.1 400 OK\015\012"); // extra data causes close ??
HttpParser.ResponseHandler<ByteBuffer> handler = new Handler();
@ -641,8 +818,13 @@ public class HttpParserTest
assertTrue(_headerCompleted);
assertTrue(_messageCompleted);
parser.reset();
parser.parseNext(buffer);
assertFalse(buffer.hasRemaining());
assertTrue(parser.isClosed());
}
@Test
public void testNoURI() throws Exception
@ -1025,7 +1207,7 @@ public class HttpParserTest
_versionOrReason=null;
_hdr=null;
_val=null;
_h=0;
_headers=0;
_headerCompleted=false;
_messageCompleted=false;
}
@ -1040,15 +1222,15 @@ public class HttpParserTest
private List<HttpField> _fields=new ArrayList<>();
private String[] _hdr;
private String[] _val;
private int _h;
private int _headers;
private boolean _early;
private boolean _headerCompleted;
private boolean _messageCompleted;
private class Handler implements HttpParser.RequestHandler<ByteBuffer>, HttpParser.ResponseHandler<ByteBuffer>
{
private HttpFields fields;
private boolean request;
@Override
public boolean content(ByteBuffer ref)
@ -1066,8 +1248,7 @@ public class HttpParserTest
public boolean startRequest(HttpMethod httpMethod, String method, ByteBuffer uri, HttpVersion version)
{
_fields.clear();
request=true;
_h= -1;
_headers= -1;
_hdr= new String[10];
_val= new String[10];
_methodOrVersion= method;
@ -1077,6 +1258,7 @@ public class HttpParserTest
fields=new HttpFields();
_messageCompleted = false;
_headerCompleted = false;
_early=false;
return false;
}
@ -1085,8 +1267,8 @@ public class HttpParserTest
{
_fields.add(field);
//System.err.println("header "+name+": "+value);
_hdr[++_h]= field.getName();
_val[_h]= field.getValue();
_hdr[++_headers]= field.getName();
_val[_headers]= field.getValue();
return false;
}
@ -1125,14 +1307,13 @@ public class HttpParserTest
@Override
public void badMessage(int status, String reason)
{
_bad=reason;
_bad=reason==null?(""+status):reason;
}
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
_fields.clear();
request=false;
_methodOrVersion = version.asString();
_uriOrStatus = Integer.toString(status);
_versionOrReason = reason==null?null:reason.toString();
@ -1149,6 +1330,7 @@ public class HttpParserTest
@Override
public void earlyEOF()
{
_early=true;
}
@Override

View File

@ -96,7 +96,6 @@ public abstract class AbstractConnection implements Connection
public void fillInterested()
{
LOG.debug("fillInterested {}",this);
while(true)
{
State state=_state.get();
@ -112,6 +111,9 @@ public abstract class AbstractConnection implements Connection
while(true)
{
State state=_state.get();
// TODO yuck
if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
break;
State next=new FillingInterestedCallback(callback,state);
if (next(state,next))
break;
@ -144,6 +146,9 @@ public abstract class AbstractConnection implements Connection
_endPoint.shutdownOutput();
}
}
if (_endPoint.isOpen())
fillInterested();
}
/**
@ -267,6 +272,11 @@ public abstract class AbstractConnection implements Connection
{
throw new IllegalStateException(this.toString());
}
State onFailed()
{
throw new IllegalStateException(this.toString());
}
}
@ -298,6 +308,12 @@ public abstract class AbstractConnection implements Connection
{
return FILLING;
}
@Override
State onFailed()
{
return IDLE;
}
};
public static final State FILLING=new State("FILLING")
@ -379,7 +395,7 @@ public abstract class AbstractConnection implements Connection
{
return new NestedState(_nested.onFillable());
}
@Override
State onFilled()
{
@ -436,9 +452,6 @@ public abstract class AbstractConnection implements Connection
connection.getEndPoint().fillInterested(callback);
}
}
private final Runnable _runOnFillable = new Runnable()
@ -479,6 +492,12 @@ public abstract class AbstractConnection implements Connection
@Override
public void failed(Throwable x)
{
while(true)
{
State state=_state.get();
if (next(state,state.onFailed()))
break;
}
onFillInterestedFailed(x);
}

View File

@ -152,17 +152,17 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
@Override
public String toString()
{
return String.format("%s@%x{%s<r-l>%s,o=%b,is=%b,os=%b,fi=%s,wf=%s,it=%d}{%s}",
return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d,%s}",
getClass().getSimpleName(),
hashCode(),
getRemoteAddress(),
getLocalAddress(),
isOpen(),
isInputShutdown(),
isOutputShutdown(),
_fillInterest,
_writeFlusher,
getLocalAddress().getPort(),
isOpen()?"Open":"CLOSED",
isInputShutdown()?"ISHUT":"in",
isOutputShutdown()?"OSHUT":"out",
_fillInterest.isInterested()?"R":"-",
_writeFlusher.isInProgress()?"W":"-",
getIdleTimeout(),
getConnection());
getConnection()==null?null:getConnection().getClass().getSimpleName());
}
}

View File

@ -432,9 +432,6 @@ abstract public class WriteFlusher
public void onFail(Throwable cause)
{
if (DEBUG)
LOG.debug("failed: {} {}", this, cause);
// Keep trying to handle the failure until we get to IDLE or FAILED state
while(true)
{
@ -443,9 +440,14 @@ abstract public class WriteFlusher
{
case IDLE:
case FAILED:
if (DEBUG)
LOG.debug("ignored: {} {}", this, cause);
return;
case PENDING:
if (DEBUG)
LOG.debug("failed: {} {}", this, cause);
PendingState pending = (PendingState)current;
if (updateState(pending,__IDLE))
{
@ -455,6 +457,9 @@ abstract public class WriteFlusher
break;
default:
if (DEBUG)
LOG.debug("failed: {} {}", this, cause);
if (updateState(current,new FailedState(cause)))
return;
break;

View File

@ -188,7 +188,7 @@ public class SslConnection extends AbstractConnection
// filling.
if (DEBUG)
LOG.debug("onFillable enter {}", getEndPoint());
LOG.debug("onFillable enter {}", _decryptedEndPoint);
// We have received a close handshake, close the end point to send FIN.
if (_decryptedEndPoint.isInputShutdown())
@ -209,7 +209,7 @@ public class SslConnection extends AbstractConnection
}
if (DEBUG)
LOG.debug("onFillable exit {}", getEndPoint());
LOG.debug("onFillable exit {}", _decryptedEndPoint);
}
@Override

View File

@ -464,7 +464,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
if (isAccepting())
LOG.warn(e);
else
LOG.debug(e);
LOG.ignore(e);
}
}
}

View File

@ -25,7 +25,7 @@ import javax.servlet.ReadListener;
/**
* <p>An implementation of HttpInput using {@link ByteBuffer} as items.</p>
*/
public class ByteBufferHttpInput extends HttpInput<ByteBuffer>
public class ByteBufferHttpInput extends QueuedHttpInput<ByteBuffer>
{
@Override
protected int remaining(ByteBuffer item)
@ -45,4 +45,5 @@ public class ByteBufferHttpInput extends HttpInput<ByteBuffer>
protected void onContentConsumed(ByteBuffer item)
{
}
}

View File

@ -44,6 +44,7 @@ import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback;
@ -105,6 +106,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
_uri = new HttpURI(URIUtil.__CHARSET);
_state = new HttpChannelState(this);
input.init(_state);
_request = new Request(this, input);
_response = new Response(this, new HttpOutput(this));
}
@ -255,6 +257,8 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{
try
{
LOG.debug("{} action {}",this,action);
switch(action)
{
case REQUEST_DISPATCH:
@ -286,11 +290,28 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
getServer().handleAsync(this);
break;
case IO_CALLBACK:
_response.getHttpOutput().handle();
case READ_CALLBACK:
{
ContextHandler handler=_state.getContextHandler();
if (handler!=null)
handler.handle(_request.getHttpInput());
else
_request.getHttpInput().run();
break;
}
case WRITE_CALLBACK:
{
ContextHandler handler=_state.getContextHandler();
if (handler!=null)
handler.handle(_response.getHttpOutput());
else
_response.getHttpOutput().run();
break;
}
default:
break loop;
@ -595,7 +616,8 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
@Override
public boolean messageComplete()
{
_request.getHttpInput().shutdown();
LOG.debug("{} messageComplete", this);
_request.getHttpInput().messageComplete();
return true;
}

View File

@ -74,7 +74,8 @@ public class HttpChannelState
REQUEST_DISPATCH, // handle a normal request dispatch
ASYNC_DISPATCH, // handle an async request dispatch
ASYNC_EXPIRED, // handle an async timeout
IO_CALLBACK, // handle an IO callback
WRITE_CALLBACK, // handle an IO write callback
READ_CALLBACK, // handle an IO read callback
WAIT, // Wait for further events
COMPLETE, // Complete the channel
RECYCLE, // Channel is completed
@ -89,6 +90,7 @@ public class HttpChannelState
EXPIRED
}
private final boolean DEBUG=LOG.isDebugEnabled();
private final HttpChannel<?> _channel;
private List<AsyncListener> _lastAsyncListeners;
private List<AsyncListener> _asyncListeners;
@ -96,7 +98,8 @@ public class HttpChannelState
private State _state;
private Async _async;
private boolean _initial;
private boolean _asyncIO;
private boolean _asyncRead;
private boolean _asyncWrite;
private long _timeoutMs=DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
@ -155,7 +158,7 @@ public class HttpChannelState
{
synchronized (this)
{
return super.toString()+"@"+getStatusString();
return String.format("%s@%x{s=%s i=%b a=%s}",getClass().getSimpleName(),hashCode(),_state,_initial,_async);
}
}
@ -174,6 +177,8 @@ public class HttpChannelState
{
synchronized (this)
{
if(DEBUG)
LOG.debug("{} handling {}",this,_state);
switch(_state)
{
case IDLE:
@ -197,11 +202,17 @@ public class HttpChannelState
return Action.RECYCLE;
case ASYNCWAIT:
if (_asyncIO)
if (_asyncRead)
{
_state=State.ASYNCIO;
_asyncIO=false;
return Action.IO_CALLBACK;
_asyncRead=false;
return Action.READ_CALLBACK;
}
if (_asyncWrite)
{
_state=State.ASYNCIO;
_asyncWrite=false;
return Action.WRITE_CALLBACK;
}
if (_async!=null)
@ -289,6 +300,9 @@ public class HttpChannelState
{
synchronized (this)
{
if(DEBUG)
LOG.debug("{} unhandle {}",this,_state);
switch(_state)
{
case DISPATCHED:
@ -298,11 +312,18 @@ public class HttpChannelState
throw new IllegalStateException(this.getStatusString());
}
if (_asyncIO)
if (_asyncRead)
{
_asyncIO=false;
_state=State.ASYNCIO;
return Action.IO_CALLBACK;
_asyncRead=false;
return Action.READ_CALLBACK;
}
if (_asyncWrite)
{
_asyncWrite=false;
_state=State.ASYNCIO;
return Action.WRITE_CALLBACK;
}
if (_async!=null)
@ -491,7 +512,7 @@ public class HttpChannelState
cancelTimeout();
_timeoutMs=DEFAULT_TIMEOUT;
_event=null;
_asyncIO=false;
_asyncWrite=false;
}
}
@ -608,6 +629,16 @@ public class HttpChannelState
public void onReadPossible()
{
boolean handle;
synchronized (this)
{
_asyncRead=true;
handle=_state==State.ASYNCWAIT;
}
if (handle)
_channel.handle();
}
public void onWritePossible()
@ -616,19 +647,12 @@ public class HttpChannelState
synchronized (this)
{
_asyncIO=true;
_asyncWrite=true;
handle=_state==State.ASYNCWAIT;
}
if (handle)
{
ContextHandler handler=getContextHandler();
if (handler!=null)
handler.handle(_channel);
else
_channel.handle();
}
_channel.handle();
}
public class AsyncTimeout implements Runnable

View File

@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.RejectedExecutionException;
import javax.servlet.ReadListener;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.http.HttpHeader;
@ -63,7 +65,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final HttpParser _parser;
private volatile ByteBuffer _requestBuffer = null;
private volatile ByteBuffer _chunk = null;
private BlockingCallback _readBlocker = new BlockingCallback();
private BlockingCallback _writeBlocker = new BlockingCallback();
@ -92,9 +93,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_generator = new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
_channel = new HttpChannelOverHttp(connector, config, endPoint, this, new Input());
_channel = new HttpChannelOverHttp(connector, config, endPoint, this, new HttpInputOverHTTP(this));
_parser = newHttpParser();
LOG.debug("New HTTP Connection {}", this);
}
@ -123,27 +123,29 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return _channel;
}
public HttpParser getParser()
{
return _parser;
}
public void reset()
{
// If we are still expecting
if (_channel.isExpecting100Continue())
{
// reset to avoid seeking remaining content
_parser.reset();
// close to seek EOF
_parser.close();
}
// else if we are persistent
else if (_generator.isPersistent())
_channel.reset();
if (_generator.isPersistent() && !_parser.isClosed())
// reset to seek next request
_parser.reset();
else
// else seek EOF
_parser.close();
_generator.reset();
_channel.reset();
releaseRequestBuffer();
if (_chunk!=null)
{
@ -165,16 +167,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return getHttpChannel().getRequests();
}
@Override
public String toString()
{
return String.format("%s,g=%s,p=%s",
super.toString(),
_generator,
_parser);
}
private void releaseRequestBuffer()
void releaseRequestBuffer()
{
if (_requestBuffer != null && !_requestBuffer.hasRemaining())
{
@ -183,6 +176,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_bufferPool.release(buffer);
}
}
public ByteBuffer getRequestBuffer()
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
return _requestBuffer;
}
/**
* <p>Parses and handles HTTP messages.</p>
@ -198,77 +198,53 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
LOG.debug("{} onFillable {}", this, _channel.getState());
setCurrentConnection(this);
int filled=Integer.MAX_VALUE;
boolean suspended=false;
try
{
while (true)
// while not suspended and not upgraded
while (!suspended && getEndPoint().getConnection()==this)
{
// Can the parser progress (even with an empty buffer)
boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
// Parse the buffer
if (call_channel)
// Do we need some data to parse
if (BufferUtil.isEmpty(_requestBuffer))
{
// Parse as much content as there is available before calling the channel
// this is both efficient (may queue many chunks), will correctly set available for 100 continues
// and will drive the parser to completion if all content is available.
while (_parser.inContentState())
// If the previous iteration filled 0 bytes or saw a close, then break here
if (filled<=0)
break;
// Can we fill?
if(getEndPoint().isInputShutdown())
{
if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
break;
// No pretend we read -1
filled=-1;
_parser.atEOF();
}
else
{
// Get a buffer
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
// fill
filled = getEndPoint().fill(_requestBuffer);
if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(_requestBuffer);
// tell parser
if (filled < 0)
_parser.atEOF();
}
}
// Parse the buffer
if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
{
// The parser returned true, which indicates the channel is ready to handle a request.
// Call the channel and this will either handle the request/response to completion OR,
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
boolean handle=_channel.handle();
// Return if suspended or upgraded
if (!handle || getEndPoint().getConnection()!=this)
return;
}
else if (BufferUtil.isEmpty(_requestBuffer))
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
int filled = getEndPoint().fill(_requestBuffer);
if (filled==0) // Do a retry on fill 0 (optimisation for SSL connections)
filled = getEndPoint().fill(_requestBuffer);
LOG.debug("{} filled {}", this, filled);
// If we failed to fill
if (filled == 0)
{
// Somebody wanted to read, we didn't so schedule another attempt
releaseRequestBuffer();
fillInterested();
return;
}
else if (filled < 0)
{
_parser.shutdownInput();
// We were only filling if fully consumed, so if we have
// read -1 then we have nothing to parse and thus nothing that
// will generate a response. If we had a suspended request pending
// a response or a request waiting in the buffer, we would not be here.
if (getEndPoint().isOutputShutdown())
getEndPoint().close();
else
getEndPoint().shutdownOutput();
// buffer must be empty and the channel must be idle, so we can release.
releaseRequestBuffer();
return;
}
}
else
{
// TODO work out how we can get here and a better way to handle it
LOG.warn("Unexpected state: "+this+ " "+_channel+" "+_channel.getRequest());
if (!_channel.getState().isSuspended())
getEndPoint().close();
return;
suspended = !_channel.handle();
}
}
}
catch (EofException e)
@ -284,10 +260,22 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
close();
}
finally
{
{
setCurrentConnection(null);
if (!suspended && getEndPoint().isOpen())
{
fillInterested();
}
}
}
@Override
protected void onFillInterestedFailed(Throwable cause)
{
_parser.close();
super.onFillInterestedFailed(cause);
}
@Override
public void onOpen()
@ -354,7 +342,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public void completed()
{
// Finish consuming the request
if (_parser.isInContent() && _generator.isPersistent() && !_channel.isExpecting100Continue())
if (_parser.inContentState() && _generator.isPersistent() && !_channel.isExpecting100Continue())
// Complete reading the request
_channel.getRequest().getHttpInput().consumeAll();
@ -380,6 +368,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
if (_parser.isStart())
{
// TODO ???
// it wants to eat more
if (_requestBuffer == null)
{
@ -410,137 +399,19 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
public ByteBuffer getRequestBuffer()
{
return _requestBuffer;
}
private class Input extends ByteBufferHttpInput
{
@Override
protected void blockForContent() throws IOException
{
/* We extend the blockForContent method to replace the
default implementation of a blocking queue with an implementation
that uses the calling thread to block on a readable callback and
then to do the parsing before before attempting the read.
*/
while (!_parser.isComplete())
{
// Can the parser progress (even with an empty buffer)
boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
// If there is more content to parse, loop so we can queue all content from this buffer now without the
// need to call blockForContent again
while (!event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
event=_parser.parseNext(_requestBuffer);
// If we have content, return
if (_parser.isComplete() || available()>0)
return;
// Do we have content ready to parse?
if (BufferUtil.isEmpty(_requestBuffer))
{
// If no more input
if (getEndPoint().isInputShutdown())
{
_parser.shutdownInput();
shutdown();
return;
}
// Wait until we can read
fillInterested(_readBlocker);
LOG.debug("{} block readable on {}",this,_readBlocker);
_readBlocker.block();
// We will need a buffer to read into
if (_requestBuffer==null)
{
long content_length=_channel.getRequest().getContentLength();
int size=getInputBufferSize();
if (size<content_length)
size=size*4; // TODO tune this
_requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT);
}
// read some data
int filled=getEndPoint().fill(_requestBuffer);
LOG.debug("{} block filled {}",this,filled);
if (filled<0)
{
_parser.shutdownInput();
return;
}
}
}
}
@Override
protected void onContentQueued(ByteBuffer ref)
{
/* This callback could be used to tell the connection
* that the request did contain content and thus the request
* buffer needs to be held until a call to #onAllContentConsumed
*
* However it turns out that nothing is needed here because either a
* request will have content, in which case the request buffer will be
* released by a call to onAllContentConsumed; or it will not have content.
* If it does not have content, either it will complete quickly and the
* buffers will be released in completed() or it will be suspended and
* onReadable() contains explicit handling to release if it is suspended.
*
* We extend this method anyway, to turn off the notify done by the
* default implementation as this is not needed by our implementation
* of blockForContent
*/
}
@Override
public void earlyEOF()
{
synchronized (lock())
{
_inputEOF=true;
_earlyEOF = true;
LOG.debug("{} early EOF", this);
}
}
@Override
public void shutdown()
{
synchronized (lock())
{
_inputEOF=true;
LOG.debug("{} shutdown", this);
}
}
@Override
protected void onAllContentConsumed()
{
/* This callback tells the connection that all content that has
* been parsed has been consumed. Thus the request buffer may be
* released if it is empty.
*/
releaseRequestBuffer();
}
@Override
public String toString()
{
return super.toString()+"{"+_channel+","+HttpConnection.this+"}";
}
}
private class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
{
public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
{
super(connector,config,endPoint,transport,input);
}
@Override
public boolean content(ByteBuffer item)
{
super.content(item);
return true;
}
@Override
public void badMessage(int status, String reason)
@ -808,4 +679,5 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import javax.servlet.ServletInputStream;
import javax.servlet.ReadListener;
@ -27,6 +28,7 @@ import javax.servlet.ReadListener;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -40,37 +42,60 @@ import org.eclipse.jetty.util.log.Logger;
* {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the
* caller will know when buffers are queued and consumed.</p>
*/
public abstract class HttpInput<T> extends ServletInputStream
public abstract class HttpInput<T> extends ServletInputStream implements Runnable
{
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final ArrayQueue<T> _inputQ = new ArrayQueue<>();
protected boolean _earlyEOF;
protected boolean _inputEOF;
public Object lock()
private HttpChannelState _channelState;
private Throwable _onError;
private ReadListener _listener;
private boolean _notReady;
protected State _state = BLOCKING;
private State _eof=null;
protected HttpInput()
{
return _inputQ.lock();
}
public abstract Object lock();
public void recycle()
{
synchronized (lock())
{
T item = _inputQ.peekUnsafe();
while (item != null)
{
_inputQ.pollUnsafe();
onContentConsumed(item);
item = _inputQ.peekUnsafe();
if (item == null)
onAllContentConsumed();
}
_inputEOF = false;
_earlyEOF = false;
_state = BLOCKING;
_eof=null;
_onError=null;
}
}
protected abstract T nextContent() throws IOException;
protected void checkEOF()
{
if (_eof!=null)
{
LOG.debug("{} eof {}",this,_eof);
_state=_eof;
_eof=null;
if (_listener!=null)
{
try
{
_listener.onAllDataRead();
}
catch(Exception e)
{
LOG.warn(e.toString());
LOG.debug(e);
_listener.onError(e);
}
}
}
}
@Override
public int read() throws IOException
{
@ -82,12 +107,22 @@ public abstract class HttpInput<T> extends ServletInputStream
@Override
public int available()
{
synchronized (lock())
try
{
T item = _inputQ.peekUnsafe();
if (item == null)
return 0;
return remaining(item);
synchronized (lock())
{
T item = nextContent();
if (item==null)
{
checkEOF();
return 0;
}
return remaining(item);
}
}
catch (IOException e)
{
throw new RuntimeIOException(e);
}
}
@ -97,122 +132,47 @@ public abstract class HttpInput<T> extends ServletInputStream
T item = null;
synchronized (lock())
{
// Get the current head of the input Q
item = _inputQ.peekUnsafe();
// Skip empty items at the head of the queue
while (item != null && remaining(item) == 0)
{
_inputQ.pollUnsafe();
onContentConsumed(item);
LOG.debug("{} consumed {}", this, item);
item = _inputQ.peekUnsafe();
// If that was the last item then notify
if (item==null)
onAllContentConsumed();
}
// System.err.printf("read s=%s q=%d e=%s%n",_state,_inputQ.size(),_eof);
// Get the current head of the input Q
item = nextContent();
// If we have no item
if (item == null)
{
// Was it unexpectedly EOF'd?
if (isEarlyEOF())
throw new EofException();
// check for EOF
if (isShutdown())
{
onEOF();
return -1;
}
// OK then block for some more content
blockForContent();
// If still not content, we must be closed
item = _inputQ.peekUnsafe();
checkEOF();
_state.waitForContent(this);
item=nextContent();
if (item==null)
{
if (isEarlyEOF())
throw new EofException();
// blockForContent will only return with no
// content if it is closed.
if (!isShutdown())
LOG.warn("Unexpected !EOF: "+this);
onEOF();
return -1;
checkEOF();
return _state.noContent();
}
}
}
return get(item, b, off, len);
}
protected abstract int remaining(T item);
protected abstract int get(T item, byte[] buffer, int offset, int length);
protected abstract void onContentConsumed(T item);
protected void blockForContent() throws IOException
{
synchronized (lock())
{
while (_inputQ.isEmpty() && !isShutdown() && !isEarlyEOF())
{
try
{
LOG.debug("{} waiting for content", this);
lock().wait();
}
catch (InterruptedException e)
{
throw (IOException)new InterruptedIOException().initCause(e);
}
}
}
}
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal new content has been queued
* @param item
*/
protected void onContentQueued(T item)
{
lock().notify();
}
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal all available content has been consumed
*/
protected void onAllContentConsumed()
{
}
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal it has reached EOF
*/
protected void onEOF()
protected abstract void blockForContent() throws IOException;
protected boolean onAsyncRead()
{
if (_listener==null)
return false;
_channelState.onReadPossible();
return true;
}
/* ------------------------------------------------------------ */
/** Add some content to the input stream
* @param item
*/
public void content(T item)
{
synchronized (lock())
{
// The buffer is not copied here. This relies on the caller not recycling the buffer
// until the it is consumed. The onAllContentConsumed() callback is the signal to the
// caller that the buffers can be recycled.
_inputQ.add(item);
onContentQueued(item);
LOG.debug("{} queued {}", this, item);
}
}
public abstract void content(T item);
/* ------------------------------------------------------------ */
/** This method should be called to signal to the HttpInput
@ -224,93 +184,221 @@ public abstract class HttpInput<T> extends ServletInputStream
{
synchronized (lock())
{
_earlyEOF = true;
_inputEOF = true;
lock().notify();
LOG.debug("{} early EOF", this);
}
}
/* ------------------------------------------------------------ */
public boolean isEarlyEOF()
{
synchronized (lock())
{
return _earlyEOF;
}
}
/* ------------------------------------------------------------ */
public void shutdown()
{
synchronized (lock())
{
_inputEOF = true;
lock().notify();
LOG.debug("{} shutdown", this);
}
}
/* ------------------------------------------------------------ */
public boolean isShutdown()
{
synchronized (lock())
{
return _inputEOF;
}
}
/* ------------------------------------------------------------ */
public void consumeAll()
{
synchronized (lock())
{
T item = _inputQ.peekUnsafe();
while (!isShutdown() && !isEarlyEOF())
if (_eof==null || !_eof.isEOF())
{
while (item != null)
{
_inputQ.pollUnsafe();
onContentConsumed(item);
item = _inputQ.peekUnsafe();
if (item == null)
onAllContentConsumed();
}
try
{
blockForContent();
item = _inputQ.peekUnsafe();
if (item==null)
break;
}
catch (IOException e)
{
throw new RuntimeIOException(e);
}
LOG.debug("{} early EOF", this);
_eof=EARLY_EOF;
if (_listener!=null)
_channelState.onReadPossible();
}
}
}
@Override
/* ------------------------------------------------------------ */
public void messageComplete()
{
synchronized (lock())
{
if (_eof==null || !_eof.isEOF())
{
LOG.debug("{} EOF", this);
_eof=EOF;
if (_listener!=null)
_channelState.onReadPossible();
}
}
}
public abstract void consumeAll();
@Override
public boolean isFinished()
{
// TODO Auto-generated method stub
return false;
synchronized (lock())
{
return _state.isEOF();
}
}
@Override
public boolean isReady()
{
// TODO Auto-generated method stub
return false;
synchronized (lock())
{
if (_listener==null)
return true;
int available = available();
if (available>0)
return true;
if (!_notReady)
{
_notReady=true;
if (_state.isEOF())
_channelState.onReadPossible();
else
unready();
}
return false;
}
}
protected void unready()
{
}
@Override
public void setReadListener(ReadListener readListener)
{
// TODO Auto-generated method stub
synchronized (lock())
{
if (_state!=BLOCKING)
throw new IllegalStateException("state="+_state);
_state=ASYNC;
_listener=readListener;
_notReady=true;
_channelState.onReadPossible();
}
}
public void failed(Throwable x)
{
synchronized (lock())
{
if (_onError==null)
LOG.warn(x);
else
_onError=x;
}
}
@Override
public void run()
{
final boolean available;
final boolean eof;
final Throwable x;
synchronized (lock())
{
if (!_notReady || _listener==null)
return;
x=_onError;
T item;
try
{
item = nextContent();
}
catch(Exception e)
{
item=null;
failed(e);
}
available= item!=null && remaining(item)>0;
eof = !available && _state.isEOF();
_notReady=!available&&!eof;
}
try
{
if (x!=null)
_listener.onError(_onError);
else if (available)
_listener.onDataAvailable();
else if (!eof)
unready();
}
catch(Exception e)
{
LOG.warn(e.toString());
LOG.debug(e);
_listener.onError(e);
}
}
protected static class State
{
public void waitForContent(HttpInput<?> in) throws IOException
{
}
public int noContent() throws IOException
{
return -1;
}
public boolean isEOF()
{
return false;
}
}
protected static final State BLOCKING= new State()
{
@Override
public void waitForContent(HttpInput<?> in) throws IOException
{
in.blockForContent();
}
public String toString()
{
return "OPEN";
}
};
protected static final State ASYNC= new State()
{
@Override
public int noContent() throws IOException
{
return 0;
}
@Override
public String toString()
{
return "ASYNC";
}
};
protected static final State EARLY_EOF= new State()
{
@Override
public int noContent() throws IOException
{
throw new EofException();
}
@Override
public boolean isEOF()
{
return true;
}
public String toString()
{
return "EARLY_EOF";
}
};
protected static final State EOF= new State()
{
@Override
public boolean isEOF()
{
return true;
}
public String toString()
{
return "EOF";
}
};
public void init(HttpChannelState state)
{
_channelState=state;
}
}

View File

@ -0,0 +1,216 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback
{
private static final Logger LOG = Log.getLogger(HttpInputOverHTTP.class);
private final BlockingCallback _readBlocker = new BlockingCallback();
private final HttpConnection _httpConnection;
private ByteBuffer _content;
/**
* @param httpConnection
*/
HttpInputOverHTTP(HttpConnection httpConnection)
{
_httpConnection = httpConnection;
}
@Override
public void recycle()
{
synchronized (lock())
{
super.recycle();
_content=null;
}
}
@Override
protected void blockForContent() throws IOException
{
while(true)
{
_httpConnection.fillInterested(_readBlocker);
LOG.debug("{} block readable on {}",this,_readBlocker);
_readBlocker.block();
if (nextContent()!=null || isFinished())
break;
}
}
@Override
public String toString()
{
return String.format("%s@%x",getClass().getSimpleName(),hashCode());
}
@Override
public Object lock()
{
return this;
}
@Override
protected ByteBuffer nextContent() throws IOException
{
// If we have some content available, return it
if (BufferUtil.hasContent(_content))
return _content;
// No - then we are going to need to parse some more content
_content=null;
ByteBuffer requestBuffer = _httpConnection.getRequestBuffer();
while (!_httpConnection.getParser().isComplete())
{
// Can the parser progress (even with an empty buffer)
_httpConnection.getParser().parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
// If we got some content, that will do for now!
if (BufferUtil.hasContent(_content))
return _content;
// No, we can we try reading some content?
if (BufferUtil.isEmpty(requestBuffer) && _httpConnection.getEndPoint().isInputShutdown())
{
_httpConnection.getParser().atEOF();
continue;
}
// OK lets read some data
int filled=_httpConnection.getEndPoint().fill(requestBuffer);
LOG.debug("{} filled {}",this,filled);
if (filled<=0)
{
if (filled<0)
{
_httpConnection.getParser().atEOF();
continue;
}
return null;
}
}
return null;
}
@Override
protected int remaining(ByteBuffer item)
{
return item.remaining();
}
@Override
protected int get(ByteBuffer item, byte[] buffer, int offset, int length)
{
int l = Math.min(item.remaining(), length);
item.get(buffer, offset, l);
return l;
}
@Override
public void content(ByteBuffer item)
{
if (BufferUtil.hasContent(_content))
throw new IllegalStateException();
_content=item;
}
@Override
public void consumeAll()
{
final HttpParser parser = _httpConnection.getParser();
try
{
ByteBuffer requestBuffer = null;
while (!parser.isComplete())
{
_content=null;
_httpConnection.getParser().parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
if (parser.isComplete())
break;
if (BufferUtil.isEmpty(requestBuffer))
{
if ( _httpConnection.getEndPoint().isInputShutdown())
parser.atEOF();
else
{
requestBuffer=_httpConnection.getRequestBuffer();
if (BufferUtil.isEmpty(requestBuffer))
{
int filled=_httpConnection.getEndPoint().fill(requestBuffer);
if (filled==0)
filled=_httpConnection.getEndPoint().fill(requestBuffer);
if (filled<0)
_httpConnection.getParser().atEOF();
else if (filled==0)
{
blockForContent();
}
}
}
}
}
}
catch(IOException e)
{
LOG.ignore(e);
_httpConnection.getParser().atEOF();
}
}
@Override
protected void unready()
{
_httpConnection.fillInterested(this);
}
@Override
public void succeeded()
{
_httpConnection.getHttpChannel().getState().onReadPossible();
}
@Override
public void failed(Throwable x)
{
super.failed(x);
_httpConnection.getHttpChannel().getState().onReadPossible();
}
}

View File

@ -51,7 +51,7 @@ import org.eclipse.jetty.util.log.Logger;
* via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
* close the stream, to be reopened after the inclusion ends.</p>
*/
public class HttpOutput extends ServletOutputStream
public class HttpOutput extends ServletOutputStream implements Runnable
{
private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel<?> _channel;
@ -555,6 +555,9 @@ write completed - - - ASYNC READY->owp
@Override
public void setWriteListener(WriteListener writeListener)
{
if (!_channel.getState().isAsync())
throw new IllegalStateException("!ASYNC");
if (_state.compareAndSet(State.OPEN, State.READY))
{
_writeListener = writeListener;
@ -594,7 +597,8 @@ write completed - - - ASYNC READY->owp
}
}
public void handle()
@Override
public void run()
{
if(_onError!=null)
{
@ -738,7 +742,6 @@ write completed - - - ASYNC READY->owp
@Override
public void failed(Throwable e)
{
e.printStackTrace();
_onError=e;
_channel.getState().onWritePossible();
}

View File

@ -172,7 +172,6 @@ public class LocalConnector extends AbstractConnector
Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
endPoint.setConnection(connection);
// connectionOpened(connection);
connection.onOpen();
}

View File

@ -0,0 +1,204 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link QueuedHttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
* <p>{@link QueuedHttpInput} holds a queue of items passed to it by calls to {@link #content(T)}.</p>
* <p>{@link QueuedHttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
* but simply holds references to the item, thus the caller must organize for those buffers to valid while
* held by this class.</p>
* <p>To assist the caller, subclasses may override methods {@link #onContentQueued(T)},
* {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the
* caller will know when buffers are queued and consumed.</p>
*/
public abstract class QueuedHttpInput<T> extends HttpInput<T>
{
private final static Logger LOG = Log.getLogger(QueuedHttpInput.class);
private final ArrayQueue<T> _inputQ = new ArrayQueue<>();
public QueuedHttpInput()
{
}
public Object lock()
{
return _inputQ.lock();
}
public void recycle()
{
synchronized (lock())
{
T item = _inputQ.peekUnsafe();
while (item != null)
{
_inputQ.pollUnsafe();
onContentConsumed(item);
item = _inputQ.peekUnsafe();
if (item == null)
onAllContentConsumed();
}
super.recycle();
}
}
@Override
protected T nextContent()
{
T item = _inputQ.peekUnsafe();
// Skip empty items at the head of the queue
while (item != null && remaining(item) == 0)
{
_inputQ.pollUnsafe();
onContentConsumed(item);
LOG.debug("{} consumed {}", this, item);
item = _inputQ.peekUnsafe();
// If that was the last item then notify
if (item==null)
onAllContentConsumed();
}
return item;
}
protected abstract void onContentConsumed(T item);
protected void blockForContent() throws IOException
{
synchronized (lock())
{
while (_inputQ.isEmpty() && !_state.isEOF())
{
try
{
LOG.debug("{} waiting for content", this);
lock().wait();
}
catch (InterruptedException e)
{
throw (IOException)new InterruptedIOException().initCause(e);
}
}
}
}
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal all available content has been consumed
*/
protected void onAllContentConsumed()
{
}
/* ------------------------------------------------------------ */
/** Add some content to the input stream
* @param item
*/
public void content(T item)
{
// The buffer is not copied here. This relies on the caller not recycling the buffer
// until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are
// the signals to the caller that the buffers can be recycled.
synchronized (lock())
{
boolean empty=_inputQ.isEmpty();
_inputQ.add(item);
if (empty)
{
if (!onAsyncRead())
lock().notify();
}
LOG.debug("{} queued {}", this, item);
}
}
public void earlyEOF()
{
synchronized (lock())
{
super.earlyEOF();
lock().notify();
}
}
public void messageComplete()
{
synchronized (lock())
{
super.messageComplete();
lock().notify();
}
}
/* ------------------------------------------------------------ */
public void consumeAll()
{
synchronized (lock())
{
T item = _inputQ.peekUnsafe();
while (!_state.isEOF())
{
while (item != null)
{
_inputQ.pollUnsafe();
onContentConsumed(item);
item = _inputQ.peekUnsafe();
if (item == null)
onAllContentConsumed();
}
try
{
blockForContent();
item = _inputQ.peekUnsafe();
if (item==null)
checkEOF();
}
catch (IOException e)
{
throw new RuntimeIOException(e);
}
}
}
}
}

View File

@ -98,6 +98,24 @@ public class ServerConnector extends AbstractNetworkConnector
{
this(server,null,null,null,0,0,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param acceptors
* the number of acceptor threads to use, or 0 for a default value. Acceptors accept new TCP/IP connections.
* @param selectors
* the number of selector threads, or 0 for a default value. Selectors notice and schedule established connection that can make IO progress.
*/
public ServerConnector(
@Name("server") Server server,
@Name("acceptors") int acceptors,
@Name("selectors") int selectors)
{
this(server,null,null,null,acceptors,selectors,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** Generic Server Connection with default configuration.

View File

@ -57,8 +57,9 @@ public abstract class AbstractHttpTest
public void setUp() throws Exception
{
server = new Server();
connector = new ServerConnector(server);
connector = new ServerConnector(server,1,1);
connector.setIdleTimeout(10000);
server.addConnector(connector);
httpParser = new SimpleHttpParser();
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(true);

View File

@ -38,8 +38,10 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
@ -49,10 +51,60 @@ public class ConnectionOpenCloseTest extends AbstractHttpTest
{
super(HttpVersion.HTTP_1_1.asString());
}
@Slow
@Test
public void testOpenClose() throws Exception
{
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
throw new IllegalStateException();
}
});
server.start();
final AtomicInteger callbacks = new AtomicInteger();
final CountDownLatch openLatch = new CountDownLatch(1);
final CountDownLatch closeLatch = new CountDownLatch(1);
connector.addBean(new Connection.Listener.Empty()
{
@Override
public void onOpened(Connection connection)
{
callbacks.incrementAndGet();
openLatch.countDown();
}
@Override
public void onClosed(Connection connection)
{
callbacks.incrementAndGet();
closeLatch.countDown();
}
});
try (Socket socket = new Socket("localhost", connector.getLocalPort());)
{
socket.setSoTimeout((int)connector.getIdleTimeout());
Assert.assertTrue(openLatch.await(5, TimeUnit.SECONDS));
socket.shutdownOutput();
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
String response=IO.toString(socket.getInputStream());
Assert.assertThat(response,Matchers.containsString("400 Bad"));
// Wait some time to see if the callbacks are called too many times
TimeUnit.MILLISECONDS.sleep(200);
Assert.assertEquals(2, callbacks.get());
}
}
@Slow
@Test
public void testOpenRequestClose() throws Exception
{
server.setHandler(new AbstractHandler()
{
@ -87,7 +139,7 @@ public class ConnectionOpenCloseTest extends AbstractHttpTest
Socket socket = new Socket("localhost", connector.getLocalPort());
socket.setSoTimeout((int)connector.getIdleTimeout());
OutputStream output = socket.getOutputStream();
output.write(("" +
output.write((
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Connection: close\r\n" +
@ -112,7 +164,7 @@ public class ConnectionOpenCloseTest extends AbstractHttpTest
@Slow
@Test
public void testSSLOpenClose() throws Exception
public void testSSLOpenRequestClose() throws Exception
{
SslContextFactory sslContextFactory = new SslContextFactory();
File keystore = MavenTestingUtils.getTestResourceFile("keystore");

View File

@ -175,7 +175,7 @@ public class DumpHandler extends AbstractHandler
}
writer.write("</pre>\n<h3>Attributes:</h3>\n<pre>");
Enumeration attributes=request.getAttributeNames();
Enumeration<String> attributes=request.getAttributeNames();
if (attributes!=null && attributes.hasMoreElements())
{
while(attributes.hasMoreElements())

View File

@ -38,7 +38,7 @@ public class HalfCloseRaceTest
public void testHalfCloseRace() throws Exception
{
Server server = new Server();
ServerConnector connector = new ServerConnector(server);
ServerConnector connector = new ServerConnector(server,1,1);
connector.setPort(0);
connector.setIdleTimeout(500);
server.addConnector(connector);
@ -47,15 +47,16 @@ public class HalfCloseRaceTest
server.start();
Socket client = new Socket("localhost",connector.getLocalPort());
int in = client.getInputStream().read();
assertEquals(-1,in);
client.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes());
Thread.sleep(200);
assertEquals(0,handler.getHandled());
try(Socket client = new Socket("localhost",connector.getLocalPort());)
{
int in = client.getInputStream().read();
assertEquals(-1,in);
client.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes());
Thread.sleep(200);
assertEquals(0,handler.getHandled());
}
}

View File

@ -200,7 +200,20 @@ public class HttpConnectionTest
}
@Test
public void testEmpty() throws Exception
public void testSimple() throws Exception
{
String response=connector.getResponses("GET /R1 HTTP/1.1\n"+
"Host: localhost\n"+
"Connection: close\n"+
"\n");
int offset=0;
offset = checkContains(response,offset,"HTTP/1.1 200");
checkContains(response,offset,"/R1");
}
@Test
public void testEmptyChunk() throws Exception
{
String response=connector.getResponses("GET /R1 HTTP/1.1\n"+
"Host: localhost\n"+
@ -372,7 +385,7 @@ public class HttpConnectionTest
}
@Test
public void testUnconsumedError() throws Exception
public void testUnconsumedErrorRead() throws Exception
{
String response=null;
String requests=null;
@ -380,7 +393,7 @@ public class HttpConnectionTest
offset=0;
requests=
"GET /R1?read=1&error=500 HTTP/1.1\n"+
"GET /R1?read=1&error=599 HTTP/1.1\n"+
"Host: localhost\n"+
"Transfer-Encoding: chunked\n"+
"Content-Type: text/plain; charset=utf-8\n"+
@ -400,7 +413,43 @@ public class HttpConnectionTest
response=connector.getResponses(requests);
offset = checkContains(response,offset,"HTTP/1.1 500");
offset = checkContains(response,offset,"HTTP/1.1 599");
offset = checkContains(response,offset,"HTTP/1.1 200");
offset = checkContains(response,offset,"/R2");
offset = checkContains(response,offset,"encoding=UTF-8");
offset = checkContains(response,offset,"abcdefghij");
}
@Test
public void testUnconsumedErrorStream() throws Exception
{
String response=null;
String requests=null;
int offset=0;
offset=0;
requests=
"GET /R1?error=599 HTTP/1.1\n"+
"Host: localhost\n"+
"Transfer-Encoding: chunked\n"+
"Content-Type: application/data; charset=utf-8\n"+
"\015\012"+
"5;\015\012"+
"12345\015\012"+
"5;\015\012"+
"67890\015\012"+
"0;\015\012\015\012"+
"GET /R2 HTTP/1.1\n"+
"Host: localhost\n"+
"Content-Type: text/plain; charset=utf-8\n"+
"Content-Length: 10\n"+
"Connection: close\n"+
"\n"+
"abcdefghij\n";
response=connector.getResponses(requests);
offset = checkContains(response,offset,"HTTP/1.1 599");
offset = checkContains(response,offset,"HTTP/1.1 200");
offset = checkContains(response,offset,"/R2");
offset = checkContains(response,offset,"encoding=UTF-8");

View File

@ -18,8 +18,10 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@ -48,12 +50,14 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StdErrLog;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.matchers.JUnitMatchers;
/**
@ -348,17 +352,19 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
"Connection: close\015\012" +
"\015\012").getBytes());
os.flush();
Thread.sleep(PAUSE);
os.write(("5\015\012").getBytes());
Thread.sleep(1000);
os.write(("5").getBytes());
Thread.sleep(1000);
os.write(("\015\012").getBytes());
os.flush();
Thread.sleep(PAUSE);
Thread.sleep(1000);
os.write(("ABCDE\015\012" +
"0;\015\012\015\012").getBytes());
os.flush();
// Read the response.
String response = readResponse(client);
assertTrue(response.indexOf("200") > 0);
assertThat(response,containsString("200"));
}
}
@ -448,59 +454,53 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
}
}
@Test
public void testRequest2Fragments() throws Exception
@Slow
public void testRequest2Sliced2() throws Exception
{
configureServer(new EchoHandler());
byte[] bytes = REQUEST2.getBytes();
final int pointCount = 2;
// TODO random unit tests suck!
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < LOOPS; i++)
{
int[] points = new int[pointCount];
StringBuilder message = new StringBuilder();
message.append("iteration #").append(i + 1);
// Pick fragment points at random
for (int j = 0; j < points.length; ++j)
points[j] = random.nextInt(bytes.length);
// Sort the list
Arrays.sort(points);
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
writeFragments(bytes, points, message, os);
// Read the response
String response = readResponse(client);
// Check the response
assertEquals("response for " + i + " " + message.toString(), RESPONSE2, response);
}
}
}
@Test
public void testRequest2Iterate() throws Exception
{
configureServer(new EchoHandler());
byte[] bytes = REQUEST2.getBytes();
for (int i = 0; i < bytes.length; i += 3)
int splits = bytes.length-REQUEST2_CONTENT.length()+5;
for (int i = 0; i < splits; i += 1)
{
int[] points = new int[]{i};
StringBuilder message = new StringBuilder();
message.append("iteration #").append(i + 1);
// Sort the list
Arrays.sort(points);
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
writeFragments(bytes, points, message, os);
// Read the response
String response = readResponse(client);
// Check the response
assertEquals("response for " + i + " " + message.toString(), RESPONSE2, response);
Thread.sleep(100);
}
}
}
@Test
@Slow
public void testRequest2Sliced3() throws Exception
{
configureServer(new EchoHandler());
byte[] bytes = REQUEST2.getBytes();
int splits = bytes.length-REQUEST2_CONTENT.length()+5;
for (int i = 0; i < splits; i += 1)
{
int[] points = new int[]{i,i+1};
StringBuilder message = new StringBuilder();
message.append("iteration #").append(i + 1);
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
@ -513,9 +513,14 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
// Check the response
assertEquals("response for " + i + " " + message.toString(), RESPONSE2, response);
Thread.sleep(100);
}
}
}
@Test
public void testFlush() throws Exception
@ -938,33 +943,28 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
"\015\012" +
"123456789\n" +
"HEAD /R1 HTTP/1.1\015\012" +
"HEAD /R2 HTTP/1.1\015\012" +
"Host: " + _serverURI.getHost() + ":" + _serverURI.getPort() + "\015\012" +
"content-type: text/plain; charset=utf-8\r\n" +
"content-length: 10\r\n" +
"\015\012" +
"123456789\n" +
"ABCDEFGHI\n" +
"POST /R1 HTTP/1.1\015\012" +
"POST /R3 HTTP/1.1\015\012" +
"Host: " + _serverURI.getHost() + ":" + _serverURI.getPort() + "\015\012" +
"content-type: text/plain; charset=utf-8\r\n" +
"content-length: 10\r\n" +
"Connection: close\015\012" +
"\015\012" +
"123456789\n"
"abcdefghi\n"
).getBytes("iso-8859-1"));
String in = IO.toString(is);
// System.err.println(in);
int index = in.indexOf("123456789");
assertTrue(index > 0);
index = in.indexOf("123456789", index + 1);
assertTrue(index > 0);
index = in.indexOf("123456789", index + 1);
assertTrue(index == -1);
Assert.assertThat(in,containsString("123456789"));
Assert.assertThat(in,not(containsString("ABCDEFGHI")));
Assert.assertThat(in,containsString("abcdefghi"));
}
}
@ -1304,7 +1304,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
}
}
private void writeFragments(byte[] bytes, int[] points, StringBuilder message, OutputStream os) throws IOException, InterruptedException
protected void writeFragments(byte[] bytes, int[] points, StringBuilder message, OutputStream os) throws IOException, InterruptedException
{
int last = 0;

View File

@ -93,14 +93,14 @@ public class HttpServerTestFixture
protected static class EchoHandler extends AbstractHandler
{
boolean musthavecontent=true;
boolean _musthavecontent=true;
public EchoHandler()
{}
public EchoHandler(boolean content)
{
musthavecontent=false;
_musthavecontent=false;
}
@Override
@ -134,7 +134,7 @@ public class HttpServerTestFixture
if (count==0)
{
if (musthavecontent)
if (_musthavecontent)
throw new IllegalStateException("no input recieved");
writer.println("No content");

View File

@ -192,6 +192,6 @@ public class LowResourcesMonitorTest
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Assert.assertEquals(-1,socket1.getInputStream().read());
}
}

View File

@ -37,9 +37,11 @@ import java.util.Arrays;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletRequestEvent;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
@ -470,15 +472,25 @@ public class RequestTest
@Test
public void testContent() throws Exception
{
final int[] length=new int[1];
final AtomicInteger length=new AtomicInteger();
_handler._checker = new RequestTester()
{
@Override
public boolean check(HttpServletRequest request,HttpServletResponse response)
public boolean check(HttpServletRequest request,HttpServletResponse response) throws IOException
{
//assertEquals(request.getContentLength(), ((Request)request).getContentRead());
length[0]=request.getContentLength();
int len=request.getContentLength();
ServletInputStream in = request.getInputStream();
for (int i=0;i<len;i++)
{
int b=in.read();
if (b<0)
return false;
}
if (in.read()>0)
return false;
length.set(len);
return true;
}
};
@ -486,11 +498,11 @@ public class RequestTest
String content="";
for (int l=0;l<1025;l++)
for (int l=0;l<1024;l++)
{
String request="POST / HTTP/1.1\r\n"+
"Host: whatever\r\n"+
"Content-Type: text/test\r\n"+
"Content-Type: multipart/form-data-test\r\n"+
"Content-Length: "+l+"\r\n"+
"Connection: close\r\n"+
"\r\n"+
@ -498,9 +510,8 @@ public class RequestTest
Log.getRootLogger().debug("test l={}",l);
String response = _connector.getResponses(request);
Log.getRootLogger().debug(response);
assertEquals(l,length[0]);
if (l>0)
assertEquals(l,_handler._content.length());
assertThat(response,Matchers.containsString(" 200 OK"));
assertEquals(l,length.get());
content+="x";
}
}

View File

@ -82,7 +82,7 @@ public class ResponseTest
AbstractEndPoint endp = new ByteArrayEndPoint(_scheduler, 5000);
ByteBufferHttpInput input = new ByteBufferHttpInput();
_channel = new HttpChannel<>(connector, new HttpConfiguration(), endp, new HttpTransport()
_channel = new HttpChannel<ByteBuffer>(connector, new HttpConfiguration(), endp, new HttpTransport()
{
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
@ -30,7 +31,9 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
@ -48,7 +51,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
// TODO need these on SPDY as well!
public class AsyncServletIOTest
{
protected AsyncIOServlet _servlet=new AsyncIOServlet();
@ -75,6 +78,10 @@ public class AsyncServletIOTest
_servletHandler.addServletWithMapping(holder,"/path/*");
_server.start();
_port=_connector.getLocalPort();
_owp.set(0);
_oda.set(0);
_read.set(0);
}
@After
@ -116,11 +123,30 @@ public class AsyncServletIOTest
@Test
public void testBigWrites() throws Exception
{
_count.set(0);
process(102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400);
Assert.assertThat(_count.get(),Matchers.greaterThan(1));
Assert.assertThat(_owp.get(),Matchers.greaterThan(1));
}
@Test
public void testRead() throws Exception
{
process("Hello!!!\r\n");
}
@Test
public void testBigRead() throws Exception
{
process("Now is the time for all good men to come to the aid of the party. How now Brown Cow. The quick brown fox jumped over the lazy dog. The moon is blue to a fish in love.\r\n");
}
@Test
public void testReadWrite() throws Exception
{
process("Hello!!!\r\n",10);
}
protected void assertContains(String content,String response)
{
@ -131,9 +157,18 @@ public class AsyncServletIOTest
{
Assert.assertThat(response,Matchers.not(Matchers.containsString(content)));
}
public synchronized List<String> process(String content,int... writes) throws Exception
{
return process(content.getBytes("ISO-8859-1"),writes);
}
public synchronized List<String> process(int... writes) throws Exception
{
return process((byte[])null,writes);
}
public synchronized List<String> process(byte[] content, int... writes) throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("GET /ctx/path/info");
@ -146,17 +181,34 @@ public class AsyncServletIOTest
request.append(" HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Connection: close\r\n")
.append("\r\n");
.append("Connection: close\r\n");
if (content!=null)
request.append("Content-Length: "+content.length+"\r\n")
.append("Content-Type: text/plain\r\n");
request.append("\r\n");
int port=_port;
List<String> list = new ArrayList<>();
try (Socket socket = new Socket("localhost",port);)
{
socket.setSoTimeout(1000000);
socket.getOutputStream().write(request.toString().getBytes("ISO-8859-1"));
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes("ISO-8859-1"));
if (content!=null && content.length>0)
{
Thread.sleep(100);
out.write(content[0]);
Thread.sleep(100);
int half=(content.length-1)/2;
out.write(content,1,half);
Thread.sleep(100);
out.write(content,1+half,content.length-half-1);
}
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
// response line
@ -198,12 +250,17 @@ public class AsyncServletIOTest
w++;
}
if (content!=null)
Assert.assertEquals(content.length,_read.get());
return list;
}
static AtomicInteger _count = new AtomicInteger();
static AtomicInteger _owp = new AtomicInteger();
static AtomicInteger _oda = new AtomicInteger();
static AtomicInteger _read = new AtomicInteger();
private static class AsyncIOServlet extends HttpServlet
{
@ -216,8 +273,53 @@ public class AsyncServletIOTest
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
final String[] writes = request.getParameterValues("w");
final AsyncContext async = request.startAsync();
final AtomicInteger complete = new AtomicInteger(2);
// Asynchronous Read
if (request.getContentLength()>0)
{
// System.err.println("reading "+request.getContentLength());
final ServletInputStream in=request.getInputStream();
in.setReadListener(new ReadListener()
{
byte[] _buf=new byte[32];
@Override
public void onError(Throwable t)
{
if (complete.decrementAndGet()==0)
async.complete();
}
@Override
public void onDataAvailable() throws IOException
{
// System.err.println("ODA");
while (in.isReady())
{
_oda.incrementAndGet();
int len=in.read(_buf);
// System.err.println("read "+len);
if (len>0)
_read.addAndGet(len);
}
}
@Override
public void onAllDataRead() throws IOException
{
// System.err.println("OADR");
if (complete.decrementAndGet()==0)
async.complete();
}
});
}
else
complete.decrementAndGet();
// Asynchronous Write
final String[] writes = request.getParameterValues("w");
final ServletOutputStream out = response.getOutputStream();
out.setWriteListener(new WriteListener()
{
@ -227,7 +329,7 @@ public class AsyncServletIOTest
public void onWritePossible() throws IOException
{
//System.err.println("OWP");
_count.incrementAndGet();
_owp.incrementAndGet();
while (writes!=null && _w< writes.length)
{
@ -247,8 +349,8 @@ public class AsyncServletIOTest
return;
}
//System.err.println("COMPLETE!!!");
async.complete();
if (complete.decrementAndGet()==0)
async.complete();
}
@Override

View File

@ -18,10 +18,13 @@
package org.eclipse.jetty.spdy.server.http;
import org.eclipse.jetty.server.HttpInput;
import java.nio.ByteBuffer;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.QueuedHttpInput;
import org.eclipse.jetty.spdy.api.DataInfo;
public class HttpInputOverSPDY extends HttpInput<DataInfo>
public class HttpInputOverSPDY extends QueuedHttpInput<DataInfo>
{
@Override
protected int remaining(DataInfo item)

View File

@ -215,8 +215,8 @@ public class ProxySPDYToHTTPLoadTest
}
catch (InterruptedException | ExecutionException | TimeoutException | IOException e)
{
fail();
e.printStackTrace();
fail();
}
}
};

View File

@ -53,11 +53,15 @@ public abstract class IteratingNestedCallback extends IteratingCallback
_callback.succeeded();
}
/* ------------------------------------------------------------ */
@Override
public void failed(Throwable x)
{
_callback.failed(x);
}
@Override
public String toString()
{
return String.format("%s@%x",getClass().getSimpleName(),hashCode());
}
}

View File

@ -20,12 +20,14 @@ package org.eclipse.jetty.websocket.server.mux;
import java.nio.ByteBuffer;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.QueuedHttpInput;
/**
* HttpInput for Empty Http body sections.
*/
public class EmptyHttpInput extends HttpInput<ByteBuffer>
public class EmptyHttpInput extends QueuedHttpInput<ByteBuffer>
{
@Override
protected int get(ByteBuffer item, byte[] buffer, int offset, int length)