NIFI-10569 Added Maximum Thread property to HandleHttpRequest

- Added simple unit test for HandleHttpRequest to run without requests
- Removed cleanup integration test method for HandleHttpRequest

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #6465.
This commit is contained in:
exceptionfactory 2022-09-30 21:46:27 -05:00 committed by Nathan Gough
parent 1ebeb2db7a
commit 97dc4b67aa
3 changed files with 96 additions and 95 deletions

View File

@ -52,6 +52,7 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import javax.net.ssl.SSLContext;
import javax.servlet.AsyncContext;
@ -250,6 +251,14 @@ public class HandleHttpRequest extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor MAXIMUM_THREADS = new PropertyDescriptor.Builder()
.name("Maximum Threads")
.displayName("Maximum Threads")
.description("The maximum number of threads that the embedded HTTP server will use for handling requests.")
.required(true)
.defaultValue("200")
.addValidator(StandardValidators.createLongValidator(8, 1000, true))
.build();
public static final PropertyDescriptor ADDITIONAL_METHODS = new PropertyDescriptor.Builder()
.name("Additional HTTP Methods")
.description("A comma-separated list of non-standard HTTP Methods that should be allowed")
@ -318,6 +327,7 @@ public class HandleHttpRequest extends AbstractProcessor {
descriptors.add(ALLOW_DELETE);
descriptors.add(ALLOW_HEAD);
descriptors.add(ALLOW_OPTIONS);
descriptors.add(MAXIMUM_THREADS);
descriptors.add(ADDITIONAL_METHODS);
descriptors.add(CLIENT_AUTH);
descriptors.add(CONTAINER_QUEUE_SIZE);
@ -362,7 +372,7 @@ public class HandleHttpRequest extends AbstractProcessor {
final long requestTimeout = httpContextMap.getRequestTimeout(TimeUnit.MILLISECONDS);
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
final Server server = new Server();
final Server server = createServer(context);
final StandardServerConnectorFactory serverConnectorFactory = new StandardServerConnectorFactory(server, port);
final boolean needClientAuth = CLIENT_NEED.getValue().equals(clientAuthValue);
@ -838,6 +848,13 @@ public class HandleHttpRequest extends AbstractProcessor {
}
}
private Server createServer(final ProcessContext context) {
final int maximumThreads = context.getProperty(MAXIMUM_THREADS).asInteger();
final QueuedThreadPool queuedThreadPool = new QueuedThreadPool(maximumThreads);
queuedThreadPool.setName(String.format("%s[id=%s] Server", getClass().getSimpleName(), getIdentifier()));
return new Server(queuedThreadPool);
}
private static class HttpRequestContainer {
private final HttpServletRequest request;
private final HttpServletResponse response;

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class HandleHttpRequestTest {
private static final String CONTEXT_MAP_ID = HttpContextMap.class.getSimpleName();
private static final String MINIMUM_THREADS = "8";
@Mock
HttpContextMap httpContextMap;
TestRunner runner;
@BeforeEach
void setRunner() throws InitializationException {
runner = TestRunners.newTestRunner(HandleHttpRequest.class);
when(httpContextMap.getIdentifier()).thenReturn(CONTEXT_MAP_ID);
runner.addControllerService(CONTEXT_MAP_ID, httpContextMap);
runner.enableControllerService(httpContextMap);
}
@AfterEach
void shutdown() {
runner.shutdown();
}
@Test
void testSetRequiredProperties() {
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.assertValid();
}
@Test
void testRun() {
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpRequest.MAXIMUM_THREADS, MINIMUM_THREADS);
final int port = NetworkUtils.getAvailableTcpPort();
runner.setProperty(HandleHttpRequest.PORT, Integer.toString(port));
runner.run();
runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
}
}

View File

@ -30,7 +30,6 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
@ -43,7 +42,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.servlet.AsyncContext;
@ -508,98 +506,6 @@ public class ITestHandleHttpRequest {
assertEquals(503, responseCode[0]);
}
@Test
public void testCleanup() throws Exception {
// GIVEN
int nrOfRequests = 5;
CountDownLatch serverReady = new CountDownLatch(1);
CountDownLatch requestSent = new CountDownLatch(nrOfRequests);
CountDownLatch cleanupDone = new CountDownLatch(nrOfRequests - 1);
processor = new HandleHttpRequest() {
@Override
synchronized void initializeServer(ProcessContext context) throws Exception {
super.initializeServer(context);
serverReady.countDown();
requestSent.await();
while (getRequestQueueSize() < nrOfRequests) {
Thread.sleep(200);
}
}
};
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
List<Response> responses = new ArrayList<>(nrOfRequests);
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
serverReady.await();
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
OkHttpClient client =
new OkHttpClient.Builder()
.readTimeout(3000, TimeUnit.MILLISECONDS)
.writeTimeout(3000, TimeUnit.MILLISECONDS)
.build();
client.dispatcher().setMaxRequests(nrOfRequests);
client.dispatcher().setMaxRequestsPerHost(nrOfRequests);
Callback callback = new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
// Will only happen once for the first non-rejected request, but not important
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) {
responses.add(response);
cleanupDone.countDown();
}
};
IntStream.rangeClosed(1, nrOfRequests).forEach(
requestCounter -> {
Request request = new Request.Builder()
.url(String.format("http://localhost:%s/my/" + requestCounter, port))
.get()
.build();
sendRequest(client, request, callback, requestSent);
}
);
} catch (final Throwable t) {
// Do nothing as HandleHttpRequest doesn't respond normally
}
}
});
// WHEN
httpThread.start();
runner.run(1, false);
cleanupDone.await();
// THEN
int nrOfPendingRequests = processor.getRequestQueueSize();
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
assertEquals(1, contextMap.size());
assertEquals(0, nrOfPendingRequests);
assertEquals(responses.size(), nrOfRequests - 1);
for (Response response : responses) {
assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code());
}
}
@Test
@Timeout(value = 15)
public void testOnPrimaryNodeChangePrimaryNodeRevoked() throws Exception {