NIFI-8304 Refactored InvokeHTTP unit tests using OkHttp MockWebServer

- Replaced Joda Time with java.time for date formatting
- Replaced Guava Files with java.nio.file.Files for cache directory
- Updated PutTCP test server to close connection when testing connection per FlowFile

NIFI-8304 Removed Thread wrapper for TestListenHTTP client requests

NIFI-8304 Disabled InvokeHTTP Connection Pooling for testing

NIFI-8304 Set 60 second timeout for testing TLS connections

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4892.
This commit is contained in:
exceptionfactory 2021-03-11 17:39:40 -06:00 committed by Pierre Villard
parent 57cca88eea
commit b1f513a2ca
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
10 changed files with 1088 additions and 3210 deletions

View File

@ -401,6 +401,12 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -20,7 +20,6 @@ import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
import com.burgstaller.okhttp.digest.CachingAuthenticator;
import com.burgstaller.okhttp.digest.DigestAuthenticator;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -29,7 +28,11 @@ import java.net.Proxy.Type;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.security.Principal;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -37,7 +40,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@ -47,15 +49,15 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.X509TrustManager;
import okhttp3.Cache;
import okhttp3.ConnectionPool;
import okhttp3.Credentials;
import okhttp3.Handshake;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.MultipartBody.Builder;
@ -90,7 +92,6 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@ -103,8 +104,6 @@ import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.StreamUtils;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@ -114,15 +113,15 @@ import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@CapabilityDescription("An HTTP client processor which can interact with a configurable HTTP Endpoint. The destination URL and HTTP Method are configurable."
+ " FlowFile attributes are converted to HTTP headers and the FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH).")
@WritesAttributes({
@WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
@WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
@WritesAttribute(attribute = "invokehttp.response.body", description = "In the instance where the status code received is not a success (2xx) "
@WritesAttribute(attribute = InvokeHTTP.STATUS_CODE, description = "The status code that is returned"),
@WritesAttribute(attribute = InvokeHTTP.STATUS_MESSAGE, description = "The status message that is returned"),
@WritesAttribute(attribute = InvokeHTTP.RESPONSE_BODY, description = "In the instance where the status code received is not a success (2xx) "
+ "then the response body will be put to the 'invokehttp.response.body' attribute of the request FlowFile."),
@WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
@WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
@WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server"),
@WritesAttribute(attribute = "invokehttp.java.exception.class", description = "The Java exception class raised when the processor fails"),
@WritesAttribute(attribute = "invokehttp.java.exception.message", description = "The Java exception message raised when the processor fails"),
@WritesAttribute(attribute = InvokeHTTP.REQUEST_URL, description = "The request URL"),
@WritesAttribute(attribute = InvokeHTTP.TRANSACTION_ID, description = "The transaction ID that is returned after reading the response"),
@WritesAttribute(attribute = InvokeHTTP.REMOTE_DN, description = "The DN of the remote server"),
@WritesAttribute(attribute = InvokeHTTP.EXCEPTION_CLASS, description = "The Java exception class raised when the processor fails"),
@WritesAttribute(attribute = InvokeHTTP.EXCEPTION_MESSAGE, description = "The Java exception message raised when the processor fails"),
@WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to "
+ "will become the attribute key and the value would be the body of the HTTP response.")})
@DynamicProperties({
@ -137,7 +136,6 @@ import static org.apache.commons.lang3.StringUtils.trimToEmpty;
+ " If send message body is false, the flowfile will not be sent, but any other form data will be.")
})
public class InvokeHTTP extends AbstractProcessor {
// flowfile attribute keys returned after reading the response
public final static String STATUS_CODE = "invokehttp.status.code";
public final static String STATUS_MESSAGE = "invokehttp.status.message";
public final static String RESPONSE_BODY = "invokehttp.response.body";
@ -147,7 +145,6 @@ public class InvokeHTTP extends AbstractProcessor {
public final static String EXCEPTION_CLASS = "invokehttp.java.exception.class";
public final static String EXCEPTION_MESSAGE = "invokehttp.java.exception.message";
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
public static final String FORM_BASE = "post:form";
@ -161,13 +158,11 @@ public class InvokeHTTP extends AbstractProcessor {
EXCEPTION_CLASS, EXCEPTION_MESSAGE,
"uuid", "filename", "path")));
// Set of HTTP header names explicitly excluded from requests.
private static final Map<String, String> excludedHeaders = new HashMap<String, String>();
public static final String HTTP = "http";
public static final String HTTPS = "https";
private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = Pattern.compile("post:form:(?<formDataName>.*)$");
private static final String FORM_DATA_NAME_GROUP = "formDataName";
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
@ -332,9 +327,9 @@ public class InvokeHTTP extends AbstractProcessor {
public static final PropertyDescriptor PROP_FORM_BODY_FORM_NAME = new PropertyDescriptor.Builder()
.name("form-body-form-name")
.displayName("Flowfile Form Data Name")
.description("When Send Message Body is true, and Flowfile Form Data Name is set, "
+ " the Flowfile will be sent as the message body in multipart/form format with this value "
.displayName("FlowFile Form Data Name")
.description("When Send Message Body is true, and FlowFile Form Data Name is set, "
+ " the FlowFile will be sent as the message body in multipart/form format with this value "
+ "as the form data name.")
.required(false)
.addValidator(
@ -344,10 +339,10 @@ public class InvokeHTTP extends AbstractProcessor {
public static final PropertyDescriptor PROP_SET_FORM_FILE_NAME = new PropertyDescriptor.Builder()
.name("set-form-filename")
.displayName("Set Flowfile Form Data File Name")
.displayName("Set FlowFile Form Data File Name")
.description(
"When Send Message Body is true, Flowfile Form Data Name is set, "
+ "and Set Flowfile Form Data File Name is true, the Flowfile's fileName value "
"When Send Message Body is true, FlowFile Form Data Name is set, "
+ "and Set FlowFile Form Data File Name is true, the FlowFile's fileName value "
+ "will be set as the filename property of the form data.")
.required(false)
.defaultValue("true")
@ -558,25 +553,20 @@ public class InvokeHTTP extends AbstractProcessor {
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
// RFC 2616 Date Time Formatter with hard-coded GMT Zone
private static final DateTimeFormatter RFC_2616_DATE_TIME = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'");
// Multiple Header Delimiter
private static final String MULTIPLE_HEADER_DELIMITER = ", ";
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
/**
* Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
* string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
*/
private static final String RFC_1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(RFC_1123).withLocale(Locale.US).withZoneUTC();
private volatile Pattern regexAttributesToSend = null;
private volatile boolean useChunked = false;
private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
@Override
protected void init(ProcessorInitializationContext context) {
excludedHeaders.put("Trusted Hostname", "HTTP request header '{}' excluded. " +
"Update processor to use the SSLContextService instead. " +
"See the Access Policies section in the System Administrator's Guide.");
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
@ -591,7 +581,7 @@ public class InvokeHTTP extends AbstractProcessor {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
.description("Form Data " + matcher.group("formDataName"))
.description("Form Data " + matcher.group(FORM_DATA_NAME_GROUP))
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ -614,9 +604,6 @@ public class InvokeHTTP extends AbstractProcessor {
return RELATIONSHIPS;
}
private volatile Pattern regexAttributesToSend = null;
private volatile boolean useChunked = false;
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.isDynamic()) {
@ -674,20 +661,11 @@ public class InvokeHTTP extends AbstractProcessor {
ProxyConfiguration.validateProxySpec(validationContext, results, PROXY_SPECS);
for (String headerKey : validationContext.getProperties().values()) {
if (excludedHeaders.containsKey(headerKey)) {
// We're not using the header message format string here, just this
// static validation message string:
results.add(new ValidationResult.Builder().subject(headerKey).valid(false).explanation("Matches excluded HTTP header name").build());
}
}
// Check for dynamic properties for form components.
// Even if the flowfile is not sent, we may still send form parameters.
boolean hasFormData = false;
Map<String, PropertyDescriptor> propertyDescriptors = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : validationContext.getProperties().entrySet()) {
Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(entry.getKey().getName());
for (final PropertyDescriptor descriptor : validationContext.getProperties().keySet()) {
final Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(descriptor.getName());
if (matcher.matches()) {
hasFormData = true;
break;
@ -699,15 +677,24 @@ public class InvokeHTTP extends AbstractProcessor {
final boolean contentNameSet = validationContext.getProperty(PROP_FORM_BODY_FORM_NAME).isSet();
if (hasFormData) {
if (sendBody && !contentNameSet) {
results.add(new ValidationResult.Builder().subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
.valid(false).explanation(
"If dynamic form data properties are set, and send body is true, Flowfile Form Data Name must be configured.")
final String explanation = String.format("[%s] is required when Form Data properties are configured and [%s] is enabled",
PROP_FORM_BODY_FORM_NAME.getDisplayName(),
PROP_SEND_BODY.getDisplayName());
results.add(new ValidationResult.Builder()
.subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
.valid(false)
.explanation(explanation)
.build());
}
}
if (!sendBody && contentNameSet) {
results.add(new ValidationResult.Builder().subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
.valid(false).explanation("If Flowfile Form Data Name is configured, Send Message Body must be true.")
final String explanation = String.format("[%s] must be [true] when Form Data properties are configured and [%s] is configured",
PROP_SEND_BODY.getDisplayName(),
PROP_FORM_BODY_FORM_NAME.getDisplayName());
results.add(new ValidationResult.Builder()
.subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
.valid(false)
.explanation(explanation)
.build());
}
@ -715,13 +702,11 @@ public class InvokeHTTP extends AbstractProcessor {
}
@OnScheduled
public void setUpClient(final ProcessContext context) throws TlsException {
public void setUpClient(final ProcessContext context) throws TlsException, IOException {
okHttpClientAtomicReference.set(null);
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
// Add a proxy if set
boolean isHttpsProxy = HTTPS.equals(context.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue());
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue();
@ -754,11 +739,9 @@ public class InvokeHTTP extends AbstractProcessor {
okHttpClientBuilder.cache(new Cache(getETagCacheDir(), maxCacheSizeBytes));
}
// Configure whether HTTP/2 protocol should be used or not
// Configure whether HTTP/2 protocol should be disabled
if (context.getProperty(DISABLE_HTTP2_PROTOCOL).asBoolean()) {
okHttpClientBuilder.protocols(Arrays.asList(Protocol.HTTP_1_1));
} else {
okHttpClientBuilder.protocols(Arrays.asList(Protocol.HTTP_1_1, Protocol.HTTP_2));
okHttpClientBuilder.protocols(Collections.singletonList(Protocol.HTTP_1_1));
}
// Set timeouts
@ -839,22 +822,14 @@ public class InvokeHTTP extends AbstractProcessor {
final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
final ComponentLog logger = getLogger();
// log ETag cache metrics
final boolean eTagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean();
if (eTagEnabled && logger.isDebugEnabled()) {
final Cache cache = okHttpClient.cache();
logger.debug("OkHttp ETag cache metrics :: Request Count: {} | Network Count: {} | Hit Count: {}",
new Object[]{cache.requestCount(), cache.networkCount(), cache.hitCount()});
}
// Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
final UUID txId = UUID.randomUUID();
FlowFile responseFlowFile = null;
try {
// read the url property from the context
final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
final URL url = new URL(urlstr);
final String urlProperty = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
final URL url = new URL(urlProperty);
Request httpRequest = configureRequest(context, session, requestFlowFile, url);
@ -876,10 +851,6 @@ public class InvokeHTTP extends AbstractProcessor {
int statusCode = responseHttp.code();
String statusMessage = responseHttp.message();
if (statusCode == 0) {
throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
}
// Create a map of the status attributes that are always written to the request and response FlowFiles
Map<String, String> statusAttributes = new HashMap<>();
statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
@ -895,7 +866,7 @@ public class InvokeHTTP extends AbstractProcessor {
if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp));
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp));
}
boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
@ -931,14 +902,15 @@ public class InvokeHTTP extends AbstractProcessor {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp));
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp));
// transfer the message body to the payload
// can potentially be null in edge cases
if (bodyExists) {
// write content type attribute to response flowfile if it is available
if (responseBody.contentType() != null) {
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), responseBody.contentType().toString());
final MediaType contentType = responseBody.contentType();
if (contentType != null) {
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), contentType.toString());
}
if (teeInputStream != null) {
responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
@ -982,14 +954,11 @@ public class InvokeHTTP extends AbstractProcessor {
} finally {
if (outputStreamToRequestAttribute != null) {
outputStreamToRequestAttribute.close();
outputStreamToRequestAttribute = null;
}
if (teeInputStream != null) {
teeInputStream.close();
teeInputStream = null;
} else if (responseBodyStream != null) {
responseBodyStream.close();
responseBodyStream = null;
}
}
@ -1012,21 +981,17 @@ public class InvokeHTTP extends AbstractProcessor {
// cleanup response flowfile, if applicable
try {
if (responseFlowFile != null) {
session.remove(responseFlowFile);
}
} catch (final Exception e1) {
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
}
}
}
private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) {
Request.Builder requestBuilder = new Request.Builder();
final Request.Builder requestBuilder = new Request.Builder();
requestBuilder = requestBuilder.url(url);
requestBuilder.url(url);
final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
// If the username/password properties are set then check if digest auth is being used
@ -1034,41 +999,41 @@ public class InvokeHTTP extends AbstractProcessor {
final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
String credential = Credentials.basic(authUser, authPass);
requestBuilder = requestBuilder.header("Authorization", credential);
requestBuilder.header("Authorization", credential);
}
// set the request method
String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
switch (method) {
case "GET":
requestBuilder = requestBuilder.get();
requestBuilder.get();
break;
case "POST":
RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBuilder = requestBuilder.post(requestBody);
requestBuilder.post(requestBody);
break;
case "PUT":
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBuilder = requestBuilder.put(requestBody);
requestBuilder.put(requestBody);
break;
case "PATCH":
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBuilder = requestBuilder.patch(requestBody);
requestBuilder.patch(requestBody);
break;
case "HEAD":
requestBuilder = requestBuilder.head();
requestBuilder.head();
break;
case "DELETE":
requestBuilder = requestBuilder.delete();
requestBuilder.delete();
break;
default:
requestBuilder = requestBuilder.method(method, null);
requestBuilder.method(method, null);
}
String userAgent = trimToEmpty(context.getProperty(PROP_USERAGENT).evaluateAttributeExpressions(requestFlowFile).getValue());
requestBuilder.addHeader("User-Agent", userAgent);
requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile);
setHeaderProperties(context, requestBuilder, requestFlowFile);
return requestBuilder.build();
}
@ -1089,7 +1054,7 @@ public class InvokeHTTP extends AbstractProcessor {
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(entry.getKey().getName());
if (matcher.matches()) {
propertyDescriptors.put(matcher.group("formDataName"), entry.getKey());
propertyDescriptors.put(matcher.group(FORM_DATA_NAME_GROUP), entry.getKey());
}
}
@ -1101,7 +1066,7 @@ public class InvokeHTTP extends AbstractProcessor {
}
@Override
public void writeTo(BufferedSink sink) throws IOException {
public void writeTo(BufferedSink sink) {
session.exportTo(requestFlowFile, sink.outputStream());
}
@ -1132,31 +1097,25 @@ public class InvokeHTTP extends AbstractProcessor {
} else if (sendBody) {
return requestBody;
}
return RequestBody.create(null, new byte[0]);
return RequestBody.create(new byte[0], null);
}
private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) {
private void setHeaderProperties(final ProcessContext context, final Request.Builder requestBuilder, final FlowFile requestFlowFile) {
// check if we should send the a Date header with the request
if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
requestBuilder = requestBuilder.addHeader("Date", DATE_FORMAT.print(System.currentTimeMillis()));
final ZonedDateTime universalCoordinatedTimeNow = ZonedDateTime.now(ZoneOffset.UTC);
requestBuilder.addHeader("Date", RFC_2616_DATE_TIME.format(universalCoordinatedTimeNow));
}
final ComponentLog logger = getLogger();
for (String headerKey : dynamicPropertyNames) {
String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestFlowFile).getValue();
// don't include any of the excluded headers, log instead
if (excludedHeaders.containsKey(headerKey)) {
logger.warn(excludedHeaders.get(headerKey), new Object[]{headerKey});
continue;
}
// don't include dynamic form data properties
if (DYNAMIC_FORM_PARAMETER_NAME.matcher(headerKey).matches()) {
continue;
}
requestBuilder = requestBuilder.addHeader(headerKey, headerValue);
requestBuilder.addHeader(headerKey, headerValue);
}
// iterate through the flowfile attributes, adding any attribute that
@ -1178,11 +1137,10 @@ public class InvokeHTTP extends AbstractProcessor {
m.reset(headerKey);
if (m.matches()) {
String headerVal = trimToEmpty(entry.getValue());
requestBuilder = requestBuilder.addHeader(headerKey, headerVal);
requestBuilder.addHeader(headerKey, headerVal);
}
}
}
return requestBuilder;
}
@ -1235,21 +1193,19 @@ public class InvokeHTTP extends AbstractProcessor {
private void logRequest(ComponentLog logger, Request request) {
logger.debug("\nRequest to remote service:\n\t{}\n{}",
new Object[]{request.url().url().toExternalForm(), getLogString(request.headers().toMultimap())});
request.url().url().toExternalForm(), getLogString(request.headers().toMultimap()));
}
private void logResponse(ComponentLog logger, URL url, Response response) {
logger.debug("\nResponse from remote service:\n\t{}\n{}",
new Object[]{url.toExternalForm(), getLogString(response.headers().toMultimap())});
url.toExternalForm(), getLogString(response.headers().toMultimap()));
}
private String getLogString(Map<String, List<String>> map) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
List<String> list = entry.getValue();
if (list.isEmpty()) {
continue;
}
if (!list.isEmpty()) {
sb.append("\t");
sb.append(entry.getKey());
sb.append(": ");
@ -1260,70 +1216,36 @@ public class InvokeHTTP extends AbstractProcessor {
}
sb.append("\n");
}
}
return sb.toString();
}
/**
* Convert a collection of string values into a overly simple comma separated string.
* <p/>
* Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
*/
private String csv(Collection<String> values) {
if (values == null || values.isEmpty()) {
return "";
}
if (values.size() == 1) {
return values.iterator().next();
}
StringBuilder sb = new StringBuilder();
for (String value : values) {
value = value.trim();
if (value.isEmpty()) {
continue;
}
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(value);
}
return sb.toString().trim();
}
/**
* Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
*/
private Map<String, String> convertAttributesFromHeaders(URL url, Response responseHttp) {
private Map<String, String> convertAttributesFromHeaders(final Response responseHttp) {
// create a new hashmap to store the values from the connection
Map<String, String> map = new HashMap<>();
responseHttp.headers().names().forEach((key) -> {
if (key == null) {
return;
}
List<String> values = responseHttp.headers().values(key);
final Map<String, String> attributes = new HashMap<>();
final Headers headers = responseHttp.headers();
headers.names().forEach((key) -> {
final List<String> values = headers.values(key);
// we ignore any headers with no actual values (rare)
if (values == null || values.isEmpty()) {
return;
}
if (!values.isEmpty()) {
// create a comma separated string from the values, this is stored in the map
String value = csv(values);
// put the csv into the map
map.put(key, value);
final String value = StringUtils.join(values, MULTIPLE_HEADER_DELIMITER);
attributes.put(key, value);
}
});
if (responseHttp.request().isHttps()) {
Principal principal = responseHttp.handshake().peerPrincipal();
final Handshake handshake = responseHttp.handshake();
if (handshake != null) {
final Principal principal = handshake.peerPrincipal();
if (principal != null) {
map.put(REMOTE_DN, principal.getName());
attributes.put(REMOTE_DN, principal.getName());
}
}
return map;
return attributes;
}
private Charset getCharsetFromMediaType(MediaType contentType) {
@ -1339,26 +1261,7 @@ public class InvokeHTTP extends AbstractProcessor {
*
* @return the directory in which the ETag cache should be written
*/
private static File getETagCacheDir() {
return Files.createTempDir();
}
private static class OverrideHostnameVerifier implements HostnameVerifier {
private final String trustedHostname;
private final HostnameVerifier delegate;
private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
this.trustedHostname = trustedHostname;
this.delegate = delegate;
}
@Override
public boolean verify(String hostname, SSLSession session) {
if (trustedHostname.equalsIgnoreCase(hostname)) {
return true;
}
return delegate.verify(hostname, session);
}
private static File getETagCacheDir() throws IOException {
return Files.createTempDirectory(InvokeHTTP.class.getSimpleName()).toFile();
}
}

View File

@ -0,0 +1,859 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_MOVED_TEMP;
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class InvokeHTTPTest {
private static final String HTTP_LOCALHOST_URL = "http://localhost";
private static final String LOCALHOST = "localhost";
private static final String BASE_PATH = "/";
private static final String POST_FORM_PARAMETER_KEY = "post:form:parameter";
private static final String DATE_HEADER = "Date";
private static final String ACCEPT_HEADER = "Accept";
private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String CONTENT_LENGTH_HEADER = "Content-Length";
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String LOCATION_HEADER = "Location";
private static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
private static final String USER_AGENT_HEADER = "User-Agent";
private static final String AUTHENTICATE_HEADER = "WWW-Authenticate";
private static final String REPEATED_HEADER = "Repeated";
private static final String GET_METHOD = "GET";
private static final String DELETE_METHOD = "DELETE";
private static final String HEAD_METHOD = "HEAD";
private static final String OPTIONS_METHOD = "OPTIONS";
private static final String POST_METHOD = "POST";
private static final String PATCH_METHOD = "PATCH";
private static final String PUT_METHOD = "PUT";
private static final String TEXT_PLAIN = "text/plain";
private static final String FLOW_FILE_CONTENT = String.class.getName();
private static final int TAKE_REQUEST_COMPLETED_TIMEOUT = 1;
private static final String TLS_CONNECTION_TIMEOUT = "60 s";
private static TlsConfiguration generatedTlsConfiguration;
private static TlsConfiguration truststoreTlsConfiguration;
private MockWebServer mockWebServer;
private TestRunner runner;
@BeforeClass
public static void setStores() throws IOException, GeneralSecurityException {
generatedTlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
truststoreTlsConfiguration = new StandardTlsConfiguration(
null,
null,
null,
generatedTlsConfiguration.getTruststorePath(),
generatedTlsConfiguration.getTruststorePassword(),
generatedTlsConfiguration.getTruststoreType()
);
}
@AfterClass
public static void deleteStores() throws IOException {
Files.deleteIfExists(Paths.get(generatedTlsConfiguration.getKeystorePath()));
Files.deleteIfExists(Paths.get(generatedTlsConfiguration.getTruststorePath()));
}
@Before
public void setRunner() {
mockWebServer = new MockWebServer();
runner = TestRunners.newTestRunner(new InvokeHTTP());
// Disable Connection Pooling
runner.setProperty(InvokeHTTP.PROP_MAX_IDLE_CONNECTIONS, Integer.toString(0));
}
@After
public void shutdownServer() throws IOException {
mockWebServer.shutdown();
}
@Test
public void testNotValidWithDefaultProperties() {
runner.assertNotValid();
}
@Test
public void testNotValidWithProxyTypeInvalid() {
runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
runner.setProperty(InvokeHTTP.PROP_PROXY_TYPE, String.class.getSimpleName());
runner.assertNotValid();
}
@Test
public void testNotValidWithProxyHostWithoutProxyPort() {
runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, String.class.getSimpleName());
runner.assertNotValid();
}
@Test
public void testNotValidWithProxyUserWithoutProxyPassword() {
runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
runner.setProperty(InvokeHTTP.PROP_PROXY_USER, String.class.getSimpleName());
runner.assertNotValid();
}
@Test
public void testNotValidWithProxyUserAndPasswordWithoutProxyHost() {
runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
runner.setProperty(InvokeHTTP.PROP_PROXY_USER, String.class.getSimpleName());
runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, String.class.getSimpleName());
runner.assertNotValid();
}
@Test
public void testNotValidWithHttpsProxyTypeWithoutSslContextService() {
runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
runner.setProperty(InvokeHTTP.PROP_PROXY_TYPE, InvokeHTTP.HTTPS);
runner.assertNotValid();
}
@Test
public void testNotValidWithPostFormPropertyWithoutFormBodyFormName() {
runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
runner.setProperty(POST_FORM_PARAMETER_KEY, String.class.getSimpleName());
runner.assertNotValid();
}
@Test
public void testNotValidWithPostFormPropertyAndFormBodyFormNameWithoutSendBodyEnabled() {
runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
runner.setProperty(POST_FORM_PARAMETER_KEY, String.class.getSimpleName());
runner.setProperty(InvokeHTTP.PROP_FORM_BODY_FORM_NAME, String.class.getSimpleName());
runner.setProperty(InvokeHTTP.PROP_SEND_BODY, Boolean.FALSE.toString());
runner.assertNotValid();
}
@Test
public void testValidWithMinimumProperties() {
runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
runner.assertValid();
}
@Test
public void testRunNoIncomingConnectionsWithNonLoopConnections() {
runner.setIncomingConnection(false);
runner.setNonLoopConnection(true);
setUrlProperty();
runner.run();
runner.assertQueueEmpty();
}
@Test
public void testRunNoIncomingConnectionsPostMethod() {
runner.setIncomingConnection(false);
runner.setNonLoopConnection(false);
setUrlProperty();
runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
runner.run();
runner.assertQueueEmpty();
}
@Test
public void testRunGetMalformedUrlExceptionFailureNoIncomingConnections() {
runner.setIncomingConnection(false);
runner.setNonLoopConnection(false);
runner.setProperty(InvokeHTTP.PROP_URL, "${file.name}");
runner.run();
final List<LogMessage> errorMessages = runner.getLogger().getErrorMessages();
assertFalse(errorMessages.isEmpty());
}
@Test
public void testRunGetMalformedUrlExceptionFailure() {
final String urlAttributeKey = "request.url";
runner.setProperty(InvokeHTTP.PROP_URL, String.format("${%s}", urlAttributeKey));
final Map<String, String> attributes = new HashMap<>();
attributes.put(urlAttributeKey, String.class.getSimpleName());
runner.enqueue(FLOW_FILE_CONTENT, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(InvokeHTTP.REL_FAILURE);
runner.assertPenalizeCount(1);
final MockFlowFile flowFile = getFailureFlowFile();
flowFile.assertAttributeEquals(InvokeHTTP.EXCEPTION_CLASS, MalformedURLException.class.getName());
flowFile.assertAttributeExists(InvokeHTTP.EXCEPTION_MESSAGE);
}
@Test
public void testRunGetMethodIllegalArgumentExceptionFailure() {
setUrlProperty();
final String methodAttributeKey = "request.method";
runner.setProperty(InvokeHTTP.PROP_METHOD, String.format("${%s}", methodAttributeKey));
final Map<String, String> attributes = new HashMap<>();
attributes.put(methodAttributeKey, null);
runner.enqueue(FLOW_FILE_CONTENT, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(InvokeHTTP.REL_FAILURE);
runner.assertPenalizeCount(1);
final MockFlowFile flowFile = getFailureFlowFile();
flowFile.assertAttributeEquals(InvokeHTTP.EXCEPTION_CLASS, IllegalArgumentException.class.getName());
flowFile.assertAttributeExists(InvokeHTTP.EXCEPTION_MESSAGE);
}
@Test
public void testRunGetHttp200Success() throws InterruptedException {
assertRequestMethodSuccess(GET_METHOD);
}
@Test
public void testRunGetHttp200SuccessIgnoreResponseContentEnabled() throws InterruptedException {
runner.setProperty(InvokeHTTP.IGNORE_RESPONSE_CONTENT, Boolean.TRUE.toString());
assertRequestMethodSuccess(GET_METHOD);
final MockFlowFile responseFlowFile = getResponseFlowFile();
assertEquals(StringUtils.EMPTY, responseFlowFile.getContent());
}
@Test
public void testRunGetHttp200SuccessOutputBodyAttribute() {
final String outputAttributeKey = String.class.getSimpleName();
runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE, outputAttributeKey);
setUrlProperty();
final String body = String.class.getName();
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK).setBody(body));
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_SUCCESS_REQ, HTTP_OK);
final MockFlowFile flowFile = getRequestFlowFile();
flowFile.assertAttributeEquals(outputAttributeKey, body);
}
@Test
public void testRunGetHttp200SuccessOutputBodyAttributeNoIncomingConnections() {
final String outputAttributeKey = String.class.getSimpleName();
runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE, outputAttributeKey);
setUrlProperty();
runner.setIncomingConnection(false);
runner.setNonLoopConnection(false);
final String body = String.class.getName();
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK).setBody(body));
runner.run();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_SUCCESS_REQ, HTTP_OK);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).iterator().next();
flowFile.assertAttributeEquals(outputAttributeKey, body);
}
@Test
public void testRunGetHttp200SuccessNoIncomingConnections() {
runner.setIncomingConnection(false);
runner.setNonLoopConnection(false);
setUrlProperty();
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
runner.run();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
}
@Test
public void testRunGetHttp200SuccessProxyHostPortConfigured() throws InterruptedException {
final String mockWebServerUrl = getMockWebServerUrl();
final URI uri = URI.create(mockWebServerUrl);
runner.setProperty(InvokeHTTP.PROP_URL, mockWebServerUrl);
runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, uri.getHost());
runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, Integer.toString(uri.getPort()));
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String requestLine = request.getRequestLine();
final String proxyRequestLine = String.format("%s %s HTTP/1.1", GET_METHOD, mockWebServerUrl);
assertEquals(proxyRequestLine, requestLine);
}
@Test
public void testRunGetHttp200SuccessProxyHostPortUserPasswordConfigured() throws InterruptedException {
final String mockWebServerUrl = getMockWebServerUrl();
final URI uri = URI.create(mockWebServerUrl);
runner.setProperty(InvokeHTTP.PROP_URL, mockWebServerUrl);
runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, uri.getHost());
runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, Integer.toString(uri.getPort()));
runner.setProperty(InvokeHTTP.PROP_PROXY_USER, String.class.getSimpleName());
runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, String.class.getName());
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String requestLine = request.getRequestLine();
final String proxyRequestLine = String.format("%s %s HTTP/1.1", GET_METHOD, mockWebServerUrl);
assertEquals(proxyRequestLine, requestLine);
}
@Test
public void testRunGetHttp200SuccessContentTypeHeaderMimeType() {
final MockResponse response = new MockResponse().setResponseCode(HTTP_OK).setHeader(CONTENT_TYPE_HEADER, TEXT_PLAIN);
mockWebServer.enqueue(response);
setUrlProperty();
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final MockFlowFile responseFlowFile = getResponseFlowFile();
responseFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), TEXT_PLAIN);
}
@Test
public void testRunGetHttp200SuccessRequestDateHeader() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_DATE_HEADER, StringUtils.capitalize(Boolean.TRUE.toString()));
enqueueResponseCodeAndRun(HTTP_OK);
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String dateHeader = request.getHeader(DATE_HEADER);
assertNotNull("Request Date not found", dateHeader);
final Pattern rfcDatePattern = Pattern.compile("^.+? \\d{4} \\d{2}:\\d{2}:\\d{2} GMT$");
assertTrue("Request Date RFC 2616 not matched", rfcDatePattern.matcher(dateHeader).matches());
final ZonedDateTime zonedDateTime = ZonedDateTime.parse(dateHeader, DateTimeFormatter.RFC_1123_DATE_TIME);
assertNotNull("Request Date Parsing Failed", zonedDateTime);
}
@Test
public void testRunGetHttp200SuccessSendAttributesAndDynamicProperties() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, String.format("^%s$", ACCEPT_HEADER));
final String defaultContentTypeHeader = "Default-Content-Type";
runner.setProperty(defaultContentTypeHeader, InvokeHTTP.DEFAULT_CONTENT_TYPE);
setUrlProperty();
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
final Map<String, String> attributes = new HashMap<>();
attributes.put(ACCEPT_HEADER, TEXT_PLAIN);
runner.enqueue(FLOW_FILE_CONTENT, attributes);
runner.run();
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String acceptHeader = request.getHeader(ACCEPT_HEADER);
assertEquals(TEXT_PLAIN, acceptHeader);
final String contentType = request.getHeader(defaultContentTypeHeader);
assertEquals(InvokeHTTP.DEFAULT_CONTENT_TYPE, contentType);
runner.removeProperty(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND);
runner.removeProperty(defaultContentTypeHeader);
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
runner.enqueue(FLOW_FILE_CONTENT, attributes);
runner.run();
final RecordedRequest secondRequest = takeRequestCompleted();
assertNull("Accept Header found", secondRequest.getHeader(ACCEPT_HEADER));
assertNull("Default-Content-Type Header found", secondRequest.getHeader(defaultContentTypeHeader));
}
@Test
public void testRunGetHttp200SuccessResponseHeaderRequestFlowFileAttributes() {
setUrlProperty();
runner.setProperty(InvokeHTTP.PROP_ADD_HEADERS_TO_REQUEST, Boolean.TRUE.toString());
final String firstHeader = String.class.getSimpleName();
final String secondHeader = Integer.class.getSimpleName();
final MockResponse response = new MockResponse()
.setResponseCode(HTTP_OK)
.addHeader(REPEATED_HEADER, firstHeader)
.addHeader(REPEATED_HEADER, secondHeader);
mockWebServer.enqueue(response);
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final MockFlowFile requestFlowFile = getRequestFlowFile();
requestFlowFile.assertAttributeEquals(CONTENT_LENGTH_HEADER, Integer.toString(0));
final String repeatedHeaders = String.format("%s, %s", firstHeader, secondHeader);
requestFlowFile.assertAttributeEquals(REPEATED_HEADER, repeatedHeaders);
}
@Test
public void testRunGetHttp200SuccessCacheTagEnabled() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_USE_ETAG, Boolean.TRUE.toString());
assertRequestMethodSuccess(GET_METHOD);
}
@Test
public void testRunGetHttp200SuccessBasicAuthentication() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, String.class.getSimpleName());
runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, String.class.getName());
enqueueResponseCodeAndRun(HTTP_OK);
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String authorization = request.getHeader(AUTHORIZATION_HEADER);
assertNotNull("Authorization Header not found", authorization);
final Pattern basicAuthPattern = Pattern.compile("^Basic [^\\s]+$");
assertTrue("Basic Authentication not matched", basicAuthPattern.matcher(authorization).matches());
}
@Test
public void testRunGetHttp200SuccessDigestAuthentication() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, String.class.getSimpleName());
runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, String.class.getName());
runner.setProperty(InvokeHTTP.PROP_DIGEST_AUTH, Boolean.TRUE.toString());
final String realm = UUID.randomUUID().toString();
final String nonce = UUID.randomUUID().toString();
final String digestHeader = String.format("Digest realm=\"%s\", nonce=\"%s\"", realm, nonce);
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_UNAUTHORIZED).setHeader(AUTHENTICATE_HEADER, digestHeader));
enqueueResponseCodeAndRun(HTTP_OK);
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
assertNull("Authorization Header not found", request.getHeader(AUTHORIZATION_HEADER));
final RecordedRequest authenticatedRequest = takeRequestCompleted();
final String authorization = authenticatedRequest.getHeader(AUTHORIZATION_HEADER);
assertNotNull("Authorization Header not found", authorization);
assertTrue("Digest Realm not found", authorization.contains(realm));
assertTrue("Digest Nonce not found", authorization.contains(nonce));
}
@Test
public void testRunGetHttp200SuccessSslContextServiceServerTrusted() throws InitializationException, GeneralSecurityException {
assertResponseSuccessSslContextConfigured(generatedTlsConfiguration, truststoreTlsConfiguration);
}
@Test
public void testRunGetHttp200SuccessSslContextServiceMutualTrusted() throws InitializationException, GeneralSecurityException {
assertResponseSuccessSslContextConfigured(generatedTlsConfiguration, generatedTlsConfiguration);
}
@Test
public void testRunGetSslContextServiceMutualTrustedClientCertificateMissing() throws InitializationException, GeneralSecurityException {
runner.setProperty(InvokeHTTP.DISABLE_HTTP2_PROTOCOL, StringUtils.capitalize(Boolean.TRUE.toString()));
setSslContextConfiguration(generatedTlsConfiguration, truststoreTlsConfiguration);
mockWebServer.requireClientAuth();
setUrlProperty();
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
runner.assertAllFlowFilesTransferred(InvokeHTTP.REL_FAILURE);
final MockFlowFile flowFile = getFailureFlowFile();
flowFile.assertAttributeExists(InvokeHTTP.EXCEPTION_CLASS);
flowFile.assertAttributeExists(InvokeHTTP.EXCEPTION_MESSAGE);
}
@Test
public void testRunGetHttp200SuccessUserAgentConfigured() throws InterruptedException {
final String userAgent = UUID.randomUUID().toString();
runner.setProperty(InvokeHTTP.PROP_USERAGENT, userAgent);
enqueueResponseCodeAndRun(HTTP_OK);
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String userAgentHeader = request.getHeader(USER_AGENT_HEADER);
assertEquals(userAgent, userAgentHeader);
}
@Test
public void testRunGetHttp302NoRetryFollowRedirectsDefaultEnabled() {
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_MOVED_TEMP).setHeader(LOCATION_HEADER, getMockWebServerUrl()));
enqueueResponseCodeAndRun(HTTP_OK);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
}
@Test
public void testRunGetHttp302NoRetryFollowRedirectsDisabled() {
runner.setProperty(InvokeHTTP.PROP_FOLLOW_REDIRECTS, StringUtils.capitalize(Boolean.FALSE.toString()));
enqueueResponseCodeAndRun(HTTP_MOVED_TEMP);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_NO_RETRY, HTTP_MOVED_TEMP);
}
@Test
public void testRunGetHttp400NoRetryMinimumProperties() {
enqueueResponseCodeAndRun(HTTP_BAD_REQUEST);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_NO_RETRY, HTTP_BAD_REQUEST);
}
@Test
public void testRunGetHttp400NoRetryPenalizeNoRetry() {
runner.setProperty(InvokeHTTP.PROP_PENALIZE_NO_RETRY, Boolean.TRUE.toString());
enqueueResponseCodeAndRun(HTTP_BAD_REQUEST);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
runner.assertPenalizeCount(1);
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_NO_RETRY, HTTP_BAD_REQUEST);
}
@Test
public void testRunGetHttp500RetryMinimumProperties() {
enqueueResponseCodeAndRun(HTTP_INTERNAL_ERROR);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RETRY, HTTP_INTERNAL_ERROR);
}
@Test
public void testRunGetHttp500RetryOutputResponseRegardless() {
runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS, Boolean.TRUE.toString());
enqueueResponseCodeAndRun(HTTP_INTERNAL_ERROR);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RETRY, HTTP_INTERNAL_ERROR);
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_INTERNAL_ERROR);
}
@Test
public void testRunDeleteHttp200Success() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, DELETE_METHOD);
assertRequestMethodSuccess(DELETE_METHOD);
}
@Test
public void testRunHeadHttp200Success() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, HEAD_METHOD);
assertRequestMethodSuccess(HEAD_METHOD);
}
@Test
public void testRunOptionsHttp200Success() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, OPTIONS_METHOD);
assertRequestMethodSuccess(OPTIONS_METHOD);
}
@Test
public void testRunPatchHttp200Success() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, PATCH_METHOD);
assertRequestMethodSuccess(PATCH_METHOD);
}
@Test
public void testRunPostHttp200Success() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
assertRequestMethodSuccess(POST_METHOD);
}
@Test
public void testRunPostHttp200SuccessChunkedEncoding() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
runner.setProperty(InvokeHTTP.PROP_USE_CHUNKED_ENCODING, Boolean.TRUE.toString());
enqueueResponseCodeAndRun(HTTP_OK);
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String contentLength = request.getHeader(CONTENT_LENGTH_HEADER);
assertNull("Content-Length Request Header found", contentLength);
final String transferEncoding = request.getHeader(TRANSFER_ENCODING_HEADER);
assertEquals("chunked", transferEncoding);
}
@Test
public void testRunPostHttp200SuccessFormData() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
final String formName = "multipart-form";
runner.setProperty(InvokeHTTP.PROP_FORM_BODY_FORM_NAME, formName);
final String formDataParameter = String.class.getName();
final String formDataParameterName = "label";
final String formDataPropertyName = String.format("%s:%s", InvokeHTTP.FORM_BASE, formDataParameterName);
runner.setProperty(formDataPropertyName, formDataParameter);
setUrlProperty();
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String contentType = request.getHeader(CONTENT_TYPE_HEADER);
assertNotNull("Content Type not found", contentType);
final Pattern multipartPattern = Pattern.compile("^multipart/form-data.+$");
assertTrue("Content Type not matched", multipartPattern.matcher(contentType).matches());
final String body = request.getBody().readUtf8();
assertTrue("Form Data Parameter not found", body.contains(formDataParameter));
}
@Test
public void testRunPutHttp200Success() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, PUT_METHOD);
assertRequestMethodSuccess(PUT_METHOD);
}
private void setUrlProperty() {
runner.setProperty(InvokeHTTP.PROP_URL, getMockWebServerUrl());
}
private String getMockWebServerUrl() {
return mockWebServer.url(BASE_PATH).newBuilder().host(LOCALHOST).build().toString();
}
private void enqueueResponseCodeAndRun(final int responseCode) {
setUrlProperty();
mockWebServer.enqueue(new MockResponse().setResponseCode(responseCode));
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
}
private RecordedRequest takeRequestCompleted() throws InterruptedException {
final RecordedRequest request = mockWebServer.takeRequest(TAKE_REQUEST_COMPLETED_TIMEOUT, TimeUnit.SECONDS);
assertNotNull("Request not found", request);
return request;
}
private MockFlowFile getFailureFlowFile() {
return runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).iterator().next();
}
private MockFlowFile getRequestFlowFile() {
return runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).iterator().next();
}
private MockFlowFile getResponseFlowFile() {
return runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).iterator().next();
}
private void assertRequestMethodSuccess(final String method) throws InterruptedException {
enqueueResponseCodeAndRun(HTTP_OK);
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
assertEquals(method, request.getMethod());
}
private void assertRelationshipStatusCodeEquals(final Relationship relationship, final int statusCode) {
final List<MockFlowFile> responseFlowFiles = runner.getFlowFilesForRelationship(relationship);
final String message = String.format("FlowFiles not found for Relationship [%s]", relationship);
assertFalse(message, responseFlowFiles.isEmpty());
final MockFlowFile responseFlowFile = responseFlowFiles.iterator().next();
assertStatusCodeEquals(responseFlowFile, statusCode);
}
private void assertStatusCodeEquals(final MockFlowFile flowFile, final int statusCode) {
flowFile.assertAttributeEquals(InvokeHTTP.STATUS_CODE, Integer.toString(statusCode));
flowFile.assertAttributeExists(InvokeHTTP.STATUS_MESSAGE);
flowFile.assertAttributeExists(InvokeHTTP.TRANSACTION_ID);
flowFile.assertAttributeExists(InvokeHTTP.REQUEST_URL);
}
private void assertResponseSuccessRelationships() {
final List<LogMessage> errorMessages = runner.getLogger().getErrorMessages();
final Optional<LogMessage> errorMessage = errorMessages.stream().findFirst();
if (errorMessage.isPresent()) {
final String message = String.format("Error Message Logged: %s", errorMessage.get().getMsg());
assertFalse(message, errorMessages.isEmpty());
}
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
}
private void assertResponseSuccessSslContextConfigured(final TlsConfiguration serverTlsConfiguration, final TlsConfiguration clientTlsConfiguration) throws InitializationException, TlsException {
setSslContextConfiguration(serverTlsConfiguration, clientTlsConfiguration);
enqueueResponseCodeAndRun(HTTP_OK);
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).iterator().next();
flowFile.assertAttributeExists(InvokeHTTP.REMOTE_DN);
}
private void setSslContextConfiguration(final TlsConfiguration serverTlsConfiguration, final TlsConfiguration clientTlsConfiguration) throws InitializationException, TlsException {
final SSLContextService sslContextService = setSslContextService();
final SSLContext serverSslContext = getSslContext(serverTlsConfiguration);
setMockWebServerSslSocketFactory(serverSslContext);
final SSLContext clientSslContext = getSslContext(clientTlsConfiguration);
when(sslContextService.createContext()).thenReturn(clientSslContext);
when(sslContextService.createTlsConfiguration()).thenReturn(clientTlsConfiguration);
}
private SSLContextService setSslContextService() throws InitializationException {
final String serviceIdentifier = SSLContextService.class.getName();
final SSLContextService sslContextService = mock(SSLContextService.class);
when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
runner.addControllerService(serviceIdentifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier);
runner.setProperty(InvokeHTTP.PROP_READ_TIMEOUT, TLS_CONNECTION_TIMEOUT);
runner.setProperty(InvokeHTTP.PROP_CONNECT_TIMEOUT, TLS_CONNECTION_TIMEOUT);
return sslContextService;
}
private void setMockWebServerSslSocketFactory(final SSLContext sslContext) {
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
if (sslSocketFactory == null) {
throw new IllegalArgumentException("Socket Factory not found");
}
mockWebServer.useHttps(sslSocketFactory, false);
}
private SSLContext getSslContext(final TlsConfiguration configuration) throws TlsException {
final SSLContext sslContext = SslContextFactory.createSslContext(configuration);
if (sslContext == null) {
throw new IllegalArgumentException("SSLContext not found for TLS Configuration");
}
return sslContext;
}
}

