459081 - http2 push failures.

Introduced ExecutionStrategy.dispatch() to handle the case where
resources that are being pushed block.
This commit is contained in:
Simone Bordet 2015-03-11 00:12:57 +01:00
parent a88d52b4e0
commit 48887377c9
9 changed files with 193 additions and 42 deletions

View File

@ -19,10 +19,10 @@
package org.eclipse.jetty.http2.client; package org.eclipse.jetty.http2.client;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
@ -242,4 +242,86 @@ public class PushCacheFilterTest extends AbstractTest
}); });
Assert.assertTrue(secondaryResponseLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(secondaryResponseLatch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testPushWithoutPrimaryResponseContent() throws Exception
{
final String primaryResource = "/primary.html";
final String secondaryResource = "/secondary.png";
start(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
String requestURI = request.getRequestURI();
final ServletOutputStream output = response.getOutputStream();
if (requestURI.endsWith(secondaryResource))
output.write("SECONDARY".getBytes(StandardCharsets.UTF_8));
}
});
final Session session = newClient(new Session.Listener.Adapter());
// Request for the primary and secondary resource to build the cache.
final String primaryURI = "http://localhost:" + connector.getLocalPort() + servletPath + primaryResource;
HttpFields primaryFields = new HttpFields();
MetaData.Request primaryRequest = newRequest("GET", primaryResource, primaryFields);
final CountDownLatch warmupLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
{
// Request for the secondary resource.
HttpFields secondaryFields = new HttpFields();
secondaryFields.put(HttpHeader.REFERER, primaryURI);
MetaData.Request secondaryRequest = newRequest("GET", secondaryResource, secondaryFields);
session.newStream(new HeadersFrame(0, secondaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
warmupLatch.countDown();
}
});
}
}
});
Assert.assertTrue(warmupLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(1000);
// Request again the primary resource, we should get the secondary resource pushed.
primaryRequest = newRequest("GET", primaryResource, primaryFields);
final CountDownLatch primaryResponseLatch = new CountDownLatch(1);
final CountDownLatch pushLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
primaryResponseLatch.countDown();
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
return new Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
pushLatch.countDown();
}
};
}
});
Assert.assertTrue(pushLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(primaryResponseLatch.await(5, TimeUnit.SECONDS));
}
} }

View File

@ -65,7 +65,7 @@ public class HTTP2Connection extends AbstractConnection implements Connection.Up
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("HTTP2 onUpgradeTo {} {}", this, BufferUtil.toDetailString(prefilled)); LOG.debug("HTTP2 onUpgradeTo {} {}", this, BufferUtil.toDetailString(prefilled));
producer.prefill(prefilled); producer.buffer = prefilled;
} }
@Override @Override
@ -117,9 +117,11 @@ public class HTTP2Connection extends AbstractConnection implements Connection.Up
return false; return false;
} }
protected void offerTask(Runnable task) protected void offerTask(Runnable task, boolean dispatch)
{ {
tasks.offer(task); tasks.offer(task);
if (dispatch)
executionStrategy.dispatch();
} }
private class HTTP2Producer implements ExecutionStrategy.Producer private class HTTP2Producer implements ExecutionStrategy.Producer
@ -135,12 +137,14 @@ public class HTTP2Connection extends AbstractConnection implements Connection.Up
if (task != null) if (task != null)
return task; return task;
if (isFillInterested())
return null;
if (buffer == null)
buffer = byteBufferPool.acquire(bufferSize, false); // TODO: make directness customizable
boolean looping = BufferUtil.hasContent(buffer); boolean looping = BufferUtil.hasContent(buffer);
while (true) while (true)
{ {
if (buffer == null)
buffer = byteBufferPool.acquire(bufferSize, false);
if (looping) if (looping)
{ {
while (buffer.hasRemaining()) while (buffer.hasRemaining())
@ -177,14 +181,9 @@ public class HTTP2Connection extends AbstractConnection implements Connection.Up
} }
} }
public void prefill(ByteBuffer prefilledBuffer)
{
buffer=prefilledBuffer;
}
private void release() private void release()
{ {
if (BufferUtil.isEmpty(buffer)) if (buffer != null && !buffer.hasRemaining())
{ {
byteBufferPool.release(buffer); byteBufferPool.release(buffer);
buffer = null; buffer = null;

View File

@ -72,16 +72,16 @@ public class HTTP2ServerConnection extends HTTP2Connection
LOG.debug("Processing {} on {}", frame, stream); LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream); HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onRequest(frame); Runnable task = channel.onRequest(frame);
offerTask(task); offerTask(task, false);
} }
public void onPush(Connector connector, IStream stream, MetaData.Request request) public void push(Connector connector, IStream stream, MetaData.Request request)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Processing push {} on {}", request, stream); LOG.debug("Processing push {} on {}", request, stream);
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream); HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onPushRequest(request); Runnable task = channel.onPushRequest(request);
offerTask(task); offerTask(task, true);
} }
private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream) private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
@ -97,20 +97,27 @@ public class HTTP2ServerConnection extends HTTP2Connection
{ {
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, this); HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, this);
transport.setStream(stream); transport.setStream(stream);
channel = new HttpChannelOverHTTP2(connector, httpConfig, getEndPoint(), transport) channel = new ServerHttpChannelOverHTTP2(connector, httpConfig, getEndPoint(), transport);
{
@Override
public void onCompleted()
{
super.onCompleted();
recycle();
channels.offer(this);
}
};
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Creating channel {} for {}", channel, this); LOG.debug("Creating channel {} for {}", channel, this);
} }
stream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel); stream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
return channel; return channel;
} }
private class ServerHttpChannelOverHTTP2 extends HttpChannelOverHTTP2
{
public ServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{
super(connector, configuration, endPoint, transport);
}
@Override
public void onCompleted()
{
super.onCompleted();
recycle();
channels.offer(this);
}
}
} }

