Improved the HTTP over SPDY layer to support content, and content in multiple chunks.

This commit is contained in:
Simone Bordet 2012-02-08 18:44:04 +01:00
parent 9316a9601b
commit 48fa5ce855
8 changed files with 601 additions and 299 deletions

View File

@ -201,11 +201,14 @@ public class StandardStream implements IStream
{
try
{
// TODO: if the read buffer is small, but the default window size is big,
// we will send many window update frames... perhaps we can delay
// window update frames until we have a bigger delta to send
WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(getVersion(), getId(), delta);
session.control(this, windowUpdateFrame);
if (delta > 0)
{
// TODO: if the read buffer is small, but the default window size is big,
// we will send many window update frames... perhaps we can delay
// window update frames until we have a bigger delta to send
WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(getVersion(), getId(), delta);
session.control(this, windowUpdateFrame);
}
}
catch (StreamException x)
{

View File

@ -35,15 +35,15 @@ import org.eclipse.jetty.spdy.parser.Parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsyncSPDYConnection extends AbstractConnection implements AsyncConnection, Controller
public class SPDYAsyncConnection extends AbstractConnection implements AsyncConnection, Controller
{
private static final Logger logger = LoggerFactory.getLogger(AsyncSPDYConnection.class);
private static final Logger logger = LoggerFactory.getLogger(SPDYAsyncConnection.class);
private final Parser parser;
private ByteBuffer buffer;
private Handler handler;
private volatile boolean flushing;
public AsyncSPDYConnection(EndPoint endp, Parser parser)
public SPDYAsyncConnection(EndPoint endp, Parser parser)
{
super(endp);
this.parser = parser;
@ -58,11 +58,9 @@ public class AsyncSPDYConnection extends AbstractConnection implements AsyncConn
{
int filled = fill();
progress = filled > 0;
logger.debug("Filled {} from {}", filled, endPoint);
int flushed = flush();
progress |= flushed > 0;
logger.debug("Flushed {} to {}", flushed, endPoint);
endPoint.flush();
@ -74,11 +72,12 @@ public class AsyncSPDYConnection extends AbstractConnection implements AsyncConn
return this;
}
protected int fill() throws IOException
public int fill() throws IOException
{
NIOBuffer jettyBuffer = new DirectNIOBuffer(1024);
AsyncEndPoint endPoint = getEndPoint();
int filled = endPoint.fill(jettyBuffer);
logger.debug("Filled {} from {}", filled, endPoint);
if (filled > 0)
{
ByteBuffer buffer = jettyBuffer.getByteBuffer();
@ -86,16 +85,17 @@ public class AsyncSPDYConnection extends AbstractConnection implements AsyncConn
buffer.position(jettyBuffer.getIndex());
parser.parse(buffer);
}
return filled;
}
protected int flush()
public int flush()
{
int result = 0;
// Volatile read to ensure visibility of buffer and handler
if (!flushing)
return 0;
return write(buffer, handler);
if (flushing)
result = write(buffer, handler);
logger.debug("Flushed {} to {}", result, getEndPoint());
return result;
}
@Override

View File

@ -422,7 +422,7 @@ public class SPDYClient
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(compressionFactory.newCompressor());
AsyncSPDYConnection connection = new AsyncSPDYConnection(endPoint, parser);
SPDYAsyncConnection connection = new SPDYAsyncConnection(endPoint, parser);
endPoint.setConnection(connection);
StandardSession session = new StandardSession(connection, 1, sessionFuture.listener, generator);

View File

@ -56,7 +56,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
if (listener == null)
listener = newServerSessionFrameListener(endPoint, attachment);
ServerAsyncSPDYConnection connection = new ServerAsyncSPDYConnection(endPoint, parser, listener);
ServerSPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(connection, 2, listener, generator);
@ -71,12 +71,12 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
return listener;
}
private static class ServerAsyncSPDYConnection extends AsyncSPDYConnection
private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection
{
private final ServerSessionFrameListener listener;
private volatile Session session;
private ServerAsyncSPDYConnection(EndPoint endPoint, Parser parser, ServerSessionFrameListener listener)
private ServerSPDYAsyncConnection(EndPoint endPoint, Parser parser, ServerSessionFrameListener listener)
{
super(endPoint, parser);
this.listener = listener;

View File

@ -17,26 +17,11 @@
package org.eclipse.jetty.spdy.nio.http;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.server.AbstractHttpConnection;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
@ -45,13 +30,14 @@ import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.nio.EmptyAsyncEndPoint;
import org.eclipse.jetty.spdy.nio.SPDYAsyncConnection;
import org.eclipse.jetty.spdy.nio.ServerSPDYAsyncConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectionFactory
{
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(HTTPOverSPDYAsyncConnectionFactory.class);
private final Connector connector;
public HTTPOverSPDYAsyncConnectionFactory(Connector connector)
@ -62,11 +48,18 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio
@Override
protected ServerSessionFrameListener newServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
{
return new HTTPServerSessionFrameListener();
return new HTTPServerSessionFrameListener(endPoint);
}
private class HTTPServerSessionFrameListener extends ServerSessionFrameListener.Adapter implements Stream.FrameListener
{
private final AsyncEndPoint endPoint;
public HTTPServerSessionFrameListener(AsyncEndPoint endPoint)
{
this.endPoint = endPoint;
}
@Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{
@ -76,39 +69,35 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio
// cycle is processed at a time, so we need to fake an http connection
// for each SYN in order to run concurrently.
logger.debug("Received {}", synInfo);
logger.debug("Received {} on {}", synInfo, stream);
try
{
HTTPSPDYConnection connection = new HTTPSPDYConnection(connector, new HTTPSPDYAsyncEndPoint(stream), connector.getServer(), stream);
HTTPSPDYAsyncConnection connection = new HTTPSPDYAsyncConnection(connector,
new HTTPSPDYAsyncEndPoint(stream), connector.getServer(),
(SPDYAsyncConnection)endPoint.getConnection(), stream);
stream.setAttribute("connection", connection);
stream.setAttribute(ParseStatus.class.getName(), ParseStatus.INITIAL);
Headers headers = synInfo.getHeaders();
connection.beginRequest(headers);
if (headers.isEmpty())
{
// SYN with no headers, perhaps they'll come in a HEADER frame
// SYN with no headers, perhaps they'll come later in a HEADER frame
return this;
}
else
{
boolean processed = processRequest(stream, headers);
if (!processed)
{
respond(stream, HttpStatus.BAD_REQUEST_400);
return null;
}
if (synInfo.isClose())
{
forwardHeadersComplete(stream);
forwardRequestComplete(stream);
connection.endRequest();
return null;
}
else
{
if (headers.names().contains("expect"))
forwardHeadersComplete(stream);
// TODO
// if (headers.names().contains("expect"))
// forwardHeadersComplete(stream);
return this;
}
}
@ -125,23 +114,6 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio
}
}
private boolean processRequest(Stream stream, Headers headers) throws IOException
{
if (stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.INITIAL)
{
Headers.Header method = headers.get("method");
Headers.Header uri = headers.get("url");
Headers.Header version = headers.get("version");
if (method == null || uri == null || version == null)
return false;
forwardRequest(stream, method.value(), uri.value(), version.value());
}
forwardHeaders(stream, headers);
return true;
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -151,22 +123,15 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
logger.debug("Received {}", headersInfo);
// TODO: support trailers
Boolean dataSeen = (Boolean)stream.getAttribute("data");
if (dataSeen != null && dataSeen)
return;
logger.debug("Received {} on {}", headersInfo, stream);
try
{
processRequest(stream, headersInfo.getHeaders());
HTTPSPDYAsyncConnection connection = (HTTPSPDYAsyncConnection)stream.getAttribute("connection");
connection.headers(headersInfo.getHeaders());
if (headersInfo.isClose())
{
forwardHeadersComplete(stream);
forwardRequestComplete(stream);
}
connection.endRequest();
}
catch (HttpException x)
{
@ -181,16 +146,19 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
logger.debug("Received {} on {}", dataInfo, stream);
try
{
if (stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.REQUEST)
forwardHeadersComplete(stream);
ByteBuffer buffer = ByteBuffer.allocate(dataInfo.getBytesCount());
dataInfo.getBytes(buffer);
forwardContent(stream, buffer);
buffer.flip();
HTTPSPDYAsyncConnection connection = (HTTPSPDYAsyncConnection)stream.getAttribute("connection");
connection.content(buffer, dataInfo.isClose());
if (dataInfo.isClose())
forwardRequestComplete(stream);
connection.endRequest();
}
catch (HttpException x)
{
@ -210,142 +178,10 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio
stream.reply(new ReplyInfo(headers, true));
}
private void forwardRequest(Stream stream, String method, String uri, String version) throws IOException
{
assert stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.INITIAL;
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
connection.startRequest(new ByteArrayBuffer(method), new ByteArrayBuffer(uri), new ByteArrayBuffer(version));
stream.setAttribute(ParseStatus.class.getName(), ParseStatus.REQUEST);
}
private void forwardHeaders(Stream stream, Headers headers) throws IOException
{
assert stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.REQUEST;
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
for (Headers.Header header : headers)
{
String name = header.name();
switch (name)
{
case "method":
case "version":
// Skip request line headers
continue;
case "url":
// Mangle the URL if the host header is missing
String host = parseHost(header.value());
// Jetty needs the host header, although HTTP 1.1 does not
// require it if it can be parsed from an absolute URI
if (host != null)
connection.parsedHeader(new ByteArrayBuffer("host"), new ByteArrayBuffer(host));
break;
case "connection":
case "keep-alive":
case "host":
// Spec says to ignore these headers
continue;
default:
// Spec says headers must be single valued
String value = header.value();
connection.parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value));
break;
}
}
}
private String parseHost(String url)
{
try
{
URI uri = new URI(url);
return uri.getHost() + (uri.getPort() > 0 ? ":" + uri.getPort() : "");
}
catch (URISyntaxException x)
{
return null;
}
}
private void forwardHeadersComplete(Stream stream) throws IOException
{
assert stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.REQUEST;
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
connection.headerComplete();
stream.setAttribute(ParseStatus.class.getName(), ParseStatus.HEADERS);
}
private void forwardContent(Stream stream, ByteBuffer buffer) throws IOException
{
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
connection.content(new IndirectNIOBuffer(buffer, false));
stream.setAttribute(ParseStatus.class.getName(), ParseStatus.CONTENT);
}
private void forwardRequestComplete(Stream stream) throws IOException
{
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
connection.messageComplete(0); // TODO: content length
}
private void close(Stream stream)
{
stream.getSession().goAway(stream.getVersion());
}
}
private enum ParseStatus
{
INITIAL, REQUEST, HEADERS, CONTENT
}
private class HTTPSPDYConnection extends AbstractHttpConnection
{
private HTTPSPDYConnection(Connector connector, EndPoint endPoint, Server server, Stream stream)
{
super(connector, endPoint, server,
new HttpParser(connector.getRequestBuffers(), endPoint, new HTTPSPDYParserHandler()),
new HTTPSPDYGenerator(connector.getResponseBuffers(), endPoint, stream), new HTTPSPDYRequest());
((HTTPSPDYRequest)getRequest()).setConnection(this);
getParser().setPersistent(true);
}
@Override
public Connection handle() throws IOException
{
return this;
}
public void startRequest(Buffer method, Buffer uri, Buffer version) throws IOException
{
super.startRequest(method, uri, version);
}
public void parsedHeader(Buffer name, Buffer value) throws IOException
{
super.parsedHeader(name, value);
}
public void headerComplete() throws IOException
{
super.headerComplete();
}
public void content(Buffer buffer) throws IOException
{
super.content(buffer);
}
public void messageComplete(long contentLength) throws IOException
{
super.messageComplete(contentLength);
}
}
private class HTTPSPDYAsyncEndPoint extends EmptyAsyncEndPoint
@ -357,86 +193,4 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio
this.stream = stream;
}
}
/**
* Empty implementation, since it won't parse anything
*/
private class HTTPSPDYParserHandler extends HttpParser.EventHandler
{
@Override
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
}
@Override
public void content(Buffer ref) throws IOException
{
}
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
}
}
private class HTTPSPDYGenerator extends HttpGenerator
{
private final Stream stream;
private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint, Stream stream)
{
super(buffers, endPoint);
this.stream = stream;
}
@Override
public void send1xx(int code) throws IOException
{
Headers headers = new Headers();
headers.put("status", String.valueOf(code));
headers.put("version", "HTTP/1.1");
stream.reply(new ReplyInfo(headers, false));
}
@Override
public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException
{
Headers headers = new Headers();
StringBuilder status = new StringBuilder().append(_status);
if (_reason != null)
status.append(" ").append(_reason.toString("UTF-8"));
headers.put("status", status.toString());
headers.put("version", "HTTP/1.1");
for (int i = 0; i < fields.size(); ++i)
{
HttpFields.Field field = fields.getField(i);
headers.put(field.getName(), field.getValue());
}
stream.reply(new ReplyInfo(headers, allContentAdded));
}
@Override
public void addContent(Buffer content, boolean last) throws IOException
{
// TODO
System.out.println("SIMON");
}
@Override
public void complete() throws IOException
{
// Nothing to do
}
}
/**
* Needed only to please the compiler
*/
private class HTTPSPDYRequest extends Request
{
private void setConnection(HTTPSPDYConnection connection)
{
super.setConnection(connection);
}
}
}

