465857 - Support HTTP/2 clear-text server-side upgrade.

Fixed and test both types of upgrade, from HTTP/1.1 via its
upgrade mechanism, and direct HTTP/2.
This commit is contained in:
Simone Bordet 2015-04-29 22:26:17 +02:00
parent aaaf65bf3c
commit a6cc4ff2f5
10 changed files with 269 additions and 143 deletions

View File

@ -60,17 +60,17 @@ public class HTTP2Connection extends AbstractConnection
return session;
}
protected Parser getParser()
{
return parser;
}
protected void prefill(ByteBuffer buffer)
protected void setInputBuffer(ByteBuffer buffer)
{
producer.buffer=buffer;
producer.buffer = buffer;
}
@Override
public void onOpen()
{

View File

@ -18,14 +18,28 @@
package org.eclipse.jetty.http2.frames;
import java.nio.charset.StandardCharsets;
public class PrefaceFrame extends Frame
{
public static final byte[] PREFACE_BYTES = new byte[]
{
0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54,
0x54, 0x50, 0x2f, 0x32, 0x2e, 0x30, 0x0d, 0x0a,
0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a
};
/**
* The bytes of the HTTP/2 preface that form a legal HTTP/1.1
* request, used in the direct upgrade.
*/
public static final byte[] PREFACE_PREAMBLE_BYTES = (
"PRI * HTTP/2.0\r\n" +
"\r\n"
).getBytes(StandardCharsets.US_ASCII);
/**
* The HTTP/2 preface bytes.
*/
public static final byte[] PREFACE_BYTES = (
"PRI * HTTP/2.0\r\n" +
"\r\n" +
"SM\r\n" +
"\r\n"
).getBytes(StandardCharsets.US_ASCII);
public PrefaceFrame()
{

View File

@ -38,20 +38,19 @@ public class PrefaceParser
this.listener = listener;
}
/* ------------------------------------------------------------ */
/** Unsafe upgrade is an unofficial upgrade from HTTP/1.0 to HTTP/2.0
* initiated when a the <code>org.eclipse.jetty.server.HttpConnection</code> sees a PRI * HTTP/2.0 prefix
* that indicates a HTTP/2.0 client is attempting a h2c direct connection.
* This is not a standard HTTP/1.1 Upgrade path.
/**
* <p>Advances this parser after the {@link PrefaceFrame#PREFACE_PREAMBLE_BYTES}.</p>
* <p>This allows the HTTP/1.1 parser to parse the preamble of the preface,
* which is a legal HTTP/1.1 request, and this parser will parse the remaining
* bytes, that are not parseable by a HTTP/1.1 parser.</p>
*/
public void directUpgrade()
protected void directUpgrade()
{
if (cursor!=0)
if (cursor != 0)
throw new IllegalStateException();
cursor=18;
cursor = PrefaceFrame.PREFACE_PREAMBLE_BYTES.length;
}
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())

View File

@ -41,20 +41,25 @@ public class ServerParser extends Parser
this.prefaceParser = new PrefaceParser(listener);
}
/* ------------------------------------------------------------ */
/** Unsafe upgrade is an unofficial upgrade from HTTP/1.0 to HTTP/2.0
* initiated when a the <code>org.eclipse.jetty.server.HttpConnection</code> sees a PRI * HTTP/2.0 prefix
* that indicates a HTTP/2.0 client is attempting a h2c direct connection.
* This is not a standard HTTP/1.1 Upgrade path.
/**
* <p>A direct upgrade is an unofficial upgrade from HTTP/1.1 to HTTP/2.0.</p>
* <p>A direct upgrade is initiated when {@code org.eclipse.jetty.server.HttpConnection}
* sees a request with these bytes:</p>
* <pre>
* PRI * HTTP/2.0\r\n
* \r\n
* </pre>
* <p>This request is part of the HTTP/2.0 preface, indicating that a
* HTTP/2.0 client is attempting a h2c direct connection.</p>
* <p>This is not a standard HTTP/1.1 Upgrade path.</p>
*/
public void directUpgrade()
{
if (state!=State.PREFACE)
if (state != State.PREFACE)
throw new IllegalStateException();
prefaceParser.directUpgrade();
}
@Override
public void parse(ByteBuffer buffer)
{

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -40,12 +39,12 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int initialStreamSendWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = -1;
private final HttpConfiguration httpConfiguration;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
this(httpConfiguration,"h2-17","h2-16","h2-15","h2-14","h2");
this(httpConfiguration,"h2","h2-17","h2-16","h2-15","h2-14");
}
protected AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration, String... protocols)
{
super(protocols);
@ -86,7 +85,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
{
return httpConfiguration;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
@ -102,8 +101,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
// the typical case is that the connection will be busier and the
// stream idle timeout will expire earlier that the connection's.
session.setStreamIdleTimeout(endPoint.getIdleTimeout());
Parser parser = newServerParser(connector, session);
ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);

View File

@ -34,33 +34,33 @@ import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */
/** HTTP2 Clear Text Connection factory.
* <p>This extension of HTTP2ServerConnection Factory sets the
* <p>This extension of HTTP2ServerConnection Factory sets the
* protocol name to "h2c" as used by the clear text upgrade mechanism
* for HTTP2 and marks all TLS ciphers as unacceptable.
* </p>
* <p>If used in combination with a {@link HttpConnectionFactory} as the
* <p>If used in combination with a {@link HttpConnectionFactory} as the
* default protocol, this factory can support the non-standard direct
* update mechanism, where a HTTP1 request of the form "PRI * HTTP/2.0"
* update mechanism, where a HTTP1 request of the form "PRI * HTTP/2.0"
* is used to trigger a switch to a HTTP2 connection. This approach
* allows a single port to accept either HTTP/1 or HTTP/2 direct
* allows a single port to accept either HTTP/1 or HTTP/2 direct
* connections.
*/
public class HTTP2CServerConnectionFactory extends HTTP2ServerConnectionFactory implements ConnectionFactory.Upgrading
public class HTTP2CServerConnectionFactory extends HTTP2ServerConnectionFactory implements ConnectionFactory.Upgrading
{
private static final Logger LOG = Log.getLogger(HTTP2CServerConnectionFactory.class);
public HTTP2CServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
super(httpConfiguration,"h2c","h2c-14");
super(httpConfiguration,"h2c","h2c-17","h2c-16","h2c-15","h2c-14");
}
@Override
public boolean isAcceptable(String protocol, String tlsProtocol, String tlsCipher)
{
// Never use TLS with h2c
return false;
}
@Override
public Connection upgradeConnection(Connector connector, EndPoint endPoint, Request request, HttpFields response101) throws BadMessageException
{
@ -69,9 +69,9 @@ public class HTTP2CServerConnectionFactory extends HTTP2ServerConnectionFactory
if (request.getContentLength() > 0)
return null;
HTTP2ServerConnection connection = (HTTP2ServerConnection)newConnection(connector, endPoint);
if (connection.upgrade(request, response101))
if (connection.upgrade(request))
return connection;
return null;
}

View File

@ -23,7 +23,7 @@ import java.util.Queue;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.MetaData;
@ -35,7 +35,6 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.http2.parser.SettingsBodyParser;
import org.eclipse.jetty.io.ByteBufferPool;
@ -56,7 +55,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final HttpConfiguration httpConfig;
private HeadersFrame upgradeRequest;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, Parser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
@ -64,11 +63,17 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
}
@Override
public void onUpgradeTo(ByteBuffer prefilled)
protected ServerParser getParser()
{
return (ServerParser)super.getParser();
}
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 onUpgradeTo {} {}", this, BufferUtil.toDetailString(prefilled));
prefill(prefilled);
LOG.debug("HTTP2 onUpgradeTo {} {}", this, BufferUtil.toDetailString(buffer));
setInputBuffer(buffer);
}
@Override
@ -143,16 +148,19 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
return channel;
}
public boolean upgrade(Request request, HttpFields response101)
public boolean upgrade(Request request)
{
if (HttpMethod.PRI.is(request.getMethod()))
{
((ServerParser)getParser()).directUpgrade();
getParser().directUpgrade();
}
else
{
String value = request.getFields().getField(HttpHeader.HTTP2_SETTINGS).getValue();
final byte[] settings = B64Code.decodeRFC4648URL(value);
HttpField settingsField = request.getFields().getField(HttpHeader.HTTP2_SETTINGS);
if (settingsField == null)
throw new BadMessageException("Missing " + HttpHeader.HTTP2_SETTINGS + " header");
String value = settingsField.getValue();
final byte[] settings = B64Code.decodeRFC4648URL(value == null ? "" : value);
if (LOG.isDebugEnabled())
LOG.debug("{} settings {}",this,TypeUtil.toHexString(settings));

View File

@ -31,35 +31,29 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
public class HTTP2CServer extends Server
{
public HTTP2CServer(int port)
{
// HTTP connector
HttpConfiguration config = new HttpConfiguration();
// HTTP + HTTP/2 connector
ServerConnector http = new ServerConnector(this,new HttpConnectionFactory(config), new HTTP2CServerConnectionFactory(config));
http.setHost("localhost");
http.setPort(port);
http.setIdleTimeout(30000);
// Set the connector
addConnector(http);
// Set a handler
((QueuedThreadPool)getThreadPool()).setName("server");
setHandler(new SimpleHandler());
}
public static void main(String... args ) throws Exception
{
// The Server
HTTP2CServer server = new HTTP2CServer(8080);
// Start the server
server.start();
server.join();
}
private static class SimpleHandler extends AbstractHandler
{
@Override
@ -69,15 +63,14 @@ public class HTTP2CServer extends Server
String code=request.getParameter("code");
if (code!=null)
response.setStatus(Integer.parseInt(code));
response.setHeader("Custom","Value");
response.setContentType("text/plain");
String content = "Hello from Jetty using "+request.getProtocol() +"\n";
content+="uri="+request.getRequestURI()+"\n";
content+="date="+new Date()+"\n";
response.setContentLength(content.length());
response.getOutputStream().print(content);
response.getOutputStream().print(content);
}
}
}

View File

@ -18,9 +18,7 @@
package org.eclipse.jetty.http2.server;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
@ -45,30 +43,35 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class HTTP2CServerTest extends AbstractServerTest
{
HTTP2CServer _server;
int _port;
private HTTP2CServer _server;
private int _port;
@Before
public void before() throws Exception
{
_server=new HTTP2CServer(0);
_server = new HTTP2CServer(0);
_server.start();
_port=((NetworkConnector)_server.getConnectors()[0]).getLocalPort();
_port = ((NetworkConnector)_server.getConnectors()[0]).getLocalPort();
}
@After
public void after() throws Exception
{
_server.stop();
}
@Test
public void testHTTP_1_0_Simple() throws Exception
{
@ -76,14 +79,14 @@ public class HTTP2CServerTest extends AbstractServerTest
{
client.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
client.getOutputStream().flush();
String response = IO.toString(client.getInputStream());
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("Hello from Jetty using HTTP/1.0"));
assertThat(response, containsString("HTTP/1.1 200 OK"));
assertThat(response, containsString("Hello from Jetty using HTTP/1.0"));
}
}
@Test
public void testHTTP_1_1_Simple() throws Exception
{
@ -92,27 +95,131 @@ public class HTTP2CServerTest extends AbstractServerTest
client.getOutputStream().write("GET /one HTTP/1.1\r\nHost: localhost\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
client.getOutputStream().write("GET /two HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
client.getOutputStream().flush();
String response = IO.toString(client.getInputStream());
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("Hello from Jetty using HTTP/1.1"));
assertThat(response,containsString("uri=/one"));
assertThat(response,containsString("uri=/two"));
assertThat(response, containsString("HTTP/1.1 200 OK"));
assertThat(response, containsString("Hello from Jetty using HTTP/1.1"));
assertThat(response, containsString("uri=/one"));
assertThat(response, containsString("uri=/two"));
}
}
@Test
public void testHTTP_2_0_Simple() throws Exception
public void testHTTP_1_1_Upgrade() throws Exception
{
try (Socket client = new Socket("localhost", _port))
{
OutputStream output = client.getOutputStream();
output.write(("" +
"GET /one HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Connection: Upgrade, HTTP2-Settings\r\n" +
"Upgrade: h2c\r\n" +
"HTTP2-Settings: \r\n" +
"\r\n").getBytes(StandardCharsets.ISO_8859_1));
output.flush();
InputStream input = client.getInputStream();
Utf8StringBuilder upgrade = new Utf8StringBuilder();
int crlfs = 0;
while (true)
{
int read = input.read();
if (read == '\r' || read == '\n')
++crlfs;
else
crlfs = 0;
upgrade.append((byte)read);
if (crlfs == 4)
break;
}
assertTrue(upgrade.toString().startsWith("HTTP/1.1 101 "));
byteBufferPool = new MappedByteBufferPool();
generator = new Generator(byteBufferPool);
final AtomicReference<HeadersFrame> headersRef = new AtomicReference<>();
final AtomicReference<DataFrame> dataRef = new AtomicReference<>();
final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(new CountDownLatch(2));
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public void onHeaders(HeadersFrame frame)
{
headersRef.set(frame);
latchRef.get().countDown();
}
@Override
public void onData(DataFrame frame)
{
dataRef.set(frame);
latchRef.get().countDown();
}
}, 4096, 8192);
parseResponse(client, parser);
Assert.assertTrue(latchRef.get().await(5, TimeUnit.SECONDS));
HeadersFrame response = headersRef.get();
Assert.assertNotNull(response);
MetaData.Response responseMetaData = (MetaData.Response)response.getMetaData();
Assert.assertEquals(200, responseMetaData.getStatus());
DataFrame responseData = dataRef.get();
Assert.assertNotNull(responseData);
String content = BufferUtil.toString(responseData.getData());
// The upgrade request is seen as HTTP/1.1.
assertThat(content, containsString("Hello from Jetty using HTTP/1.1"));
assertThat(content, containsString("uri=/one"));
// Send a HTTP/2 request.
headersRef.set(null);
dataRef.set(null);
latchRef.set(new CountDownLatch(2));
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = new MetaData.Request("GET", HttpScheme.HTTP, new HostPortHttpField("localhost:" + _port), "/two", HttpVersion.HTTP_2, new HttpFields());
generator.control(lease, new HeadersFrame(3, metaData, null, true));
for (ByteBuffer buffer : lease.getByteBuffers())
output.write(BufferUtil.toArray(buffer));
output.flush();
parseResponse(client, parser);
Assert.assertTrue(latchRef.get().await(5, TimeUnit.SECONDS));
response = headersRef.get();
Assert.assertNotNull(response);
responseMetaData = (MetaData.Response)response.getMetaData();
Assert.assertEquals(200, responseMetaData.getStatus());
responseData = dataRef.get();
Assert.assertNotNull(responseData);
content = BufferUtil.toString(responseData.getData());
assertThat(content, containsString("Hello from Jetty using HTTP/2.0"));
assertThat(content, containsString("uri=/two"));
}
}
@Test
public void testHTTP_2_0_Direct() throws Exception
{
final CountDownLatch latch = new CountDownLatch(3);
byteBufferPool= new MappedByteBufferPool();
byteBufferPool = new MappedByteBufferPool();
generator = new Generator(byteBufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = new MetaData.Request("GET", HttpScheme.HTTP, new HostPortHttpField("localhost:"+_port), "/test", HttpVersion.HTTP_2, new HttpFields());
MetaData.Request metaData = new MetaData.Request("GET", HttpScheme.HTTP, new HostPortHttpField("localhost:" + _port), "/test", HttpVersion.HTTP_2, new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
@ -160,11 +267,11 @@ public class HTTP2CServerTest extends AbstractServerTest
DataFrame responseData = dataRef.get();
Assert.assertNotNull(responseData);
String s = BufferUtil.toString(responseData.getData());
assertThat(s,containsString("Hello from Jetty using HTTP/2.0"));
assertThat(s,containsString("uri=/test"));
assertThat(s, containsString("Hello from Jetty using HTTP/2.0"));
assertThat(s, containsString("uri=/test"));
}
}
}

