413155 refactor HttpTransportOverSPDY to fix some bugs and reduce cyclomatic complexity

This commit is contained in:
Thomas Becker 2013-07-17 13:33:40 +02:00
parent 762e4ba4c3
commit 205ef85ead
4 changed files with 147 additions and 93 deletions

View File

@ -105,7 +105,8 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
if (!(headers.get("accept-encoding") != null && headers.get("accept-encoding").value().contains
("gzip")))
headers.add("accept-encoding", "gzip");
HttpTransportOverSPDY transport = new HttpTransportOverSPDY(connector, httpConfiguration, endPoint, pushStrategy, stream, headers);
HttpTransportOverSPDY transport = new HttpTransportOverSPDY(connector, httpConfiguration, endPoint,
pushStrategy, stream, headers, getVersion());
HttpInputOverSPDY input = new HttpInputOverSPDY();
HttpChannelOverSPDY channel = new HttpChannelOverSPDY(connector, httpConfiguration, endPoint, transport, input, stream);
stream.setAttribute(CHANNEL_ATTRIBUTE, channel);

View File

@ -63,11 +63,12 @@ public class HttpTransportOverSPDY implements HttpTransport
private final EndPoint endPoint;
private final PushStrategy pushStrategy;
private final Stream stream;
private final short version;
private final Fields requestHeaders;
private final BlockingCallback streamBlocker = new BlockingCallback();
private final AtomicBoolean committed = new AtomicBoolean();
public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders, short version)
{
this.connector = connector;
this.configuration = configuration;
@ -75,6 +76,7 @@ public class HttpTransportOverSPDY implements HttpTransport
this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
this.stream = stream;
this.requestHeaders = requestHeaders;
this.version = version;
}
protected Stream getStream()
@ -117,9 +119,9 @@ public class HttpTransportOverSPDY implements HttpTransport
// info!=null content!=null lastContent==false reply, commit with content
// info!=null content!=null lastContent==true reply, commit with content and complete
short version = stream.getSession().getVersion();
boolean isHeadRequest = HttpMethod.HEAD.name().equalsIgnoreCase(requestHeaders.get(HTTPSPDYHeader.METHOD.name(version)).value());
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
boolean close = !hasContent && lastContent;
if (info != null)
{
@ -131,6 +133,33 @@ public class HttpTransportOverSPDY implements HttpTransport
LOG.warn("Committed response twice.", exception);
return;
}
sendReply(info, lastContent && !hasContent ? callback : new Callback.Adapter(), close);
}
// Do we have some content to send as well
if (hasContent)
{
LOG.debug("Send content: {} on stream: {} lastContent={}", BufferUtil.toDetailString(content), stream,
lastContent);
// send the data and let it call the callback
stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
), callback);
}
// else do we need to close
else if (lastContent && info == null)
{
LOG.debug("No content and lastContent=true. Sending empty ByteBuffer to close stream: {}", stream);
// send empty data to close and let the send call the callback
stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
BufferUtil.EMPTY_BUFFER, lastContent), callback);
}
else if(!lastContent)
callback.succeeded();
}
private void sendReply(HttpGenerator.ResponseInfo info, Callback callback, boolean close)
{
Fields headers = new Fields();
HttpVersion httpVersion = HttpVersion.HTTP_1_1;
@ -165,38 +194,9 @@ public class HttpTransportOverSPDY implements HttpTransport
if (configuration.getSendXPoweredBy())
headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION);
boolean close = !hasContent && lastContent;
ReplyInfo reply = new ReplyInfo(headers, close);
reply(stream, reply);
}
// Do we have some content to send as well
if (hasContent)
{
// Is the stream still open?
if (stream.isClosed() || stream.isReset())
// tell the callback about the EOF
callback.failed(new EofException("stream closed"));
else
// send the data and let it call the callback
stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
), callback);
}
// else do we need to close
else if (lastContent)
{
// Are we closed ?
if (stream.isClosed() || stream.isReset())
// already closed by reply, so just tell callback we are complete
callback.succeeded();
else
// send empty data to close and let the send call the callback
stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
BufferUtil.EMPTY_BUFFER, lastContent), callback);
}
else
// No data and no close so tell callback we are completed
callback.succeeded();
LOG.debug("Sending reply: {} on stream: {}", reply, stream);
reply(stream, reply, callback);
}
@Override
@ -219,15 +219,14 @@ public class HttpTransportOverSPDY implements HttpTransport
LOG.debug("Completed {}", this);
}
private void reply(Stream stream, ReplyInfo replyInfo)
private void reply(Stream stream, ReplyInfo replyInfo, Callback callback)
{
if (!stream.isUnidirectional())
stream.reply(replyInfo, new Callback.Adapter());
stream.reply(replyInfo, callback);
else
stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), new Callback.Adapter());
stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), callback);
Fields responseHeaders = replyInfo.getHeaders();
short version = stream.getSession().getVersion();
if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") && !stream.isClosed())
{
Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
@ -242,13 +241,15 @@ public class HttpTransportOverSPDY implements HttpTransport
private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY
{
private final PushResourceCoordinator coordinator;
private final short version;
private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint,
PushStrategy pushStrategy, Stream stream, Fields requestHeaders,
PushResourceCoordinator coordinator)
PushResourceCoordinator coordinator, short version)
{
super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders);
super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders, version);
this.coordinator = coordinator;
this.version = version;
}
@Override
@ -256,7 +257,7 @@ public class HttpTransportOverSPDY implements HttpTransport
{
Stream stream = getStream();
LOG.debug("Resource pushed for {} on {}",
getRequestHeaders().get(HTTPSPDYHeader.URI.name(stream.getSession().getVersion())), stream);
getRequestHeaders().get(HTTPSPDYHeader.URI.name(version)), stream);
coordinator.complete();
}
}
@ -298,14 +299,13 @@ public class HttpTransportOverSPDY implements HttpTransport
private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
{
HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy,
pushStream, pushRequestHeaders, this);
pushStream, pushRequestHeaders, this, version);
HttpInputOverSPDY input = new HttpInputOverSPDY();
return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
}
private void pushResource(String pushResource)
{
final short version = stream.getSession().getVersion();
Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
@ -340,7 +340,6 @@ public class HttpTransportOverSPDY implements HttpTransport
private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
{
final Fields newRequestHeaders = new Fields(requestHeaders, false);
short version = stream.getSession().getVersion();
newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
newRequestHeaders.put(scheme);
@ -355,7 +354,6 @@ public class HttpTransportOverSPDY implements HttpTransport
private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
{
final Fields pushHeaders = new Fields();
short version = stream.getSession().getVersion();
if (version == SPDY.V2)
pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
else

View File

@ -75,14 +75,15 @@ public class HttpTransportOverSPDYTest
private Random random = new Random();
HttpTransportOverSPDY httpTransportOverSPDY;
private short version = SPDY.V3;
@Before
public void setUp() throws Exception
{
Fields requestHeaders = new Fields();
requestHeaders.add(HTTPSPDYHeader.METHOD.name(SPDY.V3),"GET");
requestHeaders.add(HTTPSPDYHeader.METHOD.name(version), "GET");
httpTransportOverSPDY = new HttpTransportOverSPDY(connector, httpConfiguration, endPoint, pushStrategy,
stream, requestHeaders);
stream, requestHeaders, version);
when(responseInfo.getStatus()).thenReturn(HttpStatus.OK_200);
when(stream.getSession()).thenReturn(session);
when(session.getVersion()).thenReturn(SPDY.V3);
@ -97,7 +98,10 @@ public class HttpTransportOverSPDYTest
httpTransportOverSPDY.send(null, content, lastContent, callback);
ArgumentCaptor<ByteBufferDataInfo> dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class));
ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
verify(callback, times(1)).succeeded();
assertThat("lastContent is true", dataInfoCaptor.getValue().isClose(), is(true));
assertThat("ByteBuffer is empty", dataInfoCaptor.getValue().length(), is(0));
}
@ -111,7 +115,10 @@ public class HttpTransportOverSPDYTest
httpTransportOverSPDY.send(null, content, lastContent, callback);
ArgumentCaptor<ByteBufferDataInfo> dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class));
ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
verify(callback, times(1)).succeeded();
assertThat("lastContent is true", dataInfoCaptor.getValue().isClose(), is(true));
assertThat("ByteBuffer length is 4096", dataInfoCaptor.getValue().length(), is(4096));
}
@ -122,10 +129,12 @@ public class HttpTransportOverSPDYTest
ByteBuffer content = BufferUtil.EMPTY_BUFFER;
boolean lastContent = true;
httpTransportOverSPDY.send(null, content, lastContent, callback);
ArgumentCaptor<ByteBufferDataInfo> dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class));
ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
verify(callback, times(1)).succeeded();
assertThat("lastContent is true", dataInfoCaptor.getValue().isClose(), is(true));
assertThat("ByteBuffer is empty", dataInfoCaptor.getValue().length(), is(0));
}
@ -136,8 +145,8 @@ public class HttpTransportOverSPDYTest
ByteBuffer content = null;
boolean lastContent = false;
httpTransportOverSPDY.send(null, content, lastContent, callback);
verify(callback, times(1)).succeeded();
verify(stream, times(0)).data(any(ByteBufferDataInfo.class), any(Callback.class));
}
@ -147,10 +156,12 @@ public class HttpTransportOverSPDYTest
ByteBuffer content = createRandomByteBuffer();
boolean lastContent = false;
httpTransportOverSPDY.send(null, content, lastContent, callback);
ArgumentCaptor<ByteBufferDataInfo> dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class));
ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
verify(callback, times(1)).succeeded();
assertThat("lastContent is false", dataInfoCaptor.getValue().isClose(), is(false));
assertThat("ByteBuffer is empty", dataInfoCaptor.getValue().length(), is(4096));
}
@ -161,7 +172,6 @@ public class HttpTransportOverSPDYTest
ByteBuffer content = BufferUtil.EMPTY_BUFFER;
boolean lastContent = false;
httpTransportOverSPDY.send(null, content, lastContent, callback);
verify(stream, times(0)).data(any(ByteBufferDataInfo.class), any(Callback.class));
verify(callback, times(1)).succeeded();
@ -178,7 +188,9 @@ public class HttpTransportOverSPDYTest
httpTransportOverSPDY.send(responseInfo, content, lastContent, callback);
ArgumentCaptor<ReplyInfo> replyInfoCaptor = ArgumentCaptor.forClass(ReplyInfo.class);
verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class));
ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(stream, times(1)).reply(replyInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
assertThat("ReplyInfo close is true", replyInfoCaptor.getValue().isClose(), is(true));
verify(callback, times(1)).succeeded();
@ -188,20 +200,20 @@ public class HttpTransportOverSPDYTest
public void testSendWithResponseInfoAndContentAndLastContentTrue() throws Exception
{
ByteBuffer content = createRandomByteBuffer();
boolean lastContent = true;
httpTransportOverSPDY.send(responseInfo, content, lastContent, callback);
ArgumentCaptor<ReplyInfo> replyInfoCaptor = ArgumentCaptor.forClass(ReplyInfo.class);
verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class));
ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(stream, times(1)).reply(replyInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
assertThat("ReplyInfo close is false", replyInfoCaptor.getValue().isClose(), is(false));
ArgumentCaptor<ByteBufferDataInfo> dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class));
verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
assertThat("lastContent is true", dataInfoCaptor.getValue().isClose(), is(true));
assertThat("ByteBuffer length is 4096", dataInfoCaptor.getValue().length(), is(4096));
verify(callback, times(1)).succeeded();
}
@Test
@ -210,32 +222,37 @@ public class HttpTransportOverSPDYTest
ByteBuffer content = null;
boolean lastContent = false;
httpTransportOverSPDY.send(responseInfo, content, lastContent, callback);
ArgumentCaptor<ReplyInfo> replyInfoCaptor = ArgumentCaptor.forClass(ReplyInfo.class);
verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class));
ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(stream, times(1)).reply(replyInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
assertThat("ReplyInfo close is true", replyInfoCaptor.getValue().isClose(), is(false));
verify(stream, times(0)).data(any(ByteBufferDataInfo.class), any(Callback.class));
verify(callback, times(1)).succeeded();
}
@Test
public void testSendWithResponseInfoAndContentAndLastContentFalse() throws Exception
{
ByteBuffer content = createRandomByteBuffer();
boolean lastContent = false;
httpTransportOverSPDY.send(responseInfo, content, lastContent, callback);
ArgumentCaptor<ReplyInfo> replyInfoCaptor = ArgumentCaptor.forClass(ReplyInfo.class);
verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class));
ArgumentCaptor<Callback> callbackCaptor = ArgumentCaptor.forClass(Callback.class);
verify(stream, times(1)).reply(replyInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
assertThat("ReplyInfo close is false", replyInfoCaptor.getValue().isClose(), is(false));
ArgumentCaptor<ByteBufferDataInfo> dataInfoCaptor = ArgumentCaptor.forClass(ByteBufferDataInfo.class);
verify(stream, times(1)).data(dataInfoCaptor.capture(), any(Callback.class));
verify(stream, times(1)).data(dataInfoCaptor.capture(), callbackCaptor.capture());
callbackCaptor.getValue().succeeded();
assertThat("lastContent is false", dataInfoCaptor.getValue().isClose(), is(false));
assertThat("ByteBuffer length is 4096", dataInfoCaptor.getValue().length(), is(4096));
verify(callback, times(1)).succeeded();
}
@Test

View File

@ -242,6 +242,44 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPOSTWithDelayedContentBody() throws Exception
{
final String path = "/foo";
final CountDownLatch handlerLatch = new CountDownLatch(1);
Session session = startClient(version, startHTTPServer(version, new AbstractHandler()
{
@Override
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
// don't read the request body, reply immediately
request.setHandled(true);
handlerLatch.countDown();
}
}), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
final CountDownLatch replyLatch = new CountDownLatch(1);
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
assertTrue(replyInfo.isClose());
Fields replyHeaders = replyInfo.getHeaders();
assertTrue(replyHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().contains("200"));
replyLatch.countDown();
}
});
stream.data(new StringDataInfo("a", false));
assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
stream.data(new StringDataInfo("b", true));
}
@Test
public void testPOSTWithParameters() throws Exception
{