NIFI-4597 This closes #2283. correcting imports, checkstyle issues, and adding validator

Add a property to override HTTP return code in ListenHTTP processor

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
sbouchex 2017-11-21 20:23:26 +01:00 committed by joewitt
parent 3df3ff6c61
commit 33281300cd
3 changed files with 97 additions and 34 deletions

View File

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.servlet.Servlet; import javax.servlet.Servlet;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -126,6 +127,12 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.required(false) .required(false)
.build(); .build();
public static final PropertyDescriptor RETURN_CODE = new PropertyDescriptor.Builder()
.name("Return Code")
.description("The HTTP return code returned after every HTTP call")
.defaultValue(String.valueOf(HttpServletResponse.SC_OK))
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@ -136,6 +143,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath"; public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
private volatile Server server = null; private volatile Server server = null;
private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
@ -156,6 +164,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
descriptors.add(AUTHORIZED_DN_PATTERN); descriptors.add(AUTHORIZED_DN_PATTERN);
descriptors.add(MAX_UNCONFIRMED_TIME); descriptors.add(MAX_UNCONFIRMED_TIME);
descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX); descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
descriptors.add(RETURN_CODE);
this.properties = Collections.unmodifiableList(descriptors); this.properties = Collections.unmodifiableList(descriptors);
} }
@ -203,6 +212,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
final int returnCode = context.getProperty(RETURN_CODE).asInteger();
throttlerRef.set(streamThrottler); throttlerRef.set(streamThrottler);
final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null; final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null;
@ -284,6 +294,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE,returnCode);
if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));

View File

@ -95,6 +95,7 @@ public class ListenHTTPServlet extends HttpServlet {
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap; private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
private StreamThrottler streamThrottler; private StreamThrottler streamThrottler;
private String basePath; private String basePath;
private int returnCode;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
@ -108,6 +109,7 @@ public class ListenHTTPServlet extends HttpServlet {
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH); this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
} }
@Override @Override
@ -301,7 +303,7 @@ public class ListenHTTPServlet extends HttpServlet {
new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
} }
} else { } else {
response.setStatus(HttpServletResponse.SC_OK); response.setStatus(this.returnCode);
logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}", logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}",
new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile}); new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile});

View File

