From 94742d3e94347c5dc6f59b38686a9efd5b17c505 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 2 Mar 2012 11:45:06 +0100 Subject: [PATCH] Updated HTTP layer to invoke the application asynchronously. --- .../http/ServerHTTPSPDYAsyncConnection.java | 198 ++++++++++++------ .../ServerHTTPSPDYAsyncConnectionFactory.java | 132 +++--------- .../jetty/spdy/http/ServerHTTPSPDYTest.java | 6 +- 3 files changed, 169 insertions(+), 167 deletions(-) diff --git a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java index d533c02e9ea..7d0fbbcb989 100644 --- a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java +++ b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java @@ -18,9 +18,12 @@ package org.eclipse.jetty.spdy.http; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.http.HttpException; @@ -43,6 +46,7 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.spdy.SPDYAsyncConnection; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; +import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.Stream; @@ -52,14 +56,17 @@ import org.slf4j.LoggerFactory; public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection { private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnection.class); - private final Queue queue = new LinkedList<>(); + + private final Queue tasks = new LinkedList<>(); + private final BlockingQueue dataInfos = new LinkedBlockingQueue<>(); private final SPDYAsyncConnection connection; private final Stream stream; - private Headers headers; - private NIOBuffer buffer; - private boolean complete; + private Headers headers; // No need for volatile, guarded by state + private DataInfo dataInfo; // No need for volatile, guarded by state + private NIOBuffer buffer; // No need for volatile, guarded by state + private boolean complete; // No need for volatile, guarded by state private volatile State state = State.INITIAL; - private boolean dispatched; + private boolean dispatched; // Guarded by synchronization on tasks public ServerHTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, SPDYAsyncConnection connection, Stream stream) { @@ -87,24 +94,24 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem return (AsyncEndPoint)super.getEndPoint(); } - public void post(Runnable task) + private void post(Runnable task) { - synchronized (queue) + synchronized (tasks) { logger.debug("Posting task {}", task); - queue.offer(task); + tasks.offer(task); dispatch(); } } private void dispatch() { - synchronized (queue) + synchronized (tasks) { if (dispatched) return; - final Runnable task = queue.poll(); + final Runnable task = tasks.poll(); if (task != null) { dispatched = true; @@ -126,7 +133,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem } @Override - public Connection handle() throws IOException + public Connection handle() { setCurrentConnection(this); try @@ -198,13 +205,12 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem case CONTENT: { final Buffer buffer = this.buffer; - if (buffer.length() > 0) + if (buffer != null && buffer.length() > 0) content(buffer); break; } case FINAL: { - // TODO: compute content-length parameter messageComplete(0); break; } @@ -215,95 +221,156 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem } return this; } + catch (HttpException x) + { + respond(stream, x.getStatus()); + return this; + } + catch (IOException x) + { + close(stream); + return this; + } finally { setCurrentConnection(null); } } + private void respond(Stream stream, int status) + { + Headers headers = new Headers(); + headers.put("status", String.valueOf(status)); + headers.put("version", "HTTP/1.1"); + stream.reply(new ReplyInfo(headers, true)); + } + + private void close(Stream stream) + { + stream.getSession().goAway(); + } + @Override public void onInputShutdown() throws IOException { - // TODO } - public void beginRequest(Headers headers) throws IOException + public void beginRequest(final Headers headers) { - if (!headers.isEmpty()) + this.headers = headers.isEmpty() ? null : headers; + post(new Runnable() { - this.headers = headers; - state = State.REQUEST; - } - handle(); + @Override + public void run() + { + if (!headers.isEmpty()) + state = State.REQUEST; + handle(); + } + }); } - public void headers(Headers headers) throws IOException + public void headers(Headers headers) { this.headers = headers; - state = state == State.INITIAL ? State.REQUEST : State.HEADERS; - handle(); + post(new Runnable() + { + @Override + public void run() + { + state = state == State.INITIAL ? State.REQUEST : State.HEADERS; + handle(); + } + }); } - public void content(ByteBuffer byteBuffer, boolean endRequest) throws IOException + public void content(final DataInfo dataInfo, boolean endRequest) { - buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false); + dataInfos.offer(dataInfo); complete = endRequest; - logger.debug("HTTP > {} bytes of content", buffer.length()); - if (state == State.HEADERS) + post(new Runnable() { - state = State.HEADERS_COMPLETE; - handle(); - } - state = State.CONTENT; - handle(); + @Override + public void run() + { + logger.debug("HTTP > {} bytes of content", dataInfo.length()); + if (state == State.HEADERS) + { + state = State.HEADERS_COMPLETE; + handle(); + } + state = State.CONTENT; + handle(); + } + }); } - public void endRequest() throws IOException + public void endRequest() { - if (state == State.HEADERS) + post(new Runnable() { - state = State.HEADERS_COMPLETE; - handle(); - } - state = State.FINAL; - handle(); + public void run() + { + if (state == State.HEADERS) + { + state = State.HEADERS_COMPLETE; + handle(); + } + state = State.FINAL; + handle(); + } + }); } - private Buffer consumeContent(long maxIdleTime) throws IOException + private Buffer consumeContent(long maxIdleTime) throws IOException, InterruptedException { - boolean filled = false; while (true) { + // Volatile read to ensure visibility State state = this.state; if (state != State.HEADERS_COMPLETE && state != State.CONTENT && state != State.FINAL) throw new IllegalStateException(); - Buffer buffer = this.buffer; - logger.debug("Consuming {} content bytes", buffer.length()); - if (buffer.length() > 0) - return buffer; - - if (complete) - return null; - - if (filled) + if (buffer != null) + { + if (buffer.length() > 0) + { + logger.debug("Consuming content bytes, {} available", buffer.length()); + return buffer; + } + else + { + // The application has consumed the buffer, so consume also the DataInfo + if (dataInfo.consumed() == 0) + dataInfo.consume(dataInfo.length()); + dataInfo = null; + buffer = null; + if (complete && dataInfos.isEmpty()) + return null; + // Loop to get content bytes from DataInfos + } + } + else { - // Wait for content logger.debug("Waiting at most {} ms for content bytes", maxIdleTime); long begin = System.nanoTime(); - boolean expired = !connection.getEndPoint().blockReadable(maxIdleTime); - if (expired) + dataInfo = dataInfos.poll(maxIdleTime, TimeUnit.MILLISECONDS); + long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin); + logger.debug("Waited {} ms for content bytes", elapsed); + if (dataInfo != null) + { + // Only consume if it's the last DataInfo + boolean consume = dataInfos.isEmpty() && complete; + ByteBuffer byteBuffer = dataInfo.asByteBuffer(consume); + buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false); + // Loop to return the buffer + } + else { stream.getSession().goAway(); throw new EOFException("read timeout"); } - logger.debug("Waited {} ms for content bytes", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin)); } - - // We need to parse more bytes; this may change the state - // therefore we need to re-read the fields - connection.fill(); - filled = true; } } @@ -313,7 +380,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem State state = this.state; if (state != State.HEADERS_COMPLETE && state != State.CONTENT) throw new IllegalStateException(); - return buffer.length(); + return buffer == null ? 0 : buffer.length(); } @Override @@ -344,8 +411,6 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem /** * Needed in order to override parser methods that read content. - * TODO: DESIGN: having the parser to block for content is messy, since the - * TODO: DESIGN: state machine for that should be in the connection/interpreter */ private class HTTPSPDYParser extends HttpParser { @@ -357,7 +422,14 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem @Override public Buffer blockForContent(long maxIdleTime) throws IOException { - return consumeContent(maxIdleTime); + try + { + return consumeContent(maxIdleTime); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } } @Override diff --git a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java index a61f77bab35..d2318aac7d7 100644 --- a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java +++ b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java @@ -16,12 +16,9 @@ package org.eclipse.jetty.spdy.http; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.spdy.EmptyAsyncEndPoint; @@ -57,6 +54,8 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect private class HTTPServerFrameListener extends ServerSessionFrameListener.Adapter implements StreamFrameListener { + private static final String CONNECTION_ATTRIBUTE = "org.eclipse.jetty.spdy.http.connection"; + private final AsyncEndPoint endPoint; public HTTPServerFrameListener(AsyncEndPoint endPoint) @@ -72,47 +71,34 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect // and this is very different from HTTP, where only one request/response // cycle is processed at a time, so we need to fake an http connection // for each SYN in order to run concurrently. - // Furthermore, in order to avoid that one "slow" SYN blocks all other - // SYNs that may be processed concurrently (for example when the - // application is waiting for a JDBC connection), we dispatch to a new - // thread when invoking the fake connection (that will call the application). - // Dispatching must be ordered to avoid that client's data frames are - // processed out of order. logger.debug("Received {} on {}", synInfo, stream); - final ServerHTTPSPDYAsyncConnection connection = new ServerHTTPSPDYAsyncConnection(connector, + ServerHTTPSPDYAsyncConnection connection = new ServerHTTPSPDYAsyncConnection(connector, new EmptyAsyncEndPoint(), connector.getServer(), (SPDYAsyncConnection)endPoint.getConnection(), stream); - stream.setAttribute("connection", connection); - final Headers headers = synInfo.getHeaders(); - final boolean isClose = synInfo.isClose(); - // If the SYN has no headers, they may come later in a HEADERS frame - StreamFrameListener result = headers.isEmpty() || !isClose ? this : null; + stream.setAttribute(CONNECTION_ATTRIBUTE, connection); - connection.post(new Runnable() + Headers headers = synInfo.getHeaders(); + connection.beginRequest(headers); + + if (headers.isEmpty()) { - @Override - public void run() + // If the SYN has no headers, they may come later in a HEADERS frame + return this; + } + else + { + if (synInfo.isClose()) { - try - { - connection.beginRequest(headers); - if (isClose) - connection.endRequest(); - } - catch (HttpException x) - { - respond(stream, x.getStatus()); - } - catch (IOException x) - { - close(stream); - } + connection.endRequest(); + return null; } - }); - - return result; + else + { + return this; + } + } } @Override @@ -122,79 +108,23 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect } @Override - public void onHeaders(final Stream stream, final HeadersInfo headersInfo) + public void onHeaders(Stream stream, HeadersInfo headersInfo) { logger.debug("Received {} on {}", headersInfo, stream); - - final ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute("connection"); - final Headers headers = headersInfo.getHeaders(); - final boolean isClose = headersInfo.isClose(); - - connection.post(new Runnable() - { - @Override - public void run() - { - try - { - connection.headers(headers); - if (isClose) - connection.endRequest(); - } - catch (HttpException x) - { - respond(stream, x.getStatus()); - } - catch (IOException x) - { - close(stream); - } - } - }); + ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute(CONNECTION_ATTRIBUTE); + connection.headers(headersInfo.getHeaders()); + if (headersInfo.isClose()) + connection.endRequest(); } @Override - public void onData(final Stream stream, DataInfo dataInfo) + public void onData(Stream stream, DataInfo dataInfo) { logger.debug("Received {} on {}", dataInfo, stream); - - final ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute("connection"); - final ByteBuffer buffer = dataInfo.asByteBuffer(true); - final boolean isClose = dataInfo.isClose(); - - connection.post(new Runnable() - { - public void run() - { - try - { - connection.content(buffer, isClose); - if (isClose) - connection.endRequest(); - } - catch (HttpException x) - { - respond(stream, x.getStatus()); - } - catch (IOException x) - { - close(stream); - } - } - }); - } - - private void respond(Stream stream, int status) - { - Headers headers = new Headers(); - headers.put("status", String.valueOf(status)); - headers.put("version", "HTTP/1.1"); - stream.reply(new ReplyInfo(headers, true)); - } - - private void close(Stream stream) - { - stream.getSession().goAway(); + ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute(CONNECTION_ATTRIBUTE); + connection.content(dataInfo, dataInfo.isClose()); + if (dataInfo.isClose()) + connection.endRequest(); } } } diff --git a/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java b/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java index 64b1aacf299..a103ec0fa96 100644 --- a/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java +++ b/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java @@ -296,7 +296,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest { Assert.assertTrue(replyInfo.isClose()); Headers replyHeaders = replyInfo.getHeaders(); - Assert.assertTrue(replyHeaders.get("status").value().contains("200")); + Assert.assertTrue(replyHeaders.toString(), replyHeaders.get("status").value().contains("200")); replyLatch.countDown(); } }).get(5, TimeUnit.SECONDS); @@ -514,7 +514,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest @Override public void onData(Stream stream, DataInfo dataInfo) { - contentBytes.addAndGet(dataInfo.available()); + contentBytes.addAndGet(dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available()))); if (dataInfo.isClose()) { Assert.assertEquals(data.length, contentBytes.get()); @@ -571,7 +571,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest @Override public void onData(Stream stream, DataInfo dataInfo) { - contentBytes.addAndGet(dataInfo.available()); + contentBytes.addAndGet(dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available()))); if (dataInfo.isClose()) { Assert.assertEquals(2 * data.length, contentBytes.get());