Issue #12185 implementation/test of max suspended requests in QoSHandler

This commit is contained in:
Lars Krog-Jensen 2024-08-22 10:32:29 +02:00
parent 0644aaf88c
commit 497da2da0a
2 changed files with 97 additions and 6 deletions

View File

@ -52,7 +52,10 @@ import org.slf4j.LoggerFactory;
* If more requests are received, they are suspended (that is, not
* forwarded to the child {@code Handler}) and stored in a priority
* queue.
* Priorities are determined via {@link #getPriority(Request)},
* Maximum number of suspended request can be set {@link #setMaxSuspendedRequestCount(int)} to avoid
* out of memory error. When this limit is reached, the request will fail fast
* with status code {@code 503} (not available).</p>
* <p>Priorities are determined via {@link #getPriority(Request)},
* that should return values between {@code 0} (the lowest priority)
* and positive numbers, typically in the range {@code 0-10}.</p>
* <p>When a request that is being processed completes, the suspended
@ -82,6 +85,7 @@ public class QoSHandler extends ConditionalHandler.Abstract
private final Set<Integer> priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder());
private CyclicTimeouts<Entry> timeouts;
private int maxRequests;
private int maxSuspendedRequests = Integer.MAX_VALUE;
private Duration maxSuspend = Duration.ZERO;
public QoSHandler()
@ -119,6 +123,29 @@ public class QoSHandler extends ConditionalHandler.Abstract
this.maxRequests = maxRequests;
}
/**
* @return the max number of suspended requests
*/
@ManagedAttribute(value = "The maximum number of suspended requests", readonly = true)
public int getMaxSuspendedRequestCount()
{
return maxSuspendedRequests;
}
/**
* <p>Sets the max number of suspended requests.</p>
* <p>Once the max suspended request limit is reached, the request is failed with a HTTP
* status of {@code 503 Service unavailable}.</p>
*
* @param maxSuspendedRequests the max number of suspended requests
*/
public void setMaxSuspendedRequestCount(int maxSuspendedRequests)
{
if (isStarted())
throw new IllegalStateException("Cannot change maxSuspendedRequests: " + this);
this.maxSuspendedRequests = maxSuspendedRequests;
}
/**
* Get the max duration of time a request may stay suspended.
* @return the max duration of time a request may stay suspended
@ -194,6 +221,7 @@ public class QoSHandler extends ConditionalHandler.Abstract
LOG.debug("{} processing {}", this, request);
boolean expired = false;
boolean tooManyRequests = false;
// The read lock allows concurrency with resume(),
// which is the common case, but not with expire().
@ -203,7 +231,14 @@ public class QoSHandler extends ConditionalHandler.Abstract
int permits = state.decrementAndGet();
if (permits < 0)
{
if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
if (Math.abs(permits) > getMaxSuspendedRequestCount())
{
// Reached the limit of number of suspended requests,
// complete request with 503, service unavailable
state.incrementAndGet();
tooManyRequests = true;
}
else if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
{
// Cover this race condition:
// T1 in this method may find no permits, so it will suspend the request.
@ -228,11 +263,13 @@ public class QoSHandler extends ConditionalHandler.Abstract
lock.readLock().unlock();
}
if (!expired)
return handleWithPermit(request, response, callback);
if (expired || tooManyRequests)
{
notAvailable(response, callback);
return true;
}
notAvailable(response, callback);
return true;
return handleWithPermit(request, response, callback);
}
@Override

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.server.handler;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@ -407,4 +408,57 @@ public class QoSHandlerTest
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@Test
public void testMaxSuspendedRequests() throws Exception
{
int delay = 100;
QoSHandler qosHandler = new QoSHandler();
qosHandler.setMaxRequestCount(2);
qosHandler.setMaxSuspendedRequestCount(2);
qosHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
try
{
Thread.sleep(delay);
callback.succeeded();
}
catch (Throwable x) {
callback.failed(x);
}
return true;
}
});
start(qosHandler);
int parallelism = 8;
Vector<Integer> statusCodes = new Vector<>(); // due to synchronized List
IntStream.range(0, parallelism).parallel().forEach(i ->
{
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
GET /%d HTTP/1.1
Host: localhost
""".formatted(i))) {
String text = endPoint.getResponse(false, parallelism * delay * 5, TimeUnit.MILLISECONDS);
HttpTester.Response response = HttpTester.parseResponse(text);
statusCodes.add(response.getStatus());
}
catch (Exception x)
{
fail(x);
}
});
await().atMost(5, TimeUnit.SECONDS).until(statusCodes::size, is(8));
// expectation is that
// 2 requests will be handled straight away
// 2 will be suspended and eventually handled
// 4 will hit the max suspended request limit
assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.OK_200).count());
assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.SERVICE_UNAVAILABLE_503).count());
}
}