@ -39,12 +39,13 @@ import java.util.List;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.servlet.http.HttpServletResponse;
import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS; import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class TestListenHTTP { public class TestListenHTTP {
private static final String SSL_CONTEXT_SERVICE_IDENTIFIER = "ssl-context"; private static final String SSL_CONTEXT_SERVICE_IDENTIFIER = "ssl-context";
private static final String HTTP_POST_METHOD = "POST"; private static final String HTTP_POST_METHOD = "POST";
@ -67,7 +68,7 @@ public class TestListenHTTP {
runner = TestRunners.newTestRunner(proc); runner = TestRunners.newTestRunner(proc);
availablePort = NetworkUtils.availablePort();; availablePort = NetworkUtils.availablePort();;
runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort)); runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort));
runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH); runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH);
} }
@ -81,7 +82,16 @@ public class TestListenHTTP {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
testPOSTRequestsReceived(); testPOSTRequestsReceived(HttpServletResponse.SC_OK);
}
@Test
public void testPOSTRequestsReceivedReturnCodeWithoutEL() throws Exception {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT));
testPOSTRequestsReceived(HttpServletResponse.SC_NO_CONTENT);
} }
@Test @Test
@ -90,7 +100,17 @@ public class TestListenHTTP {
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL); runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL);
runner.assertValid(); runner.assertValid();
testPOSTRequestsReceived(); testPOSTRequestsReceived(HttpServletResponse.SC_OK);
}
@Test
public void testPOSTRequestsReturnCodeReceivedWithEL() throws Exception {
runner.setProperty(ListenHTTP.PORT, HTTP_SERVER_PORT_EL);
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL);
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT));
runner.assertValid();
testPOSTRequestsReceived(HttpServletResponse.SC_NO_CONTENT);
} }
@Test @Test
@ -103,7 +123,21 @@ public class TestListenHTTP {
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.assertValid(); runner.assertValid();
testPOSTRequestsReceived(); testPOSTRequestsReceived(HttpServletResponse.SC_OK);
}
@Test
public void testSecurePOSTRequestsReturnCodeReceivedWithoutEL() throws Exception {
SSLContextService sslContextService = configureProcessorSslContextService();
runner.setProperty(sslContextService, StandardRestrictedSSLContextService.RESTRICTED_SSL_ALGORITHM, "TLSv1.2");
runner.enableControllerService(sslContextService);
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT));
runner.assertValid();
testPOSTRequestsReceived(HttpServletResponse.SC_NO_CONTENT);
} }
@Test @Test
@ -116,7 +150,21 @@ public class TestListenHTTP {
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL); runner.setProperty(ListenHTTP.BASE_PATH, HTTP_SERVER_BASEPATH_EL);
runner.assertValid(); runner.assertValid();
testPOSTRequestsReceived(); testPOSTRequestsReceived(HttpServletResponse.SC_OK);
}
@Test
public void testSecurePOSTRequestsReturnCodeReceivedWithEL() throws Exception {
SSLContextService sslContextService = configureProcessorSslContextService();
runner.setProperty(sslContextService, StandardRestrictedSSLContextService.RESTRICTED_SSL_ALGORITHM, "TLSv1.2");
runner.enableControllerService(sslContextService);
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT));
runner.assertValid();
testPOSTRequestsReceived(HttpServletResponse.SC_NO_CONTENT);
} }
@Test @Test
@ -137,7 +185,7 @@ public class TestListenHTTP {
final URL url = new URL(scheme + "://localhost:" + availablePort + "/" + HTTP_BASE_PATH); final URL url = new URL(scheme + "://localhost:" + availablePort + "/" + HTTP_BASE_PATH);
HttpURLConnection connection; HttpURLConnection connection;
if(secure) { if (secure) {
final HttpsURLConnection sslCon = (HttpsURLConnection) url.openConnection(); final HttpsURLConnection sslCon = (HttpsURLConnection) url.openConnection();
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.WANT); final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.WANT);
sslCon.setSSLSocketFactory(sslContext.getSocketFactory()); sslCon.setSSLSocketFactory(sslContext.getSocketFactory());
@ -151,7 +199,7 @@ public class TestListenHTTP {
final DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); final DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
if (message!=null) { if (message != null) {
wr.writeBytes(message); wr.writeBytes(message);
} }
wr.flush(); wr.flush();
@ -159,54 +207,56 @@ public class TestListenHTTP {
return connection.getResponseCode(); return connection.getResponseCode();
} }
private void testPOSTRequestsReceived() throws Exception { private void testPOSTRequestsReceived(int returnCode) throws Exception {
final List<String> messages = new ArrayList<>(); final List<String> messages = new ArrayList<>();
messages.add("payload 1"); messages.add("payload 1");
messages.add(""); messages.add("");
messages.add(null); messages.add(null);
messages.add("payload 2"); messages.add("payload 2");
startWebServerAndSendMessages(messages); startWebServerAndSendMessages(messages, returnCode);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS); List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
runner.assertTransferCount(RELATIONSHIP_SUCCESS,4); runner.assertTransferCount(RELATIONSHIP_SUCCESS, 4);
mockFlowFiles.get(0).assertContentEquals("payload 1"); mockFlowFiles.get(0).assertContentEquals("payload 1");
mockFlowFiles.get(1).assertContentEquals(""); mockFlowFiles.get(1).assertContentEquals("");
mockFlowFiles.get(2).assertContentEquals(""); mockFlowFiles.get(2).assertContentEquals("");
mockFlowFiles.get(3).assertContentEquals("payload 2"); mockFlowFiles.get(3).assertContentEquals("payload 2");
} }
private void startWebServerAndSendMessages(final List<String> messages) private void startWebServerAndSendMessages(final List<String> messages, int returnCode)
throws Exception { throws Exception {
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext(); final ProcessContext context = runner.getProcessContext();
proc.createHttpServer(context); proc.createHttpServer(context);
Runnable sendMessagestoWebServer = () -> { Runnable sendMessagestoWebServer = () -> {
try { try {
for (final String message : messages) { for (final String message : messages) {
if (executePOST(message)!=200) fail("HTTP POST failed."); if (executePOST(message) != returnCode) {
fail("HTTP POST failed.");
} }
} catch (Exception e) {
e.printStackTrace();
fail("Not expecting error here.");
} }
}; } catch (Exception e) {
new Thread(sendMessagestoWebServer).start(); e.printStackTrace();
fail("Not expecting error here.");
long responseTimeout = 10000;
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
Thread.sleep(100);
} }
};
new Thread(sendMessagestoWebServer).start();
runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size()); long responseTimeout = 10000;
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
Thread.sleep(100);
}
runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size());
} }