diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java index 11eecba5871..30d443fcf2a 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDY.java @@ -76,11 +76,21 @@ public class HttpTransportOverSPDY implements HttpTransport this.requestHeaders = requestHeaders; } + protected Stream getStream() + { + return stream; + } + + protected Fields getRequestHeaders() + { + return requestHeaders; + } + @Override public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("send {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent); + LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent); if (stream.isClosed() || stream.isReset()) { @@ -88,7 +98,6 @@ public class HttpTransportOverSPDY implements HttpTransport callback.failed(exception); return; } - // new Throwable().printStackTrace(); // info==null content==null lastContent==false should not happen // info==null content==null lastContent==true signals no more content - complete @@ -151,7 +160,7 @@ public class HttpTransportOverSPDY implements HttpTransport { // Is the stream still open? if (stream.isClosed() || stream.isReset()) - // tell the callback about the EOF + // tell the callback about the EOF callback.failed(new EofException("stream closed")); else // send the data and let it call the callback @@ -173,7 +182,6 @@ public class HttpTransportOverSPDY implements HttpTransport else // No data and no close so tell callback we are completed callback.succeeded(); - } @Override @@ -190,11 +198,10 @@ public class HttpTransportOverSPDY implements HttpTransport } } - @Override public void completed() { - LOG.debug("completed"); + LOG.debug("Completed"); } private void reply(Stream stream, ReplyInfo replyInfo) @@ -217,54 +224,61 @@ public class HttpTransportOverSPDY implements HttpTransport } } - private class PushHttpTransportOverSPDY extends HttpTransportOverSPDY + private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY { - private final PushResourceCoordinator pushResourceCoordinator; + private final PushResourceCoordinator coordinator; private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders, - PushResourceCoordinator pushResourceCoordinator) + PushResourceCoordinator coordinator) { super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders); - this.pushResourceCoordinator = pushResourceCoordinator; + this.coordinator = coordinator; } @Override public void completed() { - pushResourceCoordinator.complete(); + Stream stream = getStream(); + LOG.debug("Resource pushed for {} on {}", + getRequestHeaders().get(HTTPSPDYHeader.URI.name(stream.getSession().getVersion())), stream); + coordinator.complete(); } } private class PushResourceCoordinator { private final Queue queue = new ConcurrentArrayQueue<>(); - private final AtomicBoolean channelActive = new AtomicBoolean(false); - private final Set pushResources; + private final Set resources; + private boolean active; - private PushResourceCoordinator(Set pushResources) + private PushResourceCoordinator(Set resources) { - this.pushResources = pushResources; + this.resources = resources; } private void coordinate() { - for (String pushResource : pushResources) + // Must send all push frames to the client at once before we + // return from this method and send the main resource data + for (String pushResource : resources) pushResource(pushResource); } private void sendNextResourceData() { - if (channelActive.compareAndSet(false, true)) + PushResource resource; + synchronized (this) { - PushResource pushResource = queue.poll(); - if (pushResource != null) - { - HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(pushResource.getPushStream(), - pushResource.getPushRequestHeaders()); - pushChannel.requestStart(pushResource.getPushRequestHeaders(), true); - } + if (active) + return; + resource = queue.poll(); + if (resource == null) + return; + active = true; } + HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders()); + pushChannel.requestStart(resource.getPushRequestHeaders(), true); } private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders) @@ -277,65 +291,66 @@ public class HttpTransportOverSPDY implements HttpTransport private void pushResource(String pushResource) { - short version = stream.getSession().getVersion(); + final short version = stream.getSession().getVersion(); Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version)); Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version)); Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version)); - Fields pushHeaders = createPushHeaders(scheme, host, pushResource); + final Fields pushHeaders = createPushHeaders(scheme, host, pushResource); final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource); - stream.push(new PushInfo(pushHeaders, false), - new Promise.Adapter() + stream.push(new PushInfo(pushHeaders, false), new Promise.Adapter() { @Override public void succeeded(Stream pushStream) { + LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream); queue.offer(new PushResource(pushStream, pushRequestHeaders)); sendNextResourceData(); } }); } - public void complete() + private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath) { - if (channelActive.compareAndSet(true, false)) - sendNextResourceData(); + final Fields newRequestHeaders = new Fields(requestHeaders, false); + short version = stream.getSession().getVersion(); + newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET"); + newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1"); + newRequestHeaders.put(scheme); + newRequestHeaders.put(host); + newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); + String referrer = scheme.value() + "://" + host.value() + uri.value(); + newRequestHeaders.put("referer", referrer); + newRequestHeaders.put("x-spdy-push", "true"); + return newRequestHeaders; + } + + private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath) + { + final Fields pushHeaders = new Fields(); + short version = stream.getSession().getVersion(); + if (version == SPDY.V2) + pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath); else - throw new IllegalStateException("No channel was active when complete has been called."); + { + pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); + pushHeaders.put(scheme); + pushHeaders.put(host); + } + return pushHeaders; } - } - private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath) - { - final Fields newRequestHeaders = new Fields(requestHeaders, false); - short version = stream.getSession().getVersion(); - newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET"); - newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1"); - newRequestHeaders.put(scheme); - newRequestHeaders.put(host); - newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); - String referrer = scheme.value() + "://" + host.value() + uri.value(); - newRequestHeaders.put("referer", referrer); - newRequestHeaders.put("x-spdy-push", "true"); - return newRequestHeaders; - } - - private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath) - { - final Fields pushHeaders = new Fields(); - short version = stream.getSession().getVersion(); - if (version == SPDY.V2) - pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath); - else + private void complete() { - pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); - pushHeaders.put(scheme); - pushHeaders.put(host); + synchronized (this) + { + active = false; + } + sendNextResourceData(); } - return pushHeaders; } - private class PushResource + private static class PushResource { private final Stream pushStream; private final Fields pushRequestHeaders;