Add `EventsHandler` API (#9901)

* #8885 add EventsHandler API

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2023-06-15 17:31:11 +02:00 committed by GitHub
parent 9e4dc46a3e
commit 8e79c1b58b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1796 additions and 350 deletions

View File

@ -53,7 +53,6 @@ public class HttpStreamOverFCGI implements HttpStream
private final ServerGenerator _generator;
private final HttpChannel _httpChannel;
private final int _id;
private final long _nanoTime;
private String _method;
private HostPortHttpField hostPort;
private String _path;
@ -71,7 +70,6 @@ public class HttpStreamOverFCGI implements HttpStream
_generator = generator;
_httpChannel = httpChannel;
_id = id;
_nanoTime = NanoTime.now();
}
public HttpChannel getHttpChannel()
@ -85,12 +83,6 @@ public class HttpStreamOverFCGI implements HttpStream
return String.valueOf(_id);
}
@Override
public long getNanoTime()
{
return _nanoTime;
}
public void onHeader(HttpField field)
{
String name = field.getName();
@ -114,7 +106,7 @@ public class HttpStreamOverFCGI implements HttpStream
{
String pathQuery = URIUtil.addPathQuery(_path, _query);
HttpScheme scheme = StringUtil.isEmpty(_secure) ? HttpScheme.HTTP : HttpScheme.HTTPS;
MetaData.Request request = new MetaData.Request(_method, scheme.asString(), hostPort, pathQuery, HttpVersion.fromString(_version), _headers, -1);
MetaData.Request request = new MetaData.Request(NanoTime.now(), _method, scheme.asString(), hostPort, pathQuery, HttpVersion.fromString(_version), _headers, -1); // TODO #9900 make beginNanoTime accurate
Runnable task = _httpChannel.onRequest(request);
_allHeaders.forEach(field -> _httpChannel.getRequest().setAttribute(field.getName(), field.getValue()));
// TODO: here we just execute the task.

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.http.HttpTokens.EndOfContent;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.HostPort;
import org.eclipse.jetty.util.Index;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.slf4j.Logger;
@ -235,6 +236,7 @@ public class HttpParser
private final FieldCache _fieldCache = new FieldCache();
private HttpField _field;
private HttpHeader _header;
private long _beginNanoTime;
private String _headerString;
private String _valueString;
private int _responseStatus;
@ -310,6 +312,11 @@ public class HttpParser
_complianceListener = (ComplianceViolation.Listener)(_handler instanceof ComplianceViolation.Listener ? _handler : null);
}
public long getBeginNanoTime()
{
return _beginNanoTime;
}
public HttpHandler getHandler()
{
return _handler;
@ -1519,6 +1526,8 @@ public class HttpParser
_methodString = null;
_endOfContent = EndOfContent.UNKNOWN_CONTENT;
_header = null;
if (buffer.hasRemaining())
_beginNanoTime = NanoTime.now(); // TODO #9900 check beginNanoTime's accuracy
if (quickStart(buffer))
return true;
}

View File

@ -17,6 +17,8 @@ import java.util.Iterator;
import java.util.Objects;
import java.util.function.Supplier;
import org.eclipse.jetty.util.NanoTime;
/**
* <p>Immutable common HTTP information for requests and responses.</p>
* <p>Specific HTTP request information is captured by {@link Request}.</p>
@ -128,29 +130,53 @@ public class MetaData implements Iterable<HttpField>
{
private final String _method;
private final HttpURI _uri;
private final long _beginNanoTime;
public Request(String method, HttpURI uri, HttpVersion version, HttpFields headers)
{
this(method, uri, version, headers, -1);
this(NanoTime.now(), method, uri, version, headers, -1);
}
public Request(String method, HttpURI uri, HttpVersion version, HttpFields headers, long contentLength)
public Request(long beginNanoTime, String method, HttpURI uri, HttpVersion version, HttpFields headers)
{
this(method, uri, version, headers, contentLength, null);
this(beginNanoTime, method, uri, version, headers, -1);
}
public Request(String method, String scheme, HostPortHttpField authority, String uri, HttpVersion version, HttpFields headers, long contentLength)
{
this(method,
this(NanoTime.now(), method,
HttpURI.build().scheme(scheme).host(authority == null ? null : authority.getHost()).port(authority == null ? -1 : authority.getPort()).pathQuery(uri),
version, headers, contentLength);
}
public Request(long beginNanoTime, String method, String scheme, HostPortHttpField authority, String uri, HttpVersion version, HttpFields headers, long contentLength)
{
this(beginNanoTime, method,
HttpURI.build().scheme(scheme).host(authority == null ? null : authority.getHost()).port(authority == null ? -1 : authority.getPort()).pathQuery(uri),
version, headers, contentLength);
}
public Request(String method, HttpURI uri, HttpVersion version, HttpFields headers, long contentLength)
{
this(NanoTime.now(), method, uri, version, headers, contentLength, null);
}
public Request(long beginNanoTime, String method, HttpURI uri, HttpVersion version, HttpFields headers, long contentLength)
{
this(beginNanoTime, method, uri, version, headers, contentLength, null);
}
public Request(String method, HttpURI uri, HttpVersion version, HttpFields headers, long contentLength, Supplier<HttpFields> trailers)
{
this(NanoTime.now(), method, uri, version, headers, contentLength, trailers);
}
public Request(long beginNanoTime, String method, HttpURI uri, HttpVersion version, HttpFields headers, long contentLength, Supplier<HttpFields> trailers)
{
super(version, headers, contentLength, trailers);
_method = Objects.requireNonNull(method);
_uri = Objects.requireNonNull(uri);
_beginNanoTime = beginNanoTime;
}
@Override
@ -159,6 +185,11 @@ public class MetaData implements Iterable<HttpField>
return true;
}
public long getBeginNanoTime()
{
return _beginNanoTime;
}
/**
* @return the HTTP method
*/
@ -201,12 +232,22 @@ public class MetaData implements Iterable<HttpField>
public ConnectRequest(HttpScheme scheme, HostPortHttpField authority, String path, HttpFields headers, String protocol)
{
this(scheme == null ? null : scheme.asString(), authority, path, headers, protocol);
this(NanoTime.now(), scheme == null ? null : scheme.asString(), authority, path, headers, protocol);
}
public ConnectRequest(long beginNanoTime, HttpScheme scheme, HostPortHttpField authority, String path, HttpFields headers, String protocol)
{
this(beginNanoTime, scheme == null ? null : scheme.asString(), authority, path, headers, protocol);
}
public ConnectRequest(String scheme, HostPortHttpField authority, String path, HttpFields headers, String protocol)
{
super(HttpMethod.CONNECT.asString(),
this(NanoTime.now(), scheme, authority, path, headers, protocol);
}
public ConnectRequest(long beginNanoTime, String scheme, HostPortHttpField authority, String path, HttpFields headers, String protocol)
{
super(beginNanoTime, HttpMethod.CONNECT.asString(),
HttpURI.build().scheme(scheme).host(authority == null ? null : authority.getHost()).port(authority == null ? -1 : authority.getPort()).pathQuery(path),
HttpVersion.HTTP_2, headers, -1, null);
_protocol = protocol;

View File

@ -23,6 +23,7 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.http2.hpack.HpackException.SessionException;
import org.eclipse.jetty.util.NanoTime;
public class MetaDataBuilder
{
@ -249,10 +250,11 @@ public class MetaDataBuilder
throw new HpackException.StreamException("No Path");
}
if (isConnect)
return new MetaData.ConnectRequest(_scheme, _authority, _path, fields, _protocol);
return new MetaData.ConnectRequest(NanoTime.now(), _scheme, _authority, _path, fields, _protocol); // TODO #9900 make beginNanoTime accurate
else
return new MetaData.Request(
_method,
NanoTime.now(), // TODO #9900 make beginNanoTime accurate
_method,
_scheme == null ? HttpScheme.HTTP.asString() : _scheme.asString(),
_authority,
_path,

View File

@ -44,7 +44,6 @@ import org.eclipse.jetty.server.HttpStream;
import org.eclipse.jetty.server.TunnelSupport;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
@ -59,7 +58,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
private final HTTP2ServerConnection _connection;
private final HttpChannel _httpChannel;
private final HTTP2Stream _stream;
private final long _nanoTime;
private MetaData.Request _requestMetaData;
private MetaData.Response _responseMetaData;
private TunnelSupport tunnelSupport;
@ -73,7 +71,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
_connection = connection;
_httpChannel = httpChannel;
_stream = stream;
_nanoTime = NanoTime.now();
}
@Override
@ -82,12 +79,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
return String.valueOf(_stream.getId());
}
@Override
public long getNanoTime()
{
return _nanoTime;
}
public Runnable onRequest(HeadersFrame frame)
{
try

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.http3.HTTP3ErrorCode;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.NanoTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,6 +44,7 @@ public class MessageParser
private BodyParser unknownBodyParser;
private State state = State.HEADER;
protected boolean dataMode;
private long beginNanoTime;
public MessageParser(ParserListener listener, QpackDecoder decoder, long streamId, BooleanSupplier isLast)
{
@ -52,6 +54,11 @@ public class MessageParser
this.isLast = isLast;
}
public long getBeginNanoTime()
{
return beginNanoTime;
}
public void init(UnaryOperator<ParserListener> wrapper)
{
ParserListener listener = wrapper.apply(this.listener);
@ -116,6 +123,7 @@ public class MessageParser
{
BodyParser bodyParser = null;
long frameType = headerParser.getFrameType();
beginNanoTime = NanoTime.now(); // TODO #9900 check beginNanoTime's accuracy
if (frameType >= 0 && frameType < bodyParsers.length)
bodyParser = bodyParsers[(int)frameType];

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.util.NanoTime;
import static org.eclipse.jetty.http3.qpack.QpackException.H3_GENERAL_PROTOCOL_ERROR;
@ -247,9 +248,10 @@ public class MetaDataBuilder
throw new QpackException.StreamException(H3_GENERAL_PROTOCOL_ERROR, "No Path");
}
if (isConnect)
return new MetaData.ConnectRequest(_scheme, _authority, _path, fields, _protocol);
return new MetaData.ConnectRequest(NanoTime.now(), _scheme, _authority, _path, fields, _protocol); // TODO #9900 make beginNanoTime accurate
else
return new MetaData.Request(
NanoTime.now(), // TODO #9900 make beginNanoTime accurate
_method,
_scheme.asString(),
_authority,

View File

@ -39,7 +39,6 @@ import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpStream;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
@ -50,7 +49,6 @@ public class HttpStreamOverHTTP3 implements HttpStream
private static final Logger LOG = LoggerFactory.getLogger(HttpStreamOverHTTP3.class);
private final AutoLock lock = new AutoLock();
private final long nanoTime = NanoTime.now();
private final ServerHTTP3StreamConnection connection;
private final HttpChannel httpChannel;
private final HTTP3StreamServer stream;
@ -73,12 +71,6 @@ public class HttpStreamOverHTTP3 implements HttpStream
return String.valueOf(stream.getId());
}
@Override
public long getNanoTime()
{
return nanoTime;
}
public Runnable onRequest(HeadersFrame frame)
{
try

View File

@ -664,6 +664,16 @@ public class Content
return length;
}
/**
* @return an immutable version of this Chunk
*/
default Chunk asReadOnly()
{
if (!canRetain())
return this;
return asChunk(getByteBuffer().asReadOnlyBuffer(), isLast(), this);
}
/**
* <p>A chunk that wraps a failure.</p>
* <p>Error Chunks are always last and have no bytes to read,

View File

@ -206,7 +206,7 @@ public class DigestAuthenticator extends LoginAuthenticator
byte[] nounce = new byte[24];
_random.nextBytes(nounce);
nonce = new Nonce(Base64.getEncoder().encodeToString(nounce), request.getTimeStamp(), getMaxNonceCount());
nonce = new Nonce(Base64.getEncoder().encodeToString(nounce), Request.getTimeStamp(request), getMaxNonceCount());
}
while (_nonceMap.putIfAbsent(nonce._nonce, nonce) != null);
_nonceQueue.add(nonce);
@ -222,7 +222,7 @@ public class DigestAuthenticator extends LoginAuthenticator
private int checkNonce(Digest digest, Request request)
{
// firstly let's expire old nonces
long expired = request.getTimeStamp() - getMaxNonceAge();
long expired = Request.getTimeStamp(request) - getMaxNonceAge();
Nonce nonce = _nonceQueue.peek();
while (nonce != null && nonce._ts < expired)
{

View File

@ -1063,7 +1063,7 @@ public class CustomRequestLog extends ContainerLifeCycle implements RequestLog
private static void logRequestTime(DateCache dateCache, StringBuilder b, Request request, Response response)
{
b.append('[');
append(b, dateCache.format(request.getTimeStamp()));
append(b, dateCache.format(Request.getTimeStamp(request)));
b.append(']');
}
@ -1087,7 +1087,7 @@ public class CustomRequestLog extends ContainerLifeCycle implements RequestLog
private static void logLatency(StringBuilder b, Request request, TimeUnit unit)
{
b.append(unit.convert(NanoTime.since(request.getNanoTime()), TimeUnit.NANOSECONDS));
b.append(unit.convert(NanoTime.since(request.getBeginNanoTime()), TimeUnit.NANOSECONDS));
}
@SuppressWarnings("unused")

View File

@ -46,11 +46,6 @@ public interface HttpStream extends Callback
*/
String getId();
/**
* @return the nanoTime when this HttpStream was created
*/
long getNanoTime();
/**
* <p>Reads a chunk of content, with the same semantic as {@link Content.Source#read()}.</p>
* <p>This method is called from the implementation of {@link Request#read()}.</p>
@ -160,12 +155,6 @@ public interface HttpStream extends Callback
return getWrapped().getId();
}
@Override
public final long getNanoTime()
{
return getWrapped().getNanoTime();
}
@Override
public Content.Chunk read()
{

View File

@ -43,6 +43,7 @@ import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.HostPort;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.UrlEncoded;
@ -202,20 +203,29 @@ public interface Request extends Attributes, Content.Source
HttpFields getTrailers();
/**
* <p>Get the millisecond timestamp at which the request was created, obtained via {@link System#currentTimeMillis()}.
* This method should be used for wall clock time, rather than {@link #getNanoTime()},
* <p>Get the millisecond timestamp at which the request was created, obtained with {@link System#currentTimeMillis()}.
* This method should be used for wall clock time, rather than {@link #getHeadersNanoTime()},
* which is appropriate for measuring latencies.</p>
* @return The timestamp that the request was received/created in milliseconds
*/
long getTimeStamp();
static long getTimeStamp(Request request)
{
return System.currentTimeMillis() - NanoTime.millisSince(request.getHeadersNanoTime());
}
/**
* <p>Get the nanoTime at which the request was created, obtained via {@link System#nanoTime()}.
* This method should be used when measuring latencies, rather than {@link #getTimeStamp()},
* which is appropriate for wall clock time.</p>
* <p>Get the nanoTime at which the request arrived to a connector, obtained via {@link System#nanoTime()}.
* This method can be used when measuring latencies.</p>
* @return The nanoTime at which the request was received/created in nanoseconds
*/
long getNanoTime();
long getBeginNanoTime();
/**
* <p>Get the nanoTime at which the request headers were parsed, obtained via {@link System#nanoTime()}.
* This method can be used when measuring latencies.</p>
* @return The nanoTime at which the request was ready in nanoseconds
*/
long getHeadersNanoTime();
// TODO: see above.
boolean isSecure();
@ -300,6 +310,41 @@ public interface Request extends Attributes, Content.Source
*/
Session getSession(boolean create);
/**
* Returns a copy of the request that throws {@link UnsupportedOperationException}
* from all mutative methods.
* @return a copy of the request
*/
static Request asReadOnly(Request request)
{
return new Request.Wrapper(request)
{
@Override
public void addHttpStreamWrapper(Function<HttpStream, HttpStream> wrapper)
{
throw new UnsupportedOperationException();
}
@Override
public Content.Chunk read()
{
throw new UnsupportedOperationException();
}
@Override
public void demand(Runnable demandCallback)
{
throw new UnsupportedOperationException();
}
@Override
public void fail(Throwable failure)
{
throw new UnsupportedOperationException();
}
};
}
static String getLocalAddr(Request request)
{
if (request == null)
@ -628,15 +673,15 @@ public interface Request extends Attributes, Content.Source
}
@Override
public long getTimeStamp()
public long getBeginNanoTime()
{
return getWrapped().getTimeStamp();
return getWrapped().getBeginNanoTime();
}
@Override
public long getNanoTime()
public long getHeadersNanoTime()
{
return getWrapped().getNanoTime();
return getWrapped().getHeadersNanoTime();
}
@Override

View File

@ -1,88 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpStream;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A <code>Handler</code> that allows recording the latency of the requests executed by the wrapped handler.</p>
* <p>The latency reported by {@link #onRequestComplete(long)} is the delay between the first notice of the request
* (obtained from {@link HttpStream#getNanoTime()}) until the stream completion event has been handled by
* {@link HttpStream#succeeded()} or {@link HttpStream#failed(Throwable)}.</p>
*/
public abstract class AbstractLatencyRecordingHandler extends Handler.Wrapper
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractLatencyRecordingHandler.class);
public AbstractLatencyRecordingHandler()
{
}
private HttpStream recordingWrapper(HttpStream httpStream)
{
return new HttpStream.Wrapper(httpStream)
{
@Override
public void succeeded()
{
// Take the httpStream nano timestamp before calling super.
long begin = httpStream.getNanoTime();
super.succeeded();
fireOnRequestComplete(begin);
}
@Override
public void failed(Throwable x)
{
// Take the httpStream nano timestamp before calling super.
long begin = httpStream.getNanoTime();
super.failed(x);
fireOnRequestComplete(begin);
}
private void fireOnRequestComplete(long begin)
{
try
{
onRequestComplete(NanoTime.since(begin));
}
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.debug("Error thrown by onRequestComplete", t);
}
}
};
}
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
request.addHttpStreamWrapper(this::recordingWrapper);
return super.handle(request, response, callback);
}
/**
* Called back for each completed request with its execution's latency.
* @param durationInNs the duration in nanoseconds of the completed request
*/
protected abstract void onRequestComplete(long durationInNs);
}

View File

@ -0,0 +1,376 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpStream;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A {@link Handler.Wrapper} that fires events during the processing of the requests.</p>
* <p>EventsHandler will emit events for the various phases the server goes through while
* processing an HTTP request and response.</p>
* <p>Subclasses may listen to those events to track timing and/or other values such as
* request URI, etc.</p>
* <p>The events parameters, especially the {@link Request} object, may be
* in a transient state depending on the event, and not all properties/features
* of the parameters may be available inside a listener method.</p>
* <p>It is recommended that the event parameters are <em>not</em> acted upon
* in the listener methods, or undefined behavior may result. On the other
* hand, it is legit to store request attributes in one listener method that
* may be possibly retrieved in another listener method in a later event.</p>
* <p>Listener methods are invoked synchronously from the thread that is
* performing the request processing, and they should not call blocking code
* (otherwise the request processing will be blocked as well).</p>
* <p>The kind of chunk passed to {@link #onRequestRead(Request, Content.Chunk)} depends on
* the parent of this handler. For instance, if the parent is the Server, then raw chunks
* are always passed. If somewhere in the parent chain is the {@code GzipHandler} then
* unzipped chunks are passed.</p>
*/
public abstract class EventsHandler extends Handler.Wrapper
{
private static final Logger LOG = LoggerFactory.getLogger(EventsHandler.class);
public EventsHandler()
{
}
public EventsHandler(Handler handler)
{
super(handler);
}
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
Request roRequest = Request.asReadOnly(request);
notifyOnBeforeHandling(roRequest);
try
{
EventsRequest wrappedRequest = new EventsRequest(request, roRequest);
EventsResponse wrappedResponse = new EventsResponse(roRequest, response);
request.addHttpStreamWrapper(stream -> new HttpStream.Wrapper(stream)
{
@Override
public void succeeded()
{
notifyOnResponseBegin(roRequest, wrappedResponse);
notifyOnResponseTrailersComplete(roRequest, wrappedResponse);
notifyOnComplete(roRequest, null);
super.succeeded();
}
@Override
public void failed(Throwable x)
{
notifyOnResponseBegin(roRequest, wrappedResponse);
notifyOnResponseTrailersComplete(roRequest, wrappedResponse);
notifyOnComplete(roRequest, x);
super.failed(x);
}
});
boolean handled = super.handle(wrappedRequest, wrappedResponse, callback);
notifyOnAfterHandling(roRequest, handled, null);
return handled;
}
catch (Throwable x)
{
notifyOnAfterHandling(roRequest, false, x);
throw x;
}
}
private void notifyOnBeforeHandling(Request request)
{
try
{
onBeforeHandling(request);
}
catch (Throwable x)
{
LOG.info("Error firing onBeforeHandling", x);
}
}
private void notifyOnRequestRead(Request wrapped, Content.Chunk chunk)
{
try
{
onRequestRead(wrapped, chunk == null ? null : chunk.asReadOnly());
}
catch (Throwable x)
{
LOG.info("Error firing onRequestRead", x);
}
}
private void notifyOnAfterHandling(Request request, boolean handled, Throwable failure)
{
try
{
onAfterHandling(request, handled, failure);
}
catch (Throwable x)
{
LOG.info("Error firing onAfterHandling", x);
}
}
private void notifyOnResponseBegin(Request request, EventsResponse response)
{
try
{
if (!response.notifiedOnResponseBegin)
{
onResponseBegin(request, response.getStatus(), response.getHeaders().asImmutable());
response.notifiedOnResponseBegin = true;
}
}
catch (Throwable x)
{
LOG.info("Error firing onResponseBegin", x);
}
}
private void notifyOnResponseWrite(Request request, boolean last, ByteBuffer content)
{
try
{
onResponseWrite(request, last, content.asReadOnlyBuffer());
}
catch (Throwable x)
{
LOG.info("Error firing onResponseWrite", x);
}
}
private void notifyOnResponseWriteComplete(Request request, Throwable failure)
{
try
{
onResponseWriteComplete(request, failure);
}
catch (Throwable x)
{
LOG.info("Error firing onResponseWriteComplete", x);
}
}
private void notifyOnResponseTrailersComplete(Request request, EventsResponse response)
{
try
{
if (response.suppliedTrailers != null)
onResponseTrailersComplete(request, response.suppliedTrailers);
}
catch (Throwable x)
{
LOG.info("Error firing onResponseTrailersComplete", x);
}
}
private void notifyOnComplete(Request request, Throwable failure)
{
try
{
onComplete(request, failure);
}
catch (Throwable x)
{
LOG.info("Error firing onComplete", x);
}
}
/**
* Invoked just before calling the server handler tree (i.e. just before the {@link Runnable}
* returned from {@link org.eclipse.jetty.server.HttpChannel#onRequest(MetaData.Request)} is run).
*
* <p>
* This is the final state of the request before the handlers are called.
* This includes any request customization.
* </p>
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @see org.eclipse.jetty.server.HttpChannel#onRequest(MetaData.Request)
*/
protected void onBeforeHandling(Request request)
{
if (LOG.isDebugEnabled())
LOG.debug("onBeforeHandling of {}", request);
}
/**
* Invoked every time a request content chunk has been parsed, just before
* making it available to the application (i.e. from within a call to
* {@link Request#read()}).
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @param chunk a potentially null request content chunk, including {@link org.eclipse.jetty.io.Content.Chunk.Error}
* and {@link org.eclipse.jetty.http.Trailers} chunks.
* If a reference to the chunk (or its {@link ByteBuffer}) is kept,
* then {@link Content.Chunk#retain()} must be called.
* @see Request#read()
*/
protected void onRequestRead(Request request, Content.Chunk chunk)
{
if (LOG.isDebugEnabled())
LOG.debug("onRequestRead of {} and {}", request, chunk);
}
/**
* Invoked after application handling (i.e. just after the call to the {@link Runnable} returned from
* {@link org.eclipse.jetty.server.HttpChannel#onRequest(MetaData.Request)} returns).
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @param handled if the server handlers handled the request
* @param failure the exception thrown by the application
* @see org.eclipse.jetty.server.HttpChannel#onRequest(MetaData.Request)
*/
protected void onAfterHandling(Request request, boolean handled, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("onAfterHandling of {} handled={}", request, handled, failure);
}
/**
* Invoked just before the response is line written to the network (i.e. from
* within the first call to {@link Response#write(boolean, ByteBuffer, Callback)}).
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @param status the response status
* @param headers the immutable fields of the response object
* @see Response#write(boolean, ByteBuffer, Callback)
*/
protected void onResponseBegin(Request request, int status, HttpFields headers)
{
if (LOG.isDebugEnabled())
LOG.debug("onResponseBegin of {} status={} headers={}", request, status, headers);
}
/**
* Invoked before each response content chunk has been written (i.e. from
* within the any call to {@link Response#write(boolean, ByteBuffer, Callback)}).
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @param last indicating last write
* @param content The {@link ByteBuffer} of the response content chunk (readonly).
* @see Response#write(boolean, ByteBuffer, Callback)
*/
protected void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
if (LOG.isDebugEnabled())
LOG.debug("onResponseWrite of {} last={} content={}", request, last, BufferUtil.toDetailString(content));
}
/**
* Invoked after each response content chunk has been written
* (i.e. immediately before calling the {@link Callback} passed to
* {@link Response#write(boolean, ByteBuffer, Callback)}).
* This will always fire <em>before</em> {@link #onResponseTrailersComplete(Request, HttpFields)} is fired.
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @param failure if there was a failure to write the given content
* @see Response#write(boolean, ByteBuffer, Callback)
*/
protected void onResponseWriteComplete(Request request, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("onResponseWriteComplete of {}", request, failure);
}
/**
* Invoked after the response trailers have been written <em>and</em> the final {@link #onResponseWriteComplete(Request, Throwable)} event was fired.
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @param trailers the written trailers.
*/
protected void onResponseTrailersComplete(Request request, HttpFields trailers)
{
if (LOG.isDebugEnabled())
LOG.debug("onResponseTrailersComplete of {}, trailers={}", request, trailers);
}
/**
* Invoked when the request <em>and</em> response processing are complete,
* just before the request and response will be recycled (i.e. after the
* {@link Runnable} return from {@link org.eclipse.jetty.server.HttpChannel#onRequest(MetaData.Request)}
* has returned and the {@link Callback} passed to {@link Handler#handle(Request, Response, Callback)}
* has been completed).
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @param failure if there was a failure to complete
*/
protected void onComplete(Request request, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("onComplete of {}", request, failure);
}
private class EventsResponse extends Response.Wrapper
{
private boolean notifiedOnResponseBegin;
private HttpFields suppliedTrailers;
public EventsResponse(Request roRequest, Response response)
{
super(roRequest, response);
}
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
notifyOnResponseBegin(getRequest(), this);
notifyOnResponseWrite(getRequest(), last, byteBuffer);
super.write(last, byteBuffer, Callback.from(callback, (x) -> notifyOnResponseWriteComplete(getRequest(), x)));
}
@Override
public void setTrailersSupplier(Supplier<HttpFields> trailers)
{
super.setTrailersSupplier(trailers == null ? null : () -> suppliedTrailers = trailers.get());
}
}
private class EventsRequest extends Request.Wrapper
{
private final Request roRequest;
public EventsRequest(Request request, Request roRequest)
{
super(request);
this.roRequest = roRequest;
}
@Override
public Content.Chunk read()
{
Content.Chunk chunk = super.read();
notifyOnRequestRead(roRequest, chunk);
return chunk;
}
}
}

View File

@ -0,0 +1,50 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.NanoTime;
/**
* <p>A <code>Handler</code> that helps recording the total latency of the requests executed by the wrapped handler.</p>
* <p>The latency reported by {@link #onRequestComplete(String, long)} is the delay between when {@link Request#getBeginNanoTime()
* the request arrived to a connector} until {@link EventsHandler#onComplete(Request, Throwable) the completion of that
* request}.</p>
*/
public abstract class LatencyRecordingHandler extends EventsHandler
{
public LatencyRecordingHandler()
{
}
public LatencyRecordingHandler(Handler handler)
{
super(handler);
}
@Override
protected final void onComplete(Request request, Throwable failure)
{
onRequestComplete(request.getId(), NanoTime.since(request.getBeginNanoTime()));
}
/**
* Called back for each completed request with its execution's latency.
*
* @param requestId the ID of the request
* @param durationInNs the duration in nanoseconds of the completed request
*/
protected abstract void onRequestComplete(String requestId, long durationInNs);
}

View File

@ -18,10 +18,9 @@ import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpStream;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.BufferUtil;
@ -33,7 +32,7 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.statistic.CounterStatistic;
import org.eclipse.jetty.util.statistic.SampleStatistic;
public class StatisticsHandler extends Handler.Wrapper
public class StatisticsHandler extends EventsHandler
{
private final CounterStatistic _requestStats = new CounterStatistic(); // how many requests are being handled (full lifecycle)
private final SampleStatistic _requestTimeStats = new SampleStatistic(); // latencies of requests (full lifecycle)
@ -46,28 +45,63 @@ public class StatisticsHandler extends Handler.Wrapper
private final LongAdder _bytesRead = new LongAdder();
private final LongAdder _bytesWritten = new LongAdder();
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
public StatisticsHandler()
{
Handler next = getHandler();
if (next == null)
return false;
}
StatisticsRequest statisticsRequest = newStatisticsRequest(request);
try
{
if (next.handle(statisticsRequest, response, callback))
return true;
statisticsRequest.notHandled();
return false;
}
catch (Throwable t)
{
public StatisticsHandler(Handler handler)
{
super(handler);
}
@Override
protected void onBeforeHandling(Request request)
{
_requestStats.increment();
}
@Override
protected void onAfterHandling(Request request, boolean handled, Throwable failure)
{
if (failure != null)
_handlingFailures.increment();
throw t;
}
@Override
protected void onRequestRead(Request request, Content.Chunk chunk)
{
if (chunk != null)
_bytesRead.add(chunk.remaining());
}
@Override
protected void onResponseBegin(Request request, int status, HttpFields headers)
{
switch (status / 100)
{
case 1 -> _responses1xx.increment();
case 2 -> _responses2xx.increment();
case 3 -> _responses3xx.increment();
case 4 -> _responses4xx.increment();
case 5 -> _responses5xx.increment();
}
}
@Override
protected void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
int length = BufferUtil.length(content);
if (length > 0)
_bytesWritten.add(length);
}
@Override
protected void onComplete(Request request, Throwable failure)
{
_requestTimeStats.record(NanoTime.since(request.getBeginNanoTime()));
_requestStats.decrement();
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
@ -85,11 +119,6 @@ public class StatisticsHandler extends Handler.Wrapper
);
}
protected StatisticsRequest newStatisticsRequest(Request request)
{
return new StatisticsRequest(request);
}
@ManagedOperation(value = "resets the statistics", impact = "ACTION")
public void reset()
{
@ -195,85 +224,6 @@ public class StatisticsHandler extends Handler.Wrapper
return _bytesWritten.longValue();
}
public class StatisticsRequest extends Request.Wrapper
{
public StatisticsRequest(Request request)
{
super(request);
_requestStats.increment();
addHttpStreamWrapper(this::asHttpStream);
}
public HttpStream asHttpStream(HttpStream httpStream)
{
return new StatisticsHttpStream(httpStream);
}
/**
* Creating a {@link StatisticsRequest} increments the {@link #getRequests() request counter} before its gets a chance
* of figuring out if the request is going to be handled by the {@link StatisticsHandler#getHandler() wrapped handler}.
* In case the wrapped handler did not handle the request, calling this method decrements the request counter to
* compensate for the unneeded increment.
*/
protected void notHandled()
{
_requestStats.decrement();
}
public class StatisticsHttpStream extends HttpStream.Wrapper
{
public StatisticsHttpStream(HttpStream httpStream)
{
super(httpStream);
}
@Override
public Content.Chunk read()
{
Content.Chunk chunk = super.read();
if (chunk != null)
_bytesRead.add(chunk.remaining());
return chunk;
}
@Override
public void send(MetaData.Request request, MetaData.Response response, boolean last, ByteBuffer content, Callback callback)
{
if (response != null)
{
switch (response.getStatus() / 100)
{
case 1 -> _responses1xx.increment();
case 2 -> _responses2xx.increment();
case 3 -> _responses3xx.increment();
case 4 -> _responses4xx.increment();
case 5 -> _responses5xx.increment();
}
}
int length = BufferUtil.length(content);
if (length > 0)
_bytesWritten.add(length);
super.send(request, response, last, content, callback);
}
@Override
public void succeeded()
{
_requestStats.decrement();
_requestTimeStats.record(NanoTime.since(getNanoTime()));
super.succeeded();
}
@Override
public void failed(Throwable x)
{
_requestStats.decrement();
_requestTimeStats.record(NanoTime.since(getNanoTime()));
super.failed(x);
}
}
}
/**
* Checks that the wrapped handler can read/write at a minimal rate of N bytes per second.
* When reading or writing does not conform to the specified rates, this handler prevents
@ -295,13 +245,28 @@ public class StatisticsHandler extends Handler.Wrapper
_minimumWriteRate = minimumWriteRate;
}
@Override
protected StatisticsRequest newStatisticsRequest(Request request)
/**
* Creates a {@code MinimumDataRateHandler} with the specified read and write rates.
* @param minimumReadRate the minimum number of bytes to be read per second, or 0 for not checking the read rate.
* @param minimumWriteRate the minimum number of bytes to be written per second, or 0 for not checking the write rate.
* @param handler the handler to wrap.
*/
public MinimumDataRateHandler(long minimumReadRate, long minimumWriteRate, Handler handler)
{
return new MinimumDataRateRequest(request);
super(handler);
_minimumReadRate = minimumReadRate;
_minimumWriteRate = minimumWriteRate;
}
protected class MinimumDataRateRequest extends StatisticsRequest
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
MinimumDataRateRequest wrappedRequest = new MinimumDataRateRequest(request);
MinimumDataRateResponse wrappedResponse = new MinimumDataRateResponse(wrappedRequest, response);
return super.handle(wrappedRequest, wrappedResponse, callback);
}
protected class MinimumDataRateRequest extends Request.Wrapper
{
private Content.Chunk.Error _errorContent;
@ -314,7 +279,7 @@ public class StatisticsHandler extends Handler.Wrapper
{
if (dataCount == 0L)
return 0L;
long delayInNs = NanoTime.since(getNanoTime());
long delayInNs = NanoTime.since(getHeadersNanoTime());
// If you read 1 byte or more in 0ns or less, you have infinite bandwidth.
if (delayInNs <= 0L)
return Long.MAX_VALUE;
@ -342,33 +307,40 @@ public class StatisticsHandler extends Handler.Wrapper
{
return _errorContent != null ? _errorContent : super.read();
}
}
protected class MinimumDataRateResponse extends Response.Wrapper
{
public MinimumDataRateResponse(MinimumDataRateRequest request, Response wrapped)
{
super(request, wrapped);
}
@Override
public HttpStream asHttpStream(HttpStream httpStream)
public MinimumDataRateRequest getRequest()
{
return new StatisticsHttpStream(httpStream)
return (MinimumDataRateRequest)super.getRequest();
}
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
if (_minimumWriteRate > 0)
{
@Override
public void send(MetaData.Request request, MetaData.Response response, boolean last, ByteBuffer content, Callback callback)
long bytesWritten = getBytesWritten();
if (bytesWritten > 0L)
{
if (_minimumWriteRate > 0)
long wr = getRequest().dataRatePerSecond(bytesWritten);
if (wr < _minimumWriteRate)
{
long bytesWritten = getBytesWritten();
if (bytesWritten > 0L)
{
long wr = dataRatePerSecond(bytesWritten);
if (wr < _minimumWriteRate)
{
TimeoutException cause = new TimeoutException("write rate is too low: " + wr);
_errorContent = Content.Chunk.from(cause);
callback.failed(cause);
return;
}
}
TimeoutException cause = new TimeoutException("write rate is too low: " + wr);
getRequest()._errorContent = Content.Chunk.from(cause);
callback.failed(cause);
return;
}
super.send(request, response, last, content, callback);
}
};
}
super.write(last, byteBuffer, callback);
}
}
}

