NIFI-3266 Added EL support for basePath and port in ListenHTTP

This closes #1373.
This commit is contained in:
Davy De Waele 2016-12-29 23:24:21 +01:00 committed by Pierre Villard
parent 0d14db72f5
commit 55f4716f3d
2 changed files with 194 additions and 18 deletions

View File

@ -16,22 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.servlet.Servlet;
import javax.ws.rs.Path;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -64,6 +48,21 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import javax.servlet.Servlet;
import javax.ws.rs.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"ingest", "http", "https", "rest", "listen"})
@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
@ -81,6 +80,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.name("Base Path")
.description("Base path for incoming connections")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("contentListener")
.addValidator(StandardValidators.URI_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
@ -89,6 +89,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.name("Listening Port")
.description("The Port to listen on for incoming connections")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
@ -196,7 +197,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
}
private void createHttpServerFromService(final ProcessContext context) throws Exception {
final String basePath = context.getProperty(BASE_PATH).getValue();
final String basePath = context.getProperty(BASE_PATH).evaluateAttributeExpressions().getValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
@ -232,7 +233,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final Server server = new Server(threadPool);
// get the configured port
final int port = context.getProperty(PORT).asInteger();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final ServerConnector connector;
final HttpConfiguration httpConfiguration = new HttpConfiguration();

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.ServerSocket;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
import static org.junit.Assert.fail;
public class TestListenHTTP {
private static final String HTTP_POST_METHOD = "POST";
private static final String HTTP_BASE_PATH = "basePath";
private final static String PORT_VARIABLE = "HTTP_PORT";
private final static String HTTP_SERVER_PORT_EL = "${" + PORT_VARIABLE + "}";
private final static String BASEPATH_VARIABLE = "HTTP_BASEPATH";
private final static String HTTP_SERVER_BASEPATH_EL = "${" + BASEPATH_VARIABLE + "}";
private ListenHTTP proc;
private TestRunner runner;
private int availablePort;
@Before
public void setup() throws IOException {
proc = new ListenHTTP();
runner = TestRunners.newTestRunner(proc);
availablePort = findAvailablePort();
runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort));
runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH);
}
@Test
public void testPOSTRequestsReceivedWithoutEL() throws Exception {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
testPOSTRequestsReceived();
}
@Test
public void testPOSTRequestsReceivedWithEL() throws Exception {
runner.setProperty(ListenHTTP.PORT, HTTP_SERVER_PORT_EL);
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL);
testPOSTRequestsReceived();
}
private int executePOST(String message) throws Exception {
URL url= new URL("http://localhost:" + availablePort + "/" + HTTP_BASE_PATH);
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod(HTTP_POST_METHOD);
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
if (message!=null) {
wr.writeBytes(message);
}
wr.flush();
wr.close();
return con.getResponseCode();
}
private void testPOSTRequestsReceived() throws Exception {
final List<String> messages = new ArrayList<>();
messages.add("payload 1");
messages.add("");
messages.add(null);
messages.add("payload 2");
startWebServerAndSendMessages(messages);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
runner.assertTransferCount(RELATIONSHIP_SUCCESS,4);
mockFlowFiles.get(0).assertContentEquals("payload 1");
mockFlowFiles.get(1).assertContentEquals("");
mockFlowFiles.get(2).assertContentEquals("");
mockFlowFiles.get(3).assertContentEquals("payload 2");
}
private void startWebServerAndSendMessages(final List<String> messages)
throws Exception {
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.createHttpServer(context);
Runnable sendMessagestoWebServer = () -> {
try {
for (final String message : messages) {
if (executePOST(message)!=200) fail("HTTP POST failed.");
}
} catch (Exception e) {
e.printStackTrace();
fail("Not expecting error here.");
}
};
new Thread(sendMessagestoWebServer).start();
long responseTimeout = 10000;
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
Thread.sleep(100);
}
runner.assertTransferCount(ListenTCP.REL_SUCCESS, messages.size());
}
private int findAvailablePort() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
socket.setReuseAddress(true);
return socket.getLocalPort();
}
}
private SSLContextService configureProcessorSslContextService() throws InitializationException {
final SSLContextService sslContextService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslContextService);
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
runner.enableControllerService(sslContextService);
runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context");
return sslContextService;
}
}