From 5b2aab505f4e2b516887498f4137e0ead369a7d6 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 3 May 2013 17:24:29 +0200 Subject: [PATCH] 405570 - spdy push: resource ordering and sequential push. Fixed race condition. The race was happening when the headers of a push resource were pushed, then the push resource was completed, which triggered the send of the next resource, which was polling the queue but find it empty, because the next resource was not pushed yet. In this case the activity flag remained true, causing all subsequent pushes to be skipped. --- .../server/http/HttpTransportOverSPDY.java | 135 ++++++++++-------- 1 file changed, 75 insertions(+), 60 deletions(-) diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java index 11eecba5871..30d443fcf2a 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java @@ -76,11 +76,21 @@ public class HttpTransportOverSPDY implements HttpTransport this.requestHeaders = requestHeaders; } + protected Stream getStream() + { + return stream; + } + + protected Fields getRequestHeaders() + { + return requestHeaders; + } + @Override public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("send {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent); + LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent); if (stream.isClosed() || stream.isReset()) { @@ -88,7 +98,6 @@ public class HttpTransportOverSPDY implements HttpTransport callback.failed(exception); return; } - // new Throwable().printStackTrace(); // info==null content==null lastContent==false should not happen // info==null content==null lastContent==true signals no more content - complete @@ -151,7 +160,7 @@ public class HttpTransportOverSPDY implements HttpTransport { // Is the stream still open? if (stream.isClosed() || stream.isReset()) - // tell the callback about the EOF + // tell the callback about the EOF callback.failed(new EofException("stream closed")); else // send the data and let it call the callback @@ -173,7 +182,6 @@ public class HttpTransportOverSPDY implements HttpTransport else // No data and no close so tell callback we are completed callback.succeeded(); - } @Override @@ -190,11 +198,10 @@ public class HttpTransportOverSPDY implements HttpTransport } } - @Override public void completed() { - LOG.debug("completed"); + LOG.debug("Completed"); } private void reply(Stream stream, ReplyInfo replyInfo) @@ -217,54 +224,61 @@ public class HttpTransportOverSPDY implements HttpTransport } } - private class PushHttpTransportOverSPDY extends HttpTransportOverSPDY + private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY { - private final PushResourceCoordinator pushResourceCoordinator; + private final PushResourceCoordinator coordinator; private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders, - PushResourceCoordinator pushResourceCoordinator) + PushResourceCoordinator coordinator) { super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders); - this.pushResourceCoordinator = pushResourceCoordinator; + this.coordinator = coordinator; } @Override public void completed() { - pushResourceCoordinator.complete(); + Stream stream = getStream(); + LOG.debug("Resource pushed for {} on {}", + getRequestHeaders().get(HTTPSPDYHeader.URI.name(stream.getSession().getVersion())), stream); + coordinator.complete(); } } private class PushResourceCoordinator { private final Queue queue = new ConcurrentArrayQueue<>(); - private final AtomicBoolean channelActive = new AtomicBoolean(false); - private final Set pushResources; + private final Set resources; + private boolean active; - private PushResourceCoordinator(Set pushResources) + private PushResourceCoordinator(Set resources) { - this.pushResources = pushResources; + this.resources = resources; } private void coordinate() { - for (String pushResource : pushResources) + // Must send all push frames to the client at once before we + // return from this method and send the main resource data + for (String pushResource : resources) pushResource(pushResource); } private void sendNextResourceData() { - if (channelActive.compareAndSet(false, true)) + PushResource resource; + synchronized (this) { - PushResource pushResource = queue.poll(); - if (pushResource != null) - { - HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(pushResource.getPushStream(), - pushResource.getPushRequestHeaders()); - pushChannel.requestStart(pushResource.getPushRequestHeaders(), true); - } + if (active) + return; + resource = queue.poll(); + if (resource == null) + return; + active = true; } + HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders()); + pushChannel.requestStart(resource.getPushRequestHeaders(), true); } private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders) @@ -277,65 +291,66 @@ public class HttpTransportOverSPDY implements HttpTransport private void pushResource(String pushResource) { - short version = stream.getSession().getVersion(); + final short version = stream.getSession().getVersion(); Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version)); Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version)); Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version)); - Fields pushHeaders = createPushHeaders(scheme, host, pushResource); + final Fields pushHeaders = createPushHeaders(scheme, host, pushResource); final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource); - stream.push(new PushInfo(pushHeaders, false), - new Promise.Adapter() + stream.push(new PushInfo(pushHeaders, false), new Promise.Adapter() { @Override public void succeeded(Stream pushStream) { + LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream); queue.offer(new PushResource(pushStream, pushRequestHeaders)); sendNextResourceData(); } }); } - public void complete() + private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath) { - if (channelActive.compareAndSet(true, false)) - sendNextResourceData(); + final Fields newRequestHeaders = new Fields(requestHeaders, false); + short version = stream.getSession().getVersion(); + newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET"); + newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1"); + newRequestHeaders.put(scheme); + newRequestHeaders.put(host); + newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); + String referrer = scheme.value() + "://" + host.value() + uri.value(); + newRequestHeaders.put("referer", referrer); + newRequestHeaders.put("x-spdy-push", "true"); + return newRequestHeaders; + } + + private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath) + { + final Fields pushHeaders = new Fields(); + short version = stream.getSession().getVersion(); + if (version == SPDY.V2) + pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath); else - throw new IllegalStateException("No channel was active when complete has been called."); + { + pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); + pushHeaders.put(scheme); + pushHeaders.put(host); + } + return pushHeaders; } - } - private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath) - { - final Fields newRequestHeaders = new Fields(requestHeaders, false); - short version = stream.getSession().getVersion(); - newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET"); - newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1"); - newRequestHeaders.put(scheme); - newRequestHeaders.put(host); - newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); - String referrer = scheme.value() + "://" + host.value() + uri.value(); - newRequestHeaders.put("referer", referrer); - newRequestHeaders.put("x-spdy-push", "true"); - return newRequestHeaders; - } - - private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath) - { - final Fields pushHeaders = new Fields(); - short version = stream.getSession().getVersion(); - if (version == SPDY.V2) - pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath); - else + private void complete() { - pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); - pushHeaders.put(scheme); - pushHeaders.put(host); + synchronized (this) + { + active = false; + } + sendNextResourceData(); } - return pushHeaders; } - private class PushResource + private static class PushResource { private final Stream pushStream; private final Fields pushRequestHeaders;