Issue #12185 implementation/test of max suspended requests in QoSHandler
This commit is contained in:
parent
dbb982108b
commit
b7591d546f
|
@ -52,7 +52,10 @@ import org.slf4j.LoggerFactory;
|
|||
* If more requests are received, they are suspended (that is, not
|
||||
* forwarded to the child {@code Handler}) and stored in a priority
|
||||
* queue.
|
||||
* Priorities are determined via {@link #getPriority(Request)},
|
||||
* Maximum number of suspended request can be set {@link #setMaxSuspendedRequestCount(int)} to avoid
|
||||
* out of memory error. When this limit is reached, the request will fail fast
|
||||
* with status code {@code 503} (not available).</p>
|
||||
* <p>Priorities are determined via {@link #getPriority(Request)},
|
||||
* that should return values between {@code 0} (the lowest priority)
|
||||
* and positive numbers, typically in the range {@code 0-10}.</p>
|
||||
* <p>When a request that is being processed completes, the suspended
|
||||
|
@ -82,6 +85,7 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
private final Set<Integer> priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder());
|
||||
private CyclicTimeouts<Entry> timeouts;
|
||||
private int maxRequests;
|
||||
private int maxSuspendedRequests = Integer.MAX_VALUE;
|
||||
private Duration maxSuspend = Duration.ZERO;
|
||||
|
||||
public QoSHandler()
|
||||
|
@ -119,6 +123,29 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
this.maxRequests = maxRequests;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the max number of suspended requests
|
||||
*/
|
||||
@ManagedAttribute(value = "The maximum number of suspended requests", readonly = true)
|
||||
public int getMaxSuspendedRequestCount()
|
||||
{
|
||||
return maxSuspendedRequests;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Sets the max number of suspended requests.</p>
|
||||
* <p>Once the max suspended request limit is reached, the request is failed with a HTTP
|
||||
* status of {@code 503 Service unavailable}.</p>
|
||||
*
|
||||
* @param maxSuspendedRequests the max number of suspended requests
|
||||
*/
|
||||
public void setMaxSuspendedRequestCount(int maxSuspendedRequests)
|
||||
{
|
||||
if (isStarted())
|
||||
throw new IllegalStateException("Cannot change maxSuspendedRequests: " + this);
|
||||
this.maxSuspendedRequests = maxSuspendedRequests;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the max duration of time a request may stay suspended.
|
||||
* @return the max duration of time a request may stay suspended
|
||||
|
@ -194,6 +221,7 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
LOG.debug("{} processing {}", this, request);
|
||||
|
||||
boolean expired = false;
|
||||
boolean tooManyRequests = false;
|
||||
|
||||
// The read lock allows concurrency with resume(),
|
||||
// which is the common case, but not with expire().
|
||||
|
@ -203,7 +231,14 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
int permits = state.decrementAndGet();
|
||||
if (permits < 0)
|
||||
{
|
||||
if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
|
||||
if (Math.abs(permits) > getMaxSuspendedRequestCount())
|
||||
{
|
||||
// Reached the limit of number of suspended requests,
|
||||
// complete request with 503, service unavailable
|
||||
state.incrementAndGet();
|
||||
tooManyRequests = true;
|
||||
}
|
||||
else if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
|
||||
{
|
||||
// Cover this race condition:
|
||||
// T1 in this method may find no permits, so it will suspend the request.
|
||||
|
@ -228,13 +263,15 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (!expired)
|
||||
return handleWithPermit(request, response, callback);
|
||||
|
||||
if (expired || tooManyRequests)
|
||||
{
|
||||
notAvailable(response, callback);
|
||||
return true;
|
||||
}
|
||||
|
||||
return handleWithPermit(request, response, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onConditionsNotMet(Request request, Response response, Callback callback) throws Exception
|
||||
{
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.server.handler;
|
|||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
|
@ -407,4 +408,57 @@ public class QoSHandlerTest
|
|||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxSuspendedRequests() throws Exception
|
||||
{
|
||||
int delay = 100;
|
||||
QoSHandler qosHandler = new QoSHandler();
|
||||
qosHandler.setMaxRequestCount(2);
|
||||
qosHandler.setMaxSuspendedRequestCount(2);
|
||||
qosHandler.setHandler(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(delay);
|
||||
callback.succeeded();
|
||||
}
|
||||
catch (Throwable x) {
|
||||
callback.failed(x);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
start(qosHandler);
|
||||
|
||||
int parallelism = 8;
|
||||
Vector<Integer> statusCodes = new Vector<>(); // due to synchronized List
|
||||
IntStream.range(0, parallelism).parallel().forEach(i ->
|
||||
{
|
||||
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
|
||||
GET /%d HTTP/1.1
|
||||
Host: localhost
|
||||
|
||||
""".formatted(i))) {
|
||||
String text = endPoint.getResponse(false, parallelism * delay * 5, TimeUnit.MILLISECONDS);
|
||||
HttpTester.Response response = HttpTester.parseResponse(text);
|
||||
statusCodes.add(response.getStatus());
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
fail(x);
|
||||
}
|
||||
});
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).until(statusCodes::size, is(8));
|
||||
// expectation is that
|
||||
// 2 requests will be handled straight away
|
||||
// 2 will be suspended and eventually handled
|
||||
// 4 will hit the max suspended request limit
|
||||
assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.OK_200).count());
|
||||
assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.SERVICE_UNAVAILABLE_503).count());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue