Merged branch 'jetty-9.4.x' into 'master'.

This commit is contained in:
Simone Bordet 2017-10-09 18:17:49 +02:00
commit e2c411ba25
2 changed files with 596 additions and 7 deletions

View File

@ -21,11 +21,15 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.servlet.DispatcherType;
@ -78,6 +82,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
private final Response _response;
private HttpFields _trailers;
private final Supplier<HttpFields> _trailerSupplier = () -> _trailers;
private final List<Listener> _listeners;
private MetaData.Response _committedMetaData;
private RequestLog _requestLog;
private long _oldIdleTimeout;
@ -99,6 +104,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_executor = connector == null ? null : connector.getServer().getThreadPool();
_requestLog = connector == null ? null : connector.getServer().getRequestLog();
List<Listener> listeners = new ArrayList<>();
if (connector != null)
listeners.addAll(connector.getBeans(Listener.class));
_listeners = listeners;
if (LOG.isDebugEnabled())
LOG.debug("new {} -> {},{},{}",
this,
@ -319,6 +329,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
try
{
_request.setDispatcherType(DispatcherType.REQUEST);
notifyRequestBeforeDispatch(_request);
List<HttpConfiguration.Customizer> customizers = _configuration.getCustomizers();
if (!customizers.isEmpty())
@ -336,6 +347,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
}
finally
{
notifyRequestAfterDispatch(_request);
_request.setDispatcherType(null);
}
break;
@ -349,10 +361,12 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
try
{
_request.setDispatcherType(DispatcherType.ASYNC);
notifyRequestBeforeDispatch(_request);
getServer().handleAsync(this);
}
finally
{
notifyRequestAfterDispatch(_request);
_request.setDispatcherType(null);
}
break;
@ -373,10 +387,12 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
try
{
_request.setDispatcherType(DispatcherType.ERROR);
notifyRequestBeforeDispatch(_request);
getServer().handle(this);
}
finally
{
notifyRequestAfterDispatch(_request);
_request.setDispatcherType(null);
}
}
@ -603,6 +619,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
request.setTrailerSupplier(_trailerSupplier);
_request.setMetaData(request);
notifyRequestBegin(_request);
if (LOG.isDebugEnabled())
LOG.debug("REQUEST for {} on {}{}{} {} {}{}{}",request.getURIString(),this,System.lineSeparator(),
request.getMethod(),request.getURIString(),request.getHttpVersion(),System.lineSeparator(),
@ -613,7 +631,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
if (LOG.isDebugEnabled())
LOG.debug("{} onContent {}", this, content);
notifyRequestContent(_request, content.getByteBuffer());
return _request.getHttpInput().addContent(content);
}
@ -621,6 +639,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
if (LOG.isDebugEnabled())
LOG.debug("{} onContentComplete", this);
notifyRequestContentEnd(_request);
return false;
}
@ -629,13 +648,16 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (LOG.isDebugEnabled())
LOG.debug("{} onTrailers {}", this, trailers);
_trailers = trailers;
notifyRequestTrailers(_request);
}
public boolean onRequestComplete()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onRequestComplete", this);
return _request.getHttpInput().eof();
boolean result = _request.getHttpInput().eof();
notifyRequestEnd(_request);
return result;
}
public void onCompleted()
@ -650,6 +672,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (idleTO>=0 && getIdleTimeout()!=_oldIdleTimeout)
setIdleTimeout(_oldIdleTimeout);
notifyComplete(_request);
_transport.onCompleted();
}
@ -663,6 +687,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (status < 400 || status > 599)
status = HttpStatus.BAD_REQUEST_400;
notifyRequestFailure(_request, new BadMessageException(status, reason));
Action action;
try
{
@ -726,7 +752,9 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
// wrap callback to process 100 responses
final int status=info.getStatus();
final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback);
final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback, content, complete);
notifyResponseBegin(_request);
// committing write
_transport.send(info, _request.isHead(), content, complete, committed);
@ -734,7 +762,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
else if (info==null)
{
// This is a normal write
_transport.send(null,_request.isHead(), content, complete, callback);
_transport.send(null,_request.isHead(), content, complete, new ContentCallback(callback, content, complete));
}
else
{
@ -828,14 +856,302 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
*/
public void abort(Throwable failure)
{
notifyResponseFailure(_request, failure);
_transport.abort(failure);
}
private void notifyRequestBegin(Request request)
{
notifyEvent1(listener -> listener::onRequestBegin, request);
}
private void notifyRequestBeforeDispatch(Request request)
{
notifyEvent1(listener -> listener::onRequestBeforeDispatch, request);
}
private void notifyRequestAfterDispatch(Request request)
{
notifyEvent1(listener -> listener::onRequestAfterDispatch, request);
}
private void notifyRequestContent(Request request, ByteBuffer content)
{
notifyEvent2(listener -> listener::onRequestContent, request, content);
}
private void notifyRequestContentEnd(Request request)
{
notifyEvent1(listener -> listener::onRequestContentEnd, request);
}
private void notifyRequestTrailers(Request request)
{
notifyEvent1(listener -> listener::onRequestTrailers, request);
}
private void notifyRequestEnd(Request request)
{
notifyEvent1(listener -> listener::onRequestEnd, request);
}
private void notifyRequestFailure(Request request, Throwable failure)
{
notifyEvent2(listener -> listener::onRequestFailure, request, failure);
}
private void notifyResponseBegin(Request request)
{
notifyEvent1(listener -> listener::onResponseBegin, request);
}
private void notifyResponseCommit(Request request)
{
notifyEvent1(listener -> listener::onResponseCommit, request);
}
private void notifyResponseContent(Request request, ByteBuffer content)
{
notifyEvent2(listener -> listener::onResponseContent, request, content);
}
private void notifyResponseEnd(Request request)
{
notifyEvent1(listener -> listener::onResponseEnd, request);
}
private void notifyResponseFailure(Request request, Throwable failure)
{
notifyEvent2(listener -> listener::onResponseFailure, request, failure);
}
private void notifyComplete(Request request)
{
notifyEvent1(listener -> listener::onComplete, request);
}
private void notifyEvent1(Function<Listener, Consumer<Request>> function, Request request)
{
for (Listener listener : _listeners)
{
try
{
function.apply(listener).accept(request);
}
catch (Throwable x)
{
LOG.debug("Failure invoking listener " + listener, x);
}
}
}
private void notifyEvent2(Function<Listener, BiConsumer<Request, ByteBuffer>> function, Request request, ByteBuffer content)
{
for (Listener listener : _listeners)
{
ByteBuffer view = content.slice();
try
{
function.apply(listener).accept(request, view);
}
catch (Throwable x)
{
LOG.debug("Failure invoking listener " + listener, x);
}
}
}
private void notifyEvent2(Function<Listener, BiConsumer<Request, Throwable>> function, Request request, Throwable failure)
{
for (Listener listener : _listeners)
{
try
{
function.apply(listener).accept(request, failure);
}
catch (Throwable x)
{
LOG.debug("Failure invoking listener " + listener, x);
}
}
}
/**
* <p>Listener for {@link HttpChannel} events.</p>
* <p>HttpChannel will emit events for the various phases it goes through while
* processing a HTTP request and response.</p>
* <p>Implementations of this interface may listen to those events to track
* timing and/or other values such as request URI, etc.</p>
* <p>The events parameters, especially the {@link Request} object, may be
* in a transient state depending on the event, and not all properties/features
* of the parameters may be available inside a listener method.</p>
* <p>It is recommended that the event parameters are <em>not</em> acted upon
* in the listener methods, or undefined behavior may result. For example, it
* would be a bad idea to try to read some content from the
* {@link javax.servlet.ServletInputStream} in listener methods. On the other
* hand, it is legit to store request attributes in one listener method that
* may be possibly retrieved in another listener method in a later event.</p>
* <p>Listener methods are invoked synchronously from the thread that is
* performing the request processing, and they should not call blocking code
* (otherwise the request processing will be blocked as well).</p>
*/
public interface Listener
{
/**
* Invoked just after the HTTP request line has been parsed.
*
* @param request the request object
*/
public default void onRequestBegin(Request request)
{
}
/**
* Invoked just before calling the application.
*
* @param request the request object
*/
public default void onRequestBeforeDispatch(Request request)
{
}
/**
* Invoked just after the application returns from the first invocation.
*
* @param request the request object
*/
public default void onRequestAfterDispatch(Request request)
{
}
/**
* Invoked every time a request content chunk has been parsed, just before
* making it available to the application.
*
* @param request the request object
* @param content a {@link ByteBuffer#slice() slice} of the request content chunk
*/
public default void onRequestContent(Request request, ByteBuffer content)
{
}
/**
* Invoked when the end of the request content is detected.
*
* @param request the request object
*/
public default void onRequestContentEnd(Request request)
{
}
/**
* Invoked when the request trailers have been parsed.
*
* @param request the request object
*/
public default void onRequestTrailers(Request request)
{
}
/**
* Invoked when the request has been fully parsed.
*
* @param request the request object
*/
public default void onRequestEnd(Request request)
{
}
/**
* Invoked when the request processing failed.
*
* @param request the request object
* @param failure the request failure
*/
public default void onRequestFailure(Request request, Throwable failure)
{
}
/**
* Invoked just before the response line is written to the network.
*
* @param request the request object
*/
public default void onResponseBegin(Request request)
{
}
/**
* Invoked just after the response is committed (that is, the response
* line, headers and possibly some content have been written to the
* network).
*
* @param request the request object
*/
public default void onResponseCommit(Request request)
{
}
/**
* Invoked after a response content chunk has been written to the network.
*
* @param request the request object
* @param content a {@link ByteBuffer#slice() slice} of the response content chunk
*/
public default void onResponseContent(Request request, ByteBuffer content)
{
}
/**
* Invoked when the response has been fully written.
*
* @param request the request object
*/
public default void onResponseEnd(Request request)
{
}
/**
* Invoked when the response processing failed.
*
* @param request the request object
* @param failure the response failure
*/
public default void onResponseFailure(Request request, Throwable failure)
{
}
/**
* Invoked when the request <em>and</em> response processing are complete.
*
* @param request the request object
*/
public default void onComplete(Request request)
{
}
}
private class CommitCallback extends Callback.Nested
{
private CommitCallback(Callback callback)
private final ByteBuffer _content;
private final boolean _complete;
private CommitCallback(Callback callback, ByteBuffer content, boolean complete)
{
super(callback);
this._content = content == null ? BufferUtil.EMPTY_BUFFER : content.slice();
this._complete = complete;
}
@Override
public void succeeded()
{
super.succeeded();
notifyResponseCommit(_request);
if (_content.hasRemaining())
notifyResponseContent(_request, _content);
if (_complete)
notifyResponseEnd(_request);
}
@Override
@ -875,7 +1191,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
private Commit100Callback(Callback callback)
{
super(callback);
super(callback, null, false);
}
@Override
@ -886,8 +1202,28 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
else
super.failed(new IllegalStateException());
}
}
private class ContentCallback extends Callback.Nested
{
private final ByteBuffer _content;
private final boolean _complete;
private ContentCallback(Callback callback, ByteBuffer content, boolean complete)
{
super(callback);
this._content = content == null ? BufferUtil.EMPTY_BUFFER : content.slice();
this._complete = complete;
}
@Override
public void succeeded()
{
super.succeeded();
if (_content.hasRemaining())
notifyResponseContent(_request, _content);
if (_complete)
notifyResponseEnd(_request);
}
}
}