View File

@ -100,6 +100,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
{ {
onRequest(request); onRequest(request);
getRequest().setAttribute("org.eclipse.jetty.pushed", Boolean.TRUE); getRequest().setAttribute("org.eclipse.jetty.pushed", Boolean.TRUE);
onRequestComplete();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {

View File

@ -151,7 +151,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
@Override @Override
public void succeeded(Stream pushStream) public void succeeded(Stream pushStream)
{ {
connection.onPush(connector, (IStream)pushStream, request); connection.push(connector, (IStream)pushStream, request);
} }
@Override @Override

View File

@ -36,7 +36,20 @@ import org.eclipse.jetty.util.thread.strategy.ExecuteProduceRun;
public interface ExecutionStrategy public interface ExecutionStrategy
{ {
/** /**
* Initiates (or resumes) the task production and execution. * <p>Initiates (or resumes) the task production and execution.</p>
* <p>This method guarantees that the task is never run by the
* thread that called this method.</p>
*
* @see #execute()
*/
public void dispatch();
/**
* <p>Initiates (or resumes) the task production and execution.</p>
* <p>The produced task may be run by the same thread that called
* this method.</p>
*
* @see #dispatch()
*/ */
public void execute(); public void execute();

View File

@ -19,8 +19,6 @@
package org.eclipse.jetty.util.thread.strategy; package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -47,6 +45,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
{ {
private static final Logger LOG = Log.getLogger(ExecuteProduceRun.class); private static final Logger LOG = Log.getLogger(ExecuteProduceRun.class);
private final SpinLock _lock = new SpinLock(); private final SpinLock _lock = new SpinLock();
private final Runnable _resumer = new Resumer();
private final Producer _producer; private final Producer _producer;
private final Executor _executor; private final Executor _executor;
private boolean _idle=true; private boolean _idle=true;
@ -90,9 +89,31 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
if (produce) if (produce)
produceAndRun(); produceAndRun();
} }
@Override
public void dispatch()
{
if (LOG.isDebugEnabled())
LOG.debug("{} spawning",this);
boolean dispatch=false;
try (Lock locked = _lock.lock())
{
if (_idle)
dispatch=true;
else
_execute=true;
}
if (dispatch)
_executor.execute(this);
}
@Override @Override
public void run() public void run()
{
execute();
}
private void resume()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} run",this); LOG.debug("{} run",this);
@ -105,7 +126,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
produce=_producing=true; produce=_producing=true;
} }
} }
if (produce) if (produce)
produceAndRun(); produceAndRun();
} }
@ -115,7 +136,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} produce enter",this); LOG.debug("{} produce enter",this);
loop: while (true) while (true)
{ {
// If we got here, then we are the thread that is producing. // If we got here, then we are the thread that is producing.
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -141,12 +162,12 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
_idle=false; _idle=false;
_producing=true; _producing=true;
_execute=false; _execute=false;
continue loop; continue;
} }
// ... and no additional calls to execute, so we are idle // ... and no additional calls to execute, so we are idle
_idle=true; _idle=true;
break loop; break;
} }
// We have a task, which we will run ourselves, // We have a task, which we will run ourselves,
@ -166,7 +187,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
// Spawn a new thread to continue production by running the produce loop. // Spawn a new thread to continue production by running the produce loop.
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} dispatch",this); LOG.debug("{} dispatch",this);
_executor.execute(this); _executor.execute(_resumer);
} }
// Run the task. // Run the task.
@ -181,7 +202,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
{ {
// Is another thread already producing or we are now idle? // Is another thread already producing or we are now idle?
if (_producing || _idle) if (_producing || _idle)
break loop; break;
_producing=true; _producing=true;
} }
} }
@ -202,7 +223,6 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
{ {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("EPR "); builder.append("EPR ");
builder.append(" ");
try (Lock locked = _lock.lock()) try (Lock locked = _lock.lock())
{ {
builder.append(_idle?"Idle/":""); builder.append(_idle?"Idle/":"");
@ -213,4 +233,13 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
builder.append(_producer); builder.append(_producer);
return builder.toString(); return builder.toString();
} }
}
private class Resumer implements Runnable
{
@Override
public void run()
{
resume();
}
}
}

View File

@ -58,4 +58,10 @@ public class ProduceExecuteRun implements ExecutionStrategy
_executor.execute(task); _executor.execute(task);
} }
} }
}
@Override
public void dispatch()
{
execute();
}
}

View File

@ -26,13 +26,15 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
* <p>A strategy where the caller thread iterates over task production, submitting each * <p>A strategy where the caller thread iterates over task production, submitting each
* task to an {@link Executor} for execution.</p> * task to an {@link Executor} for execution.</p>
*/ */
public class ProduceRun implements ExecutionStrategy public class ProduceRun implements ExecutionStrategy, Runnable
{ {
private final Producer _producer; private final Producer _producer;
private final Executor _executor;
public ProduceRun(Producer producer) public ProduceRun(Producer producer, Executor executor)
{ {
this._producer = producer; this._producer = producer;
this._executor = executor;
} }
@Override @Override
@ -51,4 +53,16 @@ public class ProduceRun implements ExecutionStrategy
task.run(); task.run();
} }
} }
}
@Override
public void dispatch()
{
_executor.execute(this);
}
@Override
public void run()
{
execute();
}
}