Merged branch 'jetty-9.4.x' into 'master'.
This commit is contained in:
commit
3cbcb380a6
|
@ -67,7 +67,6 @@ import org.eclipse.jetty.util.Callback;
|
|||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
|
@ -439,7 +438,6 @@ public class StreamResetTest extends AbstractTest
|
|||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
Log.getLogger(StreamResetTest.class).info("SIMON: uri={}", request.getRequestURI());
|
||||
phaser.get().countDown();
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
}
|
||||
|
@ -466,7 +464,6 @@ public class StreamResetTest extends AbstractTest
|
|||
@Override
|
||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
Log.getLogger(StreamResetTest.class).info("SIMON: response={}/{}", stream.getId(), frame.getMetaData());
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
if (response.getStatus() == HttpStatus.OK_200)
|
||||
latch.get().countDown();
|
||||
|
@ -475,7 +472,6 @@ public class StreamResetTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
Log.getLogger(StreamResetTest.class).info("SIMON: data={}/{}", stream.getId(), frame);
|
||||
callback.succeeded();
|
||||
if (frame.isEndStream())
|
||||
latch.get().countDown();
|
||||
|
@ -544,7 +540,6 @@ public class StreamResetTest extends AbstractTest
|
|||
|
||||
Session client = newClient(new Session.Listener.Adapter());
|
||||
|
||||
Log.getLogger(HttpChannel.class).info("Expecting java.lang.IllegalStateException: explictly_thrown_by_test");
|
||||
MetaData.Request request = newRequest("GET", new HttpFields());
|
||||
HeadersFrame frame = new HeadersFrame(request, null, false);
|
||||
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||
|
|
|
@ -199,7 +199,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
|||
public boolean onStreamTimeout(IStream stream, Throwable failure)
|
||||
{
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
boolean result = channel != null && channel.onStreamTimeout(failure);
|
||||
boolean result = channel != null && channel.onStreamTimeout(failure, task -> offerTask(task, true));
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
|
||||
return result;
|
||||
|
@ -211,7 +211,11 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
|||
LOG.debug("Processing failure on {}: {}", stream, failure);
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
if (channel != null)
|
||||
channel.onFailure(failure);
|
||||
{
|
||||
Runnable task = channel.onFailure(failure);
|
||||
if (task != null)
|
||||
offerTask(task, true);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean onSessionTimeout(Throwable failure)
|
||||
|
@ -222,7 +226,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
|||
{
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
if (channel != null)
|
||||
result &= !channel.isRequestExecuting();
|
||||
result &= channel.isRequestIdle();
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.eclipse.jetty.http2.server;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.http.BadMessageException;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
|
@ -300,40 +301,39 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
|
|||
return handle || wasDelayed ? this : null;
|
||||
}
|
||||
|
||||
public boolean isRequestExecuting()
|
||||
public boolean isRequestIdle()
|
||||
{
|
||||
return !getState().isIdle();
|
||||
return getState().isIdle();
|
||||
}
|
||||
|
||||
public boolean onStreamTimeout(Throwable failure)
|
||||
public boolean onStreamTimeout(Throwable failure, Consumer<Runnable> consumer)
|
||||
{
|
||||
boolean result = false;
|
||||
if (isRequestIdle())
|
||||
{
|
||||
consumeInput();
|
||||
result = true;
|
||||
}
|
||||
|
||||
getHttpTransport().onStreamTimeout(failure);
|
||||
if (getRequest().getHttpInput().onIdleTimeout(failure))
|
||||
handle();
|
||||
consumer.accept(this::handleWithContext);
|
||||
|
||||
if (isRequestExecuting())
|
||||
return false;
|
||||
|
||||
consumeInput();
|
||||
return true;
|
||||
return result;
|
||||
}
|
||||
|
||||
public void onFailure(Throwable failure)
|
||||
public Runnable onFailure(Throwable failure)
|
||||
{
|
||||
getHttpTransport().onStreamFailure(failure);
|
||||
if (getRequest().getHttpInput().failed(failure))
|
||||
{
|
||||
ContextHandler handler = getState().getContextHandler();
|
||||
if (handler != null)
|
||||
handler.handle(getRequest(), this);
|
||||
else
|
||||
handle();
|
||||
}
|
||||
else
|
||||
{
|
||||
getState().asyncError(failure);
|
||||
}
|
||||
boolean handle = getRequest().getHttpInput().failed(failure);
|
||||
consumeInput();
|
||||
return () ->
|
||||
{
|
||||
if (handle)
|
||||
handleWithContext();
|
||||
else
|
||||
getState().asyncError(failure);
|
||||
};
|
||||
}
|
||||
|
||||
protected void consumeInput()
|
||||
|
@ -341,6 +341,15 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
|
|||
getRequest().getHttpInput().consumeAll();
|
||||
}
|
||||
|
||||
private void handleWithContext()
|
||||
{
|
||||
ContextHandler context = getState().getContextHandler();
|
||||
if (context != null)
|
||||
context.handle(getRequest(), this);
|
||||
else
|
||||
handle();
|
||||
}
|
||||
|
||||
/**
|
||||
* If the associated response has the Expect header set to 100 Continue,
|
||||
* then accessing the input stream indicates that the handler/servlet
|
||||
|
|
|
@ -325,14 +325,16 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
synchronized (this)
|
||||
{
|
||||
commit = this.commit;
|
||||
// Only fail pending writes, as we
|
||||
// may need to write an error page.
|
||||
State state = this.state;
|
||||
this.state = State.FAILED;
|
||||
if (this.failure == null)
|
||||
this.failure = failure;
|
||||
else
|
||||
this.failure.addSuppressed(failure);
|
||||
if (state == State.WRITING)
|
||||
{
|
||||
this.state = State.FAILED;
|
||||
callback = this.callback;
|
||||
this.callback = null;
|
||||
this.failure = failure;
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
|
|
|
@ -165,6 +165,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
_contentConsumed = 0;
|
||||
_firstByteTimeStamp = -1;
|
||||
_blockUntil = 0;
|
||||
_waitingForContent = false;
|
||||
if (_interceptor instanceof Destroyable)
|
||||
((Destroyable)_interceptor).destroy();
|
||||
_interceptor = null;
|
||||
|
|
|
@ -694,7 +694,7 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(2 * idleTimeout);
|
||||
Thread.sleep(idleTimeout + idleTimeout / 2);
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
|
@ -729,7 +729,7 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
});
|
||||
|
||||
// Wait for the server application to block reading.
|
||||
Thread.sleep(3 * idleTimeout);
|
||||
Thread.sleep(2 * idleTimeout);
|
||||
content.offer(ByteBuffer.wrap(data2));
|
||||
content.close();
|
||||
|
||||
|
|
Loading…
Reference in New Issue