Merge branch 'jetty-9.3.x' of github.com:eclipse/jetty.project into jetty-9.3.x

This commit is contained in:
Greg Wilkins 2017-08-30 16:38:09 +10:00
commit 165ca36f6b
11 changed files with 213 additions and 48 deletions

View File

@ -66,7 +66,6 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -428,7 +427,6 @@ public class StreamResetTest extends AbstractTest
@Override @Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{ {
Log.getLogger(StreamResetTest.class).info("SIMON: uri={}", request.getRequestURI());
phaser.get().countDown(); phaser.get().countDown();
IO.copy(request.getInputStream(), response.getOutputStream()); IO.copy(request.getInputStream(), response.getOutputStream());
} }
@ -455,7 +453,6 @@ public class StreamResetTest extends AbstractTest
@Override @Override
public void onHeaders(Stream stream, HeadersFrame frame) public void onHeaders(Stream stream, HeadersFrame frame)
{ {
Log.getLogger(StreamResetTest.class).info("SIMON: response={}/{}", stream.getId(), frame.getMetaData());
MetaData.Response response = (MetaData.Response)frame.getMetaData(); MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (response.getStatus() == HttpStatus.OK_200) if (response.getStatus() == HttpStatus.OK_200)
latch.get().countDown(); latch.get().countDown();
@ -464,7 +461,6 @@ public class StreamResetTest extends AbstractTest
@Override @Override
public void onData(Stream stream, DataFrame frame, Callback callback) public void onData(Stream stream, DataFrame frame, Callback callback)
{ {
Log.getLogger(StreamResetTest.class).info("SIMON: data={}/{}", stream.getId(), frame);
callback.succeeded(); callback.succeeded();
if (frame.isEndStream()) if (frame.isEndStream())
latch.get().countDown(); latch.get().countDown();

View File

@ -175,6 +175,7 @@ public class HTTP2Connection extends AbstractConnection
{ {
private final Callback fillCallback = new FillCallback(); private final Callback fillCallback = new FillCallback();
private ByteBuffer buffer; private ByteBuffer buffer;
private boolean shutdown;
@Override @Override
public Runnable produce() public Runnable produce()
@ -185,7 +186,7 @@ public class HTTP2Connection extends AbstractConnection
if (task != null) if (task != null)
return task; return task;
if (isFillInterested()) if (isFillInterested() || shutdown)
return null; return null;
if (buffer == null) if (buffer == null)
@ -221,6 +222,7 @@ public class HTTP2Connection extends AbstractConnection
else if (filled < 0) else if (filled < 0)
{ {
release(); release();
shutdown = true;
session.onShutdown(); session.onShutdown();
return null; return null;
} }

View File

@ -421,8 +421,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{ {
// We received a GO_AWAY, so try to write // We received a GO_AWAY, so try to write
// what's in the queue and then disconnect. // what's in the queue and then disconnect.
notifyClose(this, frame); notifyClose(this, frame, new DisconnectCallback());
control(null, Callback.NOOP, new DisconnectFrame());
return; return;
} }
break; break;
@ -462,8 +461,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override @Override
public void onConnectionFailure(int error, String reason) public void onConnectionFailure(int error, String reason)
{ {
notifyFailure(this, new IOException(String.format("%d/%s", error, reason))); notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason));
close(error, reason, Callback.NOOP);
} }
@Override @Override
@ -991,8 +989,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void abort(Throwable failure) protected void abort(Throwable failure)
{ {
notifyFailure(this, failure); notifyFailure(this, failure, new TerminateCallback(failure));
terminate(failure);
} }
public boolean isDisconnected() public boolean isDisconnected()
@ -1054,11 +1051,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
protected void notifyClose(Session session, GoAwayFrame frame) protected void notifyClose(Session session, GoAwayFrame frame, Callback callback)
{ {
try try
{ {
listener.onClose(session, frame); listener.onClose(session, frame, callback);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -1079,11 +1076,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
protected void notifyFailure(Session session, Throwable failure) protected void notifyFailure(Session session, Throwable failure, Callback callback)
{ {
try try
{ {
listener.onFailure(session, failure); listener.onFailure(session, failure, callback);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -1322,4 +1319,81 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
promise.failed(x); promise.failed(x);
} }
} }
private class CloseCallback implements Callback.NonBlocking
{
private final int error;
private final String reason;
private CloseCallback(int error, String reason)
{
this.error = error;
this.reason = reason;
}
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
complete();
}
private void complete()
{
close(error, reason, Callback.NOOP);
}
}
private class DisconnectCallback implements Callback.NonBlocking
{
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
complete();
}
private void complete()
{
control(null, Callback.NOOP, new DisconnectFrame());
}
}
private class TerminateCallback implements Callback.NonBlocking
{
private final Throwable failure;
private TerminateCallback(Throwable failure)
{
this.failure = failure;
}
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
failure.addSuppressed(x);
complete();
}
private void complete()
{
terminate(failure);
}
}
} }

View File

@ -195,9 +195,23 @@ public interface Session
/** /**
* <p>Callback method invoked when a GOAWAY frame has been received.</p> * <p>Callback method invoked when a GOAWAY frame has been received.</p>
* *
* @param session the session * @param session the session
* @param frame the GOAWAY frame received * @param frame the GOAWAY frame received
* @param callback the callback to notify of the GOAWAY processing
*/ */
public default void onClose(Session session, GoAwayFrame frame, Callback callback)
{
try
{
onClose(session, frame);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
public void onClose(Session session, GoAwayFrame frame); public void onClose(Session session, GoAwayFrame frame);
/** /**
@ -210,9 +224,23 @@ public interface Session
/** /**
* <p>Callback method invoked when a failure has been detected for this session.</p> * <p>Callback method invoked when a failure has been detected for this session.</p>
* *
* @param session the session * @param session the session
* @param failure the failure * @param failure the failure
* @param callback the callback to notify of failure processing
*/ */
public default void onFailure(Session session, Throwable failure, Callback callback)
{
try
{
onFailure(session, failure);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
public void onFailure(Session session, Throwable failure); public void onFailure(Session session, Throwable failure);
/** /**

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -55,6 +56,7 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
@ -156,19 +158,27 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
public boolean onStreamTimeout(IStream stream, Throwable failure) public boolean onStreamTimeout(IStream stream, Throwable failure)
{ {
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
boolean result = channel != null && channel.onStreamTimeout(failure); boolean result = channel != null && channel.onStreamTimeout(failure, task -> offerTask(task, true));
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
return result; return result;
} }
public void onStreamFailure(IStream stream, Throwable failure) public void onStreamFailure(IStream stream, Throwable failure, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", stream, failure); LOG.debug("Processing failure on {}: {}", stream, failure);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null) if (channel != null)
channel.onFailure(failure); {
Runnable task = channel.onFailure(failure, callback);
if (task != null)
offerTask(task, true);
}
else
{
callback.succeeded();
}
} }
public boolean onSessionTimeout(Throwable failure) public boolean onSessionTimeout(Throwable failure)
@ -179,20 +189,29 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
{ {
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null) if (channel != null)
result &= !channel.isRequestExecuting(); result &= channel.isRequestIdle();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);
return result; return result;
} }
public void onSessionFailure(Throwable failure) public void onSessionFailure(Throwable failure, Callback callback)
{ {
ISession session = getSession(); ISession session = getSession();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", session, failure); LOG.debug("Processing failure on {}: {}", session, failure);
for (Stream stream : session.getStreams()) Collection<Stream> streams = session.getStreams();
onStreamFailure((IStream)stream, failure); if (streams.isEmpty())
{
callback.succeeded();
}
else
{
CountingCallback counter = new CountingCallback(callback, streams.size());
for (Stream stream : streams)
onStreamFailure((IStream)stream, failure, counter);
}
} }
public void push(Connector connector, IStream stream, MetaData.Request request) public void push(Connector connector, IStream stream, MetaData.Request request)

View File

@ -123,7 +123,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
} }
@Override @Override
public void onClose(Session session, GoAwayFrame frame) public void onClose(Session session, GoAwayFrame frame, Callback callback)
{ {
ErrorCode error = ErrorCode.from(frame.getError()); ErrorCode error = ErrorCode.from(frame.getError());
if (error == null) if (error == null)
@ -131,13 +131,13 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
String reason = frame.tryConvertPayload(); String reason = frame.tryConvertPayload();
if (reason != null && !reason.isEmpty()) if (reason != null && !reason.isEmpty())
reason = " (" + reason + ")"; reason = " (" + reason + ")";
getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason)); getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason), callback);
} }
@Override @Override
public void onFailure(Session session, Throwable failure) public void onFailure(Session session, Throwable failure, Callback callback)
{ {
getConnection().onSessionFailure(failure); getConnection().onSessionFailure(failure, callback);
} }
@Override @Override
@ -167,7 +167,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
ErrorCode error = ErrorCode.from(frame.getError()); ErrorCode error = ErrorCode.from(frame.getError());
if (error == null) if (error == null)
error = ErrorCode.CANCEL_STREAM_ERROR; error = ErrorCode.CANCEL_STREAM_ERROR;
getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error)); getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP);
} }
@Override @Override

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
@ -40,6 +41,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput; import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -277,32 +279,33 @@ public class HttpChannelOverHTTP2 extends HttpChannel
return handle || wasDelayed ? this : null; return handle || wasDelayed ? this : null;
} }
public boolean isRequestExecuting() public boolean isRequestIdle()
{ {
return !getState().isIdle(); return getState().isIdle();
} }
public boolean onStreamTimeout(Throwable failure) public boolean onStreamTimeout(Throwable failure, Consumer<Runnable> consumer)
{ {
boolean result = false;
if (isRequestIdle())
{
consumeInput();
result = true;
}
getHttpTransport().onStreamTimeout(failure); getHttpTransport().onStreamTimeout(failure);
if (getRequest().getHttpInput().onIdleTimeout(failure)) if (getRequest().getHttpInput().onIdleTimeout(failure))
handle(); consumer.accept(this::handleWithContext);
if (isRequestExecuting()) return result;
return false;
consumeInput();
return true;
} }
public void onFailure(Throwable failure) public Runnable onFailure(Throwable failure, Callback callback)
{ {
getHttpTransport().onStreamFailure(failure); getHttpTransport().onStreamFailure(failure);
if (getRequest().getHttpInput().failed(failure)) boolean handle = getRequest().getHttpInput().failed(failure);
handle();
else
getState().asyncError(failure);
consumeInput(); consumeInput();
return new FailureTask(failure, callback, handle);
} }
protected void consumeInput() protected void consumeInput()
@ -310,6 +313,15 @@ public class HttpChannelOverHTTP2 extends HttpChannel
getRequest().getHttpInput().consumeAll(); getRequest().getHttpInput().consumeAll();
} }
private void handleWithContext()
{
ContextHandler context = getState().getContextHandler();
if (context != null)
context.handle(getRequest(), this);
else
handle();
}
/** /**
* If the associated response has the Expect header set to 100 Continue, * If the associated response has the Expect header set to 100 Continue,
* then accessing the input stream indicates that the handler/servlet * then accessing the input stream indicates that the handler/servlet
@ -348,4 +360,35 @@ public class HttpChannelOverHTTP2 extends HttpChannel
streamId = stream.getId(); streamId = stream.getId();
return String.format("%s#%d", super.toString(), getStream() == null ? -1 : streamId); return String.format("%s#%d", super.toString(), getStream() == null ? -1 : streamId);
} }
private class FailureTask implements Runnable
{
private final Throwable failure;
private final Callback callback;
private final boolean handle;
public FailureTask(Throwable failure, Callback callback, boolean handle)
{
this.failure = failure;
this.callback = callback;
this.handle = handle;
}
@Override
public void run()
{
try
{
if (handle)
handleWithContext();
else
getState().asyncError(failure);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
}
} }

View File

@ -91,6 +91,7 @@ public class HttpInput extends ServletInputStream implements Runnable
_contentConsumed = 0; _contentConsumed = 0;
_firstByteTimeStamp = -1; _firstByteTimeStamp = -1;
_blockUntil = 0; _blockUntil = 0;
_waitingForContent = false;
} }
} }

View File

@ -30,7 +30,7 @@ public interface Callback
* Instance of Adapter that can be used when the callback methods need an empty * Instance of Adapter that can be used when the callback methods need an empty
* implementation without incurring in the cost of allocating a new Adapter object. * implementation without incurring in the cost of allocating a new Adapter object.
*/ */
Callback NOOP = new Callback() Callback NOOP = new Callback.NonBlocking()
{ {
}; };

View File

@ -45,6 +45,8 @@ public class CountingCallback extends Callback.Nested
public CountingCallback(Callback callback, int count) public CountingCallback(Callback callback, int count)
{ {
super(callback); super(callback);
if (count < 1)
throw new IllegalArgumentException();
this.count = new AtomicInteger(count); this.count = new AtomicInteger(count);
} }

View File

@ -694,7 +694,7 @@ public class ServerTimeoutsTest extends AbstractTest
{ {
try try
{ {
Thread.sleep(2 * idleTimeout); Thread.sleep(idleTimeout + idleTimeout / 2);
IO.copy(request.getInputStream(), response.getOutputStream()); IO.copy(request.getInputStream(), response.getOutputStream());
} }
catch (InterruptedException x) catch (InterruptedException x)
@ -729,7 +729,7 @@ public class ServerTimeoutsTest extends AbstractTest
}); });
// Wait for the server application to block reading. // Wait for the server application to block reading.
Thread.sleep(3 * idleTimeout); Thread.sleep(2 * idleTimeout);
content.offer(ByteBuffer.wrap(data2)); content.offer(ByteBuffer.wrap(data2));
content.close(); content.close();