Implemented support for async HTTP requests over SPDY.

This commit is contained in:
Simone Bordet 2012-03-02 16:15:01 +01:00
parent 94742d3e94
commit 22f0f58062
3 changed files with 242 additions and 5 deletions

View File

@ -214,6 +214,11 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
messageComplete(0);
break;
}
case ASYNC:
{
handleRequest();
break;
}
default:
{
throw new IllegalStateException();
@ -322,6 +327,21 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
});
}
public void async()
{
post(new Runnable()
{
@Override
public void run()
{
State currentState = state;
state = State.ASYNC;
handle();
state = currentState;
}
});
}
private Buffer consumeContent(long maxIdleTime) throws IOException, InterruptedException
{
while (true)
@ -360,7 +380,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
if (dataInfo != null)
{
// Only consume if it's the last DataInfo
boolean consume = dataInfos.isEmpty() && complete;
boolean consume = complete && dataInfos.isEmpty();
ByteBuffer byteBuffer = dataInfo.asByteBuffer(consume);
buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
// Loop to return the buffer
@ -406,7 +426,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
private enum State
{
INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL
INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL, ASYNC
}
/**

View File

@ -37,7 +37,9 @@ import org.slf4j.LoggerFactory;
public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectionFactory
{
private static final String CONNECTION_ATTRIBUTE = "org.eclipse.jetty.spdy.http.connection";
private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnectionFactory.class);
private final Connector connector;
public ServerHTTPSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler, Connector connector)
@ -54,8 +56,6 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
private class HTTPServerFrameListener extends ServerSessionFrameListener.Adapter implements StreamFrameListener
{
private static final String CONNECTION_ATTRIBUTE = "org.eclipse.jetty.spdy.http.connection";
private final AsyncEndPoint endPoint;
public HTTPServerFrameListener(AsyncEndPoint endPoint)
@ -74,9 +74,11 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
logger.debug("Received {} on {}", synInfo, stream);
HTTPSPDYAsyncEndPoint asyncEndPoint = new HTTPSPDYAsyncEndPoint(stream);
ServerHTTPSPDYAsyncConnection connection = new ServerHTTPSPDYAsyncConnection(connector,
new EmptyAsyncEndPoint(), connector.getServer(),
asyncEndPoint, connector.getServer(),
(SPDYAsyncConnection)endPoint.getConnection(), stream);
asyncEndPoint.setConnection(connection);
stream.setAttribute(CONNECTION_ATTRIBUTE, connection);
Headers headers = synInfo.getHeaders();
@ -127,4 +129,21 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
connection.endRequest();
}
}
private class HTTPSPDYAsyncEndPoint extends EmptyAsyncEndPoint
{
private final Stream stream;
public HTTPSPDYAsyncEndPoint(Stream stream)
{
this.stream = stream;
}
@Override
public void asyncDispatch()
{
ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute(CONNECTION_ATTRIBUTE);
connection.async();
}
}
}

View File

@ -18,6 +18,8 @@ package org.eclipse.jetty.spdy.http;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
@ -30,8 +32,10 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.server.AsyncContext;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -957,4 +961,198 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPOSTThenSuspendRequestThenReadOneChunkThenComplete() throws Exception
{
final byte[] data = new byte[2000];
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(startHTTPServer(new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
request.setHandled(true);
final AsyncContext async = request.startAsync();
new Thread()
{
@Override
public void run()
{
try
{
InputStream input = request.getInputStream();
byte[] buffer = new byte[512];
int read = 0;
while (read < data.length)
read += input.read(buffer);
async.complete();
latch.countDown();
}
catch (IOException x)
{
x.printStackTrace();
}
}
}.start();
}
}), null);
Headers headers = new Headers();
headers.put("method", "POST");
headers.put("url", "/foo");
headers.put("version", "HTTP/1.1");
headers.put("host", "localhost:" + connector.getLocalPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
Stream stream = session.syn(new SynInfo(headers, false), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Headers replyHeaders = replyInfo.getHeaders();
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
stream.data(new BytesDataInfo(data, true));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPOSTThenSuspendRequestThenReadTwoChunksThenComplete() throws Exception
{
final byte[] data = new byte[2000];
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(startHTTPServer(new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
request.setHandled(true);
final AsyncContext async = request.startAsync();
new Thread()
{
@Override
public void run()
{
try
{
InputStream input = request.getInputStream();
byte[] buffer = new byte[512];
int read = 0;
while (read < 2 * data.length)
read += input.read(buffer);
async.complete();
latch.countDown();
}
catch (IOException x)
{
x.printStackTrace();
}
}
}.start();
}
}), null);
Headers headers = new Headers();
headers.put("method", "POST");
headers.put("url", "/foo");
headers.put("version", "HTTP/1.1");
headers.put("host", "localhost:" + connector.getLocalPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
Stream stream = session.syn(new SynInfo(headers, false), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Headers replyHeaders = replyInfo.getHeaders();
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
stream.data(new BytesDataInfo(data, false));
stream.data(new BytesDataInfo(data, true));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPOSTThenSuspendRequestThenResumeThenRespond() throws Exception
{
final byte[] data = new byte[1000];
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(startHTTPServer(new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
request.setHandled(true);
if (request.getAsyncContinuation().isInitial())
{
InputStream input = request.getInputStream();
byte[] buffer = new byte[256];
int read = 0;
while (read < data.length)
read += input.read(buffer);
final AsyncContext async = request.startAsync();
new Thread()
{
@Override
public void run()
{
try
{
TimeUnit.SECONDS.sleep(1);
async.dispatch();
latch.countDown();
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
}.start();
}
else
{
OutputStream output = httpResponse.getOutputStream();
output.write(data);
}
}
}), null);
Headers headers = new Headers();
headers.put("method", "POST");
headers.put("url", "/foo");
headers.put("version", "HTTP/1.1");
headers.put("host", "localhost:" + connector.getLocalPort());
final CountDownLatch responseLatch = new CountDownLatch(2);
Stream stream = session.syn(new SynInfo(headers, false), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Headers replyHeaders = replyInfo.getHeaders();
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
responseLatch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
if (dataInfo.isClose())
responseLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
stream.data(new BytesDataInfo(data, true));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
}