View File

@ -0,0 +1,412 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.nio.http;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.NIOBuffer;
import org.eclipse.jetty.server.AbstractHttpConnection;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.nio.SPDYAsyncConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection
{
private static final Logger logger = LoggerFactory.getLogger(HTTPSPDYAsyncConnection.class);
private final SPDYAsyncConnection connection;
private final Stream stream;
private volatile State state = State.INITIAL;
private volatile NIOBuffer buffer;
public HTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, SPDYAsyncConnection connection, Stream stream)
{
super(connector, endPoint, server);
this.connection = connection;
this.stream = stream;
getParser().setPersistent(true);
}
@Override
protected HttpParser newHttpParser(Buffers requestBuffers, EndPoint endPoint, HttpParser.EventHandler requestHandler)
{
return new HTTPSPDYParser(requestBuffers, endPoint);
}
@Override
protected HttpGenerator newHttpGenerator(Buffers responseBuffers, EndPoint endPoint)
{
return new HTTPSPDYGenerator(responseBuffers, endPoint);
}
@Override
public AsyncEndPoint getEndPoint()
{
return (AsyncEndPoint)super.getEndPoint();
}
@Override
public Connection handle() throws IOException
{
return this;
}
@Override
public void onInputShutdown() throws IOException
{
// TODO
}
public void beginRequest(Headers headers) throws IOException
{
switch (state)
{
case INITIAL:
{
if (!headers.isEmpty())
{
Headers.Header method = headers.get("method");
Headers.Header uri = headers.get("url");
Headers.Header version = headers.get("version");
if (method == null || uri == null || version == null)
throw new HttpException(HttpStatus.BAD_REQUEST_400);
state = State.REQUEST;
String m = method.value();
String u = uri.value();
String v = version.value();
logger.debug("HTTP {} {} {}", new Object[]{m, u, v});
startRequest(new ByteArrayBuffer(m), new ByteArrayBuffer(u), new ByteArrayBuffer(v));
headers(headers);
}
break;
}
default:
{
throw new IllegalStateException();
}
}
}
public void headers(Headers headers) throws IOException
{
switch (state)
{
case INITIAL:
{
if (headers.isEmpty())
throw new HttpException(HttpStatus.BAD_REQUEST_400);
beginRequest(headers);
break;
}
case REQUEST:
{
for (Headers.Header header : headers)
{
String name = header.name();
switch (name)
{
case "method":
case "version":
// Skip request line headers
continue;
case "url":
// Mangle the URL if the host header is missing
String host = parseHost(header.value());
// Jetty needs the host header, although HTTP 1.1 does not
// require it if it can be parsed from an absolute URI
if (host != null)
parsedHeader(new ByteArrayBuffer("host"), new ByteArrayBuffer(host));
break;
case "connection":
case "keep-alive":
case "host":
// Spec says to ignore these headers
continue;
default:
// Spec says headers must be single valued
String value = header.value();
logger.debug("HTTP {}: {}", name, value);
parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value));
break;
}
}
break;
}
}
}
public void content(ByteBuffer byteBuffer, boolean endRequest) throws IOException
{
switch (state)
{
case REQUEST:
{
state = endRequest ? State.FINAL : State.CONTENT;
buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
logger.debug("Accumulated first {} content bytes", byteBuffer.remaining());
headerComplete();
content(buffer);
break;
}
case CONTENT:
{
if (endRequest)
state = State.FINAL;
buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
logger.debug("Accumulated {} content bytes", byteBuffer.remaining());
content(buffer);
break;
}
default:
{
throw new IllegalStateException();
}
}
}
private Buffer consumeContent(long maxIdleTime) throws IOException
{
switch (state)
{
case CONTENT:
{
Buffer buffer = this.buffer;
logger.debug("Consuming {} content bytes", buffer.length());
if (buffer.length() > 0)
return buffer;
while (true)
{
// We read and parse more bytes; this may change the state
// (for example to FINAL state) and change the buffer field
connection.fill();
if (state != State.CONTENT)
{
return consumeContent(maxIdleTime);
}
// Read again the buffer field, it may have changed by fill() above
buffer = this.buffer;
logger.debug("Consuming {} content bytes", buffer.length());
if (buffer.length() > 0)
return buffer;
// Wait for content
logger.debug("Waiting {} ms for content bytes", maxIdleTime);
long begin = System.nanoTime();
boolean expired = !connection.getEndPoint().blockReadable(maxIdleTime);
if (expired)
{
stream.getSession().goAway(stream.getVersion());
throw new EOFException("read timeout");
}
logger.debug("Waited {} ms for content bytes", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin));
}
}
case FINAL:
{
Buffer buffer = this.buffer;
logger.debug("Consuming {} content bytes", buffer.length());
if (buffer.length() > 0)
return buffer;
return null;
}
default:
{
throw new IllegalStateException();
}
}
}
public void endRequest() throws IOException
{
switch (state)
{
case REQUEST:
{
state = State.FINAL;
headerComplete();
endRequest();
break;
}
case FINAL:
{
messageComplete(0);
break;
}
default:
{
throw new IllegalStateException();
}
}
}
private String parseHost(String url)
{
try
{
URI uri = new URI(url);
return uri.getHost() + (uri.getPort() > 0 ? ":" + uri.getPort() : "");
}
catch (URISyntaxException x)
{
return null;
}
}
private enum State
{
INITIAL, REQUEST, CONTENT, FINAL
}
/**
* Needed in order to override parser methods that read content.
* TODO: DESIGN: having the parser to block for content is messy, since the
* TODO: DESIGN: state machine for that should be in the connection/interpreter
*/
private class HTTPSPDYParser extends HttpParser
{
public HTTPSPDYParser(Buffers buffers, EndPoint endPoint)
{
super(buffers, endPoint, new HTTPSPDYParserHandler());
}
@Override
public Buffer blockForContent(long maxIdleTime) throws IOException
{
return consumeContent(maxIdleTime);
}
@Override
public int available() throws IOException
{
return super.available();
}
}
/**
* Empty implementation, since it won't parse anything
*/
private static class HTTPSPDYParserHandler extends HttpParser.EventHandler
{
@Override
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
}
@Override
public void content(Buffer ref) throws IOException
{
}
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
}
}
/**
* Needed in order to override generator methods that would generate HTTP,
* since we must generate SPDY instead.
*/
private class HTTPSPDYGenerator extends HttpGenerator
{
private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint)
{
super(buffers, endPoint);
}
@Override
public void send1xx(int code) throws IOException
{
Headers headers = new Headers();
headers.put("status", String.valueOf(code));
headers.put("version", "HTTP/1.1");
stream.reply(new ReplyInfo(headers, false));
}
@Override
public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException
{
Headers headers = new Headers();
StringBuilder status = new StringBuilder().append(_status);
if (_reason != null)
status.append(" ").append(_reason.toString("UTF-8"));
headers.put("status", status.toString());
headers.put("version", "HTTP/1.1");
for (int i = 0; i < fields.size(); ++i)
{
HttpFields.Field field = fields.getField(i);
headers.put(field.getName(), field.getValue());
}
stream.reply(new ReplyInfo(headers, allContentAdded));
}
@Override
public void addContent(Buffer content, boolean last) throws IOException
{
// TODO
System.out.println("SIMON");
}
@Override
public void complete() throws IOException
{
// Nothing to do
}
}
/**
* Needed only to please the compiler
*/
private static class HTTPSPDYRequest extends Request
{
private void setConnection(HTTPSPDYAsyncConnection connection)
{
super.setConnection(connection);
}
}
}

