diff --git a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYAsyncConnection.java b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYAsyncConnection.java index 283b55a6dc0..ffb15be4d14 100644 --- a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYAsyncConnection.java +++ b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYAsyncConnection.java @@ -287,6 +287,26 @@ public class HTTPSPDYAsyncConnection extends AbstractHttpConnection implements A return buffer.length(); } + @Override + public void commitResponse(boolean last) throws IOException + { + // Keep the original behavior since it just delegates to the generator + super.commitResponse(last); + } + + @Override + public void flushResponse() throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void completeResponse() throws IOException + { + // Keep the original behavior since it just delegates to the generator + super.completeResponse(); + } + private String parseHost(String url) { try @@ -365,10 +385,19 @@ public class HTTPSPDYAsyncConnection extends AbstractHttpConnection implements A @Override public void send1xx(int code) throws IOException { - Headers headers = new Headers(); - headers.put("status", String.valueOf(code)); - headers.put("version", "HTTP/1.1"); - stream.reply(new ReplyInfo(headers, false)); + throw new UnsupportedOperationException(); + } + + @Override + public void sendResponse(Buffer response) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void sendError(int code, String reason, String content, boolean close) throws IOException + { + throw new UnsupportedOperationException(); } @Override @@ -388,20 +417,58 @@ public class HTTPSPDYAsyncConnection extends AbstractHttpConnection implements A headers.put(field.getName(), field.getValue()); } } - stream.reply(new ReplyInfo(headers, allContentAdded)); + + // We have to query the HttpGenerator and its _buffer to know + // whether there is content buffered; if so, send the data frame + boolean close = _buffer == null || _buffer.length() == 0; + stream.reply(new ReplyInfo(headers, close)); + if (!close) + { + ByteBuffer buffer = ((NIOBuffer)_buffer).getByteBuffer(); + buffer.limit(_buffer.putIndex()); + buffer.position(_buffer.getIndex()); + // Update HttpGenerator fields so that they remain consistent + _buffer.clear(); + _state = HttpGenerator.STATE_CONTENT; + // Send the data frame + stream.data(new ByteBufferDataInfo(buffer, allContentAdded)); + } + } + + @Override + public boolean addContent(byte b) throws IOException + { + throw new UnsupportedOperationException(); } @Override public void addContent(Buffer content, boolean last) throws IOException { - ByteBuffer buffer = ByteBuffer.wrap(content.asArray()); - stream.data(new ByteBufferDataInfo(buffer, last)); + + // TODO: we need to avoid that the HttpParser chunks the content + // otherwise we're sending bad data... so perhaps we need to do our own buffering here + + // Keep the original behavior since adding content will + // just accumulate bytes until the response is committed. + super.addContent(content, last); + } + + @Override + public int flushBuffer() throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void blockForOutput(long maxIdleTime) throws IOException + { + throw new UnsupportedOperationException(); } @Override public void complete() throws IOException { - // Nothing to do + throw new UnsupportedOperationException(); } } } diff --git a/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/HTTPSPDYTest.java b/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/HTTPSPDYTest.java index bfba81c26a1..c0338c39b11 100644 --- a/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/HTTPSPDYTest.java +++ b/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/HTTPSPDYTest.java @@ -19,9 +19,13 @@ package org.eclipse.jetty.spdy.http; import java.io.BufferedReader; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -31,6 +35,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.spdy.SPDYClient; import org.eclipse.jetty.spdy.SPDYServerConnector; +import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.SPDY; @@ -418,4 +423,225 @@ public class HTTPSPDYTest Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testGETWithSmallResponseContent() throws Exception + { + final String data = "0123456789ABCDEF"; + final CountDownLatch handlerLatch = new CountDownLatch(1); + Session session = startClient(startHTTPServer(new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + request.setHandled(true); + httpResponse.setStatus(HttpServletResponse.SC_OK); + ServletOutputStream output = httpResponse.getOutputStream(); + output.write(data.getBytes("UTF-8")); + handlerLatch.countDown(); + } + }), null); + + Headers headers = new Headers(); + headers.put("method", "GET"); + headers.put("url", "http://localhost:" + connector.getLocalPort() + "/foo"); + headers.put("version", "HTTP/1.1"); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + session.syn(SPDY.V2, new SynInfo(headers, true), new Stream.FrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Assert.assertFalse(replyInfo.isClose()); + Headers replyHeaders = replyInfo.getHeaders(); + Assert.assertTrue(replyHeaders.get("status").value().contains("200")); + replyLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + Assert.assertTrue(dataInfo.isClose()); + ByteBuffer buffer = ByteBuffer.allocate(dataInfo.getBytesCount()); + dataInfo.getBytes(buffer); + buffer.flip(); + Assert.assertEquals(data, Charset.forName("UTF-8").decode(buffer).toString()); + dataLatch.countDown(); + } + }); + Assert.assertTrue(handlerLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(replyLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testGETWithBigResponseContentInOneWrite() throws Exception + { + final byte[] data = new byte[128 * 1024]; + Arrays.fill(data, (byte)'x'); + final CountDownLatch handlerLatch = new CountDownLatch(1); + Session session = startClient(startHTTPServer(new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + request.setHandled(true); + httpResponse.setStatus(HttpServletResponse.SC_OK); + ServletOutputStream output = httpResponse.getOutputStream(); + output.write(data); + handlerLatch.countDown(); + } + }), null); + + Headers headers = new Headers(); + headers.put("method", "GET"); + headers.put("url", "http://localhost:" + connector.getLocalPort() + "/foo"); + headers.put("version", "HTTP/1.1"); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + session.syn(SPDY.V2, new SynInfo(headers, true), new Stream.FrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Assert.assertFalse(replyInfo.isClose()); + Headers replyHeaders = replyInfo.getHeaders(); + Assert.assertTrue(replyHeaders.get("status").value().contains("200")); + replyLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + Assert.assertTrue(dataInfo.isClose()); + ByteBuffer buffer = ByteBuffer.allocate(dataInfo.getBytesCount()); + dataInfo.getBytes(buffer); + buffer.flip(); + Assert.assertEquals(data, Charset.forName("UTF-8").decode(buffer).toString()); + dataLatch.countDown(); + } + }); + Assert.assertTrue(handlerLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(replyLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testGETWithBigResponseContentInTwoWrites() throws Exception + { + // TODO + Assert.fail(); + + final byte[] data = new byte[128 * 1024]; + Arrays.fill(data, (byte)'x'); + final CountDownLatch handlerLatch = new CountDownLatch(1); + Session session = startClient(startHTTPServer(new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + request.setHandled(true); + httpResponse.setStatus(HttpServletResponse.SC_OK); + ServletOutputStream output = httpResponse.getOutputStream(); + output.write(data); + handlerLatch.countDown(); + } + }), null); + + Headers headers = new Headers(); + headers.put("method", "GET"); + headers.put("url", "http://localhost:" + connector.getLocalPort() + "/foo"); + headers.put("version", "HTTP/1.1"); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + session.syn(SPDY.V2, new SynInfo(headers, true), new Stream.FrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Assert.assertFalse(replyInfo.isClose()); + Headers replyHeaders = replyInfo.getHeaders(); + Assert.assertTrue(replyHeaders.get("status").value().contains("200")); + replyLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + Assert.assertTrue(dataInfo.isClose()); + ByteBuffer buffer = ByteBuffer.allocate(dataInfo.getBytesCount()); + dataInfo.getBytes(buffer); + buffer.flip(); + Assert.assertEquals(data, Charset.forName("UTF-8").decode(buffer).toString()); + dataLatch.countDown(); + } + }); + Assert.assertTrue(handlerLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(replyLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testGETWithSmallResponseContentInTwoChunks() throws Exception + { + final String data1 = "0123456789ABCDEF"; + final String data2 = "FEDCBA9876543210"; + final CountDownLatch handlerLatch = new CountDownLatch(1); + Session session = startClient(startHTTPServer(new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + request.setHandled(true); + httpResponse.setStatus(HttpServletResponse.SC_OK); + ServletOutputStream output = httpResponse.getOutputStream(); + output.write(data1.getBytes("UTF-8")); + output.flush(); + output.write(data2.getBytes("UTF-8")); + handlerLatch.countDown(); + } + }), null); + + Headers headers = new Headers(); + headers.put("method", "GET"); + headers.put("url", "http://localhost:" + connector.getLocalPort() + "/foo"); + headers.put("version", "HTTP/1.1"); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + session.syn(SPDY.V2, new SynInfo(headers, true), new Stream.FrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Assert.assertFalse(replyInfo.isClose()); + Headers replyHeaders = replyInfo.getHeaders(); + Assert.assertTrue(replyHeaders.get("status").value().contains("200")); + replyLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + Assert.assertTrue(dataInfo.isClose()); + ByteBuffer buffer = ByteBuffer.allocate(dataInfo.getBytesCount()); + dataInfo.getBytes(buffer); + buffer.flip(); + Assert.assertEquals(data1, Charset.forName("UTF-8").decode(buffer).toString()); + dataLatch.countDown(); + } + }); + Assert.assertTrue(handlerLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(replyLatch.await(500, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + } + + // TODO: add tests for chunked content + + // Note that I do not care much about the state of the generator, as long as I can avoid + // that the generator writes, that SPDY writes chunked bytes, and - if possible - data copying }