mirror of https://github.com/apache/nifi.git
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
This commit is contained in:
commit
322ac6fba6
|
@ -27,6 +27,7 @@ import java.io.OutputStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -47,6 +48,7 @@ import javax.net.ssl.HttpsURLConnection;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import javax.net.ssl.SSLSession;
|
import javax.net.ssl.SSLSession;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
@ -244,6 +246,34 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||||
.identifiesControllerService(SSLContextService.class)
|
.identifiesControllerService(SSLContextService.class)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
// Per RFC 7235, 2617, and 2616.
|
||||||
|
// basic-credentials = base64-user-pass
|
||||||
|
// base64-user-pass = userid ":" password
|
||||||
|
// userid = *<TEXT excluding ":">
|
||||||
|
// password = *TEXT
|
||||||
|
//
|
||||||
|
// OCTET = <any 8-bit sequence of data>
|
||||||
|
// CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
|
||||||
|
// LWS = [CRLF] 1*( SP | HT )
|
||||||
|
// TEXT = <any OCTET except CTLs but including LWS>
|
||||||
|
//
|
||||||
|
// Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs.
|
||||||
|
public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("Basic Authentication Username")
|
||||||
|
.displayName("Basic Authentication Username")
|
||||||
|
.description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
|
||||||
|
.name("Basic Authentication Password")
|
||||||
|
.displayName("Basic Authentication Password")
|
||||||
|
.description("The password to be used by the client to authenticate against the Remote URL.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
||||||
PROP_METHOD,
|
PROP_METHOD,
|
||||||
PROP_URL,
|
PROP_URL,
|
||||||
|
@ -252,7 +282,9 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||||
PROP_READ_TIMEOUT,
|
PROP_READ_TIMEOUT,
|
||||||
PROP_DATE_HEADER,
|
PROP_DATE_HEADER,
|
||||||
PROP_FOLLOW_REDIRECTS,
|
PROP_FOLLOW_REDIRECTS,
|
||||||
PROP_ATTRIBUTES_TO_SEND
|
PROP_ATTRIBUTES_TO_SEND,
|
||||||
|
PROP_BASIC_AUTH_USERNAME,
|
||||||
|
PROP_BASIC_AUTH_PASSWORD
|
||||||
));
|
));
|
||||||
|
|
||||||
// property to allow the hostname verifier to be overridden
|
// property to allow the hostname verifier to be overridden
|
||||||
|
@ -383,9 +415,21 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||||
// read the url property from the context
|
// read the url property from the context
|
||||||
String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue());
|
String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue());
|
||||||
URL url = new URL(urlstr);
|
URL url = new URL(urlstr);
|
||||||
|
String authuser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
|
||||||
|
String authpass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
|
||||||
|
|
||||||
|
String authstrencoded = null;
|
||||||
|
if (!authuser.isEmpty()) {
|
||||||
|
String authstr = authuser + ":" + authpass;
|
||||||
|
byte[] bytestrencoded = Base64.encodeBase64(authstr.getBytes(StandardCharsets.UTF_8));
|
||||||
|
authstrencoded = new String(bytestrencoded, StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
|
|
||||||
// create the connection
|
// create the connection
|
||||||
conn = (HttpURLConnection) url.openConnection();
|
conn = (HttpURLConnection) url.openConnection();
|
||||||
|
if (authstrencoded != null) {
|
||||||
|
conn.setRequestProperty("Authorization", "Basic " + authstrencoded);
|
||||||
|
}
|
||||||
|
|
||||||
// set the request method
|
// set the request method
|
||||||
String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase();
|
String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase();
|
||||||
|
|
|
@ -42,6 +42,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import static org.apache.commons.codec.binary.Base64.encodeBase64;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -169,6 +171,89 @@ public class TestInvokeHTTP {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test200auth() throws Exception {
|
||||||
|
addHandler(new BasicAuthHandler());
|
||||||
|
|
||||||
|
String username = "basic_user";
|
||||||
|
String password = "basic_password";
|
||||||
|
|
||||||
|
runner.setProperty(Config.PROP_URL, url + "/status/200");
|
||||||
|
runner.setProperty(Config.PROP_BASIC_AUTH_USERNAME, username);
|
||||||
|
runner.setProperty(Config.PROP_BASIC_AUTH_PASSWORD, password);
|
||||||
|
byte[] creds = String.format("%s:%s", username, password).getBytes(StandardCharsets.UTF_8);
|
||||||
|
final String expAuth = String.format("Basic %s\n", new String(encodeBase64(creds)));
|
||||||
|
|
||||||
|
createFlowFiles(runner);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1);
|
||||||
|
runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1);
|
||||||
|
runner.assertTransferCount(Config.REL_RETRY, 0);
|
||||||
|
runner.assertTransferCount(Config.REL_NO_RETRY, 0);
|
||||||
|
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||||
|
|
||||||
|
//expected in request status.code and status.message
|
||||||
|
//original flow file (+attributes)??????????
|
||||||
|
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
|
||||||
|
bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
|
||||||
|
bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
|
||||||
|
bundle.assertAttributeEquals("Foo", "Bar");
|
||||||
|
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
|
||||||
|
final String expected = "Hello";
|
||||||
|
Assert.assertEquals(expected, actual);
|
||||||
|
|
||||||
|
//expected in response
|
||||||
|
//status code, status message, all headers from server response --> ff attributes
|
||||||
|
//server response message body into payload of ff
|
||||||
|
//should not contain any original ff attributes
|
||||||
|
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
|
||||||
|
bundle1.assertContentEquals(expAuth.getBytes("UTF-8"));
|
||||||
|
bundle1.assertAttributeEquals(Config.STATUS_CODE, "200");
|
||||||
|
bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
|
||||||
|
bundle1.assertAttributeEquals("Foo", "Bar");
|
||||||
|
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
|
||||||
|
final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
|
||||||
|
Assert.assertEquals(expAuth, actual1);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test401notauth() throws Exception {
|
||||||
|
addHandler(new BasicAuthHandler());
|
||||||
|
|
||||||
|
String username = "basic_user";
|
||||||
|
String password = "basic_password";
|
||||||
|
|
||||||
|
runner.setProperty(Config.PROP_URL, url + "/status/401");
|
||||||
|
runner.setProperty(Config.PROP_BASIC_AUTH_USERNAME, username);
|
||||||
|
runner.setProperty(Config.PROP_BASIC_AUTH_PASSWORD, password);
|
||||||
|
|
||||||
|
createFlowFiles(runner);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0);
|
||||||
|
runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0);
|
||||||
|
runner.assertTransferCount(Config.REL_RETRY, 0);
|
||||||
|
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
|
||||||
|
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||||
|
|
||||||
|
//expected in request status.code and status.message
|
||||||
|
//original flow file (+attributes)??????????
|
||||||
|
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
|
||||||
|
bundle.assertAttributeEquals(Config.STATUS_CODE, "401");
|
||||||
|
bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Unauthorized");
|
||||||
|
bundle.assertAttributeEquals("Foo", "Bar");
|
||||||
|
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
|
||||||
|
final String expected = "Hello";
|
||||||
|
Assert.assertEquals(expected, actual);
|
||||||
|
|
||||||
|
String response = bundle.getAttribute(Config.RESPONSE_BODY);
|
||||||
|
assertEquals(response, "Get off my lawn!");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test500() throws Exception {
|
public void test500() throws Exception {
|
||||||
addHandler(new GetOrHeadHandler());
|
addHandler(new GetOrHeadHandler());
|
||||||
|
@ -543,4 +628,28 @@ public class TestInvokeHTTP {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class BasicAuthHandler extends AbstractHandler {
|
||||||
|
|
||||||
|
private String authString;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
|
||||||
|
authString = request.getHeader("Authorization");
|
||||||
|
|
||||||
|
int status = Integer.valueOf(target.substring("/status".length() + 1));
|
||||||
|
|
||||||
|
if (status == 200) {
|
||||||
|
response.setStatus(status);
|
||||||
|
response.setContentType("text/plain");
|
||||||
|
response.getWriter().println(authString);
|
||||||
|
} else {
|
||||||
|
response.setStatus(status);
|
||||||
|
response.setContentType("text/plain");
|
||||||
|
response.getWriter().println("Get off my lawn!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue