From 270083f6a16f4c0d7a8c860be4f8bd9a15ec5117 Mon Sep 17 00:00:00 2001 From: Thomas Becker Date: Tue, 16 Apr 2013 10:20:45 +0200 Subject: [PATCH] 405570 spdy push - send push resources in the order they have been requested. Do not interleave sending the bytes of the push streams. --- .../java/org/eclipse/jetty/spdy/IStream.java | 4 +- .../server/http/HttpTransportOverSPDY.java | 129 ++++++++++-- .../jetty/spdy/server/http/PushStrategy.java | 2 + .../server/http/ReferrerPushStrategy.java | 3 +- .../server/http/ReferrerPushStrategyTest.java | 187 +++++++++++++++++- .../http/ReferrerPushStrategyUnitTest.java | 8 +- .../test/resources/jetty-logging.properties | 1 + 7 files changed, 299 insertions(+), 35 deletions(-) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java index fc6c4d96bfc..a8455d1d2fa 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java @@ -59,7 +59,9 @@ public interface IStream extends Stream, Callback */ public void setStreamFrameListener(StreamFrameListener listener); - //TODO: javadoc thomas + /** + * @return the stream frame listener associated to this stream + */ public StreamFrameListener getStreamFrameListener(); /** diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java index 1a9d686eda3..c98633d580e 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.spdy.server.http; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -46,6 +47,7 @@ import org.eclipse.jetty.spdy.api.StreamStatus; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.Fields; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; @@ -206,28 +208,100 @@ public class HttpTransportOverSPDY implements HttpTransport short version = stream.getSession().getVersion(); if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") && !stream.isClosed()) { - // We have a 200 OK with some content to send, check the push strategy + Set pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders); + if (pushResources.size() > 0) + { + PushResourceCoordinator pushResourceCoordinator = new PushResourceCoordinator(pushResources); + pushResourceCoordinator.coordinate(); + } + } + } + + private class PushHttpTransportOverSPDY extends HttpTransportOverSPDY + { + private final PushResourceCoordinator pushResourceCoordinator; + + private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, + PushStrategy pushStrategy, Stream stream, Fields requestHeaders, + PushResourceCoordinator pushResourceCoordinator) + { + super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders); + this.pushResourceCoordinator = pushResourceCoordinator; + } + + @Override + public void completed() + { + pushResourceCoordinator.complete(); + } + } + + private class PushResourceCoordinator + { + private final Queue queue = new ConcurrentArrayQueue<>(); + private final AtomicBoolean channelActive = new AtomicBoolean(false); + private final Set pushResources; + + private PushResourceCoordinator(Set pushResources) + { + this.pushResources = pushResources; + } + + private void coordinate() + { + for (String pushResource : pushResources) + pushResource(pushResource); + } + + private void sendNextResourceData() + { + if (channelActive.compareAndSet(false, true)) + { + PushResource pushResource = queue.poll(); + if (pushResource != null) + { + HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(pushResource.getPushStream(), + pushResource.getPushRequestHeaders()); + pushChannel.requestStart(pushResource.getPushRequestHeaders(), true); + } + } + } + + private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders) + { + HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy, + pushStream, pushRequestHeaders, this); + HttpInputOverSPDY input = new HttpInputOverSPDY(); + return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream); + } + + private void pushResource(String pushResource) + { + short version = stream.getSession().getVersion(); Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version)); Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version)); Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version)); - Set pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders); + Fields pushHeaders = createPushHeaders(scheme, host, pushResource); + final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource); - for (String pushResource : pushResources) + // TODO: handle the timeout better + stream.push(new PushInfo(0, TimeUnit.MILLISECONDS, pushHeaders, false), new Promise.Adapter() { - Fields pushHeaders = createPushHeaders(scheme, host, pushResource); - final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource); - - // TODO: handle the timeout better - stream.push(new PushInfo(0, TimeUnit.MILLISECONDS, pushHeaders, false), new Promise.Adapter() + @Override + public void succeeded(Stream pushStream) { - @Override - public void succeeded(Stream pushStream) - { - HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(pushStream, pushRequestHeaders); - pushChannel.requestStart(pushRequestHeaders, true); - } - }); - } + queue.offer(new PushResource(pushStream, pushRequestHeaders)); + sendNextResourceData(); + } + }); + } + + public void complete() + { + if (channelActive.compareAndSet(true, false)) + sendNextResourceData(); + else + throw new IllegalStateException("No channel was active when complete has been called."); } } @@ -261,10 +335,25 @@ public class HttpTransportOverSPDY implements HttpTransport return pushHeaders; } - private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders) + private class PushResource { - HttpTransport transport = new HttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy, pushStream, pushRequestHeaders); - HttpInputOverSPDY input = new HttpInputOverSPDY(); - return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream); + private final Stream pushStream; + private final Fields pushRequestHeaders; + + public PushResource(Stream pushStream, Fields pushRequestHeaders) + { + this.pushStream = pushStream; + this.pushRequestHeaders = pushRequestHeaders; + } + + public Stream getPushStream() + { + return pushStream; + } + + public Fields getPushRequestHeaders() + { + return pushRequestHeaders; + } } } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/PushStrategy.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/PushStrategy.java index 0e905c710c6..43bb3cc3bbd 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/PushStrategy.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/PushStrategy.java @@ -34,6 +34,8 @@ public interface PushStrategy /** *

Applies the SPDY push logic for the primary resource.

* + * + * * @param stream the primary resource stream * @param requestHeaders the primary resource request headers * @param responseHeaders the primary resource response headers diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategy.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategy.java index be893aa8f67..21788699fd3 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategy.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategy.java @@ -27,6 +27,7 @@ import java.util.Locale; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -262,7 +263,7 @@ public class ReferrerPushStrategy implements PushStrategy private class MainResource { private final String name; - private final Set resources = Collections.newSetFromMap(new ConcurrentHashMap()); + private final CopyOnWriteArraySet resources = new CopyOnWriteArraySet<>(); private final AtomicLong firstResourceAdded = new AtomicLong(-1); private MainResource(String name) diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java index 099f736c1f7..c9380c8b7e0 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java @@ -22,9 +22,13 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -41,6 +45,8 @@ import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; +import org.eclipse.jetty.spdy.api.Settings; +import org.eclipse.jetty.spdy.api.SettingsInfo; import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.StreamFrameListener; import org.eclipse.jetty.spdy.api.StreamStatus; @@ -56,6 +62,7 @@ import org.junit.Before; import org.junit.Test; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThat; public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest @@ -159,25 +166,187 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest public void testMaxConcurrentStreamsToDisablePush() throws Exception { final CountDownLatch pushReceivedLatch = new CountDownLatch(1); - Session session = sendMainRequestAndCSSRequest(new SessionFrameListener.Adapter() + + Session pushCacheBuildSession = startClient(version, serverAddress, null); + + pushCacheBuildSession.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()); + pushCacheBuildSession.syn(new SynInfo(associatedCSSRequestHeaders, true), new StreamFrameListener.Adapter()); + + Session session = startClient(version, serverAddress, null); + + Settings settings = new Settings(); + settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, 0)); + SettingsInfo settingsInfo = new SettingsInfo(settings); + session.settings(settingsInfo); + + session.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() { @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) { pushReceivedLatch.countDown(); + return super.onPush(stream, pushInfo); + } + }); + + assertThat("No push stream is received", pushReceivedLatch.await(1, TimeUnit.SECONDS), is(false)); + } + + @Test + public void testPushResourceOrder() throws Exception + { + final CountDownLatch allExpectedPushesReceivedLatch = new CountDownLatch(4); + + Session pushCacheBuildSession = startClient(version, serverAddress, null); + + sendRequest(pushCacheBuildSession, mainRequestHeaders, null, null); + sendRequest(pushCacheBuildSession, associatedCSSRequestHeaders, null, null); + sendRequest(pushCacheBuildSession, associatedJSRequestHeaders, null, null); + sendRequest(pushCacheBuildSession, createHeaders("/image1.jpg", mainResource), null, null); + sendRequest(pushCacheBuildSession, createHeaders("/image2.jpg", mainResource), null, null); + + Session session = startClient(version, serverAddress, null); + + session.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() + { + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + LOG.info("onPush: stream: {}, pushInfo: {}", stream, pushInfo); + String uriHeader = pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value(); + switch ((int)allExpectedPushesReceivedLatch.getCount()) + { + case 4: + assertThat("1st pushed resource is the css", uriHeader.endsWith("css"), is(true)); + break; + case 3: + assertThat("2nd pushed resource is the js", uriHeader.endsWith("js"), is(true)); + break; + case 2: + assertThat("3rd pushed resource is image1", uriHeader.endsWith("image1.jpg"), + is(true)); + break; + case 1: + assertThat("4th pushed resource is image2", uriHeader.endsWith("image2.jpg"), + is(true)); + break; + } + allExpectedPushesReceivedLatch.countDown(); + return super.onPush(stream, pushInfo); + } + }); + + assertThat("All expected push resources have been received", allExpectedPushesReceivedLatch.await(5, + TimeUnit.SECONDS), is(true)); + } + + @Test + public void testThatPushResourcesAreUnique() throws Exception + { + final CountDownLatch pushReceivedLatch = new CountDownLatch(2); + sendMainRequestAndCSSRequest(null); + sendMainRequestAndCSSRequest(null); + + Session session = startClient(version, serverAddress, null); + + session.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter() + { + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + pushReceivedLatch.countDown(); + LOG.info("Push received: {}", pushInfo); return null; } }); -// Settings settings = new Settings(); -// settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, 0)); -// SettingsInfo settingsInfo = new SettingsInfo(settings); -// -// session.settings(settingsInfo); + assertThat("style.css has been pushed only once", pushReceivedLatch.await(1, TimeUnit.SECONDS), is(false)); + } - sendRequest(session, mainRequestHeaders, null, null); + @Test + public void testPushResourceAreSentNonInterleaved() throws Exception + { + final CountDownLatch allExpectedPushesReceivedLatch = new CountDownLatch(4); + final CountDownLatch allPushDataReceivedLatch = new CountDownLatch(4); + final CopyOnWriteArrayList dataReceivedOrder = new CopyOnWriteArrayList<>(); - assertThat(pushReceivedLatch.await(1, TimeUnit.SECONDS), is(false)); + InetSocketAddress bigResponseServerAddress = startHTTPServer(version, new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + byte[] bytes = new byte[32768]; + new Random().nextBytes(bytes); + ServletOutputStream outputStream = response.getOutputStream(); + outputStream.write(bytes); + baseRequest.setHandled(true); + } + }); + Session pushCacheBuildSession = startClient(version, bigResponseServerAddress, null); + + Fields mainResourceHeaders = createHeadersWithoutReferrer(mainResource); + sendRequest(pushCacheBuildSession, mainResourceHeaders, null, null); + sendRequest(pushCacheBuildSession, createHeaders("/style.css", mainResource), null, null); + sendRequest(pushCacheBuildSession, createHeaders("/javascript.js", mainResource), null, null); + sendRequest(pushCacheBuildSession, createHeaders("/image1.jpg", mainResource), null, null); + sendRequest(pushCacheBuildSession, createHeaders("/image2.jpg", mainResource), null, null); + + Session session = startClient(version, bigResponseServerAddress, null); + + session.syn(new SynInfo(mainResourceHeaders, true), new StreamFrameListener.Adapter() + { + AtomicInteger currentStreamId = new AtomicInteger(2); + + @Override + public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) + { + LOG.info("Received push for stream: {} {}", stream.getId(), pushInfo); + String uriHeader = pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value(); + switch ((int)allExpectedPushesReceivedLatch.getCount()) + { + case 4: + assertThat("1st pushed resource is the css", uriHeader.endsWith("css"), is(true)); + break; + case 3: + assertThat("2nd pushed resource is the js", uriHeader.endsWith("js"), is(true)); + break; + case 2: + assertThat("3rd pushed resource is image1", uriHeader.endsWith("image1.jpg"), + is(true)); + break; + case 1: + assertThat("4th pushed resource is image2", uriHeader.endsWith("image2.jpg"), + is(true)); + break; + } + allExpectedPushesReceivedLatch.countDown(); + return new Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + if (stream.getId() != currentStreamId.get()) + throw new IllegalStateException("Streams interleaved. Expected StreamId: " + + currentStreamId + " but was: " + stream.getId()); + dataInfo.consume(dataInfo.available()); + if (dataInfo.isClose()) + { + currentStreamId.compareAndSet(currentStreamId.get(), currentStreamId.get() + 2); + allPushDataReceivedLatch.countDown(); + dataReceivedOrder.add(stream.getId()); + } + + LOG.info(stream.getId() + ":" + dataInfo); + } + }; + } + }); + + assertThat("All push resources received", allExpectedPushesReceivedLatch.await(5, TimeUnit.SECONDS), is(true)); + assertThat("All pushData received", allPushDataReceivedLatch.await(5, TimeUnit.SECONDS), is(true)); + assertThat("The data for different push streams has not been interleaved", + dataReceivedOrder.toString(), equalTo("[2, 4, 6, 8]")); + LOG.info(dataReceivedOrder.toString()); } private InetSocketAddress createServer() throws Exception diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyUnitTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyUnitTest.java index d11c84a515e..7d038bc7b06 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyUnitTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyUnitTest.java @@ -18,10 +18,6 @@ package org.eclipse.jetty.spdy.server.http; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - import java.util.Arrays; import java.util.Set; @@ -35,6 +31,10 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class ReferrerPushStrategyUnitTest { diff --git a/jetty-spdy/spdy-http-server/src/test/resources/jetty-logging.properties b/jetty-spdy/spdy-http-server/src/test/resources/jetty-logging.properties index 4286a47e37d..30da0a84744 100644 --- a/jetty-spdy/spdy-http-server/src/test/resources/jetty-logging.properties +++ b/jetty-spdy/spdy-http-server/src/test/resources/jetty-logging.properties @@ -4,4 +4,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog #org.eclipse.jetty.io.ssl.LEVEL=DEBUG #org.eclipse.jetty.spdy.LEVEL=DEBUG #org.eclipse.jetty.client.LEVEL=DEBUG +#org.eclipse.jetty.spdy.server.http.ReferrerPushStrategy.LEVEL=DEBUG #org.mortbay.LEVEL=DEBUG