From 55f4716f3d2bed47f422251c97155abcf197a480 Mon Sep 17 00:00:00 2001 From: Davy De Waele Date: Thu, 29 Dec 2016 23:24:21 +0100 Subject: [PATCH] NIFI-3266 Added EL support for basePath and port in ListenHTTP This closes #1373. --- .../nifi/processors/standard/ListenHTTP.java | 37 ++-- .../processors/standard/TestListenHTTP.java | 175 ++++++++++++++++++ 2 files changed, 194 insertions(+), 18 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 3e8576d2b1..82eee927a5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -16,22 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; - -import javax.servlet.Servlet; -import javax.ws.rs.Path; - import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -64,6 +48,21 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; +import javax.servlet.Servlet; +import javax.ws.rs.Path; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"ingest", "http", "https", "rest", "listen"}) @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener") @@ -81,6 +80,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .name("Base Path") .description("Base path for incoming connections") .required(true) + .expressionLanguageSupported(true) .defaultValue("contentListener") .addValidator(StandardValidators.URI_VALIDATOR) .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with / @@ -89,6 +89,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .name("Listening Port") .description("The Port to listen on for incoming connections") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder() @@ -196,7 +197,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { } private void createHttpServerFromService(final ProcessContext context) throws Exception { - final String basePath = context.getProperty(BASE_PATH).getValue(); + final String basePath = context.getProperty(BASE_PATH).evaluateAttributeExpressions().getValue(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); @@ -232,7 +233,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final Server server = new Server(threadPool); // get the configured port - final int port = context.getProperty(PORT).asInteger(); + final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); final ServerConnector connector; final HttpConfiguration httpConfiguration = new HttpConfiguration(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java new file mode 100644 index 0000000000..020535b15e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.ServerSocket; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS; +import static org.junit.Assert.fail; + +public class TestListenHTTP { + + private static final String HTTP_POST_METHOD = "POST"; + private static final String HTTP_BASE_PATH = "basePath"; + + private final static String PORT_VARIABLE = "HTTP_PORT"; + private final static String HTTP_SERVER_PORT_EL = "${" + PORT_VARIABLE + "}"; + + private final static String BASEPATH_VARIABLE = "HTTP_BASEPATH"; + private final static String HTTP_SERVER_BASEPATH_EL = "${" + BASEPATH_VARIABLE + "}"; + + private ListenHTTP proc; + private TestRunner runner; + + private int availablePort; + + @Before + public void setup() throws IOException { + proc = new ListenHTTP(); + runner = TestRunners.newTestRunner(proc); + availablePort = findAvailablePort(); + runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort)); + runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH); + + } + + @Test + public void testPOSTRequestsReceivedWithoutEL() throws Exception { + + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + + testPOSTRequestsReceived(); + } + + @Test + public void testPOSTRequestsReceivedWithEL() throws Exception { + + runner.setProperty(ListenHTTP.PORT, HTTP_SERVER_PORT_EL); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL); + + testPOSTRequestsReceived(); + } + + private int executePOST(String message) throws Exception { + + URL url= new URL("http://localhost:" + availablePort + "/" + HTTP_BASE_PATH); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + + con.setRequestMethod(HTTP_POST_METHOD); + con.setDoOutput(true); + DataOutputStream wr = new DataOutputStream(con.getOutputStream()); + if (message!=null) { + wr.writeBytes(message); + } + wr.flush(); + wr.close(); + + return con.getResponseCode(); + + } + private void testPOSTRequestsReceived() throws Exception { + final List messages = new ArrayList<>(); + messages.add("payload 1"); + messages.add(""); + messages.add(null); + messages.add("payload 2"); + + startWebServerAndSendMessages(messages); + + List mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS); + + runner.assertTransferCount(RELATIONSHIP_SUCCESS,4); + mockFlowFiles.get(0).assertContentEquals("payload 1"); + mockFlowFiles.get(1).assertContentEquals(""); + mockFlowFiles.get(2).assertContentEquals(""); + mockFlowFiles.get(3).assertContentEquals("payload 2"); + } + + private void startWebServerAndSendMessages(final List messages) + throws Exception { + + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.createHttpServer(context); + + Runnable sendMessagestoWebServer = () -> { + try { + for (final String message : messages) { + if (executePOST(message)!=200) fail("HTTP POST failed."); + } + } catch (Exception e) { + e.printStackTrace(); + fail("Not expecting error here."); + } + }; + new Thread(sendMessagestoWebServer).start(); + + long responseTimeout = 10000; + + int numTransferred = 0; + long startTime = System.currentTimeMillis(); + while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) { + proc.onTrigger(context, processSessionFactory); + numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size(); + Thread.sleep(100); + } + + runner.assertTransferCount(ListenTCP.REL_SUCCESS, messages.size()); + + } + + private int findAvailablePort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + return socket.getLocalPort(); + } + } + + private SSLContextService configureProcessorSslContextService() throws InitializationException { + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context"); + return sslContextService; + } + +}