Merge remote-tracking branch 'origin/master' into jetty-9.1

Conflicts:
	jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
	jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
This commit is contained in:
Greg Wilkins 2013-07-08 13:58:35 +10:00
commit 83b55418f5
5 changed files with 147 additions and 31 deletions

View File

@ -366,16 +366,11 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
}
finally
{
action=Action.RECYCLE;
_request.setHandled(true);
_transport.completed();
}
}
if (action==Action.RECYCLE)
{
_request.setHandled(true);
_transport.completed();
}
LOG.debug("{} handle exit, result {}", this, action);
return action!=Action.WAIT;

View File

@ -77,8 +77,7 @@ public class HttpChannelState
WRITE_CALLBACK, // handle an IO write callback
READ_CALLBACK, // handle an IO read callback
WAIT, // Wait for further events
COMPLETE, // Complete the channel
RECYCLE, // Channel is completed
COMPLETE // Complete the channel
}
public enum Async
@ -199,7 +198,7 @@ public class HttpChannelState
return Action.COMPLETE;
case COMPLETED:
return Action.RECYCLE;
return Action.WAIT;
case ASYNCWAIT:
if (_asyncRead)
@ -567,6 +566,14 @@ public class HttpChannelState
}
}
boolean isCompleted()
{
synchronized (this)
{
return _state == State.COMPLETED;
}
}
public boolean isAsyncStarted()
{
synchronized (this)

View File

@ -0,0 +1,110 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import java.util.Random;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(JUnit4.class)
public class DataFrameGeneratorTest
{
private int increment = 1024;
private int streamId = 1;
private ArrayByteBufferPool bufferPool;
private DataFrameGenerator dataFrameGenerator;
private ByteBuffer headerBuffer = ByteBuffer.allocate(DataFrame.HEADER_LENGTH);
@Before
public void setUp()
{
bufferPool = new ArrayByteBufferPool(64, increment, 8192);
dataFrameGenerator = new DataFrameGenerator(bufferPool);
headerBuffer.putInt(0, streamId & 0x7F_FF_FF_FF);
}
@Test
public void testGenerateSmallFrame()
{
int bufferSize = 256;
generateFrame(bufferSize);
}
@Test
public void testGenerateFrameWithBufferThatEqualsBucketSize()
{
int bufferSize = increment;
generateFrame(bufferSize);
}
@Test
public void testGenerateFrameWithBufferThatEqualsBucketSizeMinusHeaderLength()
{
int bufferSize = increment - DataFrame.HEADER_LENGTH;
generateFrame(bufferSize);
}
private void generateFrame(int bufferSize)
{
ByteBuffer byteBuffer = createByteBuffer(bufferSize);
ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(byteBuffer, true);
fillHeaderBuffer(bufferSize);
ByteBuffer dataFrameBuffer = dataFrameGenerator.generate(streamId, bufferSize, dataInfo);
assertThat("The content size in dataFrameBuffer matches the buffersize + header length",
dataFrameBuffer.limit(),
is(bufferSize + DataFrame.HEADER_LENGTH));
byte[] headerBytes = new byte[DataFrame.HEADER_LENGTH];
dataFrameBuffer.get(headerBytes, 0, DataFrame.HEADER_LENGTH);
assertThat("Header bytes are prepended", headerBytes, is(headerBuffer.array()));
}
private ByteBuffer createByteBuffer(int bufferSize)
{
byte[] bytes = new byte[bufferSize];
new Random().nextBytes(bytes);
ByteBuffer byteBuffer = bufferPool.acquire(bufferSize, false);
BufferUtil.flipToFill(byteBuffer);
byteBuffer.put(bytes);
BufferUtil.flipToFlush(byteBuffer, 0);
return byteBuffer;
}
private void fillHeaderBuffer(int bufferSize)
{
headerBuffer.putInt(4, bufferSize & 0x00_FF_FF_FF);
headerBuffer.put(4, DataInfo.FLAG_CLOSE);
}
}

View File

@ -214,7 +214,7 @@ public class HttpTransportOverSPDY implements HttpTransport
@Override
public void completed()
{
LOG.debug("Completed");
LOG.debug("Completed {}", this);
}
private void reply(Stream stream, ReplyInfo replyInfo)
@ -263,7 +263,7 @@ public class HttpTransportOverSPDY implements HttpTransport
{
private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
private final Set<String> resources;
private boolean active;
private AtomicBoolean active = new AtomicBoolean(false);
private PushResourceCoordinator(Set<String> resources)
{
@ -272,6 +272,7 @@ public class HttpTransportOverSPDY implements HttpTransport
private void coordinate()
{
LOG.debug("Pushing resources: {}", resources);
// Must send all push frames to the client at once before we
// return from this method and send the main resource data
for (String pushResource : resources)
@ -281,17 +282,15 @@ public class HttpTransportOverSPDY implements HttpTransport
private void sendNextResourceData()
{
PushResource resource;
synchronized (this)
if(active.compareAndSet(false, true))
{
if (active)
return;
resource = queue.poll();
if (resource == null)
return;
active = true;
LOG.debug("Opening new push channel for: {}", resource);
HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
pushChannel.requestStart(resource.getPushRequestHeaders(), true);
}
HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
pushChannel.requestStart(resource.getPushRequestHeaders(), true);
}
private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
@ -329,6 +328,13 @@ public class HttpTransportOverSPDY implements HttpTransport
});
}
private void complete()
{
if(!active.compareAndSet(true, false))
LOG.warn("complete() called and active==false? That smells like a concurrency bug!", new IllegalStateException());
sendNextResourceData();
}
private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
{
final Fields newRequestHeaders = new Fields(requestHeaders, false);
@ -358,15 +364,6 @@ public class HttpTransportOverSPDY implements HttpTransport
}
return pushHeaders;
}
private void complete()
{
synchronized (this)
{
active = false;
}
sendNextResourceData();
}
}
private static class PushResource
@ -389,5 +386,14 @@ public class HttpTransportOverSPDY implements HttpTransport
{
return pushRequestHeaders;
}
@Override
public String toString()
{
return "PushResource{" +
"pushStream=" + pushStream +
", pushRequestHeaders=" + pushRequestHeaders +
'}';
}
}
}

View File

@ -60,7 +60,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@ -338,7 +337,6 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
@Test
@Ignore
public void testPushResourceAreSentNonInterleaved() throws Exception
{
final CountDownLatch allExpectedPushesReceivedLatch = new CountDownLatch(4);