View File

@ -0,0 +1,253 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class HttpChannelEventTest
{
private Server server;
private LocalConnector connector;
public void start(Handler handler) throws Exception
{
server = new Server();
connector = new LocalConnector(server);
server.addConnector(connector);
server.setHandler(handler);
server.start();
}
@After
public void dispose() throws Exception
{
if (server != null)
server.stop();
}
@Test
public void testRequestContentSlice() throws Exception
{
int data = 'x';
CountDownLatch applicationLatch = new CountDownLatch(1);
start(new TestHandler()
{
@Override
protected void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
ServletInputStream input = request.getInputStream();
int content = input.read();
Assert.assertEquals(data, content);
applicationLatch.countDown();
}
});
CountDownLatch listenerLatch = new CountDownLatch(1);
connector.addBean(new HttpChannel.Listener()
{
@Override
public void onRequestContent(Request request, ByteBuffer content)
{
// Consume the buffer to verify it's a slice.
content.position(content.limit());
listenerLatch.countDown();
}
});
HttpTester.Request request = HttpTester.newRequest();
request.setHeader("Host", "localhost");
request.setContent(new byte[]{(byte)data});
ByteBuffer buffer = connector.getResponse(request.generate(), 5, TimeUnit.SECONDS);
// Listener event happens before the application.
Assert.assertTrue(listenerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(applicationLatch.await(5, TimeUnit.SECONDS));
HttpTester.Response response = HttpTester.parseResponse(buffer);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
}
@Test
public void testResponseContentSlice() throws Exception
{
byte[] data = new byte[]{'y'};
start(new TestHandler()
{
@Override
protected void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
response.getOutputStream().write(data);
}
});
CountDownLatch latch = new CountDownLatch(1);
connector.addBean(new HttpChannel.Listener()
{
@Override
public void onResponseContent(Request request, ByteBuffer content)
{
Assert.assertTrue(content.hasRemaining());
latch.countDown();
}
});
HttpTester.Request request = HttpTester.newRequest();
request.setHeader("Host", "localhost");
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(request.toString(), 5, TimeUnit.SECONDS));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertArrayEquals(data, response.getContentBytes());
}
@Test
public void testRequestFailure() throws Exception
{
start(new TestHandler());
CountDownLatch latch = new CountDownLatch(2);
connector.addBean(new HttpChannel.Listener()
{
@Override
public void onRequestFailure(Request request, Throwable failure)
{
latch.countDown();
}
@Override
public void onComplete(Request request)
{
latch.countDown();
}
});
// No Host header, request will fail.
String request = HttpTester.newRequest().toString();
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(request, 5, TimeUnit.SECONDS));
Assert.assertEquals(HttpStatus.BAD_REQUEST_400, response.getStatus());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testResponseFailure() throws Exception
{
start(new TestHandler()
{
@Override
protected void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
// Closes all connections, response will fail.
connector.getConnectedEndPoints().forEach(EndPoint::close);
}
});
CountDownLatch latch = new CountDownLatch(2);
connector.addBean(new HttpChannel.Listener()
{
@Override
public void onResponseFailure(Request request, Throwable failure)
{
latch.countDown();
}
@Override
public void onComplete(Request request)
{
latch.countDown();
}
});
HttpTester.Request request = HttpTester.newRequest();
request.setHeader("Host", "localhost");
HttpTester.parseResponse(connector.getResponse(request.toString(), 5, TimeUnit.SECONDS));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testExchangeTimeRecording() throws Exception
{
start(new TestHandler());
CountDownLatch latch = new CountDownLatch(1);
AtomicLong elapsed = new AtomicLong();
connector.addBean(new HttpChannel.Listener()
{
private final String attribute = getClass().getName() + ".begin";
@Override
public void onRequestBegin(Request request)
{
request.setAttribute(attribute, System.nanoTime());
}
@Override
public void onComplete(Request request)
{
long endTime = System.nanoTime();
long beginTime = (Long)request.getAttribute(attribute);
elapsed.set(endTime - beginTime);
latch.countDown();
}
});
HttpTester.Request request = HttpTester.newRequest();
request.setHeader("Host", "localhost");
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(request.toString(), 5, TimeUnit.SECONDS));
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertThat(elapsed.get(), Matchers.greaterThan(0L));
}
private static class TestHandler extends AbstractHandler.ErrorDispatchHandler
{
@Override
protected final void doNonErrorHandle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
jettyRequest.setHandled(true);
handle(request, response);
}
protected void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
}
}
}