View File

@ -62,6 +62,7 @@ import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
@ -96,7 +97,6 @@ public class HttpChannelState implements HttpChannel, Components
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelState.class);
private static final Throwable DO_NOT_SEND = new Throwable("No Send");
private static final MetaData.Request ERROR_REQUEST = new MetaData.Request("GET", HttpURI.from("/"), HttpVersion.HTTP_1_0, HttpFields.EMPTY);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION);
@ -392,7 +392,8 @@ public class HttpChannelState implements HttpChannel, Components
{
// If the channel doesn't have a request, then the error must have occurred during the parsing of
// the request line / headers, so make a temp request for logging and producing an error response.
_request = new ChannelRequest(this, ERROR_REQUEST);
MetaData.Request errorRequest = new MetaData.Request("GET", HttpURI.from("/"), HttpVersion.HTTP_1_0, HttpFields.EMPTY);
_request = new ChannelRequest(this, errorRequest);
_response = new ChannelResponse(_request);
}
@ -733,7 +734,7 @@ public class HttpChannelState implements HttpChannel, Components
public static class ChannelRequest implements Attributes, Request
{
private final long _timeStamp = System.currentTimeMillis();
private final long _headersNanoTime = NanoTime.now();
private final ChannelCallback _callback = new ChannelCallback(this);
private final String _id;
private final ConnectionMetaData _connectionMetaData;
@ -883,18 +884,15 @@ public class HttpChannelState implements HttpChannel, Components
}
@Override
public long getTimeStamp()
public long getBeginNanoTime()
{
return _timeStamp;
return _metaData.getBeginNanoTime();
}
@Override
public long getNanoTime()
public long getHeadersNanoTime()
{
HttpStream stream = _httpChannelState.getHttpStream();
if (stream != null)
return stream.getNanoTime();
throw new IllegalStateException();
return _headersNanoTime;
}
@Override

