412318 HttpChannel fix multiple calls to _transport.completed() if handle() is called multiple times while the channel is COMPLETED

This commit is contained in:
Thomas Becker 2013-07-04 15:45:56 +02:00
parent bce8eaabe0
commit cb2eb030d1
4 changed files with 33 additions and 19 deletions

View File

@ -235,6 +235,8 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
public boolean handle() public boolean handle()
{ {
LOG.debug("{} handle enter", this); LOG.debug("{} handle enter", this);
if(_state.isCompleted())
return false;
setCurrentHttpChannel(this); setCurrentHttpChannel(this);

View File

@ -563,6 +563,14 @@ public class HttpChannelState
} }
} }
boolean isCompleted()
{
synchronized (this)
{
return _state == State.COMPLETED;
}
}
public boolean isAsyncStarted() public boolean isAsyncStarted()
{ {
synchronized (this) synchronized (this)

View File

@ -214,7 +214,7 @@ public class HttpTransportOverSPDY implements HttpTransport
@Override @Override
public void completed() public void completed()
{ {
LOG.debug("Completed"); LOG.debug("Completed {}", this);
} }
private void reply(Stream stream, ReplyInfo replyInfo) private void reply(Stream stream, ReplyInfo replyInfo)
@ -263,7 +263,7 @@ public class HttpTransportOverSPDY implements HttpTransport
{ {
private final Queue<PushResource> queue = new ConcurrentArrayQueue<>(); private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
private final Set<String> resources; private final Set<String> resources;
private boolean active; private AtomicBoolean active = new AtomicBoolean(false);
private PushResourceCoordinator(Set<String> resources) private PushResourceCoordinator(Set<String> resources)
{ {
@ -272,6 +272,7 @@ public class HttpTransportOverSPDY implements HttpTransport
private void coordinate() private void coordinate()
{ {
LOG.debug("Pushing resources: {}", resources);
// Must send all push frames to the client at once before we // Must send all push frames to the client at once before we
// return from this method and send the main resource data // return from this method and send the main resource data
for (String pushResource : resources) for (String pushResource : resources)
@ -281,18 +282,16 @@ public class HttpTransportOverSPDY implements HttpTransport
private void sendNextResourceData() private void sendNextResourceData()
{ {
PushResource resource; PushResource resource;
synchronized (this) if(active.compareAndSet(false, true))
{ {
if (active)
return;
resource = queue.poll(); resource = queue.poll();
if (resource == null) if (resource == null)
return; return;
active = true; LOG.debug("Opening new push channel for: {}", resource);
}
HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders()); HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
pushChannel.requestStart(resource.getPushRequestHeaders(), true); pushChannel.requestStart(resource.getPushRequestHeaders(), true);
} }
}
private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders) private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
{ {
@ -329,6 +328,13 @@ public class HttpTransportOverSPDY implements HttpTransport
}); });
} }
private void complete()
{
if(!active.compareAndSet(true, false))
LOG.warn("complete() called and active==false? That smells like a concurrency bug!", new IllegalStateException());
sendNextResourceData();
}
private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath) private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
{ {
final Fields newRequestHeaders = new Fields(requestHeaders, false); final Fields newRequestHeaders = new Fields(requestHeaders, false);
@ -358,15 +364,6 @@ public class HttpTransportOverSPDY implements HttpTransport
} }
return pushHeaders; return pushHeaders;
} }
private void complete()
{
synchronized (this)
{
active = false;
}
sendNextResourceData();
}
} }
private static class PushResource private static class PushResource
@ -389,5 +386,14 @@ public class HttpTransportOverSPDY implements HttpTransport
{ {
return pushRequestHeaders; return pushRequestHeaders;
} }
@Override
public String toString()
{
return "PushResource{" +
"pushStream=" + pushStream +
", pushRequestHeaders=" + pushRequestHeaders +
'}';
}
} }
} }

View File

@ -60,7 +60,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog; import org.eclipse.jetty.util.log.StdErrLog;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -338,7 +337,6 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
} }
@Test @Test
@Ignore
public void testPushResourceAreSentNonInterleaved() throws Exception public void testPushResourceAreSentNonInterleaved() throws Exception
{ {
final CountDownLatch allExpectedPushesReceivedLatch = new CountDownLatch(4); final CountDownLatch allExpectedPushesReceivedLatch = new CountDownLatch(4);