diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java index eb7442c6229..2dee11d9923 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java @@ -52,7 +52,10 @@ import org.slf4j.LoggerFactory; * If more requests are received, they are suspended (that is, not * forwarded to the child {@code Handler}) and stored in a priority * queue. - * Priorities are determined via {@link #getPriority(Request)}, + * Maximum number of suspended request can be set {@link #setMaxSuspendedRequestCount(int)} to avoid + * out of memory error. When this limit is reached, the request will fail fast + * with status code {@code 503} (not available).

+ *

Priorities are determined via {@link #getPriority(Request)}, * that should return values between {@code 0} (the lowest priority) * and positive numbers, typically in the range {@code 0-10}.

*

When a request that is being processed completes, the suspended @@ -82,6 +85,7 @@ public class QoSHandler extends ConditionalHandler.Abstract private final Set priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder()); private CyclicTimeouts timeouts; private int maxRequests; + private int maxSuspendedRequests = Integer.MAX_VALUE; private Duration maxSuspend = Duration.ZERO; public QoSHandler() @@ -119,6 +123,29 @@ public class QoSHandler extends ConditionalHandler.Abstract this.maxRequests = maxRequests; } + /** + * @return the max number of suspended requests + */ + @ManagedAttribute(value = "The maximum number of suspended requests", readonly = true) + public int getMaxSuspendedRequestCount() + { + return maxSuspendedRequests; + } + + /** + *

Sets the max number of suspended requests.

+ *

Once the max suspended request limit is reached, the request is failed with a HTTP + * status of {@code 503 Service unavailable}.

+ * + * @param maxSuspendedRequests the max number of suspended requests + */ + public void setMaxSuspendedRequestCount(int maxSuspendedRequests) + { + if (isStarted()) + throw new IllegalStateException("Cannot change maxSuspendedRequests: " + this); + this.maxSuspendedRequests = maxSuspendedRequests; + } + /** * Get the max duration of time a request may stay suspended. * @return the max duration of time a request may stay suspended @@ -194,6 +221,7 @@ public class QoSHandler extends ConditionalHandler.Abstract LOG.debug("{} processing {}", this, request); boolean expired = false; + boolean tooManyRequests = false; // The read lock allows concurrency with resume(), // which is the common case, but not with expire(). @@ -203,7 +231,14 @@ public class QoSHandler extends ConditionalHandler.Abstract int permits = state.decrementAndGet(); if (permits < 0) { - if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null) + if (Math.abs(permits) > getMaxSuspendedRequestCount()) + { + // Reached the limit of number of suspended requests, + // complete request with 503, service unavailable + state.incrementAndGet(); + tooManyRequests = true; + } + else if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null) { // Cover this race condition: // T1 in this method may find no permits, so it will suspend the request. @@ -228,11 +263,13 @@ public class QoSHandler extends ConditionalHandler.Abstract lock.readLock().unlock(); } - if (!expired) - return handleWithPermit(request, response, callback); + if (expired || tooManyRequests) + { + notAvailable(response, callback); + return true; + } - notAvailable(response, callback); - return true; + return handleWithPermit(request, response, callback); } @Override diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java index 25c66f469aa..10d85ed0c35 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.server.handler; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Vector; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -407,4 +408,57 @@ public class QoSHandlerTest assertEquals(HttpStatus.OK_200, response.getStatus()); } + @Test + public void testMaxSuspendedRequests() throws Exception + { + int delay = 100; + QoSHandler qosHandler = new QoSHandler(); + qosHandler.setMaxRequestCount(2); + qosHandler.setMaxSuspendedRequestCount(2); + qosHandler.setHandler(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + try + { + Thread.sleep(delay); + callback.succeeded(); + } + catch (Throwable x) { + callback.failed(x); + } + return true; + } + }); + start(qosHandler); + + int parallelism = 8; + Vector statusCodes = new Vector<>(); // due to synchronized List + IntStream.range(0, parallelism).parallel().forEach(i -> + { + try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest(""" + GET /%d HTTP/1.1 + Host: localhost + + """.formatted(i))) { + String text = endPoint.getResponse(false, parallelism * delay * 5, TimeUnit.MILLISECONDS); + HttpTester.Response response = HttpTester.parseResponse(text); + statusCodes.add(response.getStatus()); + } + catch (Exception x) + { + fail(x); + } + }); + + await().atMost(5, TimeUnit.SECONDS).until(statusCodes::size, is(8)); + // expectation is that + // 2 requests will be handled straight away + // 2 will be suspended and eventually handled + // 4 will hit the max suspended request limit + assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.OK_200).count()); + assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.SERVICE_UNAVAILABLE_503).count()); + } + }