From 205ef85ead2c7ea169e4aebdabcf9f24b66de15d Mon Sep 17 00:00:00 2001 From: Thomas Becker Date: Wed, 17 Jul 2013 13:33:40 +0200 Subject: [PATCH] 413155 refactor HttpTransportOverSPDY to fix some bugs and reduce cyclomatic complexity --- .../http/HTTPSPDYServerConnectionFactory.java | 3 +- .../server/http/HttpTransportOverSPDY.java | 136 +++++++++--------- .../http/HttpTransportOverSPDYTest.java | 63 +++++--- .../spdy/server/http/ServerHTTPSPDYTest.java | 38 +++++ 4 files changed, 147 insertions(+), 93 deletions(-) diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java index ca71cb3fa4e..776ccaf96df 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java @@ -105,7 +105,8 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory if (!(headers.get("accept-encoding") != null && headers.get("accept-encoding").value().contains ("gzip"))) headers.add("accept-encoding", "gzip"); - HttpTransportOverSPDY transport = new HttpTransportOverSPDY(connector, httpConfiguration, endPoint, pushStrategy, stream, headers); + HttpTransportOverSPDY transport = new HttpTransportOverSPDY(connector, httpConfiguration, endPoint, + pushStrategy, stream, headers, getVersion()); HttpInputOverSPDY input = new HttpInputOverSPDY(); HttpChannelOverSPDY channel = new HttpChannelOverSPDY(connector, httpConfiguration, endPoint, transport, input, stream); stream.setAttribute(CHANNEL_ATTRIBUTE, channel); diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java index ea8e35837b3..4efecb79ffa 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java @@ -63,11 +63,12 @@ public class HttpTransportOverSPDY implements HttpTransport private final EndPoint endPoint; private final PushStrategy pushStrategy; private final Stream stream; + private final short version; private final Fields requestHeaders; private final BlockingCallback streamBlocker = new BlockingCallback(); private final AtomicBoolean committed = new AtomicBoolean(); - public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders) + public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders, short version) { this.connector = connector; this.configuration = configuration; @@ -75,6 +76,7 @@ public class HttpTransportOverSPDY implements HttpTransport this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy; this.stream = stream; this.requestHeaders = requestHeaders; + this.version = version; } protected Stream getStream() @@ -117,9 +119,9 @@ public class HttpTransportOverSPDY implements HttpTransport // info!=null content!=null lastContent==false reply, commit with content // info!=null content!=null lastContent==true reply, commit with content and complete - short version = stream.getSession().getVersion(); boolean isHeadRequest = HttpMethod.HEAD.name().equalsIgnoreCase(requestHeaders.get(HTTPSPDYHeader.METHOD.name(version)).value()); boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; + boolean close = !hasContent && lastContent; if (info != null) { @@ -131,74 +133,72 @@ public class HttpTransportOverSPDY implements HttpTransport LOG.warn("Committed response twice.", exception); return; } - Fields headers = new Fields(); - - HttpVersion httpVersion = HttpVersion.HTTP_1_1; - headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString()); - - int status = info.getStatus(); - StringBuilder httpStatus = new StringBuilder().append(status); - String reason = info.getReason(); - if (reason == null) - reason = HttpStatus.getMessage(status); - if (reason != null) - httpStatus.append(" ").append(reason); - headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString()); - LOG.debug("HTTP < {} {}", httpVersion, httpStatus); - - // TODO merge the two Field classes into one - HttpFields fields = info.getHttpFields(); - if (fields != null) - { - for (int i = 0; i < fields.size(); ++i) - { - HttpField field = fields.getField(i); - String name = field.getName(); - String value = field.getValue(); - headers.add(name, value); - LOG.debug("HTTP < {}: {}", name, value); - } - } - - if (configuration.getSendServerVersion()) - headers.add(HttpHeader.SERVER.asString(), HttpConfiguration.SERVER_VERSION); - if(configuration.getSendXPoweredBy()) - headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION); - - boolean close = !hasContent && lastContent; - ReplyInfo reply = new ReplyInfo(headers, close); - reply(stream, reply); + sendReply(info, lastContent && !hasContent ? callback : new Callback.Adapter(), close); } // Do we have some content to send as well if (hasContent) { - // Is the stream still open? - if (stream.isClosed() || stream.isReset()) - // tell the callback about the EOF - callback.failed(new EofException("stream closed")); - else - // send the data and let it call the callback - stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent - ), callback); + LOG.debug("Send content: {} on stream: {} lastContent={}", BufferUtil.toDetailString(content), stream, + lastContent); + + // send the data and let it call the callback + stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent + ), callback); } // else do we need to close - else if (lastContent) + else if (lastContent && info == null) { - // Are we closed ? - if (stream.isClosed() || stream.isReset()) - // already closed by reply, so just tell callback we are complete - callback.succeeded(); - else - // send empty data to close and let the send call the callback - stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, - BufferUtil.EMPTY_BUFFER, lastContent), callback); + LOG.debug("No content and lastContent=true. Sending empty ByteBuffer to close stream: {}", stream); + // send empty data to close and let the send call the callback + stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, + BufferUtil.EMPTY_BUFFER, lastContent), callback); } - else - // No data and no close so tell callback we are completed + else if(!lastContent) callback.succeeded(); } + private void sendReply(HttpGenerator.ResponseInfo info, Callback callback, boolean close) + { + Fields headers = new Fields(); + + HttpVersion httpVersion = HttpVersion.HTTP_1_1; + headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString()); + + int status = info.getStatus(); + StringBuilder httpStatus = new StringBuilder().append(status); + String reason = info.getReason(); + if (reason == null) + reason = HttpStatus.getMessage(status); + if (reason != null) + httpStatus.append(" ").append(reason); + headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString()); + LOG.debug("HTTP < {} {}", httpVersion, httpStatus); + + // TODO merge the two Field classes into one + HttpFields fields = info.getHttpFields(); + if (fields != null) + { + for (int i = 0; i < fields.size(); ++i) + { + HttpField field = fields.getField(i); + String name = field.getName(); + String value = field.getValue(); + headers.add(name, value); + LOG.debug("HTTP < {}: {}", name, value); + } + } + + if (configuration.getSendServerVersion()) + headers.add(HttpHeader.SERVER.asString(), HttpConfiguration.SERVER_VERSION); + if (configuration.getSendXPoweredBy()) + headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION); + + ReplyInfo reply = new ReplyInfo(headers, close); + LOG.debug("Sending reply: {} on stream: {}", reply, stream); + reply(stream, reply, callback); + } + @Override public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException { @@ -219,15 +219,14 @@ public class HttpTransportOverSPDY implements HttpTransport LOG.debug("Completed {}", this); } - private void reply(Stream stream, ReplyInfo replyInfo) + private void reply(Stream stream, ReplyInfo replyInfo, Callback callback) { if (!stream.isUnidirectional()) - stream.reply(replyInfo, new Callback.Adapter()); + stream.reply(replyInfo, callback); else - stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), new Callback.Adapter()); + stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), callback); Fields responseHeaders = replyInfo.getHeaders(); - short version = stream.getSession().getVersion(); if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") && !stream.isClosed()) { Set pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders); @@ -242,13 +241,15 @@ public class HttpTransportOverSPDY implements HttpTransport private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY { private final PushResourceCoordinator coordinator; + private final short version; private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders, - PushResourceCoordinator coordinator) + PushResourceCoordinator coordinator, short version) { - super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders); + super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders, version); this.coordinator = coordinator; + this.version = version; } @Override @@ -256,7 +257,7 @@ public class HttpTransportOverSPDY implements HttpTransport { Stream stream = getStream(); LOG.debug("Resource pushed for {} on {}", - getRequestHeaders().get(HTTPSPDYHeader.URI.name(stream.getSession().getVersion())), stream); + getRequestHeaders().get(HTTPSPDYHeader.URI.name(version)), stream); coordinator.complete(); } } @@ -298,14 +299,13 @@ public class HttpTransportOverSPDY implements HttpTransport private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders) { HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy, - pushStream, pushRequestHeaders, this); + pushStream, pushRequestHeaders, this, version); HttpInputOverSPDY input = new HttpInputOverSPDY(); return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream); } private void pushResource(String pushResource) { - final short version = stream.getSession().getVersion(); Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version)); Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version)); Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version)); @@ -340,7 +340,6 @@ public class HttpTransportOverSPDY implements HttpTransport private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath) { final Fields newRequestHeaders = new Fields(requestHeaders, false); - short version = stream.getSession().getVersion(); newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET"); newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1"); newRequestHeaders.put(scheme); @@ -355,7 +354,6 @@ public class HttpTransportOverSPDY implements HttpTransport private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath) { final Fields pushHeaders = new Fields(); - short version = stream.getSession().getVersion(); if (version == SPDY.V2) pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath); else diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDYTest.java index 32137576ba1..4d710253a6a 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDYTest.java @@ -75,14 +75,15 @@ public class HttpTransportOverSPDYTest private Random random = new Random(); HttpTransportOverSPDY httpTransportOverSPDY; + private short version = SPDY.V3; @Before public void setUp() throws Exception { Fields requestHeaders = new Fields(); - requestHeaders.add(HTTPSPDYHeader.METHOD.name(SPDY.V3),"GET"); + requestHeaders.add(HTTPSPDYHeader.METHOD.name(version), "GET"); httpTransportOverSPDY = new HttpTransportOverSPDY(connector, httpConfiguration, endPoint, pushStrategy, - stream, requestHeaders); + stream, requestHeaders, version); when(responseInfo.getStatus()).thenReturn(HttpStatus.OK_200); when(stream.getSession()).thenReturn(session); when(session.getVersion()).thenReturn(SPDY.V3); @@ -97,7 +98,10 @@ public class HttpTransportOverSPDYTest httpTransportOverSPDY.send(null, content, lastContent, callback); ArgumentCaptor dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class); - verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class)); + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); + verify(callback, times(1)).succeeded(); assertThat("lastContent is true", dataInfoCaptor.getValue().isClose(), is(true)); assertThat("ByteBuffer is empty", dataInfoCaptor.getValue().length(), is(0)); } @@ -111,7 +115,10 @@ public class HttpTransportOverSPDYTest httpTransportOverSPDY.send(null, content, lastContent, callback); ArgumentCaptor dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class); - verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class)); + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); + verify(callback, times(1)).succeeded(); assertThat("lastContent is true", dataInfoCaptor.getValue().isClose(), is(true)); assertThat("ByteBuffer length is 4096", dataInfoCaptor.getValue().length(), is(4096)); } @@ -121,11 +128,13 @@ public class HttpTransportOverSPDYTest { ByteBuffer content = BufferUtil.EMPTY_BUFFER; boolean lastContent = true; - httpTransportOverSPDY.send(null, content, lastContent, callback); ArgumentCaptor dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class); - verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class)); + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); + verify(callback, times(1)).succeeded(); assertThat("lastContent is true", dataInfoCaptor.getValue().isClose(), is(true)); assertThat("ByteBuffer is empty", dataInfoCaptor.getValue().length(), is(0)); } @@ -135,9 +144,9 @@ public class HttpTransportOverSPDYTest { ByteBuffer content = null; boolean lastContent = false; - httpTransportOverSPDY.send(null, content, lastContent, callback); + verify(callback, times(1)).succeeded(); verify(stream, times(0)).data(any(ByteBufferDataInfo.class), any(Callback.class)); } @@ -147,10 +156,12 @@ public class HttpTransportOverSPDYTest ByteBuffer content = createRandomByteBuffer(); boolean lastContent = false; - httpTransportOverSPDY.send(null, content, lastContent, callback); ArgumentCaptor dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class); - verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class)); + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); + verify(callback, times(1)).succeeded(); assertThat("lastContent is false", dataInfoCaptor.getValue().isClose(), is(false)); assertThat("ByteBuffer is empty", dataInfoCaptor.getValue().length(), is(4096)); } @@ -160,7 +171,6 @@ public class HttpTransportOverSPDYTest { ByteBuffer content = BufferUtil.EMPTY_BUFFER; boolean lastContent = false; - httpTransportOverSPDY.send(null, content, lastContent, callback); verify(stream, times(0)).data(any(ByteBufferDataInfo.class), any(Callback.class)); @@ -178,7 +188,9 @@ public class HttpTransportOverSPDYTest httpTransportOverSPDY.send(responseInfo, content, lastContent, callback); ArgumentCaptor replyInfoCaptor = ArgumentCaptor.forClass(ReplyInfo.class); - verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class)); + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + verify(stream, times(1)).reply(replyInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); assertThat("ReplyInfo close is true", replyInfoCaptor.getValue().isClose(), is(true)); verify(callback, times(1)).succeeded(); @@ -188,20 +200,20 @@ public class HttpTransportOverSPDYTest public void testSendWithResponseInfoAndContentAndLastContentTrue() throws Exception { ByteBuffer content = createRandomByteBuffer(); - boolean lastContent = true; - httpTransportOverSPDY.send(responseInfo, content, lastContent, callback); - ArgumentCaptor replyInfoCaptor = ArgumentCaptor.forClass(ReplyInfo.class); - verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class)); + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + verify(stream, times(1)).reply(replyInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); assertThat("ReplyInfo close is false", replyInfoCaptor.getValue().isClose(), is(false)); - ArgumentCaptor dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class); - verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class)); + verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); assertThat("lastContent is true", dataInfoCaptor.getValue().isClose(), is(true)); assertThat("ByteBuffer length is 4096", dataInfoCaptor.getValue().length(), is(4096)); + verify(callback, times(1)).succeeded(); } @Test @@ -209,33 +221,38 @@ public class HttpTransportOverSPDYTest { ByteBuffer content = null; boolean lastContent = false; - httpTransportOverSPDY.send(responseInfo, content, lastContent, callback); ArgumentCaptor replyInfoCaptor = ArgumentCaptor.forClass(ReplyInfo.class); - verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class)); + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + verify(stream, times(1)).reply(replyInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); assertThat("ReplyInfo close is true", replyInfoCaptor.getValue().isClose(), is(false)); verify(stream, times(0)).data(any(ByteBufferDataInfo.class), any(Callback.class)); + verify(callback, times(1)).succeeded(); } @Test public void testSendWithResponseInfoAndContentAndLastContentFalse() throws Exception { ByteBuffer content = createRandomByteBuffer(); - boolean lastContent = false; - httpTransportOverSPDY.send(responseInfo, content, lastContent, callback); ArgumentCaptor replyInfoCaptor = ArgumentCaptor.forClass(ReplyInfo.class); - verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class)); + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + verify(stream, times(1)).reply(replyInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); assertThat("ReplyInfo close is false", replyInfoCaptor.getValue().isClose(), is(false)); ArgumentCaptor dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class); - verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class)); + verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture()); + callbackCaptor.getValue().succeeded(); assertThat("lastContent is false", dataInfoCaptor.getValue().isClose(), is(false)); assertThat("ByteBuffer length is 4096", dataInfoCaptor.getValue().length(), is(4096)); + + verify(callback, times(1)).succeeded(); } @Test diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ServerHTTPSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ServerHTTPSPDYTest.java index 22c0f5a1206..de6c4d72eb0 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ServerHTTPSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ServerHTTPSPDYTest.java @@ -242,6 +242,44 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); } + @Test + public void testPOSTWithDelayedContentBody() throws Exception + { + final String path = "/foo"; + final CountDownLatch handlerLatch = new CountDownLatch(1); + Session session = startClient(version, startHTTPServer(version, new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + // don't read the request body, reply immediately + request.setHandled(true); + handlerLatch.countDown(); + } + }), null); + + Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path); + headers.put("content-type", "application/x-www-form-urlencoded"); + final CountDownLatch replyLatch = new CountDownLatch(1); + Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0), + new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + assertTrue(replyInfo.isClose()); + Fields replyHeaders = replyInfo.getHeaders(); + assertTrue(replyHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().contains("200")); + replyLatch.countDown(); + } + }); + stream.data(new StringDataInfo("a", false)); + assertTrue(handlerLatch.await(5, TimeUnit.SECONDS)); + assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); + stream.data(new StringDataInfo("b", true)); + } + @Test public void testPOSTWithParameters() throws Exception {