View File

@ -29,10 +29,10 @@ import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpParser.RequestHandler;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http.HttpParser.RequestHandler;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -70,11 +70,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
/**
* Get the current connection that this thread is dispatched to.
* Note that a thread may be processing a request asynchronously and
* thus not be dispatched to the connection.
* thus not be dispatched to the connection.
* @return the current HttpConnection or null
* @see Request#getAttribute(String) for a more general way to access the HttpConnection
*/
@ -97,7 +97,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_generator = newHttpGenerator();
_channel = newHttpChannel();
_channel = newHttpChannel();
_input = _channel.getRequest().getHttpInput();
_parser = newHttpParser();
if (LOG.isDebugEnabled())
@ -113,12 +113,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
}
protected HttpChannelOverHttp newHttpChannel()
{
return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
}
protected HttpParser newHttpParser()
{
return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
@ -195,12 +195,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_bufferPool.release(buffer);
}
}
public ByteBuffer getRequestBuffer()
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
return _requestBuffer;
return _requestBuffer;
}
public boolean isRequestBufferEmpty()
@ -231,39 +231,40 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
close();
break;
}
// Handle channel event
if (handle)
{
boolean suspended = !_channel.handle();
// We should break iteration if we have suspended or changed connection or this is not the handling thread.
if (suspended || getEndPoint().getConnection() != this)
break;
}
// Continue or break?
else if (filled<=0)
{
if (filled==0)
// Be fill interested only if there was no connection upgrade.
if (filled==0 && getEndPoint().getConnection()==this)
fillInterested();
break;
}
}
}
finally
{
{
setCurrentConnection(last);
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable exit {}", this, _channel.getState());
}
}
/* ------------------------------------------------------------ */
/** Fill and parse data looking for content
* @return true if an {@link RequestHandler} method was called and it returned true;
* @return true if an {@link RequestHandler} method was called and it returned true;
*/
protected boolean fillAndParseForContent()
protected boolean fillAndParseForContent()
{
boolean handled=false;
while (_parser.inContentState())
@ -280,14 +281,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
/* ------------------------------------------------------------ */
private int fillRequestBuffer()
private int fillRequestBuffer()
{
if (_contentBufferReferences.get()>0)
{
LOG.warn("{} fill with unconsumed content!",this);
return 0;
}
if (BufferUtil.isEmpty(_requestBuffer))
{
// Can we fill?
@ -299,7 +300,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
LOG.debug("{} filled -1",this);
return -1;
}
// Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not an possible legal threads calling #parseContent or #completed.
@ -315,10 +316,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// tell parser
if (filled < 0)
_parser.atEOF();
if (LOG.isDebugEnabled())
LOG.debug("{} filled {}",this,filled);
return filled;
}
catch (IOException e)
@ -331,20 +332,20 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
/* ------------------------------------------------------------ */
private boolean parseRequestBuffer()
private boolean parseRequestBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer));
boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}",this,handle,_parser);
// recycle buffer ?
if (_contentBufferReferences.get()==0)
releaseRequestBuffer();
return handle;
}
@ -376,7 +377,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return;
}
}
// Finish consuming the request
// If we are still expecting
if (_channel.isExpecting100Continue())
@ -409,7 +410,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_parser.reset();
else
_parser.close();
// Not in a race here with onFillable, because it has given up control before calling handle.
// in a slight race with #completed, but not sure what to do with that anyway.
if (_chunk!=null)
@ -451,7 +452,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
getEndPoint().close();
}
}
// else the parser must be closed, so seek the EOF if we are still open
// else the parser must be closed, so seek the EOF if we are still open
else if (getEndPoint().isOpen())
fillInterested();
}
@ -502,17 +503,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// then we can't be persistent
_generator.setPersistent(false);
}
if(_sendCallback.reset(info,head,content,lastContent,callback))
_sendCallback.iterate();
}
HttpInput.Content newContent(ByteBuffer c)
{
return new Content(c);
}
private class Content extends HttpInput.Content
{
public Content(ByteBuffer content)
@ -568,7 +569,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_channel.handle();
}
}
private class SendCallback extends IteratingCallback
{
private MetaData.Response _info;
@ -597,7 +598,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_shutdownOut = false;
return true;
}
if (isClosed())
callback.failed(new EofException());
else
@ -610,7 +611,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
if (_callback==null)
throw new IllegalStateException();
ByteBuffer chunk = _chunk;
while (true)
{
@ -646,7 +647,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
else
*/
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue;
}
case NEED_CHUNK:
@ -721,7 +722,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (h!=null)
_bufferPool.release(h);
}
@Override
protected void onCompleteSuccess()
{
@ -739,7 +740,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (_shutdownOut)
getEndPoint().shutdownOutput();
}
@Override
public String toString()
{
@ -750,7 +751,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public void abort(Throwable failure)
{
// Do a direct close of the output, as this may indicate to a client that the
// Do a direct close of the output, as this may indicate to a client that the
// response is bad either with RST or by abnormal completion of chunked response.
getEndPoint().close();
}
@ -760,13 +761,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
return false;
}
/**
* @see org.eclipse.jetty.server.HttpTransport#push(org.eclipse.jetty.http.MetaData.Request)
*/
@Override
public void push(org.eclipse.jetty.http.MetaData.Request request)
{
{
LOG.debug("ignore push in {}",this);
}
@ -777,9 +778,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public void blockingReadFillInterested()
{
getEndPoint().fillInterested(_blockingReadCallback);
getEndPoint().fillInterested(_blockingReadCallback);
}
@Override
public String toString()
{