View File

@ -130,6 +130,7 @@ public class Parser
{
try
{
logger.debug("Parsing {} bytes", buffer.remaining());
while (buffer.hasRemaining())
{
switch (state)

View File

@ -134,6 +134,7 @@ public class HTTPOverSPDYTest
throws IOException, ServletException
{
request.setHandled(true);
Assert.assertEquals("GET", httpRequest.getMethod());
Assert.assertEquals(path, target);
Assert.assertEquals(path, httpRequest.getRequestURI());
Assert.assertEquals("localhost:" + connector.getLocalPort(), httpRequest.getHeader("host"));
@ -175,6 +176,7 @@ public class HTTPOverSPDYTest
throws IOException, ServletException
{
request.setHandled(true);
Assert.assertEquals("GET", httpRequest.getMethod());
Assert.assertEquals(path, target);
Assert.assertEquals(path, httpRequest.getRequestURI());
Assert.assertEquals(query, httpRequest.getQueryString());
@ -201,4 +203,134 @@ public class HTTPOverSPDYTest
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testHEAD() throws Exception
{
final String path = "/foo";
final CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
request.setHandled(true);
Assert.assertEquals("HEAD", httpRequest.getMethod());
Assert.assertEquals(path, target);
Assert.assertEquals(path, httpRequest.getRequestURI());
handlerLatch.countDown();
}
}, null);
Headers headers = new Headers();
headers.put("method", "HEAD");
headers.put("url", "http://localhost:" + connector.getLocalPort() + path);
headers.put("version", "HTTP/1.1");
final CountDownLatch replyLatch = new CountDownLatch(1);
session.syn(SPDY.V2, new SynInfo(headers, true), new Stream.FrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Assert.assertTrue(replyInfo.isClose());
Headers replyHeaders = replyInfo.getHeaders();
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
});
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPOSTWithParameters() throws Exception
{
final String path = "/foo";
final String data = "a=1&b=2";
final CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
request.setHandled(true);
Assert.assertEquals("POST", httpRequest.getMethod());
Assert.assertEquals("1", httpRequest.getParameter("a"));
Assert.assertEquals("2", httpRequest.getParameter("b"));
handlerLatch.countDown();
}
}, null);
Headers headers = new Headers();
headers.put("method", "POST");
headers.put("url", "http://localhost:" + connector.getLocalPort() + path);
headers.put("version", "HTTP/1.1");
headers.put("content-type", "application/x-www-form-urlencoded");
final CountDownLatch replyLatch = new CountDownLatch(1);
Stream stream = session.syn(SPDY.V2, new SynInfo(headers, false), new Stream.FrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Assert.assertTrue(replyInfo.isClose());
Headers replyHeaders = replyInfo.getHeaders();
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
});
stream.data(new StringDataInfo(data, true));
Assert.assertTrue(handlerLatch.await(500, TimeUnit.SECONDS));
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPOSTWithParametersInTwoFrames() throws Exception
{
final String path = "/foo";
final String data1 = "a=1&";
final String data2 = "b=2";
final CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
request.setHandled(true);
Assert.assertEquals("POST", httpRequest.getMethod());
Assert.assertEquals("1", httpRequest.getParameter("a"));
Assert.assertEquals("2", httpRequest.getParameter("b"));
handlerLatch.countDown();
}
}, null);
Headers headers = new Headers();
headers.put("method", "POST");
headers.put("url", "http://localhost:" + connector.getLocalPort() + path);
headers.put("version", "HTTP/1.1");
headers.put("content-type", "application/x-www-form-urlencoded");
final CountDownLatch replyLatch = new CountDownLatch(1);
Stream stream = session.syn(SPDY.V2, new SynInfo(headers, false), new Stream.FrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Assert.assertTrue(replyInfo.isClose());
Headers replyHeaders = replyInfo.getHeaders();
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
});
stream.data(new StringDataInfo(data1, false));
Thread.sleep(1000);
stream.data(new StringDataInfo(data2, true));
Assert.assertTrue(handlerLatch.await(500, TimeUnit.SECONDS));
Assert.assertTrue(replyLatch.await(500, TimeUnit.SECONDS));
}
}