From 48fa5ce8552a1ad39920e3ae431db4a1c756d60c Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 8 Feb 2012 18:44:04 +0100 Subject: [PATCH] Improved the HTTP over SPDY layer to support content, and content in multiple chunks. --- .../eclipse/jetty/spdy/StandardStream.java | 13 +- ...nnection.java => SPDYAsyncConnection.java} | 22 +- .../eclipse/jetty/spdy/nio/SPDYClient.java | 2 +- .../nio/ServerSPDYAsyncConnectionFactory.java | 6 +- .../HTTPOverSPDYAsyncConnectionFactory.java | 312 ++----------- .../nio/http/HTTPSPDYAsyncConnection.java | 412 ++++++++++++++++++ .../org/eclipse/jetty/spdy/parser/Parser.java | 1 + .../jetty/spdy/nio/http/HTTPOverSPDYTest.java | 132 ++++++ 8 files changed, 601 insertions(+), 299 deletions(-) rename src/main/java/org/eclipse/jetty/spdy/nio/{AsyncSPDYConnection.java => SPDYAsyncConnection.java} (92%) create mode 100644 src/main/java/org/eclipse/jetty/spdy/nio/http/HTTPSPDYAsyncConnection.java diff --git a/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index 8dce8bf8c43..95fa82653f3 100644 --- a/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -201,11 +201,14 @@ public class StandardStream implements IStream { try { - // TODO: if the read buffer is small, but the default window size is big, - // we will send many window update frames... perhaps we can delay - // window update frames until we have a bigger delta to send - WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(getVersion(), getId(), delta); - session.control(this, windowUpdateFrame); + if (delta > 0) + { + // TODO: if the read buffer is small, but the default window size is big, + // we will send many window update frames... perhaps we can delay + // window update frames until we have a bigger delta to send + WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(getVersion(), getId(), delta); + session.control(this, windowUpdateFrame); + } } catch (StreamException x) { diff --git a/src/main/java/org/eclipse/jetty/spdy/nio/AsyncSPDYConnection.java b/src/main/java/org/eclipse/jetty/spdy/nio/SPDYAsyncConnection.java similarity index 92% rename from src/main/java/org/eclipse/jetty/spdy/nio/AsyncSPDYConnection.java rename to src/main/java/org/eclipse/jetty/spdy/nio/SPDYAsyncConnection.java index b92f01f889d..6e2bda3442a 100644 --- a/src/main/java/org/eclipse/jetty/spdy/nio/AsyncSPDYConnection.java +++ b/src/main/java/org/eclipse/jetty/spdy/nio/SPDYAsyncConnection.java @@ -35,15 +35,15 @@ import org.eclipse.jetty.spdy.parser.Parser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AsyncSPDYConnection extends AbstractConnection implements AsyncConnection, Controller +public class SPDYAsyncConnection extends AbstractConnection implements AsyncConnection, Controller { - private static final Logger logger = LoggerFactory.getLogger(AsyncSPDYConnection.class); + private static final Logger logger = LoggerFactory.getLogger(SPDYAsyncConnection.class); private final Parser parser; private ByteBuffer buffer; private Handler handler; private volatile boolean flushing; - public AsyncSPDYConnection(EndPoint endp, Parser parser) + public SPDYAsyncConnection(EndPoint endp, Parser parser) { super(endp); this.parser = parser; @@ -58,11 +58,9 @@ public class AsyncSPDYConnection extends AbstractConnection implements AsyncConn { int filled = fill(); progress = filled > 0; - logger.debug("Filled {} from {}", filled, endPoint); int flushed = flush(); progress |= flushed > 0; - logger.debug("Flushed {} to {}", flushed, endPoint); endPoint.flush(); @@ -74,11 +72,12 @@ public class AsyncSPDYConnection extends AbstractConnection implements AsyncConn return this; } - protected int fill() throws IOException + public int fill() throws IOException { NIOBuffer jettyBuffer = new DirectNIOBuffer(1024); AsyncEndPoint endPoint = getEndPoint(); int filled = endPoint.fill(jettyBuffer); + logger.debug("Filled {} from {}", filled, endPoint); if (filled > 0) { ByteBuffer buffer = jettyBuffer.getByteBuffer(); @@ -86,16 +85,17 @@ public class AsyncSPDYConnection extends AbstractConnection implements AsyncConn buffer.position(jettyBuffer.getIndex()); parser.parse(buffer); } - return filled; } - protected int flush() + public int flush() { + int result = 0; // Volatile read to ensure visibility of buffer and handler - if (!flushing) - return 0; - return write(buffer, handler); + if (flushing) + result = write(buffer, handler); + logger.debug("Flushed {} to {}", result, getEndPoint()); + return result; } @Override diff --git a/src/main/java/org/eclipse/jetty/spdy/nio/SPDYClient.java b/src/main/java/org/eclipse/jetty/spdy/nio/SPDYClient.java index c5dc0cff717..cd321f20940 100644 --- a/src/main/java/org/eclipse/jetty/spdy/nio/SPDYClient.java +++ b/src/main/java/org/eclipse/jetty/spdy/nio/SPDYClient.java @@ -422,7 +422,7 @@ public class SPDYClient Parser parser = new Parser(compressionFactory.newDecompressor()); Generator generator = new Generator(compressionFactory.newCompressor()); - AsyncSPDYConnection connection = new AsyncSPDYConnection(endPoint, parser); + SPDYAsyncConnection connection = new SPDYAsyncConnection(endPoint, parser); endPoint.setConnection(connection); StandardSession session = new StandardSession(connection, 1, sessionFuture.listener, generator); diff --git a/src/main/java/org/eclipse/jetty/spdy/nio/ServerSPDYAsyncConnectionFactory.java b/src/main/java/org/eclipse/jetty/spdy/nio/ServerSPDYAsyncConnectionFactory.java index baa4dc59636..9b7e9375a2d 100644 --- a/src/main/java/org/eclipse/jetty/spdy/nio/ServerSPDYAsyncConnectionFactory.java +++ b/src/main/java/org/eclipse/jetty/spdy/nio/ServerSPDYAsyncConnectionFactory.java @@ -56,7 +56,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory if (listener == null) listener = newServerSessionFrameListener(endPoint, attachment); - ServerAsyncSPDYConnection connection = new ServerAsyncSPDYConnection(endPoint, parser, listener); + ServerSPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener); endPoint.setConnection(connection); final StandardSession session = new StandardSession(connection, 2, listener, generator); @@ -71,12 +71,12 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory return listener; } - private static class ServerAsyncSPDYConnection extends AsyncSPDYConnection + private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection { private final ServerSessionFrameListener listener; private volatile Session session; - private ServerAsyncSPDYConnection(EndPoint endPoint, Parser parser, ServerSessionFrameListener listener) + private ServerSPDYAsyncConnection(EndPoint endPoint, Parser parser, ServerSessionFrameListener listener) { super(endPoint, parser); this.listener = listener; diff --git a/src/main/java/org/eclipse/jetty/spdy/nio/http/HTTPOverSPDYAsyncConnectionFactory.java b/src/main/java/org/eclipse/jetty/spdy/nio/http/HTTPOverSPDYAsyncConnectionFactory.java index 92674cca53e..f412bbd9b92 100644 --- a/src/main/java/org/eclipse/jetty/spdy/nio/http/HTTPOverSPDYAsyncConnectionFactory.java +++ b/src/main/java/org/eclipse/jetty/spdy/nio/http/HTTPOverSPDYAsyncConnectionFactory.java @@ -17,26 +17,11 @@ package org.eclipse.jetty.spdy.nio.http; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.ByteBuffer; import org.eclipse.jetty.http.HttpException; -import org.eclipse.jetty.http.HttpFields; -import org.eclipse.jetty.http.HttpGenerator; -import org.eclipse.jetty.http.HttpParser; -import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.AsyncEndPoint; -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.io.Buffers; -import org.eclipse.jetty.io.ByteArrayBuffer; -import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.nio.IndirectNIOBuffer; -import org.eclipse.jetty.server.AbstractHttpConnection; import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.HeadersInfo; @@ -45,13 +30,14 @@ import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.nio.EmptyAsyncEndPoint; +import org.eclipse.jetty.spdy.nio.SPDYAsyncConnection; import org.eclipse.jetty.spdy.nio.ServerSPDYAsyncConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectionFactory { - private final Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HTTPOverSPDYAsyncConnectionFactory.class); private final Connector connector; public HTTPOverSPDYAsyncConnectionFactory(Connector connector) @@ -62,11 +48,18 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio @Override protected ServerSessionFrameListener newServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment) { - return new HTTPServerSessionFrameListener(); + return new HTTPServerSessionFrameListener(endPoint); } private class HTTPServerSessionFrameListener extends ServerSessionFrameListener.Adapter implements Stream.FrameListener { + private final AsyncEndPoint endPoint; + + public HTTPServerSessionFrameListener(AsyncEndPoint endPoint) + { + this.endPoint = endPoint; + } + @Override public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo) { @@ -76,39 +69,35 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio // cycle is processed at a time, so we need to fake an http connection // for each SYN in order to run concurrently. - logger.debug("Received {}", synInfo); + logger.debug("Received {} on {}", synInfo, stream); try { - HTTPSPDYConnection connection = new HTTPSPDYConnection(connector, new HTTPSPDYAsyncEndPoint(stream), connector.getServer(), stream); + HTTPSPDYAsyncConnection connection = new HTTPSPDYAsyncConnection(connector, + new HTTPSPDYAsyncEndPoint(stream), connector.getServer(), + (SPDYAsyncConnection)endPoint.getConnection(), stream); stream.setAttribute("connection", connection); - stream.setAttribute(ParseStatus.class.getName(), ParseStatus.INITIAL); Headers headers = synInfo.getHeaders(); + connection.beginRequest(headers); + if (headers.isEmpty()) { - // SYN with no headers, perhaps they'll come in a HEADER frame + // SYN with no headers, perhaps they'll come later in a HEADER frame return this; } else { - boolean processed = processRequest(stream, headers); - if (!processed) - { - respond(stream, HttpStatus.BAD_REQUEST_400); - return null; - } - if (synInfo.isClose()) { - forwardHeadersComplete(stream); - forwardRequestComplete(stream); + connection.endRequest(); return null; } else { - if (headers.names().contains("expect")) - forwardHeadersComplete(stream); + // TODO +// if (headers.names().contains("expect")) +// forwardHeadersComplete(stream); return this; } } @@ -125,23 +114,6 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio } } - private boolean processRequest(Stream stream, Headers headers) throws IOException - { - if (stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.INITIAL) - { - Headers.Header method = headers.get("method"); - Headers.Header uri = headers.get("url"); - Headers.Header version = headers.get("version"); - - if (method == null || uri == null || version == null) - return false; - - forwardRequest(stream, method.value(), uri.value(), version.value()); - } - forwardHeaders(stream, headers); - return true; - } - @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -151,22 +123,15 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio @Override public void onHeaders(Stream stream, HeadersInfo headersInfo) { - logger.debug("Received {}", headersInfo); - - // TODO: support trailers - Boolean dataSeen = (Boolean)stream.getAttribute("data"); - if (dataSeen != null && dataSeen) - return; + logger.debug("Received {} on {}", headersInfo, stream); try { - processRequest(stream, headersInfo.getHeaders()); + HTTPSPDYAsyncConnection connection = (HTTPSPDYAsyncConnection)stream.getAttribute("connection"); + connection.headers(headersInfo.getHeaders()); if (headersInfo.isClose()) - { - forwardHeadersComplete(stream); - forwardRequestComplete(stream); - } + connection.endRequest(); } catch (HttpException x) { @@ -181,16 +146,19 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio @Override public void onData(Stream stream, DataInfo dataInfo) { + logger.debug("Received {} on {}", dataInfo, stream); + try { - if (stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.REQUEST) - forwardHeadersComplete(stream); - ByteBuffer buffer = ByteBuffer.allocate(dataInfo.getBytesCount()); dataInfo.getBytes(buffer); - forwardContent(stream, buffer); + buffer.flip(); + + HTTPSPDYAsyncConnection connection = (HTTPSPDYAsyncConnection)stream.getAttribute("connection"); + connection.content(buffer, dataInfo.isClose()); + if (dataInfo.isClose()) - forwardRequestComplete(stream); + connection.endRequest(); } catch (HttpException x) { @@ -210,142 +178,10 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio stream.reply(new ReplyInfo(headers, true)); } - private void forwardRequest(Stream stream, String method, String uri, String version) throws IOException - { - assert stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.INITIAL; - - HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection"); - connection.startRequest(new ByteArrayBuffer(method), new ByteArrayBuffer(uri), new ByteArrayBuffer(version)); - - stream.setAttribute(ParseStatus.class.getName(), ParseStatus.REQUEST); - } - - private void forwardHeaders(Stream stream, Headers headers) throws IOException - { - assert stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.REQUEST; - - HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection"); - for (Headers.Header header : headers) - { - String name = header.name(); - switch (name) - { - case "method": - case "version": - // Skip request line headers - continue; - case "url": - // Mangle the URL if the host header is missing - String host = parseHost(header.value()); - // Jetty needs the host header, although HTTP 1.1 does not - // require it if it can be parsed from an absolute URI - if (host != null) - connection.parsedHeader(new ByteArrayBuffer("host"), new ByteArrayBuffer(host)); - break; - case "connection": - case "keep-alive": - case "host": - // Spec says to ignore these headers - continue; - default: - // Spec says headers must be single valued - String value = header.value(); - connection.parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value)); - break; - } - } - } - - private String parseHost(String url) - { - try - { - URI uri = new URI(url); - return uri.getHost() + (uri.getPort() > 0 ? ":" + uri.getPort() : ""); - } - catch (URISyntaxException x) - { - return null; - } - } - - private void forwardHeadersComplete(Stream stream) throws IOException - { - assert stream.getAttribute(ParseStatus.class.getName()) == ParseStatus.REQUEST; - - HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection"); - connection.headerComplete(); - - stream.setAttribute(ParseStatus.class.getName(), ParseStatus.HEADERS); - } - - private void forwardContent(Stream stream, ByteBuffer buffer) throws IOException - { - HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection"); - connection.content(new IndirectNIOBuffer(buffer, false)); - - stream.setAttribute(ParseStatus.class.getName(), ParseStatus.CONTENT); - } - - private void forwardRequestComplete(Stream stream) throws IOException - { - HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection"); - connection.messageComplete(0); // TODO: content length - } - private void close(Stream stream) { stream.getSession().goAway(stream.getVersion()); } - - } - - private enum ParseStatus - { - INITIAL, REQUEST, HEADERS, CONTENT - } - - private class HTTPSPDYConnection extends AbstractHttpConnection - { - private HTTPSPDYConnection(Connector connector, EndPoint endPoint, Server server, Stream stream) - { - super(connector, endPoint, server, - new HttpParser(connector.getRequestBuffers(), endPoint, new HTTPSPDYParserHandler()), - new HTTPSPDYGenerator(connector.getResponseBuffers(), endPoint, stream), new HTTPSPDYRequest()); - ((HTTPSPDYRequest)getRequest()).setConnection(this); - getParser().setPersistent(true); - } - - @Override - public Connection handle() throws IOException - { - return this; - } - - public void startRequest(Buffer method, Buffer uri, Buffer version) throws IOException - { - super.startRequest(method, uri, version); - } - - public void parsedHeader(Buffer name, Buffer value) throws IOException - { - super.parsedHeader(name, value); - } - - public void headerComplete() throws IOException - { - super.headerComplete(); - } - - public void content(Buffer buffer) throws IOException - { - super.content(buffer); - } - - public void messageComplete(long contentLength) throws IOException - { - super.messageComplete(contentLength); - } } private class HTTPSPDYAsyncEndPoint extends EmptyAsyncEndPoint @@ -357,86 +193,4 @@ public class HTTPOverSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectio this.stream = stream; } } - - /** - * Empty implementation, since it won't parse anything - */ - private class HTTPSPDYParserHandler extends HttpParser.EventHandler - { - @Override - public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException - { - } - - @Override - public void content(Buffer ref) throws IOException - { - } - - @Override - public void startResponse(Buffer version, int status, Buffer reason) throws IOException - { - } - } - - private class HTTPSPDYGenerator extends HttpGenerator - { - private final Stream stream; - - private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint, Stream stream) - { - super(buffers, endPoint); - this.stream = stream; - } - - @Override - public void send1xx(int code) throws IOException - { - Headers headers = new Headers(); - headers.put("status", String.valueOf(code)); - headers.put("version", "HTTP/1.1"); - stream.reply(new ReplyInfo(headers, false)); - } - - @Override - public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException - { - Headers headers = new Headers(); - StringBuilder status = new StringBuilder().append(_status); - if (_reason != null) - status.append(" ").append(_reason.toString("UTF-8")); - headers.put("status", status.toString()); - headers.put("version", "HTTP/1.1"); - for (int i = 0; i < fields.size(); ++i) - { - HttpFields.Field field = fields.getField(i); - headers.put(field.getName(), field.getValue()); - } - stream.reply(new ReplyInfo(headers, allContentAdded)); - } - - @Override - public void addContent(Buffer content, boolean last) throws IOException - { - // TODO - System.out.println("SIMON"); - } - - @Override - public void complete() throws IOException - { - // Nothing to do - } - } - - /** - * Needed only to please the compiler - */ - private class HTTPSPDYRequest extends Request - { - private void setConnection(HTTPSPDYConnection connection) - { - super.setConnection(connection); - } - } } diff --git a/src/main/java/org/eclipse/jetty/spdy/nio/http/HTTPSPDYAsyncConnection.java b/src/main/java/org/eclipse/jetty/spdy/nio/http/HTTPSPDYAsyncConnection.java new file mode 100644 index 00000000000..2b3733dc150 --- /dev/null +++ b/src/main/java/org/eclipse/jetty/spdy/nio/http/HTTPSPDYAsyncConnection.java @@ -0,0 +1,412 @@ +/* + * Copyright (c) 2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.eclipse.jetty.spdy.nio.http; + +import java.io.EOFException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpException; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpGenerator; +import org.eclipse.jetty.http.HttpParser; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.Buffers; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.nio.AsyncConnection; +import org.eclipse.jetty.io.nio.DirectNIOBuffer; +import org.eclipse.jetty.io.nio.IndirectNIOBuffer; +import org.eclipse.jetty.io.nio.NIOBuffer; +import org.eclipse.jetty.server.AbstractHttpConnection; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.ReplyInfo; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.nio.SPDYAsyncConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection +{ + private static final Logger logger = LoggerFactory.getLogger(HTTPSPDYAsyncConnection.class); + private final SPDYAsyncConnection connection; + private final Stream stream; + private volatile State state = State.INITIAL; + private volatile NIOBuffer buffer; + + public HTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, SPDYAsyncConnection connection, Stream stream) + { + super(connector, endPoint, server); + this.connection = connection; + this.stream = stream; + getParser().setPersistent(true); + } + + @Override + protected HttpParser newHttpParser(Buffers requestBuffers, EndPoint endPoint, HttpParser.EventHandler requestHandler) + { + return new HTTPSPDYParser(requestBuffers, endPoint); + } + + @Override + protected HttpGenerator newHttpGenerator(Buffers responseBuffers, EndPoint endPoint) + { + return new HTTPSPDYGenerator(responseBuffers, endPoint); + } + + @Override + public AsyncEndPoint getEndPoint() + { + return (AsyncEndPoint)super.getEndPoint(); + } + + @Override + public Connection handle() throws IOException + { + return this; + } + + @Override + public void onInputShutdown() throws IOException + { + // TODO + } + + public void beginRequest(Headers headers) throws IOException + { + switch (state) + { + case INITIAL: + { + if (!headers.isEmpty()) + { + Headers.Header method = headers.get("method"); + Headers.Header uri = headers.get("url"); + Headers.Header version = headers.get("version"); + + if (method == null || uri == null || version == null) + throw new HttpException(HttpStatus.BAD_REQUEST_400); + + state = State.REQUEST; + + String m = method.value(); + String u = uri.value(); + String v = version.value(); + logger.debug("HTTP {} {} {}", new Object[]{m, u, v}); + startRequest(new ByteArrayBuffer(m), new ByteArrayBuffer(u), new ByteArrayBuffer(v)); + headers(headers); + } + break; + } + default: + { + throw new IllegalStateException(); + } + } + } + + public void headers(Headers headers) throws IOException + { + switch (state) + { + case INITIAL: + { + if (headers.isEmpty()) + throw new HttpException(HttpStatus.BAD_REQUEST_400); + beginRequest(headers); + break; + } + case REQUEST: + { + for (Headers.Header header : headers) + { + String name = header.name(); + switch (name) + { + case "method": + case "version": + // Skip request line headers + continue; + case "url": + // Mangle the URL if the host header is missing + String host = parseHost(header.value()); + // Jetty needs the host header, although HTTP 1.1 does not + // require it if it can be parsed from an absolute URI + if (host != null) + parsedHeader(new ByteArrayBuffer("host"), new ByteArrayBuffer(host)); + break; + case "connection": + case "keep-alive": + case "host": + // Spec says to ignore these headers + continue; + default: + // Spec says headers must be single valued + String value = header.value(); + logger.debug("HTTP {}: {}", name, value); + parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value)); + break; + } + } + break; + } + } + } + + public void content(ByteBuffer byteBuffer, boolean endRequest) throws IOException + { + switch (state) + { + case REQUEST: + { + state = endRequest ? State.FINAL : State.CONTENT; + buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false); + logger.debug("Accumulated first {} content bytes", byteBuffer.remaining()); + headerComplete(); + content(buffer); + break; + } + case CONTENT: + { + if (endRequest) + state = State.FINAL; + buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false); + logger.debug("Accumulated {} content bytes", byteBuffer.remaining()); + content(buffer); + break; + } + default: + { + throw new IllegalStateException(); + } + } + } + + private Buffer consumeContent(long maxIdleTime) throws IOException + { + switch (state) + { + case CONTENT: + { + Buffer buffer = this.buffer; + logger.debug("Consuming {} content bytes", buffer.length()); + if (buffer.length() > 0) + return buffer; + + while (true) + { + // We read and parse more bytes; this may change the state + // (for example to FINAL state) and change the buffer field + connection.fill(); + + if (state != State.CONTENT) + { + return consumeContent(maxIdleTime); + } + + // Read again the buffer field, it may have changed by fill() above + buffer = this.buffer; + logger.debug("Consuming {} content bytes", buffer.length()); + if (buffer.length() > 0) + return buffer; + + // Wait for content + logger.debug("Waiting {} ms for content bytes", maxIdleTime); + long begin = System.nanoTime(); + boolean expired = !connection.getEndPoint().blockReadable(maxIdleTime); + if (expired) + { + stream.getSession().goAway(stream.getVersion()); + throw new EOFException("read timeout"); + } + logger.debug("Waited {} ms for content bytes", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin)); + } + } + case FINAL: + { + Buffer buffer = this.buffer; + logger.debug("Consuming {} content bytes", buffer.length()); + if (buffer.length() > 0) + return buffer; + return null; + } + default: + { + throw new IllegalStateException(); + } + } + } + + public void endRequest() throws IOException + { + switch (state) + { + case REQUEST: + { + state = State.FINAL; + headerComplete(); + endRequest(); + break; + } + case FINAL: + { + messageComplete(0); + break; + } + default: + { + throw new IllegalStateException(); + } + } + } + + private String parseHost(String url) + { + try + { + URI uri = new URI(url); + return uri.getHost() + (uri.getPort() > 0 ? ":" + uri.getPort() : ""); + } + catch (URISyntaxException x) + { + return null; + } + } + + private enum State + { + INITIAL, REQUEST, CONTENT, FINAL + } + + /** + * Needed in order to override parser methods that read content. + * TODO: DESIGN: having the parser to block for content is messy, since the + * TODO: DESIGN: state machine for that should be in the connection/interpreter + */ + private class HTTPSPDYParser extends HttpParser + { + public HTTPSPDYParser(Buffers buffers, EndPoint endPoint) + { + super(buffers, endPoint, new HTTPSPDYParserHandler()); + } + + @Override + public Buffer blockForContent(long maxIdleTime) throws IOException + { + return consumeContent(maxIdleTime); + } + + @Override + public int available() throws IOException + { + return super.available(); + } + } + + /** + * Empty implementation, since it won't parse anything + */ + private static class HTTPSPDYParserHandler extends HttpParser.EventHandler + { + @Override + public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException + { + } + + @Override + public void content(Buffer ref) throws IOException + { + } + + @Override + public void startResponse(Buffer version, int status, Buffer reason) throws IOException + { + } + } + + /** + * Needed in order to override generator methods that would generate HTTP, + * since we must generate SPDY instead. + */ + private class HTTPSPDYGenerator extends HttpGenerator + { + private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint) + { + super(buffers, endPoint); + } + + @Override + public void send1xx(int code) throws IOException + { + Headers headers = new Headers(); + headers.put("status", String.valueOf(code)); + headers.put("version", "HTTP/1.1"); + stream.reply(new ReplyInfo(headers, false)); + } + + @Override + public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException + { + Headers headers = new Headers(); + StringBuilder status = new StringBuilder().append(_status); + if (_reason != null) + status.append(" ").append(_reason.toString("UTF-8")); + headers.put("status", status.toString()); + headers.put("version", "HTTP/1.1"); + for (int i = 0; i < fields.size(); ++i) + { + HttpFields.Field field = fields.getField(i); + headers.put(field.getName(), field.getValue()); + } + stream.reply(new ReplyInfo(headers, allContentAdded)); + } + + @Override + public void addContent(Buffer content, boolean last) throws IOException + { + // TODO + System.out.println("SIMON"); + } + + @Override + public void complete() throws IOException + { + // Nothing to do + } + } + + /** + * Needed only to please the compiler + */ + private static class HTTPSPDYRequest extends Request + { + private void setConnection(HTTPSPDYAsyncConnection connection) + { + super.setConnection(connection); + } + } +} + diff --git a/src/main/java/org/eclipse/jetty/spdy/parser/Parser.java b/src/main/java/org/eclipse/jetty/spdy/parser/Parser.java index 6da585a8cce..0478de884d2 100644 --- a/src/main/java/org/eclipse/jetty/spdy/parser/Parser.java +++ b/src/main/java/org/eclipse/jetty/spdy/parser/Parser.java @@ -130,6 +130,7 @@ public class Parser { try { + logger.debug("Parsing {} bytes", buffer.remaining()); while (buffer.hasRemaining()) { switch (state) diff --git a/src/test/java/org/eclipse/jetty/spdy/nio/http/HTTPOverSPDYTest.java b/src/test/java/org/eclipse/jetty/spdy/nio/http/HTTPOverSPDYTest.java index 2a4be87882f..18e39b1645c 100644 --- a/src/test/java/org/eclipse/jetty/spdy/nio/http/HTTPOverSPDYTest.java +++ b/src/test/java/org/eclipse/jetty/spdy/nio/http/HTTPOverSPDYTest.java @@ -134,6 +134,7 @@ public class HTTPOverSPDYTest throws IOException, ServletException { request.setHandled(true); + Assert.assertEquals("GET", httpRequest.getMethod()); Assert.assertEquals(path, target); Assert.assertEquals(path, httpRequest.getRequestURI()); Assert.assertEquals("localhost:" + connector.getLocalPort(), httpRequest.getHeader("host")); @@ -175,6 +176,7 @@ public class HTTPOverSPDYTest throws IOException, ServletException { request.setHandled(true); + Assert.assertEquals("GET", httpRequest.getMethod()); Assert.assertEquals(path, target); Assert.assertEquals(path, httpRequest.getRequestURI()); Assert.assertEquals(query, httpRequest.getQueryString()); @@ -201,4 +203,134 @@ public class HTTPOverSPDYTest Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testHEAD() throws Exception + { + final String path = "/foo"; + final CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + request.setHandled(true); + Assert.assertEquals("HEAD", httpRequest.getMethod()); + Assert.assertEquals(path, target); + Assert.assertEquals(path, httpRequest.getRequestURI()); + handlerLatch.countDown(); + } + }, null); + + Headers headers = new Headers(); + headers.put("method", "HEAD"); + headers.put("url", "http://localhost:" + connector.getLocalPort() + path); + headers.put("version", "HTTP/1.1"); + final CountDownLatch replyLatch = new CountDownLatch(1); + session.syn(SPDY.V2, new SynInfo(headers, true), new Stream.FrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Assert.assertTrue(replyInfo.isClose()); + Headers replyHeaders = replyInfo.getHeaders(); + Assert.assertTrue(replyHeaders.get("status").value().contains("200")); + replyLatch.countDown(); + } + }); + Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testPOSTWithParameters() throws Exception + { + final String path = "/foo"; + final String data = "a=1&b=2"; + final CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + request.setHandled(true); + Assert.assertEquals("POST", httpRequest.getMethod()); + Assert.assertEquals("1", httpRequest.getParameter("a")); + Assert.assertEquals("2", httpRequest.getParameter("b")); + handlerLatch.countDown(); + } + }, null); + + Headers headers = new Headers(); + headers.put("method", "POST"); + headers.put("url", "http://localhost:" + connector.getLocalPort() + path); + headers.put("version", "HTTP/1.1"); + headers.put("content-type", "application/x-www-form-urlencoded"); + final CountDownLatch replyLatch = new CountDownLatch(1); + Stream stream = session.syn(SPDY.V2, new SynInfo(headers, false), new Stream.FrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Assert.assertTrue(replyInfo.isClose()); + Headers replyHeaders = replyInfo.getHeaders(); + Assert.assertTrue(replyHeaders.get("status").value().contains("200")); + replyLatch.countDown(); + } + }); + stream.data(new StringDataInfo(data, true)); + + Assert.assertTrue(handlerLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testPOSTWithParametersInTwoFrames() throws Exception + { + final String path = "/foo"; + final String data1 = "a=1&"; + final String data2 = "b=2"; + final CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + request.setHandled(true); + Assert.assertEquals("POST", httpRequest.getMethod()); + Assert.assertEquals("1", httpRequest.getParameter("a")); + Assert.assertEquals("2", httpRequest.getParameter("b")); + handlerLatch.countDown(); + } + }, null); + + Headers headers = new Headers(); + headers.put("method", "POST"); + headers.put("url", "http://localhost:" + connector.getLocalPort() + path); + headers.put("version", "HTTP/1.1"); + headers.put("content-type", "application/x-www-form-urlencoded"); + final CountDownLatch replyLatch = new CountDownLatch(1); + Stream stream = session.syn(SPDY.V2, new SynInfo(headers, false), new Stream.FrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Assert.assertTrue(replyInfo.isClose()); + Headers replyHeaders = replyInfo.getHeaders(); + Assert.assertTrue(replyHeaders.get("status").value().contains("200")); + replyLatch.countDown(); + } + }); + stream.data(new StringDataInfo(data1, false)); + + Thread.sleep(1000); + + stream.data(new StringDataInfo(data2, true)); + + Assert.assertTrue(handlerLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(replyLatch.await(500, TimeUnit.SECONDS)); + } }