Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2016-08-12 14:57:47 +02:00
commit 6f20feddcb
15 changed files with 539 additions and 79 deletions

View File

@ -87,7 +87,7 @@ public class AbstractTest
server.addConnector(connector);
}
private void prepareClient()
protected void prepareClient()
{
client = new HTTP2Client();
QueuedThreadPool clientExecutor = new QueuedThreadPool();

View File

@ -0,0 +1,349 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
public class AsyncServletTest extends AbstractTest
{
@Test
public void testStartAsyncThenDispatch() throws Exception
{
byte[] content = new byte[1024];
new Random().nextBytes(content);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = (AsyncContext)request.getAttribute(AsyncContext.class.getName());
if (asyncContext == null)
{
AsyncContext context = request.startAsync();
context.setTimeout(0);
request.setAttribute(AsyncContext.class.getName(), context);
context.start(() ->
{
sleep(1000);
context.dispatch();
});
}
else
{
response.getOutputStream().write(content);
}
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
try
{
buffer.write(BufferUtil.toArray(frame.getData()));
callback.succeeded();
if (frame.isEndStream())
latch.countDown();
}
catch (IOException x)
{
callback.failed(x);
}
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertArrayEquals(content, buffer.toByteArray());
}
@Test
public void testStartAsyncThenClientSessionIdleTimeout() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new AsyncOnErrorServlet(serverLatch));
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch clientLatch = new CountDownLatch(1);
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500 && frame.isEndStream())
clientLatch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
stream.setIdleTimeout(10 * idleTimeout);
// When the client closes, the server receives the
// corresponding frame and acts by notifying the failure,
// which sends back to the client the error response.
Assert.assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(clientLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testStartAsyncThenClientStreamIdleTimeout() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new AsyncOnErrorServlet(serverLatch));
long idleTimeout = 1000;
client.setIdleTimeout(10 * idleTimeout);
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch clientLatch = new CountDownLatch(1);
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
clientLatch.countDown();
return true;
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
stream.setIdleTimeout(idleTimeout);
// When the client resets, the server receives the
// corresponding frame and acts by notifying the failure,
// but the response is not sent back to the client.
Assert.assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(clientLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testStartAsyncThenServerSessionIdleTimeout() throws Exception
{
testStartAsyncThenServerIdleTimeout(1000, 10 * 1000);
}
@Test
public void testStartAsyncThenServerStreamIdleTimeout() throws Exception
{
testStartAsyncThenServerIdleTimeout(10 * 1000, 1000);
}
private void testStartAsyncThenServerIdleTimeout(long sessionTimeout, long streamTimeout) throws Exception
{
prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration())
{
@Override
protected ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint)
{
return new HTTPServerSessionListener(connector, endPoint)
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.setIdleTimeout(streamTimeout);
return super.onNewStream(stream, frame);
}
};
}
});
connector.setIdleTimeout(sessionTimeout);
ServletContextHandler context = new ServletContextHandler(server, "/");
long timeout = Math.min(sessionTimeout, streamTimeout);
CountDownLatch errorLatch = new CountDownLatch(1);
context.addServlet(new ServletHolder(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = (AsyncContext)request.getAttribute(AsyncContext.class.getName());
if (asyncContext == null)
{
AsyncContext context = request.startAsync();
context.setTimeout(2 * timeout);
request.setAttribute(AsyncContext.class.getName(), context);
context.addListener(new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event) throws IOException
{
}
@Override
public void onTimeout(AsyncEvent event) throws IOException
{
event.getAsyncContext().complete();
}
@Override
public void onError(AsyncEvent event) throws IOException
{
errorLatch.countDown();
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
}
});
}
else
{
throw new ServletException();
}
}
}), servletPath + "/*");
server.start();
prepareClient();
client.start();
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
CountDownLatch clientLatch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (response.getStatus() == HttpStatus.OK_200 && frame.isEndStream())
clientLatch.countDown();
}
});
// When the server idle times out, but the request has been dispatched
// then the server must ignore the idle timeout as per Servlet semantic.
Assert.assertFalse(errorLatch.await(2 * timeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(clientLatch.await(2 * timeout, TimeUnit.MILLISECONDS));
}
private void sleep(long ms)
{
try
{
Thread.sleep(ms);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
private static class AsyncOnErrorServlet extends HttpServlet implements AsyncListener
{
private final CountDownLatch latch;
public AsyncOnErrorServlet(CountDownLatch latch)
{
this.latch = latch;
}
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = (AsyncContext)request.getAttribute(AsyncContext.class.getName());
if (asyncContext == null)
{
AsyncContext context = request.startAsync();
context.setTimeout(0);
request.setAttribute(AsyncContext.class.getName(), context);
context.addListener(this);
}
else
{
throw new ServletException();
}
}
@Override
public void onComplete(AsyncEvent event) throws IOException
{
}
@Override
public void onTimeout(AsyncEvent event) throws IOException
{
}
@Override
public void onError(AsyncEvent event) throws IOException
{
HttpServletResponse response = (HttpServletResponse)event.getSuppliedResponse();
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
event.getAsyncContext().complete();
latch.countDown();
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
}
}
}

View File

@ -79,8 +79,7 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (session.isClosed() && ((HTTP2Session)session).isDisconnected())
latch.countDown();
latch.countDown();
}
});
@ -118,8 +117,7 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (session.isClosed() && ((HTTP2Session)session).isDisconnected())
latch.countDown();
latch.countDown();
}
});
@ -214,8 +212,7 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (session.isClosed() && ((HTTP2Session)session).isDisconnected())
closeLatch.countDown();
closeLatch.countDown();
}
});
client.setIdleTimeout(idleTimeout);
@ -252,8 +249,7 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (session.isClosed() && ((HTTP2Session)session).isDisconnected())
closeLatch.countDown();
closeLatch.countDown();
}
});
client.setIdleTimeout(idleTimeout);
@ -362,10 +358,11 @@ public class IdleTimeoutTest extends AbstractTest
}
@Override
public void onTimeout(Stream stream, Throwable x)
public boolean onIdleTimeout(Stream stream, Throwable x)
{
Assert.assertThat(x, Matchers.instanceOf(TimeoutException.class));
timeoutLatch.countDown();
return true;
}
});
@ -392,9 +389,10 @@ public class IdleTimeoutTest extends AbstractTest
return new Stream.Listener.Adapter()
{
@Override
public void onTimeout(Stream stream, Throwable x)
public boolean onIdleTimeout(Stream stream, Throwable x)
{
timeoutLatch.countDown();
return true;
}
};
}
@ -436,9 +434,10 @@ public class IdleTimeoutTest extends AbstractTest
return new Stream.Listener.Adapter()
{
@Override
public void onTimeout(Stream stream, Throwable x)
public boolean onIdleTimeout(Stream stream, Throwable x)
{
timeoutLatch.countDown();
return true;
}
};
}