View File

@ -1,448 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import javax.net.ssl.SSLContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestInvokeHTTP extends TestInvokeHttpCommon {
private static final Logger logger = LoggerFactory.getLogger(TestInvokeHTTP.class);
private static TlsConfiguration tlsConfiguration;
@BeforeClass
public static void beforeClass() throws Exception {
// generate new keystore and truststore
tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
configureServer(null, null);
}
@AfterClass
public static void afterClass() throws Exception {
if (tlsConfiguration != null) {
try {
if (StringUtils.isNotBlank(tlsConfiguration.getKeystorePath())) {
Files.deleteIfExists(Paths.get(tlsConfiguration.getKeystorePath()));
}
} catch (IOException e) {
throw new IOException("There was an error deleting a keystore: " + e.getMessage(), e);
}
try {
if (StringUtils.isNotBlank(tlsConfiguration.getTruststorePath())) {
Files.deleteIfExists(Paths.get(tlsConfiguration.getTruststorePath()));
}
} catch (IOException e) {
throw new IOException("There was an error deleting a truststore: " + e.getMessage(), e);
}
}
}
@Before
public void before() throws Exception {
runner = TestRunners.newTestRunner(InvokeHTTP.class);
}
@Test
public void testSslSetHttpRequest() throws Exception {
final String serviceIdentifier = SSLContextService.class.getName();
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
Mockito.when(sslContextService.createTlsConfiguration()).thenReturn(tlsConfiguration);
runner = TestRunners.newTestRunner(InvokeHTTP.class);
runner.addControllerService(serviceIdentifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier);
addHandler(new GetOrHeadHandler());
runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
// expected in response
// status code, status message, all headers from server response --> ff attributes
// server response message body into payload of ff
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("/status/200".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
}
// Currently InvokeHttp does not support Proxy via Https
@Test
public void testProxy() throws Exception {
addHandler(new MyProxyHandler());
URL proxyURL = new URL(url);
runner.setVariable("proxy.host", proxyURL.getHost());
runner.setVariable("proxy.port", String.valueOf(proxyURL.getPort()));
runner.setVariable("proxy.username", "username");
runner.setVariable("proxy.password", "password");
runner.setProperty(InvokeHTTP.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, "${proxy.host}");
try {
runner.run();
Assert.fail();
} catch (AssertionError e) {
// Expect assertion error when proxy port isn't set but host is.
}
runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, "${proxy.port}");
runner.setProperty(InvokeHTTP.PROP_PROXY_USER, "${proxy.username}");
try {
runner.run();
Assert.fail();
} catch (AssertionError e) {
// Expect assertion error when proxy password isn't set but host is.
}
runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, "${proxy.password}");
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
//expected in response
//status code, status message, all headers from server response --> ff attributes
//server response message body into payload of ff
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("http://nifi.apache.org/".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
}
@Test
public void testFailingHttpRequest() throws Exception {
runner = TestRunners.newTestRunner(InvokeHTTP.class);
// Remember: we expect that connecting to the following URL should raise a Java exception
runner.setProperty(InvokeHTTP.PROP_URL, "http://127.0.0.1:0");
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
runner.assertPenalizeCount(1);
// expected in request java.exception
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
bundle.assertAttributeEquals(InvokeHTTP.EXCEPTION_CLASS, "java.lang.IllegalArgumentException");
}
public static class MyProxyHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
if ("Get".equalsIgnoreCase(request.getMethod())) {
response.setStatus(200);
String proxyPath = baseRequest.getHttpURI().toString();
response.setContentLength(proxyPath.length());
response.setContentType("text/plain");
try (PrintWriter writer = response.getWriter()) {
writer.print(proxyPath);
writer.flush();
}
} else {
response.setStatus(404);
response.setContentType("text/plain");
response.setContentLength(0);
}
}
}
@Test
public void testOnPropertyModified() throws Exception {
final InvokeHTTP processor = new InvokeHTTP();
final Field regexAttributesToSendField = InvokeHTTP.class.getDeclaredField("regexAttributesToSend");
regexAttributesToSendField.setAccessible(true);
assertNull(regexAttributesToSendField.get(processor));
// Set Attributes to Send.
processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, null, "uuid");
assertNotNull(regexAttributesToSendField.get(processor));
// Null clear Attributes to Send. NIFI-1125: Throws NullPointerException.
processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "uuid", null);
assertNull(regexAttributesToSendField.get(processor));
// Set Attributes to Send.
processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, null, "uuid");
assertNotNull(regexAttributesToSendField.get(processor));
// Clear Attributes to Send with empty string.
processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "uuid", "");
assertNull(regexAttributesToSendField.get(processor));
}
@Test
public void testEmptyGzipHttpReponse() throws Exception {
addHandler(new EmptyGzipResponseHandler());
runner.setProperty(InvokeHTTP.PROP_URL, url);
runner.setProperty(InvokeHTTP.IGNORE_RESPONSE_CONTENT, "true");
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
//expected empty content in response FlowFile
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle.assertContentEquals(new byte[0]);
bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
bundle.assertAttributeEquals("Content-Type", "text/plain");
}
@Test
public void testShouldAllowExtension() {
// Arrange
class ExtendedInvokeHTTP extends InvokeHTTP {
private final int extendedNumber;
public ExtendedInvokeHTTP(int num) {
super();
this.extendedNumber = num;
}
public int extendedMethod() {
return this.extendedNumber;
}
}
int num = Double.valueOf(Math.random() * 100).intValue();
// Act
ExtendedInvokeHTTP eih = new ExtendedInvokeHTTP(num);
// Assert
assertEquals(num, eih.extendedMethod());
}
public static class EmptyGzipResponseHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) {
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentLength(0);
response.setContentType("text/plain");
response.setHeader("Content-Encoding", "gzip");
}
}
@Test
public void testShouldNotSendUserAgentByDefault() throws Exception {
// Arrange
addHandler(new EchoUserAgentHandler());
runner.setProperty(InvokeHTTP.PROP_URL, url);
createFlowFiles(runner);
// Act
runner.run();
// Assert
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
String content = new String(response.toByteArray(), UTF_8);
logger.info("Returned flowfile content: " + content);
assertTrue(content.isEmpty());
response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
}
@Test
public void testShouldSetUserAgentExplicitly() throws Exception {
addHandler(new EchoUserAgentHandler());
runner.setProperty(InvokeHTTP.PROP_USERAGENT, "Apache NiFi For The Win");
runner.setProperty(InvokeHTTP.PROP_URL, url);
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
String content = new String(response.toByteArray(), UTF_8);
assertTrue(content.startsWith("Apache NiFi For The Win"));
response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
}
@Test
public void testShouldSetUserAgentWithExpressionLanguage() throws Exception {
addHandler(new EchoUserAgentHandler());
runner.setProperty(InvokeHTTP.PROP_URL, url);
runner.setProperty(InvokeHTTP.PROP_USERAGENT, "${literal('And now for something completely different...')}");
createFlowFiles(runner);
runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
// One check to verify a custom value and that the expression language actually works.
response.assertContentEquals("And now for something completely different...");
response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
}
public static class EchoUserAgentHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
if ("Get".equalsIgnoreCase(request.getMethod())) {
response.setStatus(200);
String useragent = request.getHeader("User-agent");
response.setContentLength(useragent.length());
response.setContentType("text/plain");
try (PrintWriter writer = response.getWriter()) {
writer.print(useragent);
writer.flush();
}
} else {
response.setStatus(404);
response.setContentType("text/plain");
response.setContentLength(0);
}
}
}
}

