diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java index 2dee11d9923..69722c13d40 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java @@ -51,9 +51,10 @@ import org.slf4j.LoggerFactory; * to the number configured via {@link #setMaxRequestCount(int)}. * If more requests are received, they are suspended (that is, not * forwarded to the child {@code Handler}) and stored in a priority - * queue. - * Maximum number of suspended request can be set {@link #setMaxSuspendedRequestCount(int)} to avoid - * out of memory error. When this limit is reached, the request will fail fast + * queue.

+ *

The maximum number of suspended request can be set with + * {@link #setMaxSuspendedRequestCount(int)} to avoid out of memory errors. + * When this limit is reached, the request will fail fast * with status code {@code 503} (not available).

*

Priorities are determined via {@link #getPriority(Request)}, * that should return values between {@code 0} (the lowest priority) @@ -85,7 +86,7 @@ public class QoSHandler extends ConditionalHandler.Abstract private final Set priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder()); private CyclicTimeouts timeouts; private int maxRequests; - private int maxSuspendedRequests = Integer.MAX_VALUE; + private int maxSuspendedRequests = 1024; private Duration maxSuspend = Duration.ZERO; public QoSHandler() @@ -134,8 +135,11 @@ public class QoSHandler extends ConditionalHandler.Abstract /** *

Sets the max number of suspended requests.

- *

Once the max suspended request limit is reached, the request is failed with a HTTP - * status of {@code 503 Service unavailable}.

+ *

Once the max suspended request limit is reached, + * the request is failed with a HTTP status of + * {@code 503 Service unavailable}.

+ *

A negative value indicate an unlimited number + * of suspended requests.

* * @param maxSuspendedRequests the max number of suspended requests */ @@ -171,7 +175,7 @@ public class QoSHandler extends ConditionalHandler.Abstract } @ManagedAttribute("The number of suspended requests") - public long getSuspendedRequestCount() + public int getSuspendedRequestCount() { int permits = state.get(); return Math.max(0, -permits); @@ -231,10 +235,11 @@ public class QoSHandler extends ConditionalHandler.Abstract int permits = state.decrementAndGet(); if (permits < 0) { - if (Math.abs(permits) > getMaxSuspendedRequestCount()) + int maxSuspended = getMaxSuspendedRequestCount(); + if (maxSuspended >= 0 && Math.abs(permits) > maxSuspended) { - // Reached the limit of number of suspended requests, - // complete request with 503, service unavailable + // Reached the limit of suspended requests, + // complete the request with 503 unavailable. state.incrementAndGet(); tooManyRequests = true; } @@ -278,8 +283,10 @@ public class QoSHandler extends ConditionalHandler.Abstract return nextHandler(request, response, callback); } - private static void notAvailable(Response response, Callback callback) + private void notAvailable(Response response, Callback callback) { + if (LOG.isDebugEnabled()) + LOG.debug("{} rejecting {}", this, response.getRequest()); response.setStatus(HttpStatus.SERVICE_UNAVAILABLE_503); if (response.isCommitted()) callback.failed(new IllegalStateException("Response already committed")); diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java index d4ad06b7af9..76f67fd2095 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java @@ -16,9 +16,8 @@ package org.eclipse.jetty.server.handler; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Vector; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.eclipse.jetty.http.HttpStatus; @@ -412,10 +411,11 @@ public class QoSHandlerTest @Test public void testMaxSuspendedRequests() throws Exception { - int delay = 500; + int delay = 1000; QoSHandler qosHandler = new QoSHandler(); qosHandler.setMaxRequestCount(2); qosHandler.setMaxSuspendedRequestCount(2); + AtomicInteger handling = new AtomicInteger(); qosHandler.setHandler(new Handler.Abstract() { @Override @@ -423,6 +423,7 @@ public class QoSHandlerTest { try { + handling.incrementAndGet(); Thread.sleep(delay); callback.succeeded(); } @@ -435,40 +436,51 @@ public class QoSHandlerTest }); start(qosHandler); - int parallelism = 8; - Vector statusCodes = new Vector<>(); // due to synchronized List - ForkJoinPool fjp = new ForkJoinPool(parallelism); - try + List endPoints = new ArrayList<>(); + // Send 2 requests that should pass through QoSHandler. + for (int i = 0; i < 2; i++) { - fjp.submit(() -> IntStream.range(0, parallelism).parallel().forEach(i -> - { - try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest(""" - GET /%d HTTP/1.1 - Host: localhost - - """.formatted(i))) - { - String text = endPoint.getResponse(false, parallelism * delay * 5, TimeUnit.MILLISECONDS); - HttpTester.Response response = HttpTester.parseResponse(text); - statusCodes.add(response.getStatus()); - } - catch (Exception x) - { - fail(x); - } - })); - await().atMost(5, TimeUnit.SECONDS).until(statusCodes::size, is(8)); - // expectation is that - // 2 requests will be handled straight away - // 2 will be suspended and eventually handled - // 4 will hit the max suspended request limit - assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.OK_200).count()); - assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.SERVICE_UNAVAILABLE_503).count()); + LocalConnector.LocalEndPoint endPoint = connector.executeRequest(""" + GET /pass/%d HTTP/1.1 + Host: localhost + + """.formatted(i)); + endPoints.add(endPoint); } - finally + await().atMost(5, TimeUnit.SECONDS).until(handling::get, is(2)); + // Send 2 requests that should be suspended by QoSHandler. + for (int i = 0; i < 2; i++) { - fjp.shutdown(); + LocalConnector.LocalEndPoint endPoint = connector.executeRequest(""" + GET /suspend/%d HTTP/1.1 + Host: localhost + + """.formatted(i)); + endPoints.add(endPoint); } + await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(2)); + // Send 2 requests that should be failed immediately by QoSHandler. + for (int i = 0; i < 2; i++) + { + HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(""" + GET /rejected/%d HTTP/1.1 + Host: localhost + + """.formatted(i))); + assertEquals(HttpStatus.SERVICE_UNAVAILABLE_503, response.getStatus()); + } + // Wait for the other requests to finish normally. + endPoints.forEach(endPoint -> + { + try + { + HttpTester.Response response = HttpTester.parseResponse(endPoint.getResponse(false, 2 * delay, TimeUnit.MILLISECONDS)); + assertEquals(HttpStatus.OK_200, response.getStatus()); + } + catch (Exception x) + { + fail(x); + } + }); } - }