View File

@ -134,10 +134,13 @@ public class HTTP2Connection extends AbstractConnection
@Override
public boolean onIdleExpired()
{
boolean close = session.onIdleTimeout();
boolean idle = isFillInterested();
if (close && idle)
session.close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
if (idle)
{
boolean close = session.onIdleTimeout();
if (close)
session.close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
}
return false;
}

View File

@ -419,20 +419,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
control(null, new Callback()
{
@Override
public void succeeded()
{
notifyClose(HTTP2Session.this, frame);
}
@Override
public void failed(Throwable x)
{
notifyClose(HTTP2Session.this, frame);
}
}, new DisconnectFrame());
notifyClose(this, frame);
control(null, Callback.NOOP, new DisconnectFrame());
return;
}
break;

View File

@ -174,15 +174,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getIdleTimeout(), this);
// The stream is now gone, we must close it to
// avoid that its idle timeout is rescheduled.
close();
// Tell the other peer that we timed out.
reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
// Notify the application.
notifyTimeout(this, timeout);
if (notifyIdleTimeout(this, timeout))
{
// Tell the other peer that we timed out.
reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}
}
private ConcurrentMap<String, Object> attributes()
@ -425,18 +422,19 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
}
}
private void notifyTimeout(Stream stream, Throwable failure)
private boolean notifyIdleTimeout(Stream stream, Throwable failure)
{
Listener listener = this.listener;
if (listener == null)
return;
return true;
try
{
listener.onTimeout(stream, failure);
return listener.onIdleTimeout(stream, failure);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return true;
}
}

View File

