diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 9ef4b74c13..c441104824 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import javax.servlet.Servlet; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Path; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -126,6 +127,12 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .required(false) .build(); + public static final PropertyDescriptor RETURN_CODE = new PropertyDescriptor.Builder() + .name("Return Code") + .description("The HTTP return code returned after every HTTP call") + .defaultValue(String.valueOf(HttpServletResponse.SC_OK)) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; @@ -136,6 +143,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath"; + public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode"; private volatile Server server = null; private final ConcurrentMap flowFileMap = new ConcurrentHashMap<>(); @@ -156,6 +164,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { descriptors.add(AUTHORIZED_DN_PATTERN); descriptors.add(MAX_UNCONFIRMED_TIME); descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX); + descriptors.add(RETURN_CODE); this.properties = Collections.unmodifiableList(descriptors); } @@ -203,6 +212,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); + final int returnCode = context.getProperty(RETURN_CODE).asInteger(); throttlerRef.set(streamThrottler); final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null; @@ -284,6 +294,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE,returnCode); if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index b38a872295..fc0644b7e6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -95,6 +95,7 @@ public class ListenHTTPServlet extends HttpServlet { private ConcurrentMap flowFileMap; private StreamThrottler streamThrottler; private String basePath; + private int returnCode; @SuppressWarnings("unchecked") @Override @@ -108,6 +109,7 @@ public class ListenHTTPServlet extends HttpServlet { this.flowFileMap = (ConcurrentMap) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH); + this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE); } @Override @@ -301,7 +303,7 @@ public class ListenHTTPServlet extends HttpServlet { new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); } } else { - response.setStatus(HttpServletResponse.SC_OK); + response.setStatus(this.returnCode); logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}", new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java index c46d4cded6..799d1b7aad 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java @@ -39,12 +39,13 @@ import java.util.List; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; +import javax.servlet.http.HttpServletResponse; import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS; import static org.junit.Assert.fail; - public class TestListenHTTP { + private static final String SSL_CONTEXT_SERVICE_IDENTIFIER = "ssl-context"; private static final String HTTP_POST_METHOD = "POST"; @@ -67,7 +68,7 @@ public class TestListenHTTP { runner = TestRunners.newTestRunner(proc); availablePort = NetworkUtils.availablePort();; runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort)); - runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH); + runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH); } @@ -81,7 +82,16 @@ public class TestListenHTTP { runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); - testPOSTRequestsReceived(); + testPOSTRequestsReceived(HttpServletResponse.SC_OK); + } + + @Test + public void testPOSTRequestsReceivedReturnCodeWithoutEL() throws Exception { + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT)); + + testPOSTRequestsReceived(HttpServletResponse.SC_NO_CONTENT); } @Test @@ -90,7 +100,17 @@ public class TestListenHTTP { runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL); runner.assertValid(); - testPOSTRequestsReceived(); + testPOSTRequestsReceived(HttpServletResponse.SC_OK); + } + + @Test + public void testPOSTRequestsReturnCodeReceivedWithEL() throws Exception { + runner.setProperty(ListenHTTP.PORT, HTTP_SERVER_PORT_EL); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL); + runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT)); + runner.assertValid(); + + testPOSTRequestsReceived(HttpServletResponse.SC_NO_CONTENT); } @Test @@ -103,7 +123,21 @@ public class TestListenHTTP { runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); runner.assertValid(); - testPOSTRequestsReceived(); + testPOSTRequestsReceived(HttpServletResponse.SC_OK); + } + + @Test + public void testSecurePOSTRequestsReturnCodeReceivedWithoutEL() throws Exception { + SSLContextService sslContextService = configureProcessorSslContextService(); + runner.setProperty(sslContextService, StandardRestrictedSSLContextService.RESTRICTED_SSL_ALGORITHM, "TLSv1.2"); + runner.enableControllerService(sslContextService); + + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT)); + runner.assertValid(); + + testPOSTRequestsReceived(HttpServletResponse.SC_NO_CONTENT); } @Test @@ -116,7 +150,21 @@ public class TestListenHTTP { runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL); runner.assertValid(); - testPOSTRequestsReceived(); + testPOSTRequestsReceived(HttpServletResponse.SC_OK); + } + + @Test + public void testSecurePOSTRequestsReturnCodeReceivedWithEL() throws Exception { + SSLContextService sslContextService = configureProcessorSslContextService(); + runner.setProperty(sslContextService, StandardRestrictedSSLContextService.RESTRICTED_SSL_ALGORITHM, "TLSv1.2"); + runner.enableControllerService(sslContextService); + + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT)); + runner.assertValid(); + + testPOSTRequestsReceived(HttpServletResponse.SC_NO_CONTENT); } @Test @@ -137,7 +185,7 @@ public class TestListenHTTP { final URL url = new URL(scheme + "://localhost:" + availablePort + "/" + HTTP_BASE_PATH); HttpURLConnection connection; - if(secure) { + if (secure) { final HttpsURLConnection sslCon = (HttpsURLConnection) url.openConnection(); final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.WANT); sslCon.setSSLSocketFactory(sslContext.getSocketFactory()); @@ -151,7 +199,7 @@ public class TestListenHTTP { final DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); - if (message!=null) { + if (message != null) { wr.writeBytes(message); } wr.flush(); @@ -159,54 +207,56 @@ public class TestListenHTTP { return connection.getResponseCode(); } - private void testPOSTRequestsReceived() throws Exception { + private void testPOSTRequestsReceived(int returnCode) throws Exception { final List messages = new ArrayList<>(); messages.add("payload 1"); messages.add(""); messages.add(null); messages.add("payload 2"); - startWebServerAndSendMessages(messages); + startWebServerAndSendMessages(messages, returnCode); List mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS); - runner.assertTransferCount(RELATIONSHIP_SUCCESS,4); + runner.assertTransferCount(RELATIONSHIP_SUCCESS, 4); mockFlowFiles.get(0).assertContentEquals("payload 1"); mockFlowFiles.get(1).assertContentEquals(""); mockFlowFiles.get(2).assertContentEquals(""); mockFlowFiles.get(3).assertContentEquals("payload 2"); } - private void startWebServerAndSendMessages(final List messages) + private void startWebServerAndSendMessages(final List messages, int returnCode) throws Exception { - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); proc.createHttpServer(context); - Runnable sendMessagestoWebServer = () -> { - try { - for (final String message : messages) { - if (executePOST(message)!=200) fail("HTTP POST failed."); + Runnable sendMessagestoWebServer = () -> { + try { + for (final String message : messages) { + if (executePOST(message) != returnCode) { + fail("HTTP POST failed."); } - } catch (Exception e) { - e.printStackTrace(); - fail("Not expecting error here."); } - }; - new Thread(sendMessagestoWebServer).start(); - - long responseTimeout = 10000; - - int numTransferred = 0; - long startTime = System.currentTimeMillis(); - while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) { - proc.onTrigger(context, processSessionFactory); - numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size(); - Thread.sleep(100); + } catch (Exception e) { + e.printStackTrace(); + fail("Not expecting error here."); } + }; + new Thread(sendMessagestoWebServer).start(); - runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size()); + long responseTimeout = 10000; + + int numTransferred = 0; + long startTime = System.currentTimeMillis(); + while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) { + proc.onTrigger(context, processSessionFactory); + numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size(); + Thread.sleep(100); + } + + runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size()); }