View File

@ -1,117 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.mockito.Mockito;
/**
* Executes the same tests as TestInvokeHttp but with one-way SSL enabled. The Jetty server created for these tests
* will not require client certificates and the client will not use keystore properties in the SSLContextService.
*/
public class TestInvokeHttpSSL extends TestInvokeHttpCommon {
private static final String HTTP_CONNECT_TIMEOUT = "30 s";
private static final String HTTP_READ_TIMEOUT = "30 s";
protected static TlsConfiguration serverConfiguration;
private static SSLContext truststoreSslContext;
private static TlsConfiguration truststoreConfiguration;
@BeforeClass
public static void beforeClass() throws Exception {
// generate new keystore and truststore
serverConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
truststoreConfiguration = new StandardTlsConfiguration(
null,
null,
null,
serverConfiguration.getTruststorePath(),
serverConfiguration.getTruststorePassword(),
serverConfiguration.getTruststoreType()
);
final SSLContext serverContext = SslContextFactory.createSslContext(serverConfiguration);
configureServer(serverContext, ClientAuth.NONE);
truststoreSslContext = SslContextFactory.createSslContext(truststoreConfiguration);
}
@AfterClass
public static void afterClass() throws Exception {
if (serverConfiguration != null) {
try {
if (StringUtils.isNotBlank(serverConfiguration.getKeystorePath())) {
Files.deleteIfExists(Paths.get(serverConfiguration.getKeystorePath()));
}
} catch (IOException e) {
throw new IOException("There was an error deleting a keystore: " + e.getMessage(), e);
}
try {
if (StringUtils.isNotBlank(serverConfiguration.getTruststorePath())) {
Files.deleteIfExists(Paths.get(serverConfiguration.getTruststorePath()));
}
} catch (IOException e) {
throw new IOException("There was an error deleting a truststore: " + e.getMessage(), e);
}
}
}
@Before
public void before() throws Exception {
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
final String serviceIdentifier = SSLContextService.class.getName();
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
Mockito.when(sslContextService.createContext()).thenReturn(getClientSslContext());
Mockito.when(sslContextService.createTlsConfiguration()).thenReturn(getClientConfiguration());
runner = TestRunners.newTestRunner(InvokeHTTP.class);
runner.addControllerService(serviceIdentifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier);
runner.setProperty(InvokeHTTP.PROP_CONNECT_TIMEOUT, HTTP_CONNECT_TIMEOUT);
runner.setProperty(InvokeHTTP.PROP_READ_TIMEOUT, HTTP_READ_TIMEOUT);
}
protected SSLContext getClientSslContext() {
return truststoreSslContext;
}
protected TlsConfiguration getClientConfiguration() {
return truststoreConfiguration;
}
}

