NIFI-8263: Maximum Thread Pool Size property introduced in ListenHTTP

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4847.
This commit is contained in:
Peter Gyori 2021-02-25 18:31:38 +01:00 committed by Pierre Villard
parent a54f739229
commit 4a27b23b1f
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 85 additions and 1 deletions

View File

@ -226,6 +226,20 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.defaultValue(ClientAuthentication.AUTO.name())
.dependsOn(SSL_CONTEXT_SERVICE)
.build();
public static final PropertyDescriptor MAX_THREAD_POOL_SIZE = new PropertyDescriptor.Builder()
.name("max-thread-pool-size")
.displayName("Maximum Thread Pool Size")
.description("The maximum number of threads to be used by the embedded Jetty server. "
+ "The value can be set between 8 and 1000. "
+ "The value of this property affects the performance of the flows and the operating system, therefore "
+ "the default value should only be changed in justified cases. "
+ "A value that is less than the default value may be suitable "
+ "if only a small number of HTTP clients connect to the server. A greater value may be suitable "
+ "if a large number of HTTP clients are expected to make requests to the server simultaneously.")
.required(true)
.addValidator(StandardValidators.createLongValidator(8L, 1000L, true))
.defaultValue("200")
.build();
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@ -289,6 +303,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
descriptors.add(RETURN_CODE);
descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
descriptors.add(MULTIPART_READ_BUFFER_SIZE);
descriptors.add(MAX_THREAD_POOL_SIZE);
this.properties = Collections.unmodifiableList(descriptors);
}
@ -321,6 +336,10 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
shutdownHttpServer(toShutdown);
}
Server getServer() {
return this.server;
}
private void shutdownHttpServer(Server toShutdown) {
try {
toShutdown.stop();
@ -344,6 +363,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final int returnCode = context.getProperty(RETURN_CODE).asInteger();
long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
int maxThreadPoolSize = context.getProperty(MAX_THREAD_POOL_SIZE).asInteger();
throttlerRef.set(streamThrottler);
final boolean sslRequired = sslContextService != null;
@ -351,7 +371,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final ClientAuthentication clientAuthentication = getClientAuthentication(sslContextService, clientAuthenticationProperty);
// thread pool for the jetty instance
final QueuedThreadPool threadPool = new QueuedThreadPool();
final QueuedThreadPool threadPool = new QueuedThreadPool(maxThreadPoolSize);
threadPool.setName(String.format("%s (%s) Web Server", getClass().getSimpleName(), getIdentifier()));
// create the server instance

View File

@ -67,6 +67,8 @@ import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -405,6 +407,68 @@ public class TestListenHTTP {
assertThrows(SSLHandshakeException.class, sslSocket::startHandshake);
}
@Test
public void testMaxThreadPoolSizeTooLow() {
// GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "7");
// THEN
runner.assertNotValid();
}
@Test
public void testMaxThreadPoolSizeTooHigh() {
// GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1001");
// THEN
runner.assertNotValid();
}
@Test
public void testMaxThreadPoolSizeOkLowerBound() {
// GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "8");
// THEN
runner.assertValid();
}
@Test
public void testMaxThreadPoolSizeOkUpperBound() {
// GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1000");
// THEN
runner.assertValid();
}
@Test
public void testMaxThreadPoolSizeSpecifiedInThePropertyIsSetInTheServerInstance() {
// GIVEN
int maxThreadPoolSize = 201;
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, Integer.toString(maxThreadPoolSize));
// WHEN
startWebServer();
// THEN
Server server = proc.getServer();
ThreadPool threadPool = server.getThreadPool();
ThreadPool.SizedThreadPool sizedThreadPool = (ThreadPool.SizedThreadPool) threadPool;
assertEquals(maxThreadPoolSize, sizedThreadPool.getMaxThreads());
}
private void startSecureServer() {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);