NIFI-1086 Provide refactoring of InvokeHTTP

NIFI-980 Add support for HTTP Digest authentication to InvokeHttp
NIFI-1080 Provide additional InvokeHttp unit tests
NIFI-1133 InvokeHTTP Processor does not save Location header for 3xx responses
NIFI-1009 InvokeHTTP should be able to be scheduled without any incoming connection for GET operations
NIFI-61 Multiple improvements for InvokeHTTP inclusive of providing unique tx.id across clusters, dynamic HTTP header properties

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Joseph Percivall 2015-11-02 15:45:20 -05:00 committed by Aldrin Piri
parent fb335ea282
commit 8c2323dc8d
6 changed files with 1662 additions and 788 deletions

View File

@ -124,11 +124,6 @@ language governing permissions and limitations under the License. -->
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-socket-utils</artifactId>
@ -137,11 +132,6 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-load-distribution-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
@ -154,20 +144,10 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
@ -189,7 +169,37 @@ language governing permissions and limitations under the License. -->
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.burgstaller</groupId>
<artifactId>okhttp-digest</artifactId>
<version>0.4</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>

View File

@ -19,22 +19,27 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestInvokeHTTP extends TestInvokeHttpCommon {
@BeforeClass
@ -72,41 +77,99 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
return new TestServer();
}
@Test
public void testSslSetHttpRequest() throws Exception {
final Map<String, String> sslProperties = new HashMap<>();
sslProperties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
sslProperties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
sslProperties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
sslProperties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
sslProperties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
sslProperties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
runner = TestRunners.newTestRunner(InvokeHTTP.class);
final StandardSSLContextService sslService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslService, sslProperties);
runner.enableControllerService(sslService);
runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
addHandler(new GetOrHeadHandler());
runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// expected in request status.code and status.message
// original flow file (+attributes)
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
// expected in response
// status code, status message, all headers from server response --> ff attributes
// server response message body into payload of ff
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
bundle1.assertAttributeEquals("OkHttp-Selected-Protocol", "http/1.1");
}
// Currently InvokeHttp does not support Proxy via Https
@Test
public void testProxy() throws Exception {
addHandler(new MyProxyHandler());
URL proxyURL = new URL(url);
runner.setProperty(InvokeHTTP.Config.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
runner.setProperty(InvokeHTTP.Config.PROP_PROXY_HOST, proxyURL.getHost());
runner.setProperty(InvokeHTTP.Config.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
runner.setProperty(InvokeHTTP.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, proxyURL.getHost());
try{
runner.run();
Assert.fail();
} catch (AssertionError e){
// Expect assetion error when proxy port isn't set but host is.
}
runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
//expected in request status.code and status.message
//original flow file (+attributes)
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
//expected in response
//status code, status message, all headers from server response --> ff attributes
//server response message body into payload of ff
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("http://nifi.apache.org/".getBytes("UTF-8"));
bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.processors.standard.InvokeHTTP.Config;
import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.TestRunners;
@ -63,7 +62,7 @@ public class TestInvokeHttpSSL extends TestInvokeHttpCommon {
final StandardSSLContextService sslService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslService, sslProperties);
runner.enableControllerService(sslService);
runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
server.clearHandlers();
}

38
pom.xml
View File

@ -97,6 +97,44 @@
<hadoop.guava.version>12.0.1</hadoop.guava.version>
<yammer.metrics.version>2.2.0</yammer.metrics.version>
</properties>
<repositories>
<repository>
<id>central</id>
<!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
<name>Maven Repository</name>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>apache-repo</id>
<name>Apache Repository</name>
<url>https://repository.apache.org/content/repositories/releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>jcenter</id>
<url>http://jcenter.bintray.com </url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>