View File

@ -1,83 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TlsConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
* This is probably overkill but in keeping with the same pattern as the TestInvokeHttp and TestInvokeHttpSSL class,
* we will execute the same tests using two-way SSL. The Jetty server created for these tests will require client
* certificates and the client will utilize keystore properties in the SSLContextService.
*/
public class TestInvokeHttpTwoWaySSL extends TestInvokeHttpSSL {
private static TlsConfiguration serverConfig;
private static SSLContext clientSslContext;
@BeforeClass
public static void beforeClass() throws Exception {
// generate new keystore and truststore
serverConfig = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
final SSLContext serverContext = SslContextFactory.createSslContext(serverConfig);
configureServer(serverContext, ClientAuth.REQUIRED);
clientSslContext = SslContextFactory.createSslContext(serverConfig);
}
@AfterClass
public static void afterClass() throws Exception {
if (serverConfig != null) {
try {
if (StringUtils.isNotBlank(serverConfig.getKeystorePath())) {
Files.deleteIfExists(Paths.get(serverConfig.getKeystorePath()));
}
} catch (IOException e) {
throw new IOException("There was an error deleting a keystore: " + e.getMessage(), e);
}
try {
if (StringUtils.isNotBlank(serverConfig.getTruststorePath())) {
Files.deleteIfExists(Paths.get(serverConfig.getTruststorePath()));
}
} catch (IOException e) {
throw new IOException("There was an error deleting a truststore: " + e.getMessage(), e);
}
}
}
@Override
protected SSLContext getClientSslContext() {
return clientSslContext;
}
@Override
protected TlsConfiguration getClientConfiguration() {
return serverConfig;
}
}

