Merge remote-tracking branch 'origin/jetty-12.0.x' into jetty-12.0.x-ee9-ContextHandlerClassLoading
This commit is contained in:
commit
d00391e86e
|
@ -150,9 +150,9 @@ public class Socks4Proxy extends ProxyConfiguration.Proxy
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeout)
|
||||
{
|
||||
failed(new TimeoutException("Idle timeout expired"));
|
||||
failed(timeout);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -185,9 +185,9 @@ public class Socks5Proxy extends Proxy
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeout)
|
||||
{
|
||||
fail(new TimeoutException("Idle timeout expired"));
|
||||
fail(timeout);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -173,12 +173,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeout)
|
||||
{
|
||||
long idleTimeout = getEndPoint().getIdleTimeout();
|
||||
boolean close = onIdleTimeout(idleTimeout);
|
||||
if (close)
|
||||
close(new TimeoutException("Idle timeout " + idleTimeout + " ms"));
|
||||
close(timeout);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.junit.jupiter.params.provider.MethodSource;
|
|||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
|
@ -186,10 +187,12 @@ public class HttpReceiverOverHTTPTest
|
|||
// ByteArrayEndPoint has an idle timeout of 0 by default,
|
||||
// so to simulate an idle timeout is enough to wait a bit.
|
||||
Thread.sleep(100);
|
||||
connection.onIdleExpired();
|
||||
TimeoutException timeoutException = new TimeoutException();
|
||||
connection.onIdleExpired(timeoutException);
|
||||
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
|
||||
assertThat(e.getCause(), instanceOf(TimeoutException.class));
|
||||
assertThat(e.getCause(), sameInstance(timeoutException));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
|
@ -217,13 +217,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeoutException)
|
||||
{
|
||||
long idleTimeout = getEndPoint().getIdleTimeout();
|
||||
TimeoutException failure = new TimeoutException("Idle timeout " + idleTimeout + " ms");
|
||||
boolean close = delegate.onIdleTimeout(idleTimeout, failure);
|
||||
boolean close = delegate.onIdleTimeout(idleTimeout, timeoutException);
|
||||
if (close)
|
||||
close(failure);
|
||||
close(timeoutException);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ package org.eclipse.jetty.fcgi.server.internal;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.fcgi.FCGI;
|
||||
import org.eclipse.jetty.fcgi.generator.Flusher;
|
||||
|
@ -301,6 +302,18 @@ public class HttpStreamOverFCGI implements HttpStream
|
|||
_generator.generateResponseContent(accumulator, _id, buffer, last, _aborted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return _connection.getEndPoint().getIdleTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIdleTimeout(long idleTimeoutMs)
|
||||
{
|
||||
_connection.getEndPoint().setIdleTimeout(idleTimeoutMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitted()
|
||||
{
|
||||
|
@ -328,9 +341,9 @@ public class HttpStreamOverFCGI implements HttpStream
|
|||
_connection.onCompleted(x);
|
||||
}
|
||||
|
||||
public boolean onIdleTimeout(Throwable timeout)
|
||||
public boolean onIdleTimeout(TimeoutException timeout)
|
||||
{
|
||||
Runnable task = _httpChannel.onFailure(timeout);
|
||||
Runnable task = _httpChannel.onIdleTimeout(timeout);
|
||||
if (task != null)
|
||||
execute(task);
|
||||
return false;
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.fcgi.server.internal;
|
|||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.fcgi.FCGI;
|
||||
import org.eclipse.jetty.fcgi.generator.Flusher;
|
||||
|
@ -291,7 +292,7 @@ public class ServerFCGIConnection extends AbstractConnection implements Connecti
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
protected boolean onReadTimeout(TimeoutException timeout)
|
||||
{
|
||||
if (stream != null)
|
||||
return stream.onIdleTimeout(timeout);
|
||||
|
@ -323,6 +324,15 @@ public class ServerFCGIConnection extends AbstractConnection implements Connecti
|
|||
getFlusher().shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired(TimeoutException timeoutException)
|
||||
{
|
||||
Runnable task = stream.getHttpChannel().onIdleTimeout(timeoutException);
|
||||
if (task != null)
|
||||
getExecutor().execute(task);
|
||||
return false;
|
||||
}
|
||||
|
||||
private class ServerListener implements ServerParser.Listener
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -522,6 +522,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
@Override
|
||||
public boolean handle(org.eclipse.jetty.server.Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
|
||||
{
|
||||
// Handler says it will handle the idletimeout
|
||||
request.addIdleTimeoutListener(t -> false);
|
||||
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
|
||||
callback.succeeded();
|
||||
return true;
|
||||
|
@ -530,7 +532,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
|
||||
connector.setIdleTimeout(idleTimeout);
|
||||
|
||||
// Request does not fail because idle timeouts while dispatched are ignored.
|
||||
// Request does not fail because handler says it will handle it.
|
||||
ContentResponse response1 = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.idleTimeout(4 * idleTimeout, TimeUnit.MILLISECONDS)
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
|
||||
package org.eclipse.jetty.http2.client.transport.internal;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.http2.HTTP2Channel;
|
||||
import org.eclipse.jetty.http2.HTTP2Stream;
|
||||
import org.eclipse.jetty.http2.HTTP2StreamEndPoint;
|
||||
|
@ -38,13 +40,13 @@ public class ClientHTTP2StreamEndPoint extends HTTP2StreamEndPoint implements HT
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(Throwable failure, Promise<Boolean> promise)
|
||||
public void onTimeout(TimeoutException timeout, Promise<Boolean> promise)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("idle timeout on {}: {}", this, failure);
|
||||
LOG.debug("idle timeout on {}", this, timeout);
|
||||
Connection connection = getConnection();
|
||||
if (connection != null)
|
||||
promise.succeeded(connection.onIdleExpired());
|
||||
promise.succeeded(connection.onIdleExpired(timeout));
|
||||
else
|
||||
promise.succeeded(true);
|
||||
}
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
|
||||
package org.eclipse.jetty.http2.client.transport.internal;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.client.Result;
|
||||
import org.eclipse.jetty.client.transport.HttpChannel;
|
||||
import org.eclipse.jetty.client.transport.HttpExchange;
|
||||
|
@ -200,7 +202,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onIdleTimeout(Stream stream, Throwable x, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
|
||||
{
|
||||
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
|
||||
channel.onTimeout(x, promise);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package org.eclipse.jetty.http2.client.transport.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.eclipse.jetty.client.HttpUpgrader;
|
||||
|
@ -220,7 +221,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(Throwable failure, Promise<Boolean> promise)
|
||||
public void onTimeout(TimeoutException failure, Promise<Boolean> promise)
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange != null)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.http2;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
|
@ -34,7 +35,7 @@ public interface HTTP2Channel
|
|||
{
|
||||
public void onDataAvailable();
|
||||
|
||||
public void onTimeout(Throwable failure, Promise<Boolean> promise);
|
||||
public void onTimeout(TimeoutException failure, Promise<Boolean> promise);
|
||||
|
||||
public void onFailure(Throwable failure, Callback callback);
|
||||
}
|
||||
|
@ -54,7 +55,7 @@ public interface HTTP2Channel
|
|||
// TODO: review the signature because the serialization done by HttpChannel.onError()
|
||||
// is now failing the callback which fails the HttpStream, which should decide whether
|
||||
// to reset the HTTP/2 stream, so we may not need the boolean return type.
|
||||
public void onTimeout(Throwable failure, BiConsumer<Runnable, Boolean> consumer);
|
||||
public void onTimeout(TimeoutException timeout, BiConsumer<Runnable, Boolean> consumer);
|
||||
|
||||
// TODO: can it be simplified? The callback seems to only be succeeded, which
|
||||
// means it can be converted into a Runnable which may just be the return type
|
||||
|
|
|
@ -18,6 +18,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
|
@ -174,7 +175,7 @@ public class HTTP2Connection extends AbstractConnection implements Parser.Listen
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeoutException)
|
||||
{
|
||||
boolean idle = isFillInterested();
|
||||
if (idle)
|
||||
|
|
|
@ -437,16 +437,12 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
|
|||
}
|
||||
}
|
||||
|
||||
if (getListener() != null)
|
||||
{
|
||||
if (offer(data))
|
||||
processData();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (updateClose(data.frame().isEndStream(), CloseState.Event.RECEIVED))
|
||||
session.removeStream(this);
|
||||
}
|
||||
boolean listenerPresent = getListener() != null;
|
||||
boolean endStream = data.frame().isEndStream();
|
||||
if ((listenerPresent || endStream) && offer(data))
|
||||
processData();
|
||||
if (!listenerPresent && updateClose(endStream, CloseState.Event.RECEIVED))
|
||||
session.removeStream(this);
|
||||
}
|
||||
|
||||
private boolean offer(Data data)
|
||||
|
@ -839,7 +835,7 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
|
|||
}
|
||||
}
|
||||
|
||||
private void notifyIdleTimeout(Stream stream, Throwable failure, Promise<Boolean> promise)
|
||||
private void notifyIdleTimeout(Stream stream, TimeoutException failure, Promise<Boolean> promise)
|
||||
{
|
||||
Listener listener = this.listener;
|
||||
if (listener != null)
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package org.eclipse.jetty.http2.api;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
|
@ -218,7 +219,7 @@ public interface Stream
|
|||
/**
|
||||
* @param idleTimeout the stream idle timeout
|
||||
* @see #getIdleTimeout()
|
||||
* @see Stream.Listener#onIdleTimeout(Stream, Throwable, Promise)
|
||||
* @see Stream.Listener#onIdleTimeout(Stream, TimeoutException, Promise)
|
||||
*/
|
||||
public void setIdleTimeout(long idleTimeout);
|
||||
|
||||
|
@ -369,7 +370,7 @@ public interface Stream
|
|||
* @param promise the promise to complete
|
||||
* @see #getIdleTimeout()
|
||||
*/
|
||||
public default void onIdleTimeout(Stream stream, Throwable x, Promise<Boolean> promise)
|
||||
public default void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
|
||||
{
|
||||
promise.succeeded(true);
|
||||
}
|
||||
|
|
|
@ -168,7 +168,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onIdleTimeout(Stream stream, Throwable x, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
|
||||
{
|
||||
getConnection().onStreamTimeout(stream, x, promise);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import java.util.Base64;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.http.BadMessageException;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
|
@ -155,14 +156,14 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
|||
}
|
||||
}
|
||||
|
||||
public void onStreamTimeout(Stream stream, Throwable failure, Promise<Boolean> promise)
|
||||
public void onStreamTimeout(Stream stream, TimeoutException timeout, Promise<Boolean> promise)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Idle timeout on {}", stream, failure);
|
||||
LOG.debug("Idle timeout on {}", stream, timeout);
|
||||
HTTP2Channel.Server channel = (HTTP2Channel.Server)((HTTP2Stream)stream).getAttachment();
|
||||
if (channel != null)
|
||||
{
|
||||
channel.onTimeout(failure, (task, timedOut) ->
|
||||
channel.onTimeout(timeout, (task, timedOut) ->
|
||||
{
|
||||
if (task != null)
|
||||
offerTask(task, true);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package org.eclipse.jetty.http2.server.internal;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -451,6 +452,18 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
|
|||
return MetaData.isTunnel(request.getMethod(), response.getStatus());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return _stream.getIdleTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIdleTimeout(long idleTimeoutMs)
|
||||
{
|
||||
_stream.setIdleTimeout(idleTimeoutMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void push(MetaData.Request resource)
|
||||
{
|
||||
|
@ -558,9 +571,9 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(Throwable failure, BiConsumer<Runnable, Boolean> consumer)
|
||||
public void onTimeout(TimeoutException timeout, BiConsumer<Runnable, Boolean> consumer)
|
||||
{
|
||||
Runnable task = _httpChannel.onFailure(failure);
|
||||
Runnable task = _httpChannel.onIdleTimeout(timeout);
|
||||
boolean idle = !_httpChannel.isRequestHandled();
|
||||
consumer.accept(task, idle);
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.http2.server.internal;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.eclipse.jetty.http2.HTTP2Channel;
|
||||
|
@ -48,19 +49,19 @@ public class ServerHTTP2StreamEndPoint extends HTTP2StreamEndPoint implements HT
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(Throwable failure, BiConsumer<Runnable, Boolean> consumer)
|
||||
public void onTimeout(TimeoutException timeout, BiConsumer<Runnable, Boolean> consumer)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("idle timeout on {}: {}", this, failure);
|
||||
LOG.debug("idle timeout on {}", this, timeout);
|
||||
boolean result = true;
|
||||
Connection connection = getConnection();
|
||||
if (connection != null)
|
||||
result = connection.onIdleExpired();
|
||||
result = connection.onIdleExpired(timeout);
|
||||
Runnable r = null;
|
||||
if (result)
|
||||
{
|
||||
processFailure(failure);
|
||||
r = () -> close(failure);
|
||||
processFailure(timeout);
|
||||
r = () -> close(timeout);
|
||||
}
|
||||
consumer.accept(r, result);
|
||||
}
|
||||
|
@ -69,7 +70,7 @@ public class ServerHTTP2StreamEndPoint extends HTTP2StreamEndPoint implements HT
|
|||
public Runnable onFailure(Throwable failure, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failure on {}: {}", this, failure);
|
||||
LOG.debug("failure on {}", this, failure);
|
||||
processFailure(failure);
|
||||
close(failure);
|
||||
return callback::succeeded;
|
||||
|
|
|
@ -392,7 +392,7 @@ public class IdleTimeoutTest extends AbstractTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onIdleTimeout(Stream stream, Throwable x, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
|
||||
{
|
||||
assertThat(x, Matchers.instanceOf(TimeoutException.class));
|
||||
timeoutLatch.countDown();
|
||||
|
@ -429,7 +429,7 @@ public class IdleTimeoutTest extends AbstractTest
|
|||
return new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onIdleTimeout(Stream stream, Throwable x, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
|
||||
{
|
||||
timeoutLatch.countDown();
|
||||
promise.succeeded(true);
|
||||
|
@ -476,7 +476,7 @@ public class IdleTimeoutTest extends AbstractTest
|
|||
return new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onIdleTimeout(Stream stream, Throwable x, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
|
||||
{
|
||||
timeoutLatch.countDown();
|
||||
promise.succeeded(true);
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Random;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
|
@ -521,7 +522,7 @@ public class RawHTTP2ProxyTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onIdleTimeout(Stream stream, Throwable x, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
|
||||
{
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("CPS idle timeout for {}", stream);
|
||||
|
@ -684,7 +685,7 @@ public class RawHTTP2ProxyTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onIdleTimeout(Stream stream, Throwable x, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
|
||||
{
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("SPC idle timeout for {}", stream);
|
||||
|
|
|
@ -17,6 +17,7 @@ import java.io.IOException;
|
|||
import java.io.UncheckedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
|
@ -84,7 +85,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
protected boolean onReadTimeout(TimeoutException timeout)
|
||||
{
|
||||
// Idle timeouts are handled by HTTP3Stream.
|
||||
return false;
|
||||
|
@ -476,7 +477,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("received {}#{}", frame, streamId);
|
||||
Runnable delegate = () -> super.onData(streamId, frame);
|
||||
if (!HTTP3StreamConnection.this.action.compareAndSet(null, () -> processData(frame, delegate)))
|
||||
Runnable action = () -> processData(frame, delegate);
|
||||
if (!HTTP3StreamConnection.this.action.compareAndSet(null, action))
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.api;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.frames.DataFrame;
|
||||
|
@ -354,7 +355,7 @@ public interface Stream
|
|||
* @param promise the promise to complete with true to reset the stream,
|
||||
* false to ignore the idle timeout
|
||||
*/
|
||||
public default void onIdleTimeout(Server stream, Throwable failure, Promise<Boolean> promise)
|
||||
public default void onIdleTimeout(Server stream, TimeoutException failure, Promise<Boolean> promise)
|
||||
{
|
||||
promise.succeeded(true);
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package org.eclipse.jetty.http3.server;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
|
@ -142,10 +143,10 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onIdleTimeout(Stream.Server stream, Throwable failure, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream.Server stream, TimeoutException timeout, Promise<Boolean> promise)
|
||||
{
|
||||
HTTP3Stream http3Stream = (HTTP3Stream)stream;
|
||||
getConnection().onIdleTimeout((HTTP3Stream)stream, failure, (task, timedOut) ->
|
||||
getConnection().onIdleTimeout((HTTP3Stream)stream, timeout, (task, timedOut) ->
|
||||
{
|
||||
if (task != null)
|
||||
{
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.server.internal;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -471,6 +472,18 @@ public class HttpStreamOverHTTP3 implements HttpStream
|
|||
return stream.trailer(frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return stream.getIdleTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIdleTimeout(long idleTimeoutMs)
|
||||
{
|
||||
stream.setIdleTimeout(idleTimeoutMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitted()
|
||||
{
|
||||
|
@ -514,9 +527,9 @@ public class HttpStreamOverHTTP3 implements HttpStream
|
|||
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
|
||||
}
|
||||
|
||||
public void onIdleTimeout(Throwable failure, BiConsumer<Runnable, Boolean> consumer)
|
||||
public void onIdleTimeout(TimeoutException failure, BiConsumer<Runnable, Boolean> consumer)
|
||||
{
|
||||
Runnable runnable = httpChannel.onFailure(failure);
|
||||
Runnable runnable = httpChannel.onIdleTimeout(failure);
|
||||
boolean idle = !httpChannel.isRequestHandled();
|
||||
consumer.accept(runnable, idle);
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.server.internal;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
|
@ -69,10 +70,10 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection implement
|
|||
return httpStream.onTrailer(frame);
|
||||
}
|
||||
|
||||
public void onIdleTimeout(HTTP3Stream stream, Throwable failure, BiConsumer<Runnable, Boolean> consumer)
|
||||
public void onIdleTimeout(HTTP3Stream stream, TimeoutException timeout, BiConsumer<Runnable, Boolean> consumer)
|
||||
{
|
||||
HttpStreamOverHTTP3 httpStream = (HttpStreamOverHTTP3)stream.getAttachment();
|
||||
httpStream.onIdleTimeout(failure, consumer);
|
||||
httpStream.onIdleTimeout(timeout, consumer);
|
||||
}
|
||||
|
||||
public Runnable onFailure(HTTP3Stream stream, Throwable failure)
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.tests;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
|
@ -157,7 +158,7 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
|
|||
return new Stream.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onIdleTimeout(Stream.Server stream, Throwable failure, Promise<Boolean> promise)
|
||||
public void onIdleTimeout(Stream.Server stream, TimeoutException failure, Promise<Boolean> promise)
|
||||
{
|
||||
serverIdleLatch.countDown();
|
||||
promise.succeeded(true);
|
||||
|
|
|
@ -174,8 +174,8 @@ public abstract class AbstractConnection implements Connection, Invocable
|
|||
if (_endPoint.isOpen())
|
||||
{
|
||||
boolean close = true;
|
||||
if (cause instanceof TimeoutException)
|
||||
close = onReadTimeout(cause);
|
||||
if (cause instanceof TimeoutException timeout)
|
||||
close = onReadTimeout(timeout);
|
||||
if (close)
|
||||
{
|
||||
if (_endPoint.isOutputShutdown())
|
||||
|
@ -195,7 +195,7 @@ public abstract class AbstractConnection implements Connection, Invocable
|
|||
* @param timeout the cause of the read timeout
|
||||
* @return true to signal that the endpoint must be closed, false to keep the endpoint open
|
||||
*/
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
protected boolean onReadTimeout(TimeoutException timeout)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ public abstract class AbstractConnection implements Connection, Invocable
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeoutException)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -392,30 +392,27 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
|
|||
protected void onIdleExpired(TimeoutException timeout)
|
||||
{
|
||||
Connection connection = _connection;
|
||||
if (connection != null && !connection.onIdleExpired())
|
||||
if (connection != null && !connection.onIdleExpired(timeout))
|
||||
return;
|
||||
|
||||
boolean outputShutdown = isOutputShutdown();
|
||||
boolean inputShutdown = isInputShutdown();
|
||||
boolean fillFailed = _fillInterest.onFail(timeout);
|
||||
boolean writeFailed = _writeFlusher.onFail(timeout);
|
||||
boolean isOpen = isOpen();
|
||||
|
||||
// If the endpoint is half closed and there was no fill/write handling, then close here.
|
||||
// This handles the situation where the connection has completed its close handling
|
||||
// and the endpoint is half closed, but the other party does not complete the close.
|
||||
// This perhaps should not check for half closed, however the servlet spec case allows
|
||||
// for a dispatched servlet or suspended request to extend beyond the connections idle
|
||||
// time. So if this test would always close an idle endpoint that is not handled, then
|
||||
// we would need a mode to ignore timeouts for some HTTP states
|
||||
if (isOpen() && (inputShutdown || outputShutdown) && !(fillFailed || writeFailed))
|
||||
close();
|
||||
else
|
||||
LOG.debug("handled idle inputShutdown={} outputShutdown={} fillFailed={} writeFailed={} for {}",
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("handled idle isOpen={} inputShutdown={} outputShutdown={} fillFailed={} writeFailed={} for {}",
|
||||
isOpen,
|
||||
inputShutdown,
|
||||
outputShutdown,
|
||||
fillFailed,
|
||||
writeFailed,
|
||||
this);
|
||||
|
||||
// If the endpoint is open and there was no fill/write handling, then close here.
|
||||
if (isOpen && !(fillFailed || writeFailed))
|
||||
close(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.io;
|
|||
import java.io.Closeable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.EventListener;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.util.component.Container;
|
||||
|
||||
|
@ -82,7 +83,7 @@ public interface Connection extends Closeable
|
|||
* @return true to let the EndPoint handle the idle timeout,
|
||||
* false to tell the EndPoint to halt the handling of the idle timeout.
|
||||
*/
|
||||
boolean onIdleExpired();
|
||||
boolean onIdleExpired(TimeoutException timeoutException);
|
||||
|
||||
long getMessagesIn();
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.ToIntFunction;
|
||||
|
@ -348,9 +349,9 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeoutException)
|
||||
{
|
||||
return getSslEndPoint().getConnection().onIdleExpired();
|
||||
return getSslEndPoint().getConnection().onIdleExpired(timeoutException);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,11 +17,9 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.NanoTime;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.eclipse.jetty.util.thread.TimerScheduler;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
|
@ -30,9 +28,6 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -270,33 +265,6 @@ public class ByteArrayEndPointTest
|
|||
|
||||
assertTrue(endp.isOpen());
|
||||
Thread.sleep(oneAndHalfIdleTimeout);
|
||||
// Still open because it has not been oshut or closed explicitly
|
||||
// and there are no callbacks, so idle timeout is ignored.
|
||||
assertTrue(endp.isOpen());
|
||||
|
||||
// Normal read is immediate, since there is data to read.
|
||||
ByteBuffer buffer = BufferUtil.allocate(1024);
|
||||
FutureCallback fcb = new FutureCallback();
|
||||
endp.fillInterested(fcb);
|
||||
fcb.get(idleTimeout, TimeUnit.MILLISECONDS);
|
||||
assertTrue(fcb.isDone());
|
||||
assertEquals(4, endp.fill(buffer));
|
||||
assertEquals("test", BufferUtil.toString(buffer));
|
||||
|
||||
// Wait for a read timeout.
|
||||
long start = NanoTime.now();
|
||||
fcb = new FutureCallback();
|
||||
endp.fillInterested(fcb);
|
||||
try
|
||||
{
|
||||
fcb.get();
|
||||
fail("Expected ExecutionException");
|
||||
}
|
||||
catch (ExecutionException t)
|
||||
{
|
||||
assertThat(t.getCause(), instanceOf(TimeoutException.class));
|
||||
}
|
||||
assertThat(NanoTime.millisSince(start), greaterThan(halfIdleTimeout));
|
||||
assertThat("Endpoint open", endp.isOpen(), is(true));
|
||||
assertFalse(endp.isOpen());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.io.ClientConnector;
|
||||
import org.eclipse.jetty.io.DatagramChannelEndPoint;
|
||||
|
@ -159,7 +160,7 @@ public class ClientQuicConnection extends QuicConnection
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeoutException)
|
||||
{
|
||||
boolean idle = isFillInterested();
|
||||
long idleTimeout = getEndPoint().getIdleTimeout();
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
|
@ -171,7 +172,7 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
}
|
||||
|
||||
@Override
|
||||
public abstract boolean onIdleExpired();
|
||||
public abstract boolean onIdleExpired(TimeoutException timeoutException);
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
|
|
|
@ -18,6 +18,7 @@ import java.net.InetSocketAddress;
|
|||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.CyclicTimeouts;
|
||||
|
@ -99,7 +100,7 @@ public class ServerQuicConnection extends QuicConnection
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeoutException)
|
||||
{
|
||||
// The current server architecture only has one listening
|
||||
// DatagramChannelEndPoint, so we ignore idle timeouts.
|
||||
|
|
|
@ -13,6 +13,10 @@
|
|||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.server.internal.HttpChannelState;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
|
@ -70,14 +74,25 @@ public interface HttpChannel extends Invocable
|
|||
*/
|
||||
Runnable onContentAvailable();
|
||||
|
||||
/**
|
||||
* <p>Notifies this {@code HttpChannel} that an idle timeout happened.</p>
|
||||
*
|
||||
* @param idleTimeout the timeout.
|
||||
* @return a {@code Runnable} that performs the timeout action, or {@code null}
|
||||
* if no action need be performed by the calling thread
|
||||
* @see Request#addIdleTimeoutListener(Predicate)
|
||||
*/
|
||||
Runnable onIdleTimeout(TimeoutException idleTimeout);
|
||||
|
||||
/**
|
||||
* <p>Notifies this {@code HttpChannel} that an asynchronous failure happened.</p>
|
||||
* <p>Typical failure examples could be idle timeouts, I/O read failures or
|
||||
* <p>Typical failure examples could be HTTP/2 resets or
|
||||
* protocol failures (for example, invalid request bytes).</p>
|
||||
*
|
||||
* @param failure the failure cause.
|
||||
* @return a {@code Runnable} that performs the failure action, or {@code null}
|
||||
* if no failure action should be performed by the caller thread
|
||||
* if no failure action need be performed by the calling thread
|
||||
* @see Request#addFailureListener(Consumer)
|
||||
*/
|
||||
Runnable onFailure(Throwable failure);
|
||||
|
||||
|
|
|
@ -99,6 +99,10 @@ public interface HttpStream extends Callback
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
long getIdleTimeout();
|
||||
|
||||
void setIdleTimeout(long idleTimeoutMs);
|
||||
|
||||
boolean isCommitted();
|
||||
|
||||
default TunnelSupport getTunnelSupport()
|
||||
|
@ -192,6 +196,18 @@ public interface HttpStream extends Callback
|
|||
getWrapped().push(resource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return getWrapped().getIdleTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIdleTimeout(long idleTimeoutMs)
|
||||
{
|
||||
getWrapped().setIdleTimeout(idleTimeoutMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isCommitted()
|
||||
{
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.nio.charset.Charset;
|
|||
import java.security.Principal;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -249,18 +251,41 @@ public interface Request extends Attributes, Content.Source
|
|||
}
|
||||
|
||||
/**
|
||||
* <p>Adds a listener for asynchronous errors.</p>
|
||||
* <p>Adds a listener for idle timeouts.</p>
|
||||
* <p>The listener is a predicate function that should return {@code true} to indicate
|
||||
* that the function will complete (either successfully or with a failure) the callback
|
||||
* received from {@link org.eclipse.jetty.server.Handler#handle(Request, Response, Callback)}, or
|
||||
* {@code false} otherwise.</p>
|
||||
* that the idle timeout should be handled by the container as a hard failure
|
||||
* (see {@link #addFailureListener(Consumer)}); or {@code false} to ignore that specific timeout and for another timeout
|
||||
* to occur after another idle period.</p>
|
||||
* <p>Any pending {@link #demand(Runnable)} or {@link Response#write(boolean, ByteBuffer, Callback)} operations
|
||||
* are not affected by this call. Applications need to be mindful of any such pending operations if attempting
|
||||
* to make new operations.</p>
|
||||
* <p>Listeners are processed in sequence, and the first that returns {@code true}
|
||||
* stops the processing of subsequent listeners, which are therefore not invoked.</p>
|
||||
*
|
||||
* @param onError the predicate function
|
||||
* @return true if the listener completes the callback, false otherwise
|
||||
* @param onIdleTimeout the predicate function
|
||||
* @see #addFailureListener(Consumer)
|
||||
*/
|
||||
boolean addErrorListener(Predicate<Throwable> onError);
|
||||
void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout);
|
||||
|
||||
/**
|
||||
* <p>Adds a listener for asynchronous hard errors.</p>
|
||||
* <p>When a listener is called, the effects of the error will already have taken place:</p>
|
||||
* <ul>
|
||||
* <li>Pending {@link #demand(Runnable)} will be woken up.</li>
|
||||
* <li>Calls to {@link #read()} will return the {@code Throwable}.</li>
|
||||
* <li>Pending and new {@link Response#write(boolean, ByteBuffer, Callback)} calls will be failed by
|
||||
* calling {@link Callback#failed(Throwable)} on the callback passed to {@code write(...)}.</li>
|
||||
* <li>Any call to {@link Callback#succeeded()} on the callback passed to
|
||||
* {@link Handler#handle(Request, Response, Callback)} will effectively be a call to {@link Callback#failed(Throwable)}
|
||||
* with the notified {@link Throwable}.</li>
|
||||
* </ul>
|
||||
* <p>Listeners are processed in sequence. When all listeners are invoked then {@link Callback#failed(Throwable)}
|
||||
* will be called on the callback passed to {@link Handler#handle(Request, Response, Callback)}.</p>
|
||||
*
|
||||
* @param onFailure the consumer function
|
||||
* @see #addIdleTimeoutListener(Predicate)
|
||||
*/
|
||||
void addFailureListener(Consumer<Throwable> onFailure);
|
||||
|
||||
TunnelSupport getTunnelSupport();
|
||||
|
||||
|
@ -657,9 +682,15 @@ public interface Request extends Attributes, Content.Source
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addErrorListener(Predicate<Throwable> onError)
|
||||
public void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout)
|
||||
{
|
||||
return getWrapped().addErrorListener(onError);
|
||||
getWrapped().addIdleTimeoutListener(onIdleTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFailureListener(Consumer<Throwable> onFailure)
|
||||
{
|
||||
getWrapped().addFailureListener(onFailure);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
|
@ -1202,6 +1203,23 @@ public class ContextHandler extends Handler.Wrapper implements Attributes, Alias
|
|||
}
|
||||
}
|
||||
|
||||
public <T> boolean test(Predicate<T> predicate, T t, Request request)
|
||||
{
|
||||
Context lastContext = __context.get();
|
||||
if (lastContext == this)
|
||||
return predicate.test(t);
|
||||
|
||||
ClassLoader lastLoader = enterScope(request);
|
||||
try
|
||||
{
|
||||
return predicate.test(t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
exitScope(request, lastContext, lastLoader);
|
||||
}
|
||||
}
|
||||
|
||||
public void accept(Consumer<Throwable> consumer, Throwable t, Request request)
|
||||
{
|
||||
Context lastContext = __context.get();
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
|
||||
package org.eclipse.jetty.server.handler;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.eclipse.jetty.server.Context;
|
||||
|
@ -40,15 +42,15 @@ public class ContextRequest extends Request.Wrapper implements Invocable
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addErrorListener(Predicate<Throwable> onError)
|
||||
public void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout)
|
||||
{
|
||||
return super.addErrorListener(t ->
|
||||
{
|
||||
// TODO: implement the line below
|
||||
// return _context.apply(onError::test, t, ContextRequest.this);
|
||||
_context.accept(onError::test, t, ContextRequest.this);
|
||||
return true;
|
||||
});
|
||||
super.addIdleTimeoutListener(t -> _context.test(onIdleTimeout, t, ContextRequest.this));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFailureListener(Consumer<Throwable> onFailure)
|
||||
{
|
||||
super.addFailureListener(t -> _context.accept(onFailure, t, ContextRequest.this));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,9 @@ import java.util.HashMap;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
@ -110,12 +112,23 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
private boolean _callbackCompleted = false;
|
||||
private ChannelRequest _request;
|
||||
private ChannelResponse _response;
|
||||
private long _oldIdleTimeout;
|
||||
private HttpStream _stream;
|
||||
private long _committedContentLength = -1;
|
||||
private Runnable _onContentAvailable;
|
||||
private Content.Chunk.Error _error;
|
||||
private Throwable _failure;
|
||||
private Predicate<Throwable> _onError;
|
||||
private Predicate<TimeoutException> _onIdleTimeout;
|
||||
/**
|
||||
* Failure passed to {@link #onFailure(Throwable)}
|
||||
*/
|
||||
private Content.Chunk.Error _failure;
|
||||
/**
|
||||
* Listener for {@link #onFailure(Throwable)} events
|
||||
*/
|
||||
private Consumer<Throwable> _onFailure;
|
||||
/**
|
||||
* Failure passed to {@link ChannelCallback#failed(Throwable)}
|
||||
*/
|
||||
private Throwable _callbackFailure;
|
||||
private Attributes _cache;
|
||||
|
||||
public HttpChannelState(ConnectionMetaData connectionMetaData)
|
||||
|
@ -149,11 +162,11 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_handling = null;
|
||||
_handled = false;
|
||||
_callbackCompleted = false;
|
||||
_failure = null;
|
||||
_callbackFailure = null;
|
||||
_committedContentLength = -1;
|
||||
_onContentAvailable = null;
|
||||
_error = null;
|
||||
_onError = null;
|
||||
_failure = null;
|
||||
_onFailure = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -262,13 +275,20 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_response = new ChannelResponse(_request);
|
||||
|
||||
HttpFields.Mutable responseHeaders = _response.getHeaders();
|
||||
if (getHttpConfiguration().getSendServerVersion())
|
||||
HttpConfiguration httpConfiguration = getHttpConfiguration();
|
||||
if (httpConfiguration.getSendServerVersion())
|
||||
responseHeaders.add(SERVER_VERSION);
|
||||
if (getHttpConfiguration().getSendXPoweredBy())
|
||||
if (httpConfiguration.getSendXPoweredBy())
|
||||
responseHeaders.add(POWERED_BY);
|
||||
if (getHttpConfiguration().getSendDateHeader())
|
||||
if (httpConfiguration.getSendDateHeader())
|
||||
responseHeaders.add(getConnectionMetaData().getConnector().getServer().getDateField());
|
||||
|
||||
long idleTO = httpConfiguration.getIdleTimeout();
|
||||
_oldIdleTimeout = _stream.getIdleTimeout();
|
||||
if (idleTO >= 0 && _oldIdleTimeout != idleTO)
|
||||
_stream.setIdleTimeout(idleTO);
|
||||
|
||||
|
||||
// This is deliberately not serialized to allow a handler to block.
|
||||
return _handlerInvoker;
|
||||
}
|
||||
|
@ -326,15 +346,43 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
return Invocable.getInvocationType(onContent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable onIdleTimeout(TimeoutException t)
|
||||
{
|
||||
Predicate<TimeoutException> onIdleTimeout;
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onIdleTimeout {}", this, t);
|
||||
onIdleTimeout = _onIdleTimeout;
|
||||
}
|
||||
|
||||
if (onIdleTimeout != null)
|
||||
{
|
||||
Runnable onIdle = () ->
|
||||
{
|
||||
if (onIdleTimeout.test(t))
|
||||
{
|
||||
Runnable task = onFailure(t);
|
||||
if (task != null)
|
||||
task.run();
|
||||
}
|
||||
};
|
||||
return _serializedInvoker.offer(onIdle);
|
||||
}
|
||||
return onFailure(t); // TODO can we avoid double lock?
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable onFailure(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onFailure {}", this, x);
|
||||
|
||||
HttpStream stream;
|
||||
Runnable task;
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onFailure {}", this, x);
|
||||
|
||||
// If the channel doesn't have a stream, then the error is ignored.
|
||||
if (_stream == null)
|
||||
return null;
|
||||
|
@ -349,14 +397,13 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
|
||||
// Set the error to arrange for any subsequent reads, demands or writes to fail.
|
||||
if (_error == null)
|
||||
if (_failure == null)
|
||||
{
|
||||
_error = Content.Chunk.from(x);
|
||||
_failure = Content.Chunk.from(x);
|
||||
}
|
||||
else if (_error.getCause() != x)
|
||||
else if (ExceptionUtil.areNotAssociated(_failure.getCause(), x) && _failure.getCause().getClass() != x.getClass())
|
||||
{
|
||||
_error.getCause().addSuppressed(x);
|
||||
return null;
|
||||
_failure.getCause().addSuppressed(x);
|
||||
}
|
||||
|
||||
// If not handled, then we just fail the request callback
|
||||
|
@ -377,18 +424,17 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
ChannelRequest request = _request;
|
||||
Runnable invokeListeners = () ->
|
||||
{
|
||||
Predicate<Throwable> onError;
|
||||
Consumer<Throwable> onFailure;
|
||||
try (AutoLock ignore = _lock.lock())
|
||||
{
|
||||
onError = _onError;
|
||||
onFailure = _onFailure;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("invokeListeners {} {}", HttpChannelState.this, onError, x);
|
||||
if (onError.test(x))
|
||||
return;
|
||||
LOG.debug("invokeListeners {} {}", HttpChannelState.this, onFailure, x);
|
||||
onFailure.accept(x);
|
||||
}
|
||||
catch (Throwable throwable)
|
||||
{
|
||||
|
@ -586,7 +632,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
stream = _stream;
|
||||
_handling = null;
|
||||
_handled = true;
|
||||
failure = _failure;
|
||||
failure = _callbackFailure;
|
||||
callbackCompleted = _callbackCompleted;
|
||||
lastStreamSendComplete = lockedIsLastStreamSendCompleted();
|
||||
completeStream = callbackCompleted && lastStreamSendComplete;
|
||||
|
@ -640,7 +686,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_streamSendState = StreamSendState.LAST_COMPLETE;
|
||||
completeStream = _handling == null;
|
||||
stream = _stream;
|
||||
failure = _failure = ExceptionUtil.combine(_failure, failure);
|
||||
failure = _callbackFailure = ExceptionUtil.combine(_callbackFailure, failure);
|
||||
}
|
||||
if (completeStream)
|
||||
completeStream(stream, failure);
|
||||
|
@ -663,6 +709,10 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
Parts parts = (Parts)_request.getAttribute(Parts.class.getName());
|
||||
if (parts != null)
|
||||
parts.close();
|
||||
|
||||
long idleTO = getHttpConfiguration().getIdleTimeout();
|
||||
if (idleTO > 0 && _oldIdleTimeout != idleTO)
|
||||
stream.setIdleTimeout(_oldIdleTimeout);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@ -867,7 +917,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
HttpChannelState httpChannel = lockedGetHttpChannelState();
|
||||
|
||||
Content.Chunk error = httpChannel._error;
|
||||
Content.Chunk error = httpChannel._failure;
|
||||
if (error != null)
|
||||
return error;
|
||||
|
||||
|
@ -913,7 +963,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("demand {}", httpChannel);
|
||||
|
||||
error = httpChannel._error != null;
|
||||
error = httpChannel._failure != null;
|
||||
if (!error)
|
||||
{
|
||||
if (httpChannel._onContentAvailable != null)
|
||||
|
@ -944,30 +994,66 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addErrorListener(Predicate<Throwable> onError)
|
||||
public void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout)
|
||||
{
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
HttpChannelState httpChannel = lockedGetHttpChannelState();
|
||||
|
||||
if (httpChannel._error != null)
|
||||
return false;
|
||||
if (httpChannel._failure != null)
|
||||
return;
|
||||
|
||||
if (httpChannel._onError == null)
|
||||
if (httpChannel._onIdleTimeout == null)
|
||||
{
|
||||
httpChannel._onError = onError;
|
||||
httpChannel._onIdleTimeout = onIdleTimeout;
|
||||
}
|
||||
else
|
||||
{
|
||||
Predicate<Throwable> previous = httpChannel._onError;
|
||||
httpChannel._onError = throwable ->
|
||||
Predicate<TimeoutException> previous = httpChannel._onIdleTimeout;
|
||||
httpChannel._onIdleTimeout = throwable ->
|
||||
{
|
||||
if (!previous.test(throwable))
|
||||
return onError.test(throwable);
|
||||
return onIdleTimeout.test(throwable);
|
||||
return true;
|
||||
};
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFailureListener(Consumer<Throwable> onFailure)
|
||||
{
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
HttpChannelState httpChannel = lockedGetHttpChannelState();
|
||||
|
||||
if (httpChannel._failure != null)
|
||||
return;
|
||||
|
||||
if (httpChannel._onFailure == null)
|
||||
{
|
||||
httpChannel._onFailure = onFailure;
|
||||
}
|
||||
else
|
||||
{
|
||||
Consumer<Throwable> previous = httpChannel._onFailure;
|
||||
httpChannel._onFailure = throwable ->
|
||||
{
|
||||
try
|
||||
{
|
||||
previous.accept(throwable);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
if (ExceptionUtil.areNotAssociated(throwable, t))
|
||||
throwable.addSuppressed(t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
onFailure.accept(throwable);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1005,33 +1091,26 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
public static class ChannelResponse implements Response, Callback
|
||||
{
|
||||
private final ChannelRequest _request;
|
||||
private int _status;
|
||||
private final ResponseHttpFields _httpFields;
|
||||
protected int _status;
|
||||
private long _contentBytesWritten;
|
||||
private Supplier<HttpFields> _trailers;
|
||||
private Callback _writeCallback;
|
||||
protected boolean _errorMode;
|
||||
|
||||
private ChannelResponse(ChannelRequest request)
|
||||
{
|
||||
_request = request;
|
||||
_httpFields = getResponseHttpFields(_request.lockedGetHttpChannelState());
|
||||
}
|
||||
|
||||
private void lockedPrepareErrorResponse()
|
||||
protected ResponseHttpFields getResponseHttpFields(HttpChannelState httpChannelState)
|
||||
{
|
||||
// reset the response state, so we can generate an error response,
|
||||
// remembering any server or date headers (probably a nicer way of doing this).
|
||||
HttpChannelState httpChannelState = _request.lockedGetHttpChannelState();
|
||||
HttpField serverField = httpChannelState._responseHeaders.getField(HttpHeader.SERVER);
|
||||
HttpField dateField = httpChannelState._responseHeaders.getField(HttpHeader.DATE);
|
||||
httpChannelState._responseHeaders.reset();
|
||||
httpChannelState._committedContentLength = -1;
|
||||
reset();
|
||||
if (serverField != null)
|
||||
httpChannelState._responseHeaders.put(serverField);
|
||||
if (dateField != null)
|
||||
httpChannelState._responseHeaders.put(dateField);
|
||||
setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
|
||||
_errorMode = true;
|
||||
return httpChannelState._responseHeaders;
|
||||
}
|
||||
|
||||
protected ResponseHttpFields getResponseHttpFields()
|
||||
{
|
||||
return _httpFields;
|
||||
}
|
||||
|
||||
private boolean lockedIsWriting()
|
||||
|
@ -1075,7 +1154,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
@Override
|
||||
public HttpFields.Mutable getHeaders()
|
||||
{
|
||||
return _request.getHttpChannelState()._responseHeaders;
|
||||
return _httpFields;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1115,19 +1194,21 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
|
||||
if (_writeCallback != null)
|
||||
failure = new IllegalStateException("write pending");
|
||||
else if (!_errorMode && httpChannelState._error != null)
|
||||
failure = httpChannelState._error.getCause();
|
||||
else if (contentLength >= 0)
|
||||
else
|
||||
{
|
||||
// If the content length were not compatible with what was written, then we need to abort.
|
||||
String lengthError = (totalWritten > contentLength) ? "written %d > %d content-length"
|
||||
: (last && totalWritten < contentLength) ? "written %d < %d content-length" : null;
|
||||
if (lengthError != null)
|
||||
failure = getFailure(httpChannelState);
|
||||
if (failure == null && contentLength >= 0)
|
||||
{
|
||||
String message = lengthError.formatted(totalWritten, contentLength);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("fail {} {}", callback, message);
|
||||
failure = new IOException(message);
|
||||
// If the content length were not compatible with what was written, then we need to abort.
|
||||
String lengthError = (totalWritten > contentLength) ? "written %d > %d content-length"
|
||||
: (last && totalWritten < contentLength) ? "written %d < %d content-length" : null;
|
||||
if (lengthError != null)
|
||||
{
|
||||
String message = lengthError.formatted(totalWritten, contentLength);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("fail {} {}", callback, message);
|
||||
failure = new IOException(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1151,7 +1232,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_writeCallback = callback;
|
||||
_contentBytesWritten = totalWritten;
|
||||
stream = httpChannelState._stream;
|
||||
if (httpChannelState._responseHeaders.commit())
|
||||
if (_httpFields.commit())
|
||||
responseMetaData = lockedPrepareResponse(httpChannelState, last);
|
||||
}
|
||||
}
|
||||
|
@ -1164,6 +1245,12 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
}
|
||||
|
||||
protected Throwable getFailure(HttpChannelState httpChannelState)
|
||||
{
|
||||
Content.Chunk.Error failure = httpChannelState._failure;
|
||||
return failure == null ? null : failure.getCause();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the call to
|
||||
* {@link HttpStream#send(MetaData.Request, MetaData.Response, boolean, ByteBuffer, Callback)}
|
||||
|
@ -1227,7 +1314,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
@Override
|
||||
public boolean isCommitted()
|
||||
{
|
||||
return _request.getHttpChannelState()._responseHeaders.isCommitted();
|
||||
return _httpFields.isCommitted();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1238,7 +1325,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (_request._httpChannelState == null)
|
||||
return false;
|
||||
|
||||
return _request._httpChannelState._callbackCompleted && _request._httpChannelState._failure == null;
|
||||
return _request._httpChannelState._callbackCompleted && _request._httpChannelState._callbackFailure == null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1276,7 +1363,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_status = HttpStatus.OK_200;
|
||||
|
||||
// Can we set the content length?
|
||||
HttpFields.Mutable mutableHeaders = httpChannel._responseHeaders.getMutableHttpFields();
|
||||
HttpFields.Mutable mutableHeaders = _httpFields.getMutableHttpFields();
|
||||
httpChannel._committedContentLength = mutableHeaders.getLongField(HttpHeader.CONTENT_LENGTH);
|
||||
if (last && httpChannel._committedContentLength < 0L)
|
||||
{
|
||||
|
@ -1288,7 +1375,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
|
||||
return new MetaData.Response(
|
||||
_status, null, httpChannel.getConnectionMetaData().getHttpVersion(),
|
||||
httpChannel._responseHeaders,
|
||||
_httpFields,
|
||||
httpChannel._committedContentLength,
|
||||
getTrailersSupplier()
|
||||
);
|
||||
|
@ -1326,6 +1413,8 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
ChannelResponse response;
|
||||
MetaData.Response responseMetaData = null;
|
||||
boolean completeStream;
|
||||
ErrorResponse errorResponse = null;
|
||||
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
request = _request;
|
||||
|
@ -1343,7 +1432,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (lockedCompleteCallback())
|
||||
return;
|
||||
|
||||
assert httpChannelState._failure == null;
|
||||
assert httpChannelState._callbackFailure == null;
|
||||
|
||||
needLastStreamSend = httpChannelState.lockedLastStreamSend();
|
||||
completeStream = !needLastStreamSend && httpChannelState._handling == null && httpChannelState.lockedIsLastStreamSendCompleted();
|
||||
|
@ -1367,9 +1456,9 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
|
||||
if (failure != null)
|
||||
{
|
||||
httpChannelState._failure = failure;
|
||||
httpChannelState._callbackFailure = failure;
|
||||
if (!stream.isCommitted())
|
||||
response.lockedPrepareErrorResponse();
|
||||
errorResponse = new ErrorResponse(request);
|
||||
else
|
||||
completeStream = true;
|
||||
}
|
||||
|
@ -1378,8 +1467,8 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("succeeded: failure={} needLastStreamSend={} {}", failure, needLastStreamSend, this);
|
||||
|
||||
if (failure != null)
|
||||
Response.writeError(request, response, new ErrorCallback(request, stream, failure), failure);
|
||||
if (errorResponse != null)
|
||||
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
|
||||
else if (needLastStreamSend)
|
||||
stream.send(_request._metaData, responseMetaData, true, null, httpChannelState._handlerInvoker);
|
||||
else if (completeStream)
|
||||
|
@ -1398,21 +1487,19 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
// Called when the request/response cycle is completing with a failure.
|
||||
HttpStream stream;
|
||||
ChannelRequest request;
|
||||
ChannelResponse response;
|
||||
HttpChannelState httpChannelState;
|
||||
boolean writeError;
|
||||
ErrorResponse errorResponse = null;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
httpChannelState = _request._httpChannelState;
|
||||
stream = httpChannelState._stream;
|
||||
request = _request;
|
||||
response = httpChannelState._response;
|
||||
|
||||
if (lockedCompleteCallback())
|
||||
return;
|
||||
assert httpChannelState._failure == null;
|
||||
assert httpChannelState._callbackFailure == null;
|
||||
|
||||
httpChannelState._failure = failure;
|
||||
httpChannelState._callbackFailure = failure;
|
||||
|
||||
// Consume any input.
|
||||
Throwable unconsumed = stream.consumeAvailable();
|
||||
|
@ -1422,13 +1509,12 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", httpChannelState._stream.isCommitted(), httpChannelState._response.isCommitted(), this);
|
||||
|
||||
writeError = !stream.isCommitted();
|
||||
if (writeError)
|
||||
response.lockedPrepareErrorResponse();
|
||||
if (!stream.isCommitted())
|
||||
errorResponse = new ErrorResponse(request);
|
||||
}
|
||||
|
||||
if (writeError)
|
||||
Response.writeError(request, response, new ErrorCallback(request, stream, failure), failure);
|
||||
if (errorResponse != null)
|
||||
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
|
||||
else
|
||||
_request.getHttpChannelState()._handlerInvoker.failed(failure);
|
||||
}
|
||||
|
@ -1471,6 +1557,53 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used as the {@link Response} when writing the error response
|
||||
* from {@link HttpChannelState.ChannelCallback#failed(Throwable)}.
|
||||
*/
|
||||
private static class ErrorResponse extends ChannelResponse
|
||||
{
|
||||
public ErrorResponse(ChannelRequest request)
|
||||
{
|
||||
super(request);
|
||||
_status = HttpStatus.INTERNAL_SERVER_ERROR_500;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Throwable getFailure(HttpChannelState httpChannelState)
|
||||
{
|
||||
// we ignore channel failures so we can try to generate an error response.
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ResponseHttpFields getResponseHttpFields(HttpChannelState httpChannelState)
|
||||
{
|
||||
httpChannelState._committedContentLength = -1;
|
||||
HttpFields original = super.getResponseHttpFields(httpChannelState);
|
||||
ResponseHttpFields httpFields = new ResponseHttpFields();
|
||||
|
||||
for (HttpField field : original)
|
||||
{
|
||||
HttpHeader header = field.getHeader();
|
||||
if (header == HttpHeader.SERVER || header == HttpHeader.DATE)
|
||||
httpFields.add(field);
|
||||
}
|
||||
return httpFields;
|
||||
}
|
||||
|
||||
@Override
|
||||
MetaData.Response lockedPrepareResponse(HttpChannelState httpChannelState, boolean last)
|
||||
{
|
||||
MetaData.Response httpFields = super.lockedPrepareResponse(httpChannelState, last);
|
||||
httpChannelState._response._status = _status;
|
||||
HttpFields.Mutable originalResponseFields = httpChannelState._responseHeaders.getMutableHttpFields();
|
||||
originalResponseFields.clear();
|
||||
originalResponseFields.add(getResponseHttpFields());
|
||||
return httpFields;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used as the {@link Response} and {@link Callback} when writing the error response
|
||||
* from {@link HttpChannelState.ChannelCallback#failed(Throwable)}.
|
||||
|
@ -1478,12 +1611,14 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
private static class ErrorCallback implements Callback
|
||||
{
|
||||
private final ChannelRequest _request;
|
||||
private final ErrorResponse _errorResponse;
|
||||
private final HttpStream _stream;
|
||||
private final Throwable _failure;
|
||||
|
||||
public ErrorCallback(ChannelRequest request, HttpStream stream, Throwable failure)
|
||||
public ErrorCallback(ChannelRequest request, ErrorResponse response, HttpStream stream, Throwable failure)
|
||||
{
|
||||
_request = request;
|
||||
_errorResponse = response;
|
||||
_stream = stream;
|
||||
_failure = failure;
|
||||
}
|
||||
|
@ -1507,8 +1642,8 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
|
||||
// Did the ErrorHandler do the last write?
|
||||
needLastWrite = httpChannelState.lockedLastStreamSend();
|
||||
if (needLastWrite && httpChannelState._responseHeaders.commit())
|
||||
responseMetaData = httpChannelState._response.lockedPrepareResponse(httpChannelState, true);
|
||||
if (needLastWrite && _errorResponse.getResponseHttpFields().commit())
|
||||
responseMetaData = _errorResponse.lockedPrepareResponse(httpChannelState, true);
|
||||
}
|
||||
|
||||
if (needLastWrite)
|
||||
|
@ -1538,13 +1673,16 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ErrorWrite failed: {}", this, x);
|
||||
Throwable failure;
|
||||
HttpChannelState httpChannelState;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
failure = _failure;
|
||||
httpChannelState = _request.lockedGetHttpChannelState();
|
||||
httpChannelState._response._status = _errorResponse._status;
|
||||
}
|
||||
if (ExceptionUtil.areNotAssociated(failure, x))
|
||||
failure.addSuppressed(x);
|
||||
_request.getHttpChannelState()._handlerInvoker.failed(failure);
|
||||
httpChannelState._handlerInvoker.failed(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1566,7 +1704,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
callbackCompleted = _callbackCompleted;
|
||||
request = _request;
|
||||
error = _request == null ? null : _error;
|
||||
error = _request == null ? null : _failure;
|
||||
}
|
||||
|
||||
if (request == null || callbackCompleted)
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -657,6 +658,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
|
|||
super.onFillInterestedFailed(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired(TimeoutException timeout)
|
||||
{
|
||||
if (_httpChannel.getRequest() == null)
|
||||
return true;
|
||||
Runnable task = _httpChannel.onIdleTimeout(timeout);
|
||||
if (task != null)
|
||||
getExecutor().execute(task);
|
||||
return false; // We've handle the exception
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
|
@ -1448,6 +1460,18 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
|
|||
_sendCallback.iterate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return getEndPoint().getIdleTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIdleTimeout(long idleTimeoutMs)
|
||||
{
|
||||
getEndPoint().setIdleTimeout(idleTimeoutMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitted()
|
||||
{
|
||||
|
|
|
@ -583,15 +583,25 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
|
|||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback) throws Exception
|
||||
{
|
||||
request.addIdleTimeoutListener(t -> false);
|
||||
response.setStatus(200);
|
||||
try
|
||||
{
|
||||
Thread.sleep(2000);
|
||||
Thread.sleep(MAX_IDLE_TIME * 3 / 2);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
// TODO what do we do about the failing write?
|
||||
// should timeout errors be non persistent?
|
||||
Callback ocb = callback;
|
||||
callback = Callback.from(ocb::succeeded, t ->
|
||||
{
|
||||
t.printStackTrace();
|
||||
ocb.failed(t);
|
||||
});
|
||||
Content.Sink.write(response, true, "Hello World\r\n", callback);
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -1175,13 +1175,9 @@ public class HttpChannelTest
|
|||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
handling.set(response);
|
||||
request.addErrorListener(t -> false);
|
||||
request.addErrorListener(t -> !error.compareAndSet(null, t));
|
||||
request.addErrorListener(t ->
|
||||
{
|
||||
callback.failed(t);
|
||||
return true;
|
||||
});
|
||||
request.addFailureListener(t -> error.set(null));
|
||||
request.addFailureListener(t -> error.compareAndSet(null, t));
|
||||
request.addFailureListener(t -> error.compareAndSet(null, new Throwable("WRONG")));
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.eclipse.jetty.http.UriCompliance;
|
|||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.logging.StacklessLogging;
|
||||
import org.eclipse.jetty.server.handler.DumpHandler;
|
||||
import org.eclipse.jetty.server.internal.HttpChannelState;
|
||||
import org.eclipse.jetty.server.internal.HttpConnection;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
@ -1158,24 +1157,13 @@ public class HttpConnectionTest
|
|||
});
|
||||
_server.start();
|
||||
|
||||
String response = null;
|
||||
try (StacklessLogging stackless = new StacklessLogging(HttpChannelState.class))
|
||||
{
|
||||
LOG.info("Expect IOException: Response header too large...");
|
||||
response = _connector.getResponse("GET / HTTP/1.1\r\n" +
|
||||
"Host: localhost\r\n" +
|
||||
"\r\n"
|
||||
);
|
||||
String response = _connector.getResponse("GET / HTTP/1.1\r\n" +
|
||||
"Host: localhost\r\n" +
|
||||
"\r\n"
|
||||
);
|
||||
|
||||
checkContains(response, 0, "HTTP/1.1 500");
|
||||
assertTrue(checkError.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
if (response != null)
|
||||
System.err.println(response);
|
||||
throw e;
|
||||
}
|
||||
checkContains(response, 0, "HTTP/1.1 500");
|
||||
assertTrue(checkError.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -218,6 +218,17 @@ public class MockHttpStream implements HttpStream
|
|||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIdleTimeout(long idleTimeoutMs)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitted()
|
||||
{
|
||||
|
|
|
@ -13,8 +13,13 @@
|
|||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -26,6 +31,7 @@ import org.eclipse.jetty.io.Connection;
|
|||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.QuietException;
|
||||
import org.eclipse.jetty.server.handler.ContextHandler;
|
||||
import org.eclipse.jetty.server.internal.HttpChannelState;
|
||||
import org.eclipse.jetty.server.internal.HttpConnection;
|
||||
import org.eclipse.jetty.util.Blocker;
|
||||
|
@ -41,12 +47,17 @@ import org.junit.jupiter.params.provider.MethodSource;
|
|||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class ServerTest
|
||||
{
|
||||
private static final long IDLE_TIMEOUT = 1000L;
|
||||
private Server _server;
|
||||
private ContextHandler _context;
|
||||
private LocalConnector _connector;
|
||||
private final AtomicReference<Runnable> _afterHandle = new AtomicReference<>();
|
||||
|
||||
|
@ -54,6 +65,8 @@ public class ServerTest
|
|||
public void prepare() throws Exception
|
||||
{
|
||||
_server = new Server();
|
||||
_context = new ContextHandler("/");
|
||||
_server.setHandler(_context);
|
||||
_connector = new LocalConnector(_server, new HttpConnectionFactory()
|
||||
{
|
||||
@Override
|
||||
|
@ -95,7 +108,8 @@ public class ServerTest
|
|||
return configure(connection, connector, endPoint);
|
||||
}
|
||||
});
|
||||
_connector.setIdleTimeout(60000);
|
||||
_connector.setIdleTimeout(IDLE_TIMEOUT);
|
||||
_connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setIdleTimeout(IDLE_TIMEOUT);
|
||||
_server.addConnector(_connector);
|
||||
}
|
||||
|
||||
|
@ -109,10 +123,10 @@ public class ServerTest
|
|||
@Test
|
||||
public void testSimpleGET() throws Exception
|
||||
{
|
||||
_server.setHandler(new Handler.Abstract()
|
||||
_context.setHandler(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback) throws Exception
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
response.getHeaders().put(HttpHeader.CONTENT_TYPE, "text/plain");
|
||||
Content.Sink.write(response, true, "Hello", callback);
|
||||
|
@ -154,7 +168,7 @@ public class ServerTest
|
|||
@MethodSource("completionScenarios")
|
||||
public void testCompletion(boolean succeeded, boolean handling, boolean written, boolean last) throws Exception
|
||||
{
|
||||
_server.setHandler(new Handler.Abstract(Invocable.InvocationType.BLOCKING)
|
||||
_context.setHandler(new Handler.Abstract(Invocable.InvocationType.BLOCKING)
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback) throws Exception
|
||||
|
@ -186,8 +200,6 @@ public class ServerTest
|
|||
\r
|
||||
""";
|
||||
String rawResponse = _connector.getResponse(request);
|
||||
// System.err.printf("succeeded=%b handling=%b written=%b last=%b%n", succeeded, handling, written, last);
|
||||
// System.err.println(rawResponse);
|
||||
|
||||
if (succeeded || written)
|
||||
assertThat(rawResponse, containsString("HTTP/1.1 200 OK"));
|
||||
|
@ -212,4 +224,187 @@ public class ServerTest
|
|||
assertThat(rawResponse, containsString("Content-Length:"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdleTimeoutNoListener() throws Exception
|
||||
{
|
||||
// See ServerTimeoutsTest for more complete idle timeout testing.
|
||||
_context.setHandler(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
// Handler never completes the callback
|
||||
return true;
|
||||
}
|
||||
});
|
||||
_server.start();
|
||||
|
||||
String request = """
|
||||
GET /path HTTP/1.0\r
|
||||
Host: hostname\r
|
||||
\r
|
||||
""";
|
||||
String rawResponse = _connector.getResponse(request);
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
|
||||
assertThat(response.getContent(), containsString("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout expired:"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdleTimeoutNoListenerHttpConfigurationOnly() throws Exception
|
||||
{
|
||||
// See ServerTimeoutsTest for more complete idle timeout testing.
|
||||
|
||||
_context.setHandler(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
// Handler never completes the callback
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
_connector.setIdleTimeout(10 * IDLE_TIMEOUT);
|
||||
_connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setIdleTimeout(IDLE_TIMEOUT);
|
||||
|
||||
_server.start();
|
||||
|
||||
String request = """
|
||||
GET /path HTTP/1.0\r
|
||||
Host: hostname\r
|
||||
\r
|
||||
""";
|
||||
String rawResponse = _connector.getResponse(request);
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
|
||||
assertThat(response.getContent(), containsString("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout expired:"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdleTimeoutFalseListener() throws Exception
|
||||
{
|
||||
// See ServerTimeoutsTest for more complete idle timeout testing.
|
||||
CompletableFuture<Callback> callbackOnTimeout = new CompletableFuture<>();
|
||||
_context.setHandler(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
request.addIdleTimeoutListener(t -> !callbackOnTimeout.complete(callback));
|
||||
return true;
|
||||
}
|
||||
});
|
||||
_server.start();
|
||||
|
||||
String request = """
|
||||
GET /path HTTP/1.0\r
|
||||
Host: hostname\r
|
||||
\r
|
||||
""";
|
||||
|
||||
try (LocalConnector.LocalEndPoint localEndPoint = _connector.executeRequest(request))
|
||||
{
|
||||
callbackOnTimeout.get(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS).succeeded();
|
||||
String rawResponse = localEndPoint.getResponse();
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdleTimeoutWriteCallback() throws Exception
|
||||
{
|
||||
CompletableFuture<Throwable> onTimeout = new CompletableFuture<>();
|
||||
CompletableFuture<Throwable> writeFail = new CompletableFuture<>();
|
||||
_context.setHandler(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
Runnable write = new Runnable()
|
||||
{
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(128 * 1024 * 1024);
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
response.write(false, buffer, Callback.from(this,
|
||||
t ->
|
||||
{
|
||||
writeFail.complete(t);
|
||||
callback.failed(t);
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
request.addIdleTimeoutListener(t ->
|
||||
{
|
||||
request.getComponents().getThreadPool().execute(write);
|
||||
return onTimeout.complete(t);
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
});
|
||||
_server.start();
|
||||
|
||||
String request = """
|
||||
GET /path HTTP/1.0\r
|
||||
Host: localhost\r
|
||||
\r
|
||||
""";
|
||||
try (LocalConnector.LocalEndPoint ignored = _connector.executeRequest(request))
|
||||
{
|
||||
Throwable x = onTimeout.get(2 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
assertThat(x, instanceOf(TimeoutException.class));
|
||||
x = writeFail.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
|
||||
assertThat(x, instanceOf(TimeoutException.class));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListenersInContext() throws Exception
|
||||
{
|
||||
CountDownLatch latch = new CountDownLatch(3);
|
||||
_context.setHandler(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext()));
|
||||
latch.countDown();
|
||||
|
||||
request.addIdleTimeoutListener(t ->
|
||||
{
|
||||
assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext()));
|
||||
latch.countDown();
|
||||
return true;
|
||||
});
|
||||
|
||||
request.addFailureListener(t ->
|
||||
{
|
||||
assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext()));
|
||||
latch.countDown();
|
||||
});
|
||||
return true;
|
||||
}
|
||||
});
|
||||
_server.start();
|
||||
|
||||
String request = """
|
||||
GET /path HTTP/1.0\r
|
||||
Host: hostname\r
|
||||
\r
|
||||
""";
|
||||
|
||||
try (LocalConnector.LocalEndPoint localEndPoint = _connector.executeRequest(request))
|
||||
{
|
||||
assertTrue(latch.await(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS));
|
||||
String rawResponse = localEndPoint.getResponse();
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -15,6 +15,8 @@ package org.eclipse.jetty.session;
|
|||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
|
@ -161,9 +163,13 @@ public class TestableRequest implements Request
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addErrorListener(Predicate<Throwable> onError)
|
||||
public void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFailureListener(Consumer<Throwable> onFailure)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -62,6 +62,11 @@
|
|||
<artifactId>jetty-http3-client-transport</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-slf4j-impl</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
|
|
|
@ -246,7 +246,7 @@ public class HttpClientTimeoutTest extends AbstractTest
|
|||
return new SslConnection(bufferPool, executor, endPoint, engine)
|
||||
{
|
||||
@Override
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
protected boolean onReadTimeout(TimeoutException timeout)
|
||||
{
|
||||
sslIdle.set(true);
|
||||
return super.onReadTimeout(timeout);
|
||||
|
|
|
@ -13,8 +13,200 @@
|
|||
|
||||
package org.eclipse.jetty.test.client.transport;
|
||||
|
||||
// TODO: similar to eeX ServerTimeoutsTest but with Handler semantic.
|
||||
// For example, we may decide to not ignore the timeouts if there is a thread dispatched to the Handler.
|
||||
public class ServerTimeoutsTest
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.eclipse.jetty.client.AsyncRequestContent;
|
||||
import org.eclipse.jetty.client.ContentResponse;
|
||||
import org.eclipse.jetty.client.FutureResponseListener;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Response;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsStringIgnoringCase;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class ServerTimeoutsTest extends AbstractTest
|
||||
{
|
||||
private static final long IDLE_TIMEOUT = 1000L;
|
||||
|
||||
@Override
|
||||
protected void prepareServer(Transport transport, Handler handler) throws Exception
|
||||
{
|
||||
super.prepareServer(transport, handler);
|
||||
setStreamIdleTimeout(IDLE_TIMEOUT);
|
||||
}
|
||||
|
||||
public static Stream<Arguments> transportsAndTrueIdleTimeoutListeners()
|
||||
{
|
||||
Collection<Transport> transports = transports();
|
||||
return Stream.concat(
|
||||
transports.stream().map(t -> Arguments.of(t, false)),
|
||||
transports.stream().map(t -> Arguments.arguments(t, true)));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsAndTrueIdleTimeoutListeners")
|
||||
public void testIdleTimeout(Transport transport, boolean listener) throws Exception
|
||||
{
|
||||
AtomicBoolean listenerCalled = new AtomicBoolean();
|
||||
start(transport, new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
|
||||
if (listener)
|
||||
request.addIdleTimeoutListener(t -> listenerCalled.compareAndSet(false, true));
|
||||
|
||||
// Do not complete the callback, so it idle times out.
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
ContentResponse response = client.newRequest(newURI(transport))
|
||||
.timeout(5 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
.send();
|
||||
|
||||
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
|
||||
assertThat(response.getContentAsString(), containsStringIgnoringCase("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout"));
|
||||
if (listener)
|
||||
assertTrue(listenerCalled.get());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsAndTrueIdleTimeoutListeners")
|
||||
public void testIdleTimeoutWithDemand(Transport transport, boolean listener) throws Exception
|
||||
{
|
||||
AtomicBoolean listenerCalled = new AtomicBoolean();
|
||||
CountDownLatch demanded = new CountDownLatch(1);
|
||||
AtomicReference<Request> requestRef = new AtomicReference<>();
|
||||
AtomicReference<Callback> callbackRef = new AtomicReference<>();
|
||||
start(transport, new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
|
||||
if (listener)
|
||||
request.addIdleTimeoutListener(t -> listenerCalled.compareAndSet(false, true));
|
||||
requestRef.set(request);
|
||||
callbackRef.set(callback);
|
||||
request.demand(demanded::countDown);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
// The response will not be completed, so use a specialized listener.
|
||||
AsyncRequestContent content = new AsyncRequestContent();
|
||||
org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport))
|
||||
.timeout(IDLE_TIMEOUT * 5, TimeUnit.MILLISECONDS)
|
||||
.headers(f -> f.put(HttpHeader.CONTENT_LENGTH, 10))
|
||||
.onResponseSuccess(s ->
|
||||
content.close())
|
||||
.body(content);
|
||||
FutureResponseListener futureResponse = new FutureResponseListener(request);
|
||||
request.send(futureResponse);
|
||||
|
||||
// Demand is invoked by the idle timeout
|
||||
assertTrue(demanded.await(2 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS));
|
||||
|
||||
// Reads should yield the idle timeout.
|
||||
Content.Chunk chunk = requestRef.get().read();
|
||||
assertThat(chunk, instanceOf(Content.Chunk.Error.class));
|
||||
Throwable cause = ((Content.Chunk.Error)chunk).getCause();
|
||||
assertThat(cause, instanceOf(TimeoutException.class));
|
||||
|
||||
// Complete the callback as the error listener promised.
|
||||
callbackRef.get().failed(cause);
|
||||
|
||||
ContentResponse response = futureResponse.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
|
||||
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
|
||||
assertThat(response.getContentAsString(), containsStringIgnoringCase("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout"));
|
||||
if (listener)
|
||||
assertTrue(listenerCalled.get());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transports")
|
||||
public void testIdleTimeoutErrorListenerReturnsFalse(Transport transport) throws Exception
|
||||
{
|
||||
AtomicReference<Response> responseRef = new AtomicReference<>();
|
||||
CompletableFuture<Callback> callbackOnTimeout = new CompletableFuture<>();
|
||||
start(transport, new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
responseRef.set(response);
|
||||
request.addIdleTimeoutListener(t ->
|
||||
{
|
||||
callbackOnTimeout.complete(callback);
|
||||
return false; // ignore timeout
|
||||
});
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport))
|
||||
.timeout(IDLE_TIMEOUT * 5, TimeUnit.MILLISECONDS);
|
||||
FutureResponseListener futureResponse = new FutureResponseListener(request);
|
||||
request.send(futureResponse);
|
||||
|
||||
// Get the callback as promised by the error listener.
|
||||
Callback callback = callbackOnTimeout.get(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
assertNotNull(callback);
|
||||
Content.Sink.write(responseRef.get(), true, "OK", callback);
|
||||
|
||||
ContentResponse response = futureResponse.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(response.getContentAsString(), is("OK"));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsNoFCGI")
|
||||
public void testIdleTimeoutErrorListenerReturnsFalseThenTrue(Transport transport) throws Exception
|
||||
{
|
||||
// TODO fix FCGI for multiple timeouts
|
||||
AtomicReference<Throwable> error = new AtomicReference<>();
|
||||
start(transport, new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
request.addIdleTimeoutListener(t -> error.getAndSet(t) != null);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
ContentResponse response = client.newRequest(newURI(transport))
|
||||
.timeout(IDLE_TIMEOUT * 5, TimeUnit.MILLISECONDS)
|
||||
.send();
|
||||
|
||||
// The first time the listener returns true, but does not complete the callback,
|
||||
// so another idle timeout elapses.
|
||||
// The second time the listener returns false and the implementation produces the response.
|
||||
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
|
||||
assertThat(response.getContentAsString(), containsStringIgnoringCase("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout"));
|
||||
assertThat(error.get(), instanceOf(TimeoutException.class));
|
||||
}
|
||||
|
||||
// TODO write side tests
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.security.SecureRandom;
|
|||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
|
@ -218,13 +219,13 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
public boolean onIdleExpired(TimeoutException timeoutException)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onIdleExpired()");
|
||||
|
||||
// treat as a handler error because socket is still open
|
||||
coreSession.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout"), Callback.NOOP);
|
||||
coreSession.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout", timeoutException), Callback.NOOP);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -234,7 +235,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
* @return true to signal that the endpoint must be closed, false to keep the endpoint open
|
||||
*/
|
||||
@Override
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
protected boolean onReadTimeout(TimeoutException timeout)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onReadTimeout()");
|
||||
|
|
|
@ -32,8 +32,6 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -64,7 +62,6 @@ import org.eclipse.jetty.http.content.VirtualHttpContentFactory;
|
|||
import org.eclipse.jetty.io.ByteBufferInputStream;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.server.Context;
|
||||
import org.eclipse.jetty.server.HttpStream;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.ResourceService;
|
||||
import org.eclipse.jetty.server.Response;
|
||||
|
@ -633,17 +630,6 @@ public class DefaultServlet extends HttpServlet
|
|||
return _servletRequest.isSecure();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addErrorListener(Predicate<Throwable> onError)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHttpStreamWrapper(Function<HttpStream, HttpStream> wrapper)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object removeAttribute(String name)
|
||||
{
|
||||
|
|
|
@ -83,7 +83,6 @@ public class ServletChannel
|
|||
private final Listener _combinedListener;
|
||||
private volatile ServletContextRequest _servletContextRequest;
|
||||
private volatile boolean _expects100Continue;
|
||||
private volatile long _oldIdleTimeout;
|
||||
private volatile Callback _callback;
|
||||
// Bytes written after interception (e.g. after compression).
|
||||
private volatile long _written;
|
||||
|
@ -380,7 +379,6 @@ public class ServletChannel
|
|||
_servletContextRequest = null;
|
||||
_callback = null;
|
||||
_written = 0;
|
||||
_oldIdleTimeout = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -838,10 +836,6 @@ public class ServletChannel
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onCompleted for {} written={}", apiRequest.getRequestURI(), getBytesWritten());
|
||||
|
||||
long idleTO = _configuration.getIdleTimeout();
|
||||
if (idleTO >= 0 && getIdleTimeout() != _oldIdleTimeout)
|
||||
setIdleTimeout(_oldIdleTimeout);
|
||||
|
||||
if (getServer().getRequestLog() instanceof CustomRequestLog)
|
||||
{
|
||||
CustomRequestLog.LogDetail logDetail = new CustomRequestLog.LogDetail(
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.EventListener;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import jakarta.servlet.AsyncListener;
|
||||
import jakarta.servlet.ServletRequest;
|
||||
|
@ -108,6 +109,7 @@ public class ServletContextRequest extends ContextRequest
|
|||
_matchedPath = matchedResource.getMatchedPath();
|
||||
_response = newServletContextResponse(response);
|
||||
_sessionManager = sessionManager;
|
||||
addIdleTimeoutListener(this::onIdleTimeout);
|
||||
}
|
||||
|
||||
protected ServletApiRequest newServletApiRequest()
|
||||
|
@ -131,6 +133,11 @@ public class ServletContextRequest extends ContextRequest
|
|||
return new ServletContextResponse(_servletChannel, this, response);
|
||||
}
|
||||
|
||||
private boolean onIdleTimeout(TimeoutException timeout)
|
||||
{
|
||||
return _servletChannel.getState().onIdleTimeout(timeout);
|
||||
}
|
||||
|
||||
public String getDecodedPathInContext()
|
||||
{
|
||||
return _decodedPathInContext;
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.ee10.servlet;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import jakarta.servlet.AsyncListener;
|
||||
import jakarta.servlet.ServletContext;
|
||||
|
@ -728,6 +729,18 @@ public class ServletRequestState
|
|||
}
|
||||
}
|
||||
|
||||
public boolean onIdleTimeout(TimeoutException timeout)
|
||||
{
|
||||
try (AutoLock ignored = lock())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onIdleTimeout {}", getStatusStringLocked(), timeout);
|
||||
// TODO this is almost always returning false?!? what about read/write timeouts???
|
||||
// return _state == State.IDLE;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
protected void onError(Throwable th)
|
||||
{
|
||||
final AsyncContextEvent asyncEvent;
|
||||
|
|
|
@ -13,9 +13,11 @@
|
|||
|
||||
package org.eclipse.jetty.ee10.test.client.transport;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.KeyStore;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
@ -51,17 +53,23 @@ import org.eclipse.jetty.server.SecureRequestCustomizer;
|
|||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.SslConnectionFactory;
|
||||
import org.eclipse.jetty.toolchain.test.jupiter.WorkDir;
|
||||
import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension;
|
||||
import org.eclipse.jetty.unixdomain.server.UnixDomainServerConnector;
|
||||
import org.eclipse.jetty.util.SocketAddressResolver;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@ExtendWith(WorkDirExtension.class)
|
||||
public class AbstractTest
|
||||
{
|
||||
public WorkDir workDir;
|
||||
|
||||
protected final HttpConfiguration httpConfig = new HttpConfiguration();
|
||||
protected SslContextFactory.Server sslContextFactoryServer;
|
||||
protected Server server;
|
||||
|
@ -140,16 +148,27 @@ public class AbstractTest
|
|||
return new Server(serverThreads);
|
||||
}
|
||||
|
||||
protected SslContextFactory.Server newSslContextFactoryServer()
|
||||
protected SslContextFactory.Server newSslContextFactoryServer() throws Exception
|
||||
{
|
||||
SslContextFactory.Server ssl = new SslContextFactory.Server();
|
||||
ssl.setKeyStorePath("src/test/resources/keystore.p12");
|
||||
ssl.setKeyStorePassword("storepwd");
|
||||
ssl.setUseCipherSuitesOrder(true);
|
||||
ssl.setCipherComparator(HTTP2Cipher.COMPARATOR);
|
||||
configureSslContextFactory(ssl);
|
||||
return ssl;
|
||||
}
|
||||
|
||||
private void configureSslContextFactory(SslContextFactory sslContextFactory) throws Exception
|
||||
{
|
||||
KeyStore keystore = KeyStore.getInstance("PKCS12");
|
||||
try (InputStream is = Files.newInputStream(Path.of("src/test/resources/keystore.p12")))
|
||||
{
|
||||
keystore.load(is, "storepwd".toCharArray());
|
||||
}
|
||||
sslContextFactory.setTrustStore(keystore);
|
||||
sslContextFactory.setKeyStore(keystore);
|
||||
sslContextFactory.setKeyStorePassword("storepwd");
|
||||
sslContextFactory.setUseCipherSuitesOrder(true);
|
||||
sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR);
|
||||
}
|
||||
|
||||
protected void startClient(Transport transport) throws Exception
|
||||
{
|
||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||
|
@ -167,7 +186,11 @@ public class AbstractTest
|
|||
case HTTP, HTTPS, H2C, H2, FCGI ->
|
||||
new ServerConnector(server, 1, 1, newServerConnectionFactory(transport));
|
||||
case H3 ->
|
||||
new HTTP3ServerConnector(server, sslContextFactoryServer, newServerConnectionFactory(transport));
|
||||
{
|
||||
HTTP3ServerConnector connector = new HTTP3ServerConnector(server, sslContextFactoryServer, newServerConnectionFactory(transport));
|
||||
connector.getQuicConfiguration().setPemWorkDirectory(workDir.getEmptyPathDir());
|
||||
yield connector;
|
||||
}
|
||||
case UNIX_DOMAIN ->
|
||||
{
|
||||
UnixDomainServerConnector connector = new UnixDomainServerConnector(server, 1, 1, newServerConnectionFactory(transport));
|
||||
|
@ -215,16 +238,15 @@ public class AbstractTest
|
|||
return list.toArray(ConnectionFactory[]::new);
|
||||
}
|
||||
|
||||
protected SslContextFactory.Client newSslContextFactoryClient()
|
||||
protected SslContextFactory.Client newSslContextFactoryClient() throws Exception
|
||||
{
|
||||
SslContextFactory.Client ssl = new SslContextFactory.Client();
|
||||
ssl.setKeyStorePath("src/test/resources/keystore.p12");
|
||||
ssl.setKeyStorePassword("storepwd");
|
||||
configureSslContextFactory(ssl);
|
||||
ssl.setEndpointIdentificationAlgorithm(null);
|
||||
return ssl;
|
||||
}
|
||||
|
||||
protected HttpClientTransport newHttpClientTransport(Transport transport)
|
||||
protected HttpClientTransport newHttpClientTransport(Transport transport) throws Exception
|
||||
{
|
||||
return switch (transport)
|
||||
{
|
||||
|
|
|
@ -60,13 +60,13 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
|||
// since they may be ignored, so we don't want to remember errors if they are ignored.
|
||||
// However, this behavior is historically so because of Servlets, and we
|
||||
// may decide differently for Handlers.
|
||||
@Disabled
|
||||
public class ServerTimeoutsTest extends AbstractTest
|
||||
{
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsNoFCGI")
|
||||
public void testBlockingReadWithDelayedFirstContentWithUndelayedDispatchIdleTimeoutFires(Transport transport) throws Exception
|
||||
{
|
||||
assumeTrue(transport != Transport.H3 && transport != Transport.H2C && transport != Transport.H2); // TODO Fix
|
||||
testBlockingReadWithDelayedFirstContentIdleTimeoutFires(transport, false);
|
||||
}
|
||||
|
||||
|
@ -74,6 +74,7 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
@MethodSource("transportsNoFCGI")
|
||||
public void testBlockingReadWithDelayedFirstContentWithDelayedDispatchIdleTimeoutFires(Transport transport) throws Exception
|
||||
{
|
||||
assumeTrue(transport != Transport.H3 && transport != Transport.H2C && transport != Transport.H2); // TODO Fix
|
||||
testBlockingReadWithDelayedFirstContentIdleTimeoutFires(transport, true);
|
||||
}
|
||||
|
||||
|
@ -369,6 +370,8 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
@MethodSource("transportsNoFCGI")
|
||||
public void testBlockingReadWithMinimumDataRateAboveLimit(Transport transport) throws Exception
|
||||
{
|
||||
assumeTrue(transport != Transport.H3 && transport != Transport.H2C && transport != Transport.H2); // TODO Fix
|
||||
|
||||
int bytesPerSecond = 20;
|
||||
httpConfig.setMinRequestDataRate(bytesPerSecond);
|
||||
CountDownLatch handlerLatch = new CountDownLatch(1);
|
||||
|
@ -413,6 +416,8 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
@MethodSource("transportsNoFCGI")
|
||||
public void testBlockingReadHttpIdleTimeoutOverridesIdleTimeout(Transport transport) throws Exception
|
||||
{
|
||||
assumeTrue(transport != Transport.H3); // TODO Fix H3
|
||||
|
||||
long httpIdleTimeout = 2500;
|
||||
long idleTimeout = 3 * httpIdleTimeout;
|
||||
httpConfig.setIdleTimeout(httpIdleTimeout);
|
||||
|
@ -444,7 +449,7 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
@MethodSource("transportsNoFCGI")
|
||||
public void testAsyncReadHttpIdleTimeoutOverridesIdleTimeout(Transport transport) throws Exception
|
||||
{
|
||||
long httpIdleTimeout = 2500;
|
||||
long httpIdleTimeout = 2000;
|
||||
long idleTimeout = 3 * httpIdleTimeout;
|
||||
httpConfig.setIdleTimeout(httpIdleTimeout);
|
||||
CountDownLatch handlerLatch = new CountDownLatch(1);
|
||||
|
@ -503,6 +508,7 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Disabled
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsNoFCGI")
|
||||
public void testIdleTimeoutBeforeReadIsIgnored(Transport transport) throws Exception
|
||||
|
@ -556,6 +562,7 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Disabled
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsNoFCGI")
|
||||
public void testBlockingWriteWithMinimumDataRateBelowLimit(Transport transport) throws Exception
|
||||
|
|
Binary file not shown.
|
@ -521,7 +521,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
_response.recycle();
|
||||
_committedMetaData = null;
|
||||
_written = 0;
|
||||
_oldIdleTimeout = 0;
|
||||
_transientListeners.clear();
|
||||
}
|
||||
|
||||
|
@ -948,9 +947,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
_coreCallback = coreCallback;
|
||||
|
||||
long idleTO = _configuration.getIdleTimeout();
|
||||
_oldIdleTimeout = getIdleTimeout();
|
||||
if (idleTO >= 0 && _oldIdleTimeout != idleTO)
|
||||
setIdleTimeout(idleTO);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
|
|
|
@ -37,8 +37,8 @@ import java.util.Set;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import jakarta.servlet.MultipartConfigElement;
|
||||
|
@ -2439,9 +2439,8 @@ public class RequestTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addErrorListener(Predicate<Throwable> onError)
|
||||
public void addFailureListener(Consumer<Throwable> onFailure)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,6 +31,8 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
@ -2387,9 +2389,13 @@ public class ResponseTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addErrorListener(Predicate<Throwable> onError)
|
||||
public void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFailureListener(Consumer<Throwable> onFailure)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@ import java.nio.file.Path;
|
|||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import jakarta.servlet.http.HttpServlet;
|
||||
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
|
||||
|
@ -112,6 +113,11 @@ public class AbstractTest
|
|||
}
|
||||
|
||||
protected void prepareServer(Transport transport, HttpServlet servlet) throws Exception
|
||||
{
|
||||
prepareServer(transport, servlet, "/");
|
||||
}
|
||||
|
||||
protected void prepareServer(Transport transport, HttpServlet servlet, String path) throws Exception
|
||||
{
|
||||
if (transport == Transport.UNIX_DOMAIN)
|
||||
{
|
||||
|
@ -126,11 +132,16 @@ public class AbstractTest
|
|||
connector = newConnector(transport, server);
|
||||
server.addConnector(connector);
|
||||
servletContextHandler = new ServletContextHandler();
|
||||
servletContextHandler.setContextPath("/");
|
||||
addServlet(servlet, path);
|
||||
server.setHandler(servletContextHandler);
|
||||
}
|
||||
|
||||
protected void addServlet(HttpServlet servlet, String path) throws Exception
|
||||
{
|
||||
Objects.requireNonNull(servletContextHandler);
|
||||
ServletHolder holder = new ServletHolder(servlet);
|
||||
holder.setAsyncSupported(true);
|
||||
servletContextHandler.addServlet(holder, "/*");
|
||||
server.setHandler(servletContextHandler);
|
||||
servletContextHandler.getServletHandler().addServletWithMapping(holder, path);
|
||||
}
|
||||
|
||||
protected Server newServer()
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
package org.eclipse.jetty.ee9.test.client.transport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -21,15 +23,20 @@ import java.util.concurrent.TimeUnit;
|
|||
import jakarta.servlet.http.HttpServlet;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import jakarta.servlet.http.PushBuilder;
|
||||
import org.eclipse.jetty.client.BufferingResponseListener;
|
||||
import org.eclipse.jetty.client.ContentResponse;
|
||||
import org.eclipse.jetty.client.Result;
|
||||
import org.eclipse.jetty.ee9.servlet.DefaultServlet;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.NetworkConnector;
|
||||
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class PushedResourcesTest extends AbstractTest
|
||||
|
@ -105,6 +112,227 @@ public class PushedResourcesTest extends AbstractTest
|
|||
assertTrue(latch2.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsWithPushSupport")
|
||||
public void testPushedResourcesSomewhatLikeTCK(Transport transport) throws Exception
|
||||
{
|
||||
Random random = new Random();
|
||||
byte[] bytes = new byte[512];
|
||||
random.nextBytes(bytes);
|
||||
byte[] pushBytes1 = new byte[1024];
|
||||
random.nextBytes(pushBytes1);
|
||||
byte[] pushBytes2 = new byte[2048];
|
||||
random.nextBytes(pushBytes2);
|
||||
|
||||
String path1 = "/secondary1";
|
||||
String path2 = "/secondary2";
|
||||
start(transport, new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
String target = request.getRequestURI();
|
||||
if (target.equals(path1))
|
||||
{
|
||||
response.getOutputStream().write(pushBytes1);
|
||||
}
|
||||
else if (target.equals(path2))
|
||||
{
|
||||
response.getOutputStream().write(pushBytes2);
|
||||
}
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
PushBuilder pb = request.newPushBuilder();
|
||||
pb.push();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
System.err.println("Expected error empty push builder");
|
||||
}
|
||||
|
||||
PushBuilder pb1 = request.newPushBuilder();
|
||||
pb1.path(path1);
|
||||
pb1.push();
|
||||
PushBuilder pb2 = request.newPushBuilder();
|
||||
pb2.path(path2);
|
||||
pb2.push();
|
||||
|
||||
try
|
||||
{
|
||||
pb2.push();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
System.err.println("Expected error no path reset");
|
||||
}
|
||||
|
||||
PushBuilder pb3 = request.newPushBuilder();
|
||||
try
|
||||
{
|
||||
pb3.method(null);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
System.err.println("Expected error null method");
|
||||
}
|
||||
|
||||
String[] methods = {
|
||||
"", "POST", "PUT", "DELETE",
|
||||
"CONNECT", "OPTIONS", "TRACE"
|
||||
};
|
||||
|
||||
for (String m : methods)
|
||||
{
|
||||
try
|
||||
{
|
||||
pb3.method(m);
|
||||
System.err.println("Fail " + m);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
System.err.println("Pass " + m);
|
||||
}
|
||||
}
|
||||
response.getOutputStream().write(bytes);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
ContentResponse response = client.newRequest(newURI(transport))
|
||||
.onPush((mainRequest, pushedRequest) -> new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
if (pushedRequest.getPath().equals(path1))
|
||||
{
|
||||
assertArrayEquals(pushBytes1, getContent());
|
||||
latch1.countDown();
|
||||
}
|
||||
else if (pushedRequest.getPath().equals(path2))
|
||||
{
|
||||
assertArrayEquals(pushBytes2, getContent());
|
||||
latch2.countDown();
|
||||
}
|
||||
}
|
||||
})
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
assertArrayEquals(bytes, response.getContent());
|
||||
assertTrue(latch1.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(latch2.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsWithPushSupport")
|
||||
public void testPushedResourcesLikeTCK(Transport transport) throws Exception
|
||||
{
|
||||
String path1 = "/secondary1.html";
|
||||
|
||||
prepareServer(transport, new DefaultServlet());
|
||||
Path staticDir = MavenTestingUtils.getTestResourcePath("serverpushtck");
|
||||
assertNotNull(staticDir);
|
||||
servletContextHandler.setBaseResourceAsPath(staticDir);
|
||||
addServlet(
|
||||
new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
PushBuilder pb = request.newPushBuilder();
|
||||
pb.push();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
System.err.println("Expected error empty push builder");
|
||||
}
|
||||
|
||||
PushBuilder pb1 = request.newPushBuilder();
|
||||
pb1.path(path1);
|
||||
pb1.push();
|
||||
|
||||
try
|
||||
{
|
||||
pb1.push();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
System.err.println("Expected error no path reset");
|
||||
}
|
||||
|
||||
PushBuilder pb3 = request.newPushBuilder();
|
||||
try
|
||||
{
|
||||
pb3.method(null);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
System.err.println("Expected error null method");
|
||||
}
|
||||
|
||||
String[] methods = {
|
||||
"", "POST", "PUT", "DELETE",
|
||||
"CONNECT", "OPTIONS", "TRACE"
|
||||
};
|
||||
|
||||
for (String m : methods)
|
||||
{
|
||||
try
|
||||
{
|
||||
pb3.method(m);
|
||||
System.err.println("Fail " + m);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
System.err.println("Pass " + m);
|
||||
}
|
||||
}
|
||||
response.getWriter().println("TEST FINISHED");
|
||||
}
|
||||
},
|
||||
"/serverpushtck/*");
|
||||
|
||||
server.start();
|
||||
startClient(transport);
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
|
||||
String scheme = transport.isSecure() ? "https" : "http";
|
||||
String uri = scheme + "://localhost";
|
||||
if (connector instanceof NetworkConnector networkConnector)
|
||||
uri += ":" + networkConnector.getLocalPort();
|
||||
URI theURI = URI.create(uri + "/serverpushtck/foo");
|
||||
|
||||
ContentResponse response = client.newRequest(theURI)
|
||||
.onPush((mainRequest, pushedRequest) -> new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
if (pushedRequest.getPath().equals(path1))
|
||||
{
|
||||
assertTrue(getContentAsString().contains("SECONDARY 1"));
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
})
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
assertTrue(response.getContentAsString().contains("TEST FINISHED"));
|
||||
assertTrue(latch1.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsWithPushSupport")
|
||||
public void testPushedResourceRedirect(Transport transport) throws Exception
|
||||
|
@ -132,7 +360,7 @@ public class PushedResourcesTest extends AbstractTest
|
|||
});
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
;
|
||||
|
||||
ContentResponse response = client.newRequest(newURI(transport))
|
||||
.onPush((mainRequest, pushedRequest) -> new BufferingResponseListener()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
<html>
|
||||
<h1>SECONDARY 1</h1>
|
||||
</html>
|
Loading…
Reference in New Issue