Updated HTTP layer to invoke the application asynchronously.

This commit is contained in:
Simone Bordet 2012-03-02 11:45:06 +01:00
parent 0161f780c9
commit 94742d3e94
3 changed files with 169 additions and 167 deletions

View File

@ -18,9 +18,12 @@ package org.eclipse.jetty.spdy.http;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.http.HttpException;
@ -43,6 +46,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.SPDYAsyncConnection; import org.eclipse.jetty.spdy.SPDYAsyncConnection;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.Stream;
@ -52,14 +56,17 @@ import org.slf4j.LoggerFactory;
public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection
{ {
private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnection.class); private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnection.class);
private final Queue<Runnable> queue = new LinkedList<>();
private final Queue<Runnable> tasks = new LinkedList<>();
private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
private final SPDYAsyncConnection connection; private final SPDYAsyncConnection connection;
private final Stream stream; private final Stream stream;
private Headers headers; private Headers headers; // No need for volatile, guarded by state
private NIOBuffer buffer; private DataInfo dataInfo; // No need for volatile, guarded by state
private boolean complete; private NIOBuffer buffer; // No need for volatile, guarded by state
private boolean complete; // No need for volatile, guarded by state
private volatile State state = State.INITIAL; private volatile State state = State.INITIAL;
private boolean dispatched; private boolean dispatched; // Guarded by synchronization on tasks
public ServerHTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, SPDYAsyncConnection connection, Stream stream) public ServerHTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, SPDYAsyncConnection connection, Stream stream)
{ {
@ -87,24 +94,24 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
return (AsyncEndPoint)super.getEndPoint(); return (AsyncEndPoint)super.getEndPoint();
} }
public void post(Runnable task) private void post(Runnable task)
{ {
synchronized (queue) synchronized (tasks)
{ {
logger.debug("Posting task {}", task); logger.debug("Posting task {}", task);
queue.offer(task); tasks.offer(task);
dispatch(); dispatch();
} }
} }
private void dispatch() private void dispatch()
{ {
synchronized (queue) synchronized (tasks)
{ {
if (dispatched) if (dispatched)
return; return;
final Runnable task = queue.poll(); final Runnable task = tasks.poll();
if (task != null) if (task != null)
{ {
dispatched = true; dispatched = true;
@ -126,7 +133,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
} }
@Override @Override
public Connection handle() throws IOException public Connection handle()
{ {
setCurrentConnection(this); setCurrentConnection(this);
try try
@ -198,13 +205,12 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
case CONTENT: case CONTENT:
{ {
final Buffer buffer = this.buffer; final Buffer buffer = this.buffer;
if (buffer.length() > 0) if (buffer != null && buffer.length() > 0)
content(buffer); content(buffer);
break; break;
} }
case FINAL: case FINAL:
{ {
// TODO: compute content-length parameter
messageComplete(0); messageComplete(0);
break; break;
} }
@ -215,40 +221,79 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
} }
return this; return this;
} }
catch (HttpException x)
{
respond(stream, x.getStatus());
return this;
}
catch (IOException x)
{
close(stream);
return this;
}
finally finally
{ {
setCurrentConnection(null); setCurrentConnection(null);
} }
} }
private void respond(Stream stream, int status)
{
Headers headers = new Headers();
headers.put("status", String.valueOf(status));
headers.put("version", "HTTP/1.1");
stream.reply(new ReplyInfo(headers, true));
}
private void close(Stream stream)
{
stream.getSession().goAway();
}
@Override @Override
public void onInputShutdown() throws IOException public void onInputShutdown() throws IOException
{ {
// TODO
} }
public void beginRequest(Headers headers) throws IOException public void beginRequest(final Headers headers)
{
this.headers = headers.isEmpty() ? null : headers;
post(new Runnable()
{
@Override
public void run()
{ {
if (!headers.isEmpty()) if (!headers.isEmpty())
{
this.headers = headers;
state = State.REQUEST; state = State.REQUEST;
}
handle(); handle();
} }
});
}
public void headers(Headers headers) throws IOException public void headers(Headers headers)
{ {
this.headers = headers; this.headers = headers;
post(new Runnable()
{
@Override
public void run()
{
state = state == State.INITIAL ? State.REQUEST : State.HEADERS; state = state == State.INITIAL ? State.REQUEST : State.HEADERS;
handle(); handle();
} }
});
}
public void content(ByteBuffer byteBuffer, boolean endRequest) throws IOException public void content(final DataInfo dataInfo, boolean endRequest)
{ {
buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false); dataInfos.offer(dataInfo);
complete = endRequest; complete = endRequest;
logger.debug("HTTP > {} bytes of content", buffer.length()); post(new Runnable()
{
@Override
public void run()
{
logger.debug("HTTP > {} bytes of content", dataInfo.length());
if (state == State.HEADERS) if (state == State.HEADERS)
{ {
state = State.HEADERS_COMPLETE; state = State.HEADERS_COMPLETE;
@ -257,8 +302,14 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
state = State.CONTENT; state = State.CONTENT;
handle(); handle();
} }
});
}
public void endRequest() throws IOException public void endRequest()
{
post(new Runnable()
{
public void run()
{ {
if (state == State.HEADERS) if (state == State.HEADERS)
{ {
@ -268,42 +319,58 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
state = State.FINAL; state = State.FINAL;
handle(); handle();
} }
});
}
private Buffer consumeContent(long maxIdleTime) throws IOException private Buffer consumeContent(long maxIdleTime) throws IOException, InterruptedException
{ {
boolean filled = false;
while (true) while (true)
{ {
// Volatile read to ensure visibility
State state = this.state; State state = this.state;
if (state != State.HEADERS_COMPLETE && state != State.CONTENT && state != State.FINAL) if (state != State.HEADERS_COMPLETE && state != State.CONTENT && state != State.FINAL)
throw new IllegalStateException(); throw new IllegalStateException();
Buffer buffer = this.buffer; if (buffer != null)
logger.debug("Consuming {} content bytes", buffer.length()); {
if (buffer.length() > 0) if (buffer.length() > 0)
return buffer; {
logger.debug("Consuming content bytes, {} available", buffer.length());
if (complete) return buffer;
return null; }
else
if (filled) {
// The application has consumed the buffer, so consume also the DataInfo
if (dataInfo.consumed() == 0)
dataInfo.consume(dataInfo.length());
dataInfo = null;
buffer = null;
if (complete && dataInfos.isEmpty())
return null;
// Loop to get content bytes from DataInfos
}
}
else
{ {
// Wait for content
logger.debug("Waiting at most {} ms for content bytes", maxIdleTime); logger.debug("Waiting at most {} ms for content bytes", maxIdleTime);
long begin = System.nanoTime(); long begin = System.nanoTime();
boolean expired = !connection.getEndPoint().blockReadable(maxIdleTime); dataInfo = dataInfos.poll(maxIdleTime, TimeUnit.MILLISECONDS);
if (expired) long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
logger.debug("Waited {} ms for content bytes", elapsed);
if (dataInfo != null)
{
// Only consume if it's the last DataInfo
boolean consume = dataInfos.isEmpty() && complete;
ByteBuffer byteBuffer = dataInfo.asByteBuffer(consume);
buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
// Loop to return the buffer
}
else
{ {
stream.getSession().goAway(); stream.getSession().goAway();
throw new EOFException("read timeout"); throw new EOFException("read timeout");
} }
logger.debug("Waited {} ms for content bytes", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin));
} }
// We need to parse more bytes; this may change the state
// therefore we need to re-read the fields
connection.fill();
filled = true;
} }
} }
@ -313,7 +380,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
State state = this.state; State state = this.state;
if (state != State.HEADERS_COMPLETE && state != State.CONTENT) if (state != State.HEADERS_COMPLETE && state != State.CONTENT)
throw new IllegalStateException(); throw new IllegalStateException();
return buffer.length(); return buffer == null ? 0 : buffer.length();
} }
@Override @Override
@ -344,8 +411,6 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
/** /**
* Needed in order to override parser methods that read content. * Needed in order to override parser methods that read content.
* TODO: DESIGN: having the parser to block for content is messy, since the
* TODO: DESIGN: state machine for that should be in the connection/interpreter
*/ */
private class HTTPSPDYParser extends HttpParser private class HTTPSPDYParser extends HttpParser
{ {
@ -356,9 +421,16 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
@Override @Override
public Buffer blockForContent(long maxIdleTime) throws IOException public Buffer blockForContent(long maxIdleTime) throws IOException
{
try
{ {
return consumeContent(maxIdleTime); return consumeContent(maxIdleTime);
} }
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
@Override @Override
public int available() throws IOException public int available() throws IOException

View File

@ -16,12 +16,9 @@
package org.eclipse.jetty.spdy.http; package org.eclipse.jetty.spdy.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.spdy.EmptyAsyncEndPoint; import org.eclipse.jetty.spdy.EmptyAsyncEndPoint;
@ -57,6 +54,8 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
private class HTTPServerFrameListener extends ServerSessionFrameListener.Adapter implements StreamFrameListener private class HTTPServerFrameListener extends ServerSessionFrameListener.Adapter implements StreamFrameListener
{ {
private static final String CONNECTION_ATTRIBUTE = "org.eclipse.jetty.spdy.http.connection";
private final AsyncEndPoint endPoint; private final AsyncEndPoint endPoint;
public HTTPServerFrameListener(AsyncEndPoint endPoint) public HTTPServerFrameListener(AsyncEndPoint endPoint)
@ -72,47 +71,34 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
// and this is very different from HTTP, where only one request/response // and this is very different from HTTP, where only one request/response
// cycle is processed at a time, so we need to fake an http connection // cycle is processed at a time, so we need to fake an http connection
// for each SYN in order to run concurrently. // for each SYN in order to run concurrently.
// Furthermore, in order to avoid that one "slow" SYN blocks all other
// SYNs that may be processed concurrently (for example when the
// application is waiting for a JDBC connection), we dispatch to a new
// thread when invoking the fake connection (that will call the application).
// Dispatching must be ordered to avoid that client's data frames are
// processed out of order.
logger.debug("Received {} on {}", synInfo, stream); logger.debug("Received {} on {}", synInfo, stream);
final ServerHTTPSPDYAsyncConnection connection = new ServerHTTPSPDYAsyncConnection(connector, ServerHTTPSPDYAsyncConnection connection = new ServerHTTPSPDYAsyncConnection(connector,
new EmptyAsyncEndPoint(), connector.getServer(), new EmptyAsyncEndPoint(), connector.getServer(),
(SPDYAsyncConnection)endPoint.getConnection(), stream); (SPDYAsyncConnection)endPoint.getConnection(), stream);
stream.setAttribute("connection", connection); stream.setAttribute(CONNECTION_ATTRIBUTE, connection);
final Headers headers = synInfo.getHeaders();
final boolean isClose = synInfo.isClose();
// If the SYN has no headers, they may come later in a HEADERS frame
StreamFrameListener result = headers.isEmpty() || !isClose ? this : null;
connection.post(new Runnable() Headers headers = synInfo.getHeaders();
{
@Override
public void run()
{
try
{
connection.beginRequest(headers); connection.beginRequest(headers);
if (isClose)
connection.endRequest();
}
catch (HttpException x)
{
respond(stream, x.getStatus());
}
catch (IOException x)
{
close(stream);
}
}
});
return result; if (headers.isEmpty())
{
// If the SYN has no headers, they may come later in a HEADERS frame
return this;
}
else
{
if (synInfo.isClose())
{
connection.endRequest();
return null;
}
else
{
return this;
}
}
} }
@Override @Override
@ -122,79 +108,23 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
} }
@Override @Override
public void onHeaders(final Stream stream, final HeadersInfo headersInfo) public void onHeaders(Stream stream, HeadersInfo headersInfo)
{ {
logger.debug("Received {} on {}", headersInfo, stream); logger.debug("Received {} on {}", headersInfo, stream);
ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute(CONNECTION_ATTRIBUTE);
final ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute("connection"); connection.headers(headersInfo.getHeaders());
final Headers headers = headersInfo.getHeaders(); if (headersInfo.isClose())
final boolean isClose = headersInfo.isClose();
connection.post(new Runnable()
{
@Override
public void run()
{
try
{
connection.headers(headers);
if (isClose)
connection.endRequest(); connection.endRequest();
} }
catch (HttpException x)
{
respond(stream, x.getStatus());
}
catch (IOException x)
{
close(stream);
}
}
});
}
@Override @Override
public void onData(final Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
logger.debug("Received {} on {}", dataInfo, stream); logger.debug("Received {} on {}", dataInfo, stream);
ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute(CONNECTION_ATTRIBUTE);
final ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute("connection"); connection.content(dataInfo, dataInfo.isClose());
final ByteBuffer buffer = dataInfo.asByteBuffer(true); if (dataInfo.isClose())
final boolean isClose = dataInfo.isClose();
connection.post(new Runnable()
{
public void run()
{
try
{
connection.content(buffer, isClose);
if (isClose)
connection.endRequest(); connection.endRequest();
} }
catch (HttpException x)
{
respond(stream, x.getStatus());
}
catch (IOException x)
{
close(stream);
}
}
});
}
private void respond(Stream stream, int status)
{
Headers headers = new Headers();
headers.put("status", String.valueOf(status));
headers.put("version", "HTTP/1.1");
stream.reply(new ReplyInfo(headers, true));
}
private void close(Stream stream)
{
stream.getSession().goAway();
}
} }
} }

View File

@ -296,7 +296,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
{ {
Assert.assertTrue(replyInfo.isClose()); Assert.assertTrue(replyInfo.isClose());
Headers replyHeaders = replyInfo.getHeaders(); Headers replyHeaders = replyInfo.getHeaders();
Assert.assertTrue(replyHeaders.get("status").value().contains("200")); Assert.assertTrue(replyHeaders.toString(), replyHeaders.get("status").value().contains("200"));
replyLatch.countDown(); replyLatch.countDown();
} }
}).get(5, TimeUnit.SECONDS); }).get(5, TimeUnit.SECONDS);
@ -514,7 +514,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
contentBytes.addAndGet(dataInfo.available()); contentBytes.addAndGet(dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available())));
if (dataInfo.isClose()) if (dataInfo.isClose())
{ {
Assert.assertEquals(data.length, contentBytes.get()); Assert.assertEquals(data.length, contentBytes.get());
@ -571,7 +571,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
contentBytes.addAndGet(dataInfo.available()); contentBytes.addAndGet(dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available())));
if (dataInfo.isClose()) if (dataInfo.isClose())
{ {
Assert.assertEquals(2 * data.length, contentBytes.get()); Assert.assertEquals(2 * data.length, contentBytes.get());