View File

@ -17,8 +17,6 @@
package org.apache.nifi.processors.standard;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@ -28,12 +26,12 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HttpsURLConnection;
@ -58,7 +56,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.KeystoreType;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration;
@ -81,7 +78,6 @@ import org.mockito.Mockito;
import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
public class TestListenHTTP {
@ -95,22 +91,12 @@ public class TestListenHTTP {
private final static String BASEPATH_VARIABLE = "HTTP_BASEPATH";
private final static String HTTP_SERVER_BASEPATH_EL = "${" + BASEPATH_VARIABLE + "}";
private static final String KEYSTORE = "src/test/resources/keystore.jks";
private static final String KEYSTORE_PASSWORD = "passwordpassword";
private static final KeystoreType KEYSTORE_TYPE = KeystoreType.JKS;
private static final String TRUSTSTORE = "src/test/resources/truststore.jks";
private static final String TRUSTSTORE_PASSWORD = "passwordpassword";
private static final KeystoreType TRUSTSTORE_TYPE = KeystoreType.JKS;
private static final String CLIENT_KEYSTORE = "src/test/resources/client-keystore.p12";
private static final KeystoreType CLIENT_KEYSTORE_TYPE = KeystoreType.PKCS12;
private static final String MULTIPART_ATTRIBUTE = "http.multipart.name";
private static final String TLS_1_3 = "TLSv1.3";
private static final String TLS_1_2 = "TLSv1.2";
private static final String LOCALHOST = "localhost";
private static final long SEND_REQUEST_SLEEP = 150;
private static final long RESPONSE_TIMEOUT = 1200000;
private static final int SOCKET_CONNECT_TIMEOUT = 100;
private static final long SERVER_START_TIMEOUT = 1200000;
@ -217,9 +203,8 @@ public class TestListenHTTP {
}
@After
public void teardown() {
public void shutdownServer() {
proc.shutdownHttpServer();
new File("my-file-text.txt").delete();
}
@Test
@ -484,15 +469,18 @@ public class TestListenHTTP {
return connection;
}
private int executePOST(String message, boolean secure, boolean twoWaySsl) throws Exception {
private int postMessage(String message, boolean secure, boolean clientAuthRequired) throws Exception {
String endpointUrl = buildUrl(secure);
final URL url = new URL(endpointUrl);
HttpURLConnection connection;
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
if (secure) {
connection = buildSecureConnection(twoWaySsl, url);
if (connection instanceof HttpsURLConnection) {
final HttpsURLConnection httpsConnection = (HttpsURLConnection) connection;
if (clientAuthRequired) {
httpsConnection.setSSLSocketFactory(keyStoreSslContext.getSocketFactory());
} else {
connection = (HttpURLConnection) url.openConnection();
httpsConnection.setSSLSocketFactory(trustStoreSslContext.getSocketFactory());
}
}
connection.setRequestMethod(HTTP_POST_METHOD);
connection.setDoOutput(true);
@ -507,18 +495,6 @@ public class TestListenHTTP {
return connection.getResponseCode();
}
private static HttpsURLConnection buildSecureConnection(boolean twoWaySsl, URL url) throws IOException {
final HttpsURLConnection connection = (HttpsURLConnection) url.openConnection();
if (twoWaySsl) {
// Use a client certificate, do not reuse the server's keystore
connection.setSSLSocketFactory(keyStoreSslContext.getSocketFactory());
} else {
// With one-way SSL, the client still needs a truststore
connection.setSSLSocketFactory(trustStoreSslContext.getSocketFactory());
}
return connection;
}
private String buildUrl(final boolean secure) {
return String.format("%s://localhost:%s/%s", secure ? "https" : "http", availablePort, HTTP_BASE_PATH);
}
@ -572,41 +548,14 @@ public class TestListenHTTP {
}
}
private void startWebServerAndSendRequests(Runnable sendRequestToWebserver, int numberOfExpectedFlowFiles) throws Exception {
private void startWebServerAndSendMessages(final List<String> messages, final int expectedStatusCode, final boolean secure, final boolean clientAuthRequired) throws Exception {
startWebServer();
new Thread(sendRequestToWebserver).start();
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < numberOfExpectedFlowFiles && (System.currentTimeMillis() - startTime < RESPONSE_TIMEOUT)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
Thread.sleep(SEND_REQUEST_SLEEP);
}
runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, numberOfExpectedFlowFiles);
}
private void startWebServerAndSendMessages(final List<String> messages, int returnCode, boolean secure, boolean twoWaySsl)
throws Exception {
Runnable sendMessagesToWebServer = () -> {
try {
for (final String message : messages) {
if (executePOST(message, secure, twoWaySsl) != returnCode) {
fail("HTTP POST failed.");
final int statusCode = postMessage(message, secure, clientAuthRequired);
assertEquals("HTTP Status Code not matched", expectedStatusCode, statusCode);
}
}
} catch (Exception e) {
e.printStackTrace();
fail("Not expecting error here.");
}
};
startWebServerAndSendRequests(sendMessagesToWebServer, messages.size());
}
private void configureProcessorSslContextService(final ListenHTTP.ClientAuthentication clientAuthentication,
final TlsConfiguration tlsConfiguration) throws InitializationException {
@ -627,28 +576,25 @@ public class TestListenHTTP {
runner.enableControllerService(sslContextService);
}
@Test
public void testMultipartFormDataRequest() throws Exception {
public void testMultipartFormDataRequest() throws IOException {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_OK));
final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class);
final boolean isSecure = (sslContextService != null);
startWebServer();
Runnable sendRequestToWebserver = () -> {
try {
File file1 = createTextFile("my-file-text-", ".txt", "Hello", "World");
File file2 = createTextFile("my-file-text-", ".txt", "{ \"name\":\"John\", \"age\":30 }");
File file1 = createTextFile("Hello", "World");
File file2 = createTextFile("{ \"name\":\"John\", \"age\":30 }");
MultipartBody multipartBody = new MultipartBody.Builder().setType(MultipartBody.FORM)
.addFormDataPart("p1", "v1")
.addFormDataPart("p2", "v2")
.addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), file1))
.addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), file2))
.addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
.addFormDataPart("file1", "my-file-text.txt", RequestBody.create(file1, MediaType.parse("text/plain")))
.addFormDataPart("file2", "my-file-data.json", RequestBody.create(file2, MediaType.parse("application/json")))
.addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream")))
.build();
Request request =
@ -666,36 +612,30 @@ public class TestListenHTTP {
try (Response response = client.newCall(request).execute()) {
Files.deleteIfExists(Paths.get(String.valueOf(file1)));
Files.deleteIfExists(Paths.get(String.valueOf(file2)));
Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body()), response.isSuccessful());
}
} catch (final Throwable t) {
t.printStackTrace();
Assert.fail(t.toString());
}
};
startWebServerAndSendRequests(sendRequestToWebserver, 5);
runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ListenHTTP.RELATIONSHIP_SUCCESS);
// Part fragments are not processed in the order we submitted them.
// We cannot rely on the order we sent them in.
MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1");
MockFlowFile mff = findFlowFile(flowFilesForRelationship, "p1");
mff.assertAttributeEquals("http.multipart.name", "p1");
mff.assertAttributeExists("http.multipart.size");
mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1");
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2");
mff = findFlowFile(flowFilesForRelationship, "p2");
mff.assertAttributeEquals("http.multipart.name", "p2");
mff.assertAttributeExists("http.multipart.size");
mff.assertAttributeExists("http.multipart.fragments.sequence.number");
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1");
mff = findFlowFile(flowFilesForRelationship, "file1");
mff.assertAttributeEquals("http.multipart.name", "file1");
mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain");
@ -704,7 +644,7 @@ public class TestListenHTTP {
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2");
mff = findFlowFile(flowFilesForRelationship, "file2");
mff.assertAttributeEquals("http.multipart.name", "file2");
mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json");
mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json");
@ -713,7 +653,7 @@ public class TestListenHTTP {
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3");
mff = findFlowFile(flowFilesForRelationship, "file3");
mff.assertAttributeEquals("http.multipart.name", "file3");
mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin");
mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream");
@ -723,23 +663,24 @@ public class TestListenHTTP {
mff.assertAttributeExists("http.headers.multipart.content-disposition");
}
private byte[] generateRandomBinaryData(int i) {
private byte[] generateRandomBinaryData() {
byte[] bytes = new byte[100];
new Random().nextBytes(bytes);
return bytes;
}
private File createTextFile(String prefix, String extension, String...lines) throws IOException {
Path file = Files.createTempFile(prefix, extension);
try (FileOutputStream fos = new FileOutputStream(file.toFile())) {
private File createTextFile(String...lines) throws IOException {
final File textFile = Files.createTempFile(TestListenHTTP.class.getSimpleName(), ".txt").toFile();
textFile.deleteOnExit();
try (FileOutputStream fos = new FileOutputStream(textFile)) {
IOUtils.writeLines(Arrays.asList(lines), System.lineSeparator(), fos, Charsets.UTF_8);
}
return file.toFile();
return textFile;
}
protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
Assert.assertTrue(optional.isPresent());
return optional.get();
protected MockFlowFile findFlowFile(final List<MockFlowFile> flowFiles, final String attributeValue) {
final Optional<MockFlowFile> foundFlowFile = flowFiles.stream().filter(flowFile -> flowFile.getAttribute(MULTIPART_ATTRIBUTE).equals(attributeValue)).findFirst();
return foundFlowFile.orElseThrow(() -> new NullPointerException(MULTIPART_ATTRIBUTE));
}
}

View File

@ -24,27 +24,26 @@ import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ServerSocketFactory;
public class TCPTestServer implements Runnable {
private final InetAddress ipAddress;
private int port;
private final String messageDelimiter;
private final ArrayBlockingQueue<List<Byte>> queue;
private final AtomicInteger totalNumConnections = new AtomicInteger();
private final boolean closeOnMessageReceived;
private volatile ServerSocket serverSocket;
private final ArrayBlockingQueue<List<Byte>> recvQueue;
private volatile Socket connectionSocket;
public final static String DEFAULT_MESSAGE_DELIMITER = "\n";
private volatile int totalNumConnections = 0;
private int port;
public TCPTestServer(final InetAddress ipAddress, final ArrayBlockingQueue<List<Byte>> recvQueue) {
this(ipAddress, recvQueue, DEFAULT_MESSAGE_DELIMITER);
}
public TCPTestServer(final InetAddress ipAddress, final ArrayBlockingQueue<List<Byte>> recvQueue, final String messageDelimiter) {
public TCPTestServer(final InetAddress ipAddress, final ArrayBlockingQueue<List<Byte>> queue, final String messageDelimiter, final boolean closeOnMessageReceived) {
this.ipAddress = ipAddress;
this.recvQueue = recvQueue;
this.queue = queue;
this.messageDelimiter = messageDelimiter;
this.closeOnMessageReceived = closeOnMessageReceived;
}
public synchronized void startServer(final ServerSocketFactory serverSocketFactory) throws Exception {
@ -91,7 +90,10 @@ public class TCPTestServer implements Runnable {
}
private void storeReceivedMessage(final List<Byte> message) {
recvQueue.add(message);
queue.add(message);
if (closeOnMessageReceived) {
shutdownConnection();
}
}
private boolean isServerRunning() {
@ -102,18 +104,14 @@ public class TCPTestServer implements Runnable {
return connectionSocket != null && !connectionSocket.isClosed();
}
public List<Byte> getReceivedMessage() {
return recvQueue.poll();
}
public int getTotalNumConnections() {
return totalNumConnections;
return totalNumConnections.get();
}
protected boolean isDelimiterPresent(final List<Byte> message) {
if (messageDelimiter != null && message.size() >= messageDelimiter.length()) {
for (int i = 1; i <= messageDelimiter.length(); i++) {
if (message.get(message.size() - i).byteValue() == messageDelimiter.charAt(messageDelimiter.length() - i)) {
if (message.get(message.size() - i) == messageDelimiter.charAt(messageDelimiter.length() - i)) {
if (i == messageDelimiter.length()) {
return true;
}
@ -142,12 +140,12 @@ public class TCPTestServer implements Runnable {
try {
while (isServerRunning()) {
connectionSocket = serverSocket.accept();
totalNumConnections++;
InputStream in = connectionSocket.getInputStream();
totalNumConnections.incrementAndGet();
final InputStream inputStream = connectionSocket.getInputStream();
while (isConnected()) {
final List<Byte> message = new ArrayList<Byte>();
final List<Byte> message = new ArrayList<>();
while (true) {
final int c = in.read();
final int c = inputStream.read();
if (c < 0) {
if (!message.isEmpty()) {
storeReceivedMessage(message);

View File

@ -65,7 +65,7 @@ public abstract class TestPutTCPCommon {
private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
private TCPTestServer server;
private int tcp_server_port;
private int port;
private ArrayBlockingQueue<List<Byte>> recvQueue;
public ServerSocketFactory serverSocketFactory;
@ -82,20 +82,24 @@ public abstract class TestPutTCPCommon {
@Before
public void setup() throws Exception {
recvQueue = new ArrayBlockingQueue<List<Byte>>(BUFFER_SIZE);
recvQueue = new ArrayBlockingQueue<>(BUFFER_SIZE);
runner = TestRunners.newTestRunner(PutTCP.class);
runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
}
private synchronized TCPTestServer createTestServer(final String address, final ArrayBlockingQueue<List<Byte>> recvQueue, final String delimiter) throws Exception {
TCPTestServer server = new TCPTestServer(InetAddress.getByName(address), recvQueue, delimiter);
private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter, final boolean closeOnMessageReceived) throws Exception {
TCPTestServer server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), queue, delimiter, closeOnMessageReceived);
server.startServer(serverSocketFactory);
tcp_server_port = server.getPort();
port = server.getPort();
return server;
}
private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter) throws Exception {
return createTestServer(queue, delimiter, false);
}
@After
public void cleanup() throws Exception {
public void cleanup() {
runner.shutdown();
removeTestServer(server);
}
@ -103,14 +107,13 @@ public abstract class TestPutTCPCommon {
private void removeTestServer(TCPTestServer server) {
if (server != null) {
server.shutdown();
server = null;
}
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testValidFiles() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@ -119,8 +122,8 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testValidFilesEL() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS_EL, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS_EL, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@ -129,8 +132,8 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testPruneSenders() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(VALID_FILES.length, 0);
@ -149,18 +152,18 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testMultiCharDelimiter() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testConnectionPerFlowFile() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, true, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER, true);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@ -169,8 +172,8 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testConnectionFailure() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@ -182,8 +185,8 @@ public abstract class TestPutTCPCommon {
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@ -192,8 +195,8 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testEmptyFile() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(EMPTY_FILE);
Thread.sleep(10);
checkRelationships(EMPTY_FILE.length, 0);
@ -203,9 +206,9 @@ public abstract class TestPutTCPCommon {
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testlargeValidFile() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, true, true);
public void testLargeValidFile() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
sendTestData(testData);
checkReceivedAllData(recvQueue, testData);
@ -216,8 +219,8 @@ public abstract class TestPutTCPCommon {
@Ignore("This test is failing intermittently as documented in NIFI-4288")
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testInvalidIPAddress() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(INVALID_IP_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(INVALID_IP_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(0, VALID_FILES.length);
@ -229,8 +232,8 @@ public abstract class TestPutTCPCommon {
@Ignore("This test is failing intermittently as documented in NIFI-4288")
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testUnknownHostname() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(UNKNOWN_HOST, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(UNKNOWN_HOST, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(0, VALID_FILES.length);
@ -249,10 +252,10 @@ public abstract class TestPutTCPCommon {
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testLoadTest() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
Thread.sleep(1000);
final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
checkReceivedAllData(recvQueue, testData, LOAD_TEST_ITERATIONS);
checkInputQueueIsEmpty();
@ -276,7 +279,7 @@ public abstract class TestPutTCPCommon {
for (String item : testData) {
runner.enqueue(item.getBytes());
}
runner.run(testData.length, false, i == 0 ? true : false);
runner.run(testData.length, false, i == 0);
}
}
@ -292,7 +295,10 @@ public abstract class TestPutTCPCommon {
private void checkEmptyMessageReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
Thread.sleep(DATA_WAIT_PERIOD);
assertEquals(0, recvQueue.poll().size());
final List<Byte> message = recvQueue.poll();
assertNotNull(message);
assertEquals(0, message.size());
}
private void checkInputQueueIsEmpty() {