@ -125,7 +125,7 @@ public interface Stream
/**
* @param idleTimeout the stream idle timeout
* @see #getIdleTimeout()
* @see Stream.Listener#onTimeout(Stream, Throwable)
* @see Stream.Listener#onIdleTimeout(Stream, Throwable)
*/
public void setIdleTimeout(long idleTimeout);
@ -150,7 +150,7 @@ public interface Stream
*
* @param stream the stream
* @param frame the PUSH_PROMISE frame received
* @return a {@link Stream.Listener} that will be notified of pushed stream events
* @return a Stream.Listener that will be notified of pushed stream events
*/
public Listener onPush(Stream stream, PushPromiseFrame frame);
@ -178,8 +178,9 @@ public interface Stream
* @param stream the stream
* @param x the timeout failure
* @see #getIdleTimeout()
* @return true to reset the stream, false to ignore the idle timeout
*/
public void onTimeout(Stream stream, Throwable x);
public boolean onIdleTimeout(Stream stream, Throwable x);
/**
* <p>Empty implementation of {@link Listener}</p>
@ -209,8 +210,9 @@ public interface Stream
}
@Override
public void onTimeout(Stream stream, Throwable x)
public boolean onIdleTimeout(Stream stream, Throwable x)
{
return true;
}
}
}

View File

@ -18,10 +18,9 @@
package org.eclipse.jetty.http2.frames;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.util.BufferUtil;
public class GoAwayFrame extends Frame
{
@ -54,16 +53,15 @@ public class GoAwayFrame extends Frame
public String tryConvertPayload()
{
if (payload == null)
if (payload == null || payload.length == 0)
return "";
ByteBuffer buffer = BufferUtil.toBuffer(payload);
try
{
return BufferUtil.toUTF8String(buffer);
return new String(payload, StandardCharsets.UTF_8);
}
catch (Throwable x)
{
return BufferUtil.toDetailString(buffer);
return "";
}
}

View File

@ -132,9 +132,10 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
}
@Override
public void onTimeout(Stream stream, Throwable failure)
public boolean onIdleTimeout(Stream stream, Throwable x)
{
responseFailure(failure);
responseFailure(x);
return true;
}
private class ContentNotifier extends IteratingCallback

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
@ -145,11 +146,51 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
Runnable task = channel.requestContent(frame, callback);
Runnable task = channel.onRequestContent(frame, callback);
if (task != null)
offerTask(task, false);
}
public boolean onStreamTimeout(IStream stream, Throwable failure)
{
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
boolean result = !channel.isRequestHandled();
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", stream, failure);
return result;
}
public void onStreamFailure(IStream stream, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", stream, failure);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.onFailure(failure);
}
public boolean onSessionTimeout(Throwable failure)
{
ISession session = getSession();
boolean result = true;
for (Stream stream : session.getStreams())
{
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
result &= !channel.isRequestHandled();
}
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", session, failure);
return result;
}
public void onSessionFailure(Throwable failure)
{
ISession session = getSession();
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", session, failure);
for (Stream stream : session.getStreams())
onStreamFailure((IStream)stream, failure);
}
public void push(Connector connector, IStream stream, MetaData.Request request)
{
if (LOG.isDebugEnabled())

View File

@ -18,8 +18,10 @@
package org.eclipse.jetty.http2.server;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Cipher;
@ -28,6 +30,7 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
@ -71,7 +74,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
return acceptable;
}
private class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener
protected class HTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener
{
private final Connector connector;
private final EndPoint endPoint;
@ -82,7 +85,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
this.endPoint = endPoint;
}
private HTTP2ServerConnection getConnection()
protected HTTP2ServerConnection getConnection()
{
return (HTTP2ServerConnection)endPoint.getConnection();
}
@ -104,7 +107,30 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
getConnection().onNewStream(connector, (IStream)stream, frame);
return frame.isEndStream() ? null : this;
return this;
}
@Override
public boolean onIdleTimeout(Session session)
{
boolean close = super.onIdleTimeout(session);
if (!close)
return false;
long idleTimeout = getConnection().getEndPoint().getIdleTimeout();
return getConnection().onSessionTimeout(new TimeoutException("Session idle timeout " + idleTimeout + " ms"));
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
ErrorCode error = ErrorCode.from(frame.getError());
if (error == null)
error = ErrorCode.STREAM_CLOSED_ERROR;
String reason = frame.tryConvertPayload();
if (reason != null && !reason.isEmpty())
reason = " (" + reason + ")";
getConnection().onSessionFailure(new IOException("HTTP/2 " + error + reason));
}
@Override
@ -131,20 +157,21 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
@Override
public void onReset(Stream stream, ResetFrame frame)
{
// TODO:
ErrorCode error = ErrorCode.from(frame.getError());
if (error == null)
error = ErrorCode.CANCEL_STREAM_ERROR;
getConnection().onStreamFailure((IStream)stream, new IOException("HTTP/2 " + error));
}
@Override
public void onTimeout(Stream stream, Throwable x)
public boolean onIdleTimeout(Stream stream, Throwable x)
{
// TODO
return getConnection().onStreamTimeout((IStream)stream, x);
}
private void close(Stream stream, String reason)
{
final Session session = stream.getSession();
session.close(ErrorCode.PROTOCOL_ERROR.code, reason, Callback.NOOP);
stream.getSession().close(ErrorCode.PROTOCOL_ERROR.code, reason, Callback.NOOP);
}
}
}

View File

@ -54,6 +54,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
private boolean _expect100Continue;
private boolean _delayedUntilContent;
private boolean _handled;
public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{
@ -106,6 +107,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
_delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
!endStream && !_expect100Continue;
_handled = !_delayedUntilContent;
if (LOG.isDebugEnabled())
{
@ -173,6 +175,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
{
_expect100Continue = false;
_delayedUntilContent = false;
_handled = false;
super.recycle();
getHttpTransport().recycle();
}
@ -190,7 +193,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
}
public Runnable requestContent(DataFrame frame, final Callback callback)
public Runnable onRequestContent(DataFrame frame, final Callback callback)
{
Stream stream = getStream();
if (stream.isReset())
@ -255,10 +258,22 @@ public class HttpChannelOverHTTP2 extends HttpChannel
boolean delayed = _delayedUntilContent;
_delayedUntilContent = false;
if (delayed)
_handled = true;
return handle || delayed ? this : null;
}
public boolean isRequestHandled()
{
return _handled;
}
public void onFailure(Throwable failure)
{
onEarlyEOF();
getState().asyncError(failure);
}
protected void consumeInput()
{
getRequest().getHttpInput().consumeAll();

View File

@ -126,11 +126,6 @@ public class AsyncContextEvent extends AsyncEvent implements Runnable
return _throwable;
}
// public void setThrowable(Throwable throwable)
// {
// _throwable=throwable;
// }
public void setDispatchContext(ServletContext context)
{
_dispatchContext=context;
@ -165,8 +160,7 @@ public class AsyncContextEvent extends AsyncEvent implements Runnable
{
if (_throwable==null)
_throwable=e;
else
else if (_throwable != e)
_throwable.addSuppressed(e);
}
}

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.server;
import static javax.servlet.RequestDispatcher.ERROR_EXCEPTION;
import static javax.servlet.RequestDispatcher.ERROR_STATUS_CODE;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -42,7 +39,6 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler;
@ -54,6 +50,9 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import static javax.servlet.RequestDispatcher.ERROR_EXCEPTION;
import static javax.servlet.RequestDispatcher.ERROR_STATUS_CODE;
/**
* HttpChannel represents a single endpoint for HTTP semantic processing.
@ -376,6 +375,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
break;
}
case ASYNC_ERROR:
{
throw _state.getAsyncContextEvent().getThrowable();
}
case READ_CALLBACK:
{
ContextHandler handler=_state.getContextHandler();

View File

@ -62,6 +62,7 @@ public class HttpChannelState
ASYNC_WAIT, // Suspended and waiting
ASYNC_WOKEN, // Dispatch to handle from ASYNC_WAIT
ASYNC_IO, // Dispatched for async IO
ASYNC_ERROR, // Async error from ASYNC_WAIT
COMPLETING, // Response is completable
COMPLETED, // Response is completed
UPGRADED // Request upgraded the connection
@ -75,6 +76,7 @@ public class HttpChannelState
DISPATCH, // handle a normal request dispatch
ASYNC_DISPATCH, // handle an async request dispatch
ERROR_DISPATCH, // handle a normal error
ASYNC_ERROR, // handle an async error
WRITE_CALLBACK, // handle an IO write callback
READ_CALLBACK, // handle an IO read callback
COMPLETE, // Complete the response
@ -253,6 +255,9 @@ public class HttpChannelState
return Action.WAIT;
case ASYNC_ERROR:
return Action.ASYNC_ERROR;
case ASYNC_IO:
case ASYNC_WAIT:
case DISPATCHED:
@ -313,6 +318,45 @@ public class HttpChannelState
}
public void asyncError(Throwable failure)
{
AsyncContextEvent event = null;
try (Locker.Lock lock= _locker.lock())
{
switch (_state)
{
case IDLE:
case DISPATCHED:
case COMPLETING:
case COMPLETED:
case UPGRADED:
case ASYNC_IO:
case ASYNC_WOKEN:
case ASYNC_ERROR:
{
break;
}
case ASYNC_WAIT:
{
_event.addThrowable(failure);
_state=State.ASYNC_ERROR;
event=_event;
break;
}
default:
{
throw new IllegalStateException(getStatusStringLocked());
}
}
}
if (event != null)
{
cancelTimeout(event);
runInContext(event, _channel);
}
}
/**
* Signal that the HttpConnection has finished handling the request.
* For blocking connectors,this call may block if the request has
@ -323,8 +367,6 @@ public class HttpChannelState
protected Action unhandle()
{
Action action;
boolean read_interested=false;
try(Locker.Lock lock= _locker.lock())
{
if(DEBUG)
@ -342,6 +384,7 @@ public class HttpChannelState
case DISPATCHED:
case ASYNC_IO:
case ASYNC_ERROR:
break;
default:
@ -661,8 +704,6 @@ public class HttpChannelState
// Set error on request.
if(_event!=null)
{
if (_event.getThrowable()!=null)
throw new IllegalStateException("Error already set",_event.getThrowable());
_event.addThrowable(failure);
_event.getSuppliedRequest().setAttribute(ERROR_STATUS_CODE,code);
_event.getSuppliedRequest().setAttribute(ERROR_EXCEPTION,failure);