From 97dc4b67aa86c35ec86227384a01b88980f8e13d Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Fri, 30 Sep 2022 21:46:27 -0500 Subject: [PATCH] NIFI-10569 Added Maximum Thread property to HandleHttpRequest - Added simple unit test for HandleHttpRequest to run without requests - Removed cleanup integration test method for HandleHttpRequest Signed-off-by: Nathan Gough This closes #6465. --- .../standard/HandleHttpRequest.java | 19 +++- .../standard/HandleHttpRequestTest.java | 78 +++++++++++++++ .../standard/ITestHandleHttpRequest.java | 94 ------------------- 3 files changed, 96 insertions(+), 95 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/HandleHttpRequestTest.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index b6dea590a3..85dc8be688 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -52,6 +52,7 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import javax.net.ssl.SSLContext; import javax.servlet.AsyncContext; @@ -250,6 +251,14 @@ public class HandleHttpRequest extends AbstractProcessor { .allowableValues("true", "false") .defaultValue("false") .build(); + public static final PropertyDescriptor MAXIMUM_THREADS = new PropertyDescriptor.Builder() + .name("Maximum Threads") + .displayName("Maximum Threads") + .description("The maximum number of threads that the embedded HTTP server will use for handling requests.") + .required(true) + .defaultValue("200") + .addValidator(StandardValidators.createLongValidator(8, 1000, true)) + .build(); public static final PropertyDescriptor ADDITIONAL_METHODS = new PropertyDescriptor.Builder() .name("Additional HTTP Methods") .description("A comma-separated list of non-standard HTTP Methods that should be allowed") @@ -318,6 +327,7 @@ public class HandleHttpRequest extends AbstractProcessor { descriptors.add(ALLOW_DELETE); descriptors.add(ALLOW_HEAD); descriptors.add(ALLOW_OPTIONS); + descriptors.add(MAXIMUM_THREADS); descriptors.add(ADDITIONAL_METHODS); descriptors.add(CLIENT_AUTH); descriptors.add(CONTAINER_QUEUE_SIZE); @@ -362,7 +372,7 @@ public class HandleHttpRequest extends AbstractProcessor { final long requestTimeout = httpContextMap.getRequestTimeout(TimeUnit.MILLISECONDS); final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); - final Server server = new Server(); + final Server server = createServer(context); final StandardServerConnectorFactory serverConnectorFactory = new StandardServerConnectorFactory(server, port); final boolean needClientAuth = CLIENT_NEED.getValue().equals(clientAuthValue); @@ -838,6 +848,13 @@ public class HandleHttpRequest extends AbstractProcessor { } } + private Server createServer(final ProcessContext context) { + final int maximumThreads = context.getProperty(MAXIMUM_THREADS).asInteger(); + final QueuedThreadPool queuedThreadPool = new QueuedThreadPool(maximumThreads); + queuedThreadPool.setName(String.format("%s[id=%s] Server", getClass().getSimpleName(), getIdentifier())); + return new Server(queuedThreadPool); + } + private static class HttpRequestContainer { private final HttpServletRequest request; private final HttpServletResponse response; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/HandleHttpRequestTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/HandleHttpRequestTest.java new file mode 100644 index 0000000000..afda282267 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/HandleHttpRequestTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.http.HttpContextMap; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class HandleHttpRequestTest { + + private static final String CONTEXT_MAP_ID = HttpContextMap.class.getSimpleName(); + + private static final String MINIMUM_THREADS = "8"; + + @Mock + HttpContextMap httpContextMap; + + TestRunner runner; + + @BeforeEach + void setRunner() throws InitializationException { + runner = TestRunners.newTestRunner(HandleHttpRequest.class); + + when(httpContextMap.getIdentifier()).thenReturn(CONTEXT_MAP_ID); + runner.addControllerService(CONTEXT_MAP_ID, httpContextMap); + runner.enableControllerService(httpContextMap); + } + + @AfterEach + void shutdown() { + runner.shutdown(); + } + + @Test + void testSetRequiredProperties() { + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID); + + runner.assertValid(); + } + + @Test + void testRun() { + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID); + runner.setProperty(HandleHttpRequest.MAXIMUM_THREADS, MINIMUM_THREADS); + + final int port = NetworkUtils.getAvailableTcpPort(); + runner.setProperty(HandleHttpRequest.PORT, Integer.toString(port)); + + runner.run(); + + runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java index cffcab0242..3d5a6dd929 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java @@ -30,7 +30,6 @@ import java.io.InputStream; import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.URL; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Random; @@ -43,7 +42,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.IntStream; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.servlet.AsyncContext; @@ -508,98 +506,6 @@ public class ITestHandleHttpRequest { assertEquals(503, responseCode[0]); } - @Test - public void testCleanup() throws Exception { - // GIVEN - int nrOfRequests = 5; - - CountDownLatch serverReady = new CountDownLatch(1); - CountDownLatch requestSent = new CountDownLatch(nrOfRequests); - CountDownLatch cleanupDone = new CountDownLatch(nrOfRequests - 1); - - processor = new HandleHttpRequest() { - @Override - synchronized void initializeServer(ProcessContext context) throws Exception { - super.initializeServer(context); - serverReady.countDown(); - - requestSent.await(); - while (getRequestQueueSize() < nrOfRequests) { - Thread.sleep(200); - } - } - }; - - final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(HandleHttpRequest.PORT, "0"); - - final MockHttpContextMap contextMap = new MockHttpContextMap(); - runner.addControllerService("http-context-map", contextMap); - runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); - - List responses = new ArrayList<>(nrOfRequests); - final Thread httpThread = new Thread(new Runnable() { - @Override - public void run() { - try { - serverReady.await(); - - final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); - - OkHttpClient client = - new OkHttpClient.Builder() - .readTimeout(3000, TimeUnit.MILLISECONDS) - .writeTimeout(3000, TimeUnit.MILLISECONDS) - .build(); - client.dispatcher().setMaxRequests(nrOfRequests); - client.dispatcher().setMaxRequestsPerHost(nrOfRequests); - - Callback callback = new Callback() { - @Override - public void onFailure(@NotNull Call call, @NotNull IOException e) { - // Will only happen once for the first non-rejected request, but not important - } - - @Override - public void onResponse(@NotNull Call call, @NotNull Response response) { - responses.add(response); - cleanupDone.countDown(); - } - }; - IntStream.rangeClosed(1, nrOfRequests).forEach( - requestCounter -> { - Request request = new Request.Builder() - .url(String.format("http://localhost:%s/my/" + requestCounter, port)) - .get() - .build(); - sendRequest(client, request, callback, requestSent); - } - ); - } catch (final Throwable t) { - // Do nothing as HandleHttpRequest doesn't respond normally - } - } - }); - - // WHEN - httpThread.start(); - runner.run(1, false); - cleanupDone.await(); - - // THEN - int nrOfPendingRequests = processor.getRequestQueueSize(); - - runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); - - assertEquals(1, contextMap.size()); - assertEquals(0, nrOfPendingRequests); - assertEquals(responses.size(), nrOfRequests - 1); - for (Response response : responses) { - assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code()); - } - } - @Test @Timeout(value = 15) public void testOnPrimaryNodeChangePrimaryNodeRevoked() throws Exception {