View File

@ -70,7 +70,6 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.Invocable;
@ -1080,7 +1079,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
HttpURI uri = stream._uri;
if (uri.hasViolations())
uri = HttpURI.from("/badURI");
_httpChannel.onRequest(new MetaData.Request(stream._method, uri, stream._version, HttpFields.EMPTY));
_httpChannel.onRequest(new MetaData.Request(_parser.getBeginNanoTime(), stream._method, uri, stream._version, HttpFields.EMPTY));
}
Runnable task = _httpChannel.onFailure(_failure);
@ -1123,7 +1122,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
protected class HttpStreamOverHTTP1 implements HttpStream
{
private final long _nanoTime = NanoTime.now();
private final String _id;
private final String _method;
private final HttpURI.Mutable _uri;
@ -1269,7 +1267,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
_uri.path("/");
}
_request = new MetaData.Request(_method, _uri.asImmutable(), _version, _headerBuilder, _contentLength);
_request = new MetaData.Request(_parser.getBeginNanoTime(), _method, _uri.asImmutable(), _version, _headerBuilder, _contentLength);
Runnable handle = _httpChannel.onRequest(_request);
++_requests;
@ -1363,12 +1361,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
return _id;
}
@Override
public long getNanoTime()
{
return _nanoTime;
}
@Override
public Content.Chunk read()
{

View File

@ -520,7 +520,7 @@ public class CustomRequestLogTest
@Override
public boolean handle(Request request, Response response, Callback callback)
{
requestTimeRef.set(request.getTimeStamp());
requestTimeRef.set(Request.getTimeStamp(request));
callback.succeeded();
return true;
}
@ -545,7 +545,7 @@ public class CustomRequestLogTest
@Override
public boolean handle(Request request, Response response, Callback callback)
{
requestTimeRef.set(request.getTimeStamp());
requestTimeRef.set(Request.getTimeStamp(request));
callback.succeeded();
return true;
}
@ -578,7 +578,7 @@ public class CustomRequestLogTest
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
requestTimeRef.set(request.getNanoTime());
requestTimeRef.set(request.getBeginNanoTime());
Thread.sleep(delay);
callback.succeeded();
return true;

View File

@ -20,8 +20,8 @@ import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.eclipse.jetty.logging.JettyLevel;
import org.eclipse.jetty.logging.JettyLogger;
import org.eclipse.jetty.server.handler.AbstractLatencyRecordingHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.LatencyRecordingHandler;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -60,10 +60,10 @@ public class LatencyRecordingHandlerTest
return true;
}
};
AbstractLatencyRecordingHandler latencyRecordingHandler = new AbstractLatencyRecordingHandler()
LatencyRecordingHandler latencyRecordingHandler = new LatencyRecordingHandler()
{
@Override
protected void onRequestComplete(long durationInNs)
protected void onRequestComplete(String requestId, long durationInNs)
{
_latencies.add(durationInNs);
}

View File

@ -28,7 +28,6 @@ import org.eclipse.jetty.io.ByteBufferAccumulator;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime;
public class MockHttpStream implements HttpStream
{
@ -47,7 +46,6 @@ public class MockHttpStream implements HttpStream
return false;
}
};
private final long _nanoTime = NanoTime.now();
private final AtomicReference<Content.Chunk> _content = new AtomicReference<>();
private final AtomicReference<Throwable> _complete = new AtomicReference<>();
private final CountDownLatch _completed = new CountDownLatch(1);
@ -139,12 +137,6 @@ public class MockHttpStream implements HttpStream
return "teststream";
}
@Override
public long getNanoTime()
{
return _nanoTime;
}
@Override
public Content.Chunk read()
{

View File

@ -22,14 +22,20 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
@ -41,6 +47,7 @@ import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.Components;
import org.eclipse.jetty.server.ConnectionMetaData;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Context;
@ -54,6 +61,8 @@ import org.eclipse.jetty.server.MockHttpStream;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.Session;
import org.eclipse.jetty.server.TunnelSupport;
import org.eclipse.jetty.server.internal.HttpChannelState;
import org.eclipse.jetty.toolchain.test.FS;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
@ -873,7 +882,7 @@ public class ContextHandlerTest
private static class ScopeListener implements ContextHandler.ContextScopeListener
{
private static final Request NULL = new Request.Wrapper(null);
private static final Request NULL = new Request.Wrapper(new TestableRequest());
private final ThreadLocal<Context> _context = new ThreadLocal<>();
private final ThreadLocal<Request> _request = new ThreadLocal<>();
@ -902,6 +911,164 @@ public class ContextHandlerTest
}
}
private static class TestableRequest implements Request
{
@Override
public Object removeAttribute(String name)
{
return null;
}
@Override
public Object setAttribute(String name, Object attribute)
{
return null;
}
@Override
public Object getAttribute(String name)
{
return null;
}
@Override
public Set<String> getAttributeNameSet()
{
return null;
}
@Override
public void clearAttributes()
{
}
@Override
public String getId()
{
return null;
}
@Override
public Components getComponents()
{
return null;
}
@Override
public ConnectionMetaData getConnectionMetaData()
{
return null;
}
@Override
public String getMethod()
{
return null;
}
@Override
public HttpURI getHttpURI()
{
return null;
}
@Override
public Context getContext()
{
return null;
}
@Override
public HttpFields getHeaders()
{
return null;
}
@Override
public HttpFields getTrailers()
{
return null;
}
public List<HttpCookie> getCookies()
{
return null;
}
@Override
public long getBeginNanoTime()
{
return 0;
}
@Override
public long getHeadersNanoTime()
{
return 0;
}
@Override
public boolean isSecure()
{
return false;
}
@Override
public long getLength()
{
return 0;
}
@Override
public Content.Chunk read()
{
return null;
}
@Override
public boolean consumeAvailable()
{
return false;
}
@Override
public void demand(Runnable demandCallback)
{
}
@Override
public void fail(Throwable failure)
{
}
@Override
public void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout)
{
}
@Override
public void addFailureListener(Consumer<Throwable> onFailure)
{
}
@Override
public TunnelSupport getTunnelSupport()
{
return null;
}
@Override
public void addHttpStreamWrapper(Function<HttpStream, HttpStream> wrapper)
{
}
@Override
public Session getSession(boolean create)
{
return null;
}
}
@Test
public void testGraceful() throws Exception
{

View File

@ -0,0 +1,147 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.NanoTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
public class EventsHandlerTest
{
private Server server;
private LocalConnector connector;
@BeforeEach
public void setUp() throws Exception
{
server = new Server();
connector = new LocalConnector(server);
server.addConnector(connector);
}
@AfterEach
public void tearDown() throws Exception
{
server.stop();
}
private void startServer(Handler handler) throws Exception
{
server.setHandler(handler);
server.start();
}
@Test
public void testRequestAttributesAreMutable() throws Exception
{
AtomicReference<String> attribute = new AtomicReference<>();
EventsHandler eventsHandler = new EventsHandler(new EchoHandler())
{
static final String ATTRIBUTE_NAME = EventsHandlerTest.class.getName();
@Override
protected void onBeforeHandling(Request request)
{
request.setAttribute(ATTRIBUTE_NAME, "testModifyRequestAttributes-1");
}
@Override
protected void onAfterHandling(Request request, boolean handled, Throwable failure)
{
request.setAttribute(ATTRIBUTE_NAME, request.getAttribute(ATTRIBUTE_NAME) + "2");
}
@Override
protected void onResponseBegin(Request request, int status, HttpFields headers)
{
request.setAttribute(ATTRIBUTE_NAME, request.getAttribute(ATTRIBUTE_NAME) + "3");
}
@Override
protected void onComplete(Request request, Throwable failure)
{
attribute.set((String)request.getAttribute(ATTRIBUTE_NAME));
}
};
startServer(eventsHandler);
String rawRequest = """
GET / HTTP/1.1\r
Host: localhost\r
Connection: close\r
\r
""";
String response = connector.getResponse(rawRequest);
assertThat(response, containsString("HTTP/1.1 200 OK"));
await().atMost(3, TimeUnit.SECONDS).until(attribute::get, is("testModifyRequestAttributes-123"));
}
@Test
public void testNanoTimestamps() throws Exception
{
AtomicReference<Long> beginNanoTime = new AtomicReference<>();
AtomicReference<Long> readyNanoTime = new AtomicReference<>();
EventsHandler eventsHandler = new EventsHandler(new EchoHandler())
{
@Override
protected void onComplete(Request request, Throwable failure)
{
beginNanoTime.set(request.getBeginNanoTime());
readyNanoTime.set(request.getHeadersNanoTime());
}
};
startServer(eventsHandler);
String reqLine = "POST / HTTP/1.1\r\n";
String headers = """
Host: localhost\r
Content-length: 6\r
Content-type: application/octet-stream\r
Connection: close\r
\r
""";
String body = "ABCDEF";
try (LocalConnector.LocalEndPoint endPoint = connector.connect())
{
endPoint.addInput(reqLine);
Thread.sleep(500);
endPoint.addInput(headers);
Thread.sleep(500);
endPoint.addInput(body);
String response = endPoint.getResponse();
assertThat(response, containsString("HTTP/1.1 200 OK"));
assertThat(NanoTime.millisSince(beginNanoTime.get()), greaterThan(900L));
assertThat(NanoTime.millisSince(readyNanoTime.get()), greaterThan(450L));
}
}
}

View File

@ -23,7 +23,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.logging.StacklessLogging;
@ -158,9 +157,8 @@ public class StatisticsHandlerTest
{
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
StatisticsHandler.MinimumDataRateHandler mdrh = new StatisticsHandler.MinimumDataRateHandler(0, 1100);
int expectedContentLength = 1000;
mdrh.setHandler(new Handler.Abstract.NonBlocking()
StatisticsHandler.MinimumDataRateHandler mdrh = new StatisticsHandler.MinimumDataRateHandler(0, 1000, new Handler.Abstract.NonBlocking()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
@ -235,10 +233,9 @@ public class StatisticsHandlerTest
assertTrue(latch.await(5, TimeUnit.SECONDS));
AtomicInteger statusHolder = new AtomicInteger();
ByteBuffer byteBuffer = endPoint.waitForResponse(false, 5, TimeUnit.SECONDS, statusHolder::set);
assertThat(statusHolder.get(), is(200));
assertThat(statusHolder.get(), is(500));
assertThat(exceptionRef.get(), instanceOf(TimeoutException.class));
assertThat(exceptionRef.get().getMessage(), startsWith("write rate is too low"));
assertThat(byteBuffer.remaining(), lessThan(expectedContentLength));
}
@Test
@ -408,8 +405,8 @@ public class StatisticsHandlerTest
assertThat(response, containsString("HTTP/1.1 500 Server Error"));
}
await().atMost(5, TimeUnit.SECONDS).until(_statsHandler::getRequestsActive, is(0));
assertEquals(1, _statsHandler.getRequests(), "stats.requests");
assertEquals(0, _statsHandler.getRequestsActive(), "stats.requestActive");
assertEquals(1, _statsHandler.getRequestsActiveMax(), "stats.requestsActiveMax");
// We get no recorded status, but we get a recorded thrown response.
@ -446,7 +443,7 @@ public class StatisticsHandlerTest
assertThat(response, containsString("HTTP/1.1 200 OK"));
}
Awaitility.waitAtMost(Duration.ofSeconds(10)).until(() -> _statsHandler.getRequestsActive() == 0);
await().atMost(Duration.ofSeconds(5)).until(_statsHandler::getRequestsActive, is(0));
assertEquals(1, _statsHandler.getRequests());
assertEquals(0, _statsHandler.getRequestsActive());
assertEquals(1, _statsHandler.getRequestsActiveMax());

View File

@ -117,13 +117,13 @@ public class TestableRequest implements Request
}
@Override
public long getTimeStamp()
public long getBeginNanoTime()
{
return 0;
}
@Override
public long getNanoTime()
public long getHeadersNanoTime()
{
return 0;
}

View File

@ -0,0 +1,59 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.test.client.transport;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
public class EchoHandler extends Handler.Abstract.NonBlocking
{
public EchoHandler()
{
}
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.setStatus(200);
long contentLength = -1;
for (HttpField field : request.getHeaders())
{
if (field.getHeader() != null)
{
switch (field.getHeader())
{
case CONTENT_LENGTH ->
{
response.getHeaders().add(field);
contentLength = field.getLongValue();
}
case CONTENT_TYPE -> response.getHeaders().add(field);
case TRAILER -> response.setTrailersSupplier(HttpFields.build());
}
}
}
if (contentLength != 0)
Content.copy(request, response, Response.newTrailersChunkProcessor(response), callback);
else
callback.succeeded();
return true;
}
}

View File

@ -0,0 +1,486 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.test.client.transport;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.StringRequestContent;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.handler.EventsHandler;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.NanoTime;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
public class EventsHandlerTest extends AbstractTest
{
@ParameterizedTest
@MethodSource("transports")
public void testEventsBufferAndChunkAreReadOnly(Transport transport) throws Exception
{
List<Throwable> onRequestReadExceptions = new CopyOnWriteArrayList<>();
List<Throwable> onResponseWriteExceptions = new CopyOnWriteArrayList<>();
EventsHandler eventsHandler = new EventsHandler(new EchoHandler())
{
@Override
protected void onRequestRead(Request request, Content.Chunk chunk)
{
try
{
if (chunk != null)
{
chunk.getByteBuffer().put((byte)0);
}
}
catch (ReadOnlyBufferException e)
{
onRequestReadExceptions.add(e);
throw e;
}
if (chunk != null)
chunk.skip(chunk.remaining());
}
@Override
protected void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
try
{
if (content != null)
content.put((byte)0);
}
catch (ReadOnlyBufferException e)
{
onResponseWriteExceptions.add(e);
throw e;
}
}
};
startServer(transport, eventsHandler);
startClient(transport);
ContentResponse response = client.POST(newURI(transport))
.body(new StringRequestContent("ABCDEF"))
.send();
assertThat(response.getStatus(), is(200));
assertThat(response.getContentAsString(), is("ABCDEF"));
assertThat(onRequestReadExceptions.size(), greaterThan(0));
assertThat(onResponseWriteExceptions.size(), greaterThan(0));
}
@ParameterizedTest
@MethodSource("transports")
public void testMultipleEventsHandlerChaining(Transport transport) throws Exception
{
String longString = "A".repeat(65536);
StringBuffer innerStringBuffer = new StringBuffer();
EventsHandler innerEventsHandler = new EventsHandler(new Handler.Abstract.NonBlocking()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.write(true, ByteBuffer.wrap(longString.getBytes(StandardCharsets.US_ASCII)), callback);
return true;
}
})
{
@Override
protected void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
if (content != null)
innerStringBuffer.append(BufferUtil.toString(content));
}
};
GzipHandler gzipHandler = new GzipHandler();
gzipHandler.setHandler(innerEventsHandler);
AtomicInteger outerBytesCounter = new AtomicInteger();
EventsHandler outerEventsHandler = new EventsHandler(gzipHandler)
{
@Override
protected void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
if (content != null)
outerBytesCounter.addAndGet(content.remaining());
}
};
startServer(transport, outerEventsHandler);
startClient(transport);
ContentResponse response = client.GET(newURI(transport));
assertThat(response.getStatus(), is(200));
assertThat(response.getContentAsString(), is(longString));
assertThat(innerStringBuffer.toString(), is(longString));
assertThat(outerBytesCounter.get(), both(greaterThan(0)).and(lessThan(longString.length())));
}
@ParameterizedTest
@MethodSource("transports")
public void testWriteNullBuffer(Transport transport) throws Exception
{
StringBuffer stringBuffer = new StringBuffer();
List<Throwable> failures = new CopyOnWriteArrayList<>();
EventsHandler eventsHandler = new EventsHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.write(false, ByteBuffer.wrap("ABCDEF".getBytes(StandardCharsets.US_ASCII)),
Callback.from(() -> response.write(false, null,
Callback.from(() -> response.write(true, null, callback), callback::failed))));
return true;
}
})
{
@Override
protected void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
if (content != null)
stringBuffer.append(BufferUtil.toString(content));
}
@Override
protected void onResponseWriteComplete(Request request, Throwable failure)
{
if (failure != null)
failures.add(failure);
}
};
startServer(transport, eventsHandler);
startClient(transport);
ContentResponse response = client.GET(newURI(transport));
assertThat(response.getStatus(), is(200));
assertThat(response.getContentAsString(), is("ABCDEF"));
assertThat(stringBuffer.toString(), is("ABCDEF"));
assertThat(failures.size(), is(0));
}
@ParameterizedTest
@MethodSource("transports")
public void testUsingEventsResponseAsContentSourceFails(Transport transport) throws Exception
{
TestForbiddenMethodsEventsHandler eventsHandler = new TestForbiddenMethodsEventsHandler(new EchoHandler());
startServer(transport, eventsHandler);
startClient(transport);
ContentResponse response = client.POST(newURI(transport))
.body(new StringRequestContent("ABCDEF"))
.send();
assertThat(response.getStatus(), is(200));
int events = switch (transport)
{
// Two reads, two writes, two writes complete.
case HTTP -> 10;
case HTTPS -> 10;
case FCGI -> 10;
case UNIX_DOMAIN -> 10;
// One read, one write, one write complete.
case H2 -> 7;
case H2C -> 7;
case H3 -> 7;
};
await().atMost(1, TimeUnit.SECONDS).until(eventsHandler.exceptions::size, is(4 * events));
}
@ParameterizedTest
@MethodSource("transports")
public void testUsingEventsResponseAsContentSourceFailsWithTrailers(Transport transport) throws Exception
{
TestForbiddenMethodsEventsHandler eventsHandler = new TestForbiddenMethodsEventsHandler(new EchoHandler());
startServer(transport, eventsHandler);
startClient(transport);
AtomicInteger status = new AtomicInteger();
AsyncRequestContent asyncRequestContent = new AsyncRequestContent();
CountDownLatch latch = new CountDownLatch(1);
client.POST(newURI(transport))
.body(asyncRequestContent)
.trailersSupplier(() -> HttpFields.build().put("Extra-Stuff", "xyz"))
.send(result ->
{
status.set(result.getResponse().getStatus());
latch.countDown();
});
asyncRequestContent.write(ByteBuffer.wrap("ABCDEF".getBytes(StandardCharsets.US_ASCII)), Callback.NOOP);
asyncRequestContent.close();
assertThat(latch.await(5, TimeUnit.SECONDS), is(true));
assertThat(status.get(), is(200));
int events = switch (transport)
{
// Reads return data, trailers.
case HTTP -> 10;
case HTTPS -> 10;
case FCGI -> 10;
case UNIX_DOMAIN -> 10;
// Reads return data, null, trailers.
case H2 -> 11;
case H2C -> 11;
case H3 -> 11;
};
await().atMost(1, TimeUnit.SECONDS).until(eventsHandler.exceptions::size, is(4 * events));
}
@ParameterizedTest
@MethodSource("transports")
public void testDelayedEvents(Transport transport) throws Exception
{
TestEventsRecordingHandler eventsHandler = new TestEventsRecordingHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
Fields query = Request.extractQueryParameters(request);
sleep(query, "handling");
new Thread(() ->
{
sleep(query, "succeeding");
callback.succeeded();
}).start();
return true;
}
private void sleep(Fields query, String fieldName)
{
Fields.Field delayField = query.get(fieldName);
if (delayField == null)
return;
long delay = Long.parseLong(delayField.getValue());
try
{
Thread.sleep(delay);
}
catch (InterruptedException e)
{
// ignore
}
}
});
startServer(transport, eventsHandler);
startClient(transport);
URI uri = URI.create(newURI(transport).toASCIIString() + "?handling=500&succeeding=500");
ContentResponse response = client.GET(uri);
assertThat(response.getStatus(), is(200));
await().atMost(1, TimeUnit.SECONDS).until(() -> eventsHandler.getEvents().size(), is(4));
assertThat(eventsHandler.getEvents().get(0).name, equalTo("onBeforeHandling"));
assertThat(eventsHandler.getEvents().get(0).delayInNs, greaterThan(0L));
assertThat(eventsHandler.getEvents().get(1).name, equalTo("onAfterHandling"));
assertThat(eventsHandler.getEvents().get(1).delayInNs - eventsHandler.getEvents().get(0).delayInNs, both(greaterThan(500_000_000L)).and(lessThan(600_000_000L)));
assertThat(eventsHandler.getEvents().get(2).name, equalTo("onResponseBegin"));
assertThat(eventsHandler.getEvents().get(2).delayInNs - eventsHandler.getEvents().get(1).delayInNs, both(greaterThan(500_000_000L)).and(lessThan(600_000_000L)));
assertThat(eventsHandler.getEvents().get(3).name, equalTo("onComplete"));
assertThat(eventsHandler.getEvents().get(3).delayInNs - eventsHandler.getEvents().get(2).delayInNs, greaterThan(0L));
}
private static class TestEventsRecordingHandler extends EventsHandler
{
private final long begin;
private final List<Event> events = new CopyOnWriteArrayList<>();
public TestEventsRecordingHandler(Handler handler)
{
super(handler);
this.begin = NanoTime.now();
}
private void addEvent(String name)
{
events.add(new Event(name, NanoTime.since(begin)));
}
public List<Event> getEvents()
{
return events;
}
@Override
protected void onBeforeHandling(Request request)
{
addEvent("onBeforeHandling");
}
@Override
protected void onRequestRead(Request request, Content.Chunk chunk)
{
addEvent("onRequestRead");
}
@Override
protected void onAfterHandling(Request request, boolean handled, Throwable failure)
{
addEvent("onAfterHandling");
}
@Override
protected void onResponseBegin(Request request, int status, HttpFields headers)
{
addEvent("onResponseBegin");
}
@Override
protected void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
addEvent("onResponseWrite");
}
@Override
protected void onResponseWriteComplete(Request request, Throwable failure)
{
addEvent("onResponseWriteComplete");
}
@Override
protected void onComplete(Request request, Throwable failure)
{
addEvent("onComplete");
}
record Event(String name, long delayInNs)
{
}
}
private static class TestForbiddenMethodsEventsHandler extends EventsHandler
{
private final List<Throwable> exceptions = new CopyOnWriteArrayList<>();
public TestForbiddenMethodsEventsHandler(Handler handler)
{
super(handler);
}
@Override
protected void onBeforeHandling(Request request)
{
// System.out.println("onBeforeHandling");
useForbiddenMethods(request, exceptions);
}
@Override
protected void onRequestRead(Request request, Content.Chunk chunk)
{
// System.out.println("onRequestRead " + chunk);
useForbiddenMethods(request, exceptions);
}
@Override
protected void onAfterHandling(Request request, boolean handled, Throwable failure)
{
// System.out.println("onAfterHandling");
useForbiddenMethods(request, exceptions);
}
@Override
protected void onResponseBegin(Request request, int status, HttpFields headers)
{
// System.out.println("onResponseBegin");
useForbiddenMethods(request, exceptions);
}
@Override
protected void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
// System.out.println("onResponseWrite");
useForbiddenMethods(request, exceptions);
}
@Override
protected void onResponseWriteComplete(Request request, Throwable failure)
{
// System.out.println("onResponseWriteComplete");
useForbiddenMethods(request, exceptions);
}
@Override
protected void onResponseTrailersComplete(Request request, HttpFields trailers)
{
// System.out.println("onResponseTrailersComplete");
useForbiddenMethods(request, exceptions);
}
@Override
protected void onComplete(Request request, Throwable failure)
{
// System.out.println("onComplete");
useForbiddenMethods(request, exceptions);
}
private static void useForbiddenMethods(Request request, List<Throwable> exceptions)
{
try
{
request.read();
}
catch (Throwable x)
{
exceptions.add(x);
}
try
{
request.demand(() -> {});
}
catch (Throwable x)
{
exceptions.add(x);
}
try
{
request.fail(new Throwable());
}
catch (Throwable x)
{
exceptions.add(x);
}
try
{
request.addHttpStreamWrapper(httpStream -> null);
}
catch (Throwable x)
{
exceptions.add(x);
}
}
}
}

