405570 - spdy push: resource ordering and sequential push.

Fixed race condition.
The race was happening when the headers of a push resource
were pushed, then the push resource was completed, which
triggered the send of the next resource, which was polling the
queue but find it empty, because the next resource was not pushed yet.
In this case the activity flag remained true, causing all subsequent
pushes to be skipped.
This commit is contained in:
Simone Bordet 2013-05-03 17:24:29 +02:00
parent cb952931d5
commit 5b2aab505f
1 changed files with 75 additions and 60 deletions

View File

@ -76,11 +76,21 @@ public class HttpTransportOverSPDY implements HttpTransport
this.requestHeaders = requestHeaders; this.requestHeaders = requestHeaders;
} }
protected Stream getStream()
{
return stream;
}
protected Fields getRequestHeaders()
{
return requestHeaders;
}
@Override @Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("send {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent); LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
if (stream.isClosed() || stream.isReset()) if (stream.isClosed() || stream.isReset())
{ {
@ -88,7 +98,6 @@ public class HttpTransportOverSPDY implements HttpTransport
callback.failed(exception); callback.failed(exception);
return; return;
} }
// new Throwable().printStackTrace();
// info==null content==null lastContent==false should not happen // info==null content==null lastContent==false should not happen
// info==null content==null lastContent==true signals no more content - complete // info==null content==null lastContent==true signals no more content - complete
@ -173,7 +182,6 @@ public class HttpTransportOverSPDY implements HttpTransport
else else
// No data and no close so tell callback we are completed // No data and no close so tell callback we are completed
callback.succeeded(); callback.succeeded();
} }
@Override @Override
@ -190,11 +198,10 @@ public class HttpTransportOverSPDY implements HttpTransport
} }
} }
@Override @Override
public void completed() public void completed()
{ {
LOG.debug("completed"); LOG.debug("Completed");
} }
private void reply(Stream stream, ReplyInfo replyInfo) private void reply(Stream stream, ReplyInfo replyInfo)
@ -217,54 +224,61 @@ public class HttpTransportOverSPDY implements HttpTransport
} }
} }
private class PushHttpTransportOverSPDY extends HttpTransportOverSPDY private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY
{ {
private final PushResourceCoordinator pushResourceCoordinator; private final PushResourceCoordinator coordinator;
private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint,
PushStrategy pushStrategy, Stream stream, Fields requestHeaders, PushStrategy pushStrategy, Stream stream, Fields requestHeaders,
PushResourceCoordinator pushResourceCoordinator) PushResourceCoordinator coordinator)
{ {
super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders); super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders);
this.pushResourceCoordinator = pushResourceCoordinator; this.coordinator = coordinator;
} }
@Override @Override
public void completed() public void completed()
{ {
pushResourceCoordinator.complete(); Stream stream = getStream();
LOG.debug("Resource pushed for {} on {}",
getRequestHeaders().get(HTTPSPDYHeader.URI.name(stream.getSession().getVersion())), stream);
coordinator.complete();
} }
} }
private class PushResourceCoordinator private class PushResourceCoordinator
{ {
private final Queue<PushResource> queue = new ConcurrentArrayQueue<>(); private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
private final AtomicBoolean channelActive = new AtomicBoolean(false); private final Set<String> resources;
private final Set<String> pushResources; private boolean active;
private PushResourceCoordinator(Set<String> pushResources) private PushResourceCoordinator(Set<String> resources)
{ {
this.pushResources = pushResources; this.resources = resources;
} }
private void coordinate() private void coordinate()
{ {
for (String pushResource : pushResources) // Must send all push frames to the client at once before we
// return from this method and send the main resource data
for (String pushResource : resources)
pushResource(pushResource); pushResource(pushResource);
} }
private void sendNextResourceData() private void sendNextResourceData()
{ {
if (channelActive.compareAndSet(false, true)) PushResource resource;
synchronized (this)
{ {
PushResource pushResource = queue.poll(); if (active)
if (pushResource != null) return;
{ resource = queue.poll();
HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(pushResource.getPushStream(), if (resource == null)
pushResource.getPushRequestHeaders()); return;
pushChannel.requestStart(pushResource.getPushRequestHeaders(), true); active = true;
}
} }
HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
pushChannel.requestStart(resource.getPushRequestHeaders(), true);
} }
private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders) private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
@ -277,65 +291,66 @@ public class HttpTransportOverSPDY implements HttpTransport
private void pushResource(String pushResource) private void pushResource(String pushResource)
{ {
short version = stream.getSession().getVersion(); final short version = stream.getSession().getVersion();
Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version)); Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version)); Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version)); Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
Fields pushHeaders = createPushHeaders(scheme, host, pushResource); final Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource); final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
stream.push(new PushInfo(pushHeaders, false), stream.push(new PushInfo(pushHeaders, false), new Promise.Adapter<Stream>()
new Promise.Adapter<Stream>()
{ {
@Override @Override
public void succeeded(Stream pushStream) public void succeeded(Stream pushStream)
{ {
LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream);
queue.offer(new PushResource(pushStream, pushRequestHeaders)); queue.offer(new PushResource(pushStream, pushRequestHeaders));
sendNextResourceData(); sendNextResourceData();
} }
}); });
} }
public void complete() private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
{ {
if (channelActive.compareAndSet(true, false)) final Fields newRequestHeaders = new Fields(requestHeaders, false);
sendNextResourceData(); short version = stream.getSession().getVersion();
newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
newRequestHeaders.put(scheme);
newRequestHeaders.put(host);
newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
String referrer = scheme.value() + "://" + host.value() + uri.value();
newRequestHeaders.put("referer", referrer);
newRequestHeaders.put("x-spdy-push", "true");
return newRequestHeaders;
}
private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
{
final Fields pushHeaders = new Fields();
short version = stream.getSession().getVersion();
if (version == SPDY.V2)
pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
else else
throw new IllegalStateException("No channel was active when complete has been called."); {
pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
pushHeaders.put(scheme);
pushHeaders.put(host);
}
return pushHeaders;
} }
}
private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath) private void complete()
{
final Fields newRequestHeaders = new Fields(requestHeaders, false);
short version = stream.getSession().getVersion();
newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
newRequestHeaders.put(scheme);
newRequestHeaders.put(host);
newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
String referrer = scheme.value() + "://" + host.value() + uri.value();
newRequestHeaders.put("referer", referrer);
newRequestHeaders.put("x-spdy-push", "true");
return newRequestHeaders;
}
private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
{
final Fields pushHeaders = new Fields();
short version = stream.getSession().getVersion();
if (version == SPDY.V2)
pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
else
{ {
pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath); synchronized (this)
pushHeaders.put(scheme); {
pushHeaders.put(host); active = false;
}
sendNextResourceData();
} }
return pushHeaders;
} }
private class PushResource private static class PushResource
{ {
private final Stream pushStream; private final Stream pushStream;
private final Fields pushRequestHeaders; private final Fields pushRequestHeaders;