View File

@ -165,7 +165,7 @@ public interface Attributes
public Wrapper(Attributes wrapped)
{
_wrapped = wrapped;
_wrapped = Objects.requireNonNull(wrapped);
}
public Attributes getWrapped()

View File

@ -244,6 +244,46 @@ public interface Callback extends Invocable
};
}
/**
* Creates a nested callback that runs completed after
* completing the nested callback.
*
* @param callback The nested callback
* @param completed The completion to run after the nested callback is completed
* @return a new callback.
*/
static Callback from(Callback callback, Consumer<Throwable> completed)
{
return new Callback()
{
@Override
public void succeeded()
{
try
{
callback.succeeded();
}
finally
{
completed.accept(null);
}
}
@Override
public void failed(Throwable x)
{
try
{
callback.failed(x);
}
finally
{
completed.accept(x);
}
}
};
}
/**
* Creates a nested callback that runs completed before
* completing the nested callback.

View File

@ -124,7 +124,7 @@ class PushBuilderImpl implements PushBuilder
}
HttpURI pushURI = HttpURI.build(_request.getHttpURI(), pushPath, pushParam, pushQuery).normalize();
MetaData.Request push = new MetaData.Request(_method, pushURI, _request.getConnectionMetaData().getHttpVersion(), _headers);
MetaData.Request push = new MetaData.Request(_request.getBeginNanoTime(), _method, pushURI, _request.getConnectionMetaData().getHttpVersion(), _headers);
_request.push(push);
_path = null;

View File

@ -794,7 +794,7 @@ public class ServletChannel
hashCode());
}
long timeStamp = _servletContextRequest.getTimeStamp();
long timeStamp = Request.getTimeStamp(_servletContextRequest);
return String.format("%s@%x{s=%s,r=%s,c=%b/%b,a=%s,uri=%s,age=%d}",
getClass().getSimpleName(),
hashCode(),

View File

@ -32,9 +32,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -80,7 +78,7 @@ public class HTTP1Servlet extends HttpServlet
String contextPath = request.getContextPath();
ServletOutputStream output = response.getOutputStream();
AsyncContext asyncContext = request.startAsync();
http2Client.connect(sslContextFactory, new InetSocketAddress(host, port), new Session.Listener() {}, new Promise<Session>()
http2Client.connect(sslContextFactory, new InetSocketAddress(host, port), new Session.Listener() {}, new Promise<>()
{
@Override
public void succeeded(Session session)

View File

@ -179,7 +179,7 @@ public class PushBuilderImpl implements PushBuilder
}
HttpURI uri = HttpURI.build(_request.getHttpURI(), path, param, query).normalize();
MetaData.Request push = new MetaData.Request(_method, uri, _request.getHttpVersion(), _fields);
MetaData.Request push = new MetaData.Request(_request.getCoreRequest().getBeginNanoTime(), _method, uri, _request.getHttpVersion(), _fields);
if (LOG.isDebugEnabled())
LOG.debug("Push {} {} inm={} ims={}", _method, uri, _fields.get(HttpHeader.IF_NONE_MATCH), _fields.get(HttpHeader.IF_MODIFIED_SINCE));

View File

@ -1485,9 +1485,10 @@ public class Request implements HttpServletRequest
_channel.getResponse().getHttpOutput().reopen();
_coreRequest = coreRequest;
setTimeStamp(coreRequest.getTimeStamp());
setTimeStamp(org.eclipse.jetty.server.Request.getTimeStamp(coreRequest));
_metaData = new MetaData.Request(
coreRequest.getBeginNanoTime(),
coreRequest.getMethod(),
coreRequest.getHttpURI(),
coreRequest.getConnectionMetaData().getHttpVersion(),

View File

@ -0,0 +1,190 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.ee9.nested;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Components;
import org.eclipse.jetty.server.ConnectionMetaData;
import org.eclipse.jetty.server.Context;
import org.eclipse.jetty.server.HttpStream;
import org.eclipse.jetty.server.Session;
import org.eclipse.jetty.server.TunnelSupport;
public class MockCoreRequest implements org.eclipse.jetty.server.Request
{
@Override
public Object removeAttribute(String name)
{
return null;
}
@Override
public Object setAttribute(String name, Object attribute)
{
return null;
}
@Override
public Object getAttribute(String name)
{
return null;
}
@Override
public Set<String> getAttributeNameSet()
{
return null;
}
@Override
public void clearAttributes()
{
}
@Override
public String getId()
{
return null;
}
@Override
public Components getComponents()
{
return null;
}
@Override
public ConnectionMetaData getConnectionMetaData()
{
return null;
}
@Override
public String getMethod()
{
return null;
}
@Override
public HttpURI getHttpURI()
{
return null;
}
@Override
public Context getContext()
{
return null;
}
@Override
public HttpFields getHeaders()
{
return null;
}
@Override
public HttpFields getTrailers()
{
return null;
}
public List<HttpCookie> getCookies()
{
return null;
}
@Override
public long getBeginNanoTime()
{
return 0;
}
@Override
public long getHeadersNanoTime()
{
return 0;
}
@Override
public boolean isSecure()
{
return false;
}
@Override
public long getLength()
{
return 0;
}
@Override
public Content.Chunk read()
{
return null;
}
@Override
public boolean consumeAvailable()
{
return false;
}
@Override
public void demand(Runnable demandCallback)
{
}
@Override
public void fail(Throwable failure)
{
}
@Override
public void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout)
{
}
@Override
public void addFailureListener(Consumer<Throwable> onFailure)
{
}
@Override
public TunnelSupport getTunnelSupport()
{
return null;
}
@Override
public void addHttpStreamWrapper(Function<HttpStream, HttpStream> wrapper)
{
}
@Override
public Session getSession(boolean create)
{
return null;
}
}

View File

@ -2338,7 +2338,7 @@ public class RequestTest
public TestCoreRequest(String uri, HttpFields.Mutable fields)
{
super(null, null, null);
super(new MockCoreRequest(), null, null);
_uri = uri;
_fields = fields;
_connectionMetaData = new MockConnectionMetaData();
@ -2393,13 +2393,7 @@ public class RequestTest
}
@Override
public long getTimeStamp()
{
return 0;
}
@Override
public long getNanoTime()
public long getHeadersNanoTime()
{
return 0;
}

View File

@ -2260,11 +2260,9 @@ public class ResponseTest
{
_channel.recycle();
long now = System.currentTimeMillis();
MetaData.Request reqMeta = new MetaData.Request("GET", HttpURI.from("http://myhost:8888/path/info"), version, HttpFields.EMPTY);
org.eclipse.jetty.server.Request coreRequest = new MockRequest(reqMeta, now, _context.getServletContext().getCoreContext());
org.eclipse.jetty.server.Request coreRequest = new MockRequest(reqMeta, _context.getServletContext().getCoreContext());
org.eclipse.jetty.server.Response coreResponse = new MockResponse(coreRequest);
_channel.onRequest(new ContextHandler.CoreContextRequest(coreRequest, _context.getCoreContextHandler().getContext(), _channel));
@ -2277,19 +2275,17 @@ public class ResponseTest
private class MockRequest extends Attributes.Mapped implements org.eclipse.jetty.server.Request
{
private final MetaData.Request _reqMeta;
private final long _now;
private final long _nanoTime = NanoTime.now();
private final Context _context;
public MockRequest(MetaData.Request reqMeta, long now)
public MockRequest(MetaData.Request reqMeta)
{
this(reqMeta, now, null);
this(reqMeta, null);
}
public MockRequest(MetaData.Request reqMeta, long now, Context context)
public MockRequest(MetaData.Request reqMeta, Context context)
{
_reqMeta = reqMeta;
_now = now;
_context = context == null ? _server.getContext() : context;
}
@ -2342,13 +2338,13 @@ public class ResponseTest
}
@Override
public long getTimeStamp()
public long getBeginNanoTime()
{
return _now;
return _nanoTime;
}
@Override
public long getNanoTime()
public long getHeadersNanoTime()
{
return _nanoTime;
}

View File

@ -32,9 +32,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -80,7 +78,7 @@ public class HTTP1Servlet extends HttpServlet
String contextPath = request.getContextPath();
ServletOutputStream output = response.getOutputStream();
AsyncContext asyncContext = request.startAsync();
http2Client.connect(sslContextFactory, new InetSocketAddress(host, port), new Session.Listener(){}, new Promise<Session>()
http2Client.connect(sslContextFactory, new InetSocketAddress(host, port), new Session.Listener(){}, new Promise<>()
{
@Override
public void succeeded(Session session)