NIFI-830 Added FlowFile Naming Strategy to InvokeHTTP

This closes #5475

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2021-10-20 00:35:40 +02:00 committed by exceptionfactory
parent fe4161b1c2
commit 6fd1f03bd6
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 188 additions and 54 deletions

View File

@ -81,6 +81,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -96,6 +97,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
import org.apache.nifi.processors.standard.util.ProxyAuthenticator;
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
import org.apache.nifi.proxy.ProxyConfiguration;
@ -166,6 +168,14 @@ public class InvokeHTTP extends AbstractProcessor {
public static final String HTTP = "http";
public static final String HTTPS = "https";
public static final String GET_METHOD = "GET";
public static final String POST_METHOD = "POST";
public static final String PUT_METHOD = "PUT";
public static final String PATCH_METHOD = "PATCH";
public static final String DELETE_METHOD = "DELETE";
public static final String HEAD_METHOD = "HEAD";
public static final String OPTIONS_METHOD = "OPTIONS";
private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = Pattern.compile("post:form:(?<formDataName>.*)$");
private static final String FORM_DATA_NAME_GROUP = "formDataName";
@ -175,7 +185,7 @@ public class InvokeHTTP extends AbstractProcessor {
.description("HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported. "
+ "Methods other than POST, PUT and PATCH will be sent without a message body.")
.required(true)
.defaultValue("GET")
.defaultValue(GET_METHOD)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.build();
@ -484,6 +494,19 @@ public class InvokeHTTP extends AbstractProcessor {
.allowableValues("True", "False")
.build();
public static final PropertyDescriptor FLOW_FILE_NAMING_STRATEGY = new PropertyDescriptor.Builder()
.name("flow-file-naming-strategy")
.description("Determines the strategy used for setting the filename attribute of the FlowFile.")
.displayName("FlowFile Naming Strategy")
.required(true)
.defaultValue(FlowFileNamingStrategy.RANDOM.name())
.allowableValues(
Arrays.stream(FlowFileNamingStrategy.values()).map(strategy ->
new AllowableValue(strategy.name(), strategy.name(), strategy.getDescription())
).toArray(AllowableValue[]::new)
)
.build();
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
@ -499,6 +522,7 @@ public class InvokeHTTP extends AbstractProcessor {
PROP_DATE_HEADER,
PROP_FOLLOW_REDIRECTS,
DISABLE_HTTP2_PROTOCOL,
FLOW_FILE_NAMING_STRATEGY,
PROP_ATTRIBUTES_TO_SEND,
PROP_USERAGENT,
PROP_BASIC_AUTH_USERNAME,
@ -817,7 +841,7 @@ public class InvokeHTTP extends AbstractProcessor {
}
String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase();
if ("POST".equals(request) || "PUT".equals(request) || "PATCH".equals(request)) {
if (POST_METHOD.equals(request) || PUT_METHOD.equals(request) || PATCH_METHOD.equals(request)) {
return;
} else if (putToAttribute) {
requestFlowFile = session.create();
@ -912,6 +936,14 @@ public class InvokeHTTP extends AbstractProcessor {
// this will overwrite any existing flowfile attributes
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp));
// update FlowFile's filename attribute with an extracted value from the remote URL
if (FlowFileNamingStrategy.URL_PATH.equals(getFlowFileNamingStrategy(context)) && GET_METHOD.equals(httpRequest.method())) {
String fileName = getFileNameFromUrl(url);
if (fileName != null) {
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.FILENAME.key(), fileName);
}
}
// transfer the message body to the payload
// can potentially be null in edge cases
if (bodyExists) {
@ -995,7 +1027,6 @@ public class InvokeHTTP extends AbstractProcessor {
}
}
private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) {
final Request.Builder requestBuilder = new Request.Builder();
@ -1013,25 +1044,25 @@ public class InvokeHTTP extends AbstractProcessor {
// set the request method
String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
switch (method) {
case "GET":
case GET_METHOD:
requestBuilder.get();
break;
case "POST":
case POST_METHOD:
RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBuilder.post(requestBody);
break;
case "PUT":
case PUT_METHOD:
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBuilder.put(requestBody);
break;
case "PATCH":
case PATCH_METHOD:
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBuilder.patch(requestBody);
break;
case "HEAD":
case HEAD_METHOD:
requestBuilder.head();
break;
case "DELETE":
case DELETE_METHOD:
requestBuilder.delete();
break;
default:
@ -1151,7 +1182,6 @@ public class InvokeHTTP extends AbstractProcessor {
}
}
private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode) {
// check if we should yield the processor
if (!isSuccess(statusCode) && request == null) {
@ -1272,4 +1302,20 @@ public class InvokeHTTP extends AbstractProcessor {
private static File getETagCacheDir() throws IOException {
return Files.createTempDirectory(InvokeHTTP.class.getSimpleName()).toFile();
}
private FlowFileNamingStrategy getFlowFileNamingStrategy(final ProcessContext context) {
final String strategy = context.getProperty(FLOW_FILE_NAMING_STRATEGY).getValue();
return FlowFileNamingStrategy.valueOf(strategy);
}
private String getFileNameFromUrl(URL url) {
String fileName = null;
String path = StringUtils.removeEnd(url.getPath(), "/");
if (!StringUtils.isEmpty(path)) {
fileName = path.substring(path.lastIndexOf('/') + 1);
}
return fileName;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.http;
public enum FlowFileNamingStrategy {
RANDOM("FlowFile filename attribute will be a random value."),
URL_PATH("FlowFile filename attribute will be extracted from the remote URL path. "
+ "The attribute may contain URL encoded characters. "
+ "If the path doesn't exist, the attribute will be a random value.");
private final String description;
FlowFileNamingStrategy(final String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}

View File

@ -22,6 +22,7 @@ import okhttp3.mockwebserver.RecordedRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
@ -33,16 +34,13 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
@ -53,17 +51,33 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_MOVED_TEMP;
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.apache.nifi.processors.standard.InvokeHTTP.GET_METHOD;
import static org.apache.nifi.processors.standard.InvokeHTTP.POST_METHOD;
import static org.apache.nifi.processors.standard.InvokeHTTP.PUT_METHOD;
import static org.apache.nifi.processors.standard.InvokeHTTP.PATCH_METHOD;
import static org.apache.nifi.processors.standard.InvokeHTTP.DELETE_METHOD;
import static org.apache.nifi.processors.standard.InvokeHTTP.HEAD_METHOD;
import static org.apache.nifi.processors.standard.InvokeHTTP.OPTIONS_METHOD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -96,24 +110,12 @@ public class InvokeHTTPTest {
private static final String REPEATED_HEADER = "Repeated";
private static final String GET_METHOD = "GET";
private static final String DELETE_METHOD = "DELETE";
private static final String HEAD_METHOD = "HEAD";
private static final String OPTIONS_METHOD = "OPTIONS";
private static final String POST_METHOD = "POST";
private static final String PATCH_METHOD = "PATCH";
private static final String PUT_METHOD = "PUT";
private static final String TEXT_PLAIN = "text/plain";
private static final String FLOW_FILE_CONTENT = String.class.getName();
private static final String FLOW_FILE_INITIAL_FILENAME = Double.class.getName();
private static final int TAKE_REQUEST_COMPLETED_TIMEOUT = 1;
private static final String TLS_CONNECTION_TIMEOUT = "60 s";
@ -126,7 +128,7 @@ public class InvokeHTTPTest {
private TestRunner runner;
@BeforeClass
@BeforeAll
public static void setStores() {
generatedTlsConfiguration = new TemporaryKeyStoreBuilder().build();
truststoreTlsConfiguration = new StandardTlsConfiguration(
@ -139,7 +141,7 @@ public class InvokeHTTPTest {
);
}
@Before
@BeforeEach
public void setRunner() {
mockWebServer = new MockWebServer();
runner = TestRunners.newTestRunner(new InvokeHTTP());
@ -147,7 +149,7 @@ public class InvokeHTTPTest {
runner.setProperty(InvokeHTTP.PROP_MAX_IDLE_CONNECTIONS, Integer.toString(0));
}
@After
@AfterEach
public void shutdownServer() throws IOException {
mockWebServer.shutdown();
}
@ -418,13 +420,13 @@ public class InvokeHTTPTest {
final RecordedRequest request = takeRequestCompleted();
final String dateHeader = request.getHeader(DATE_HEADER);
assertNotNull("Request Date not found", dateHeader);
assertNotNull(dateHeader, "Request Date not found");
final Pattern rfcDatePattern = Pattern.compile("^.+? \\d{4} \\d{2}:\\d{2}:\\d{2} GMT$");
assertTrue("Request Date RFC 2616 not matched", rfcDatePattern.matcher(dateHeader).matches());
assertTrue(rfcDatePattern.matcher(dateHeader).matches(), "Request Date RFC 2616 not matched");
final ZonedDateTime zonedDateTime = ZonedDateTime.parse(dateHeader, DateTimeFormatter.RFC_1123_DATE_TIME);
assertNotNull("Request Date Parsing Failed", zonedDateTime);
assertNotNull(zonedDateTime, "Request Date Parsing Failed");
}
@Test
@ -457,8 +459,8 @@ public class InvokeHTTPTest {
runner.run();
final RecordedRequest secondRequest = takeRequestCompleted();
assertNull("Accept Header found", secondRequest.getHeader(ACCEPT_HEADER));
assertNull("Default-Content-Type Header found", secondRequest.getHeader(defaultContentTypeHeader));
assertNull(secondRequest.getHeader(ACCEPT_HEADER), "Accept Header found");
assertNull(secondRequest.getHeader(defaultContentTypeHeader), "Default-Content-Type Header found");
}
@Test
@ -506,10 +508,10 @@ public class InvokeHTTPTest {
final RecordedRequest request = takeRequestCompleted();
final String authorization = request.getHeader(AUTHORIZATION_HEADER);
assertNotNull("Authorization Header not found", authorization);
assertNotNull(authorization, "Authorization Header not found");
final Pattern basicAuthPattern = Pattern.compile("^Basic [^\\s]+$");
assertTrue("Basic Authentication not matched", basicAuthPattern.matcher(authorization).matches());
assertTrue(basicAuthPattern.matcher(authorization).matches(), "Basic Authentication not matched");
}
@Test
@ -529,13 +531,13 @@ public class InvokeHTTPTest {
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
assertNull("Authorization Header not found", request.getHeader(AUTHORIZATION_HEADER));
assertNull(request.getHeader(AUTHORIZATION_HEADER), "Authorization Header found");
final RecordedRequest authenticatedRequest = takeRequestCompleted();
final String authorization = authenticatedRequest.getHeader(AUTHORIZATION_HEADER);
assertNotNull("Authorization Header not found", authorization);
assertTrue("Digest Realm not found", authorization.contains(realm));
assertTrue("Digest Nonce not found", authorization.contains(nonce));
assertNotNull(authorization, "Authorization Header not found");
assertTrue(authorization.contains(realm), "Digest Realm not found");
assertTrue(authorization.contains(nonce), "Digest Nonce not found");
}
@Test
@ -682,7 +684,7 @@ public class InvokeHTTPTest {
final RecordedRequest request = takeRequestCompleted();
final String contentLength = request.getHeader(CONTENT_LENGTH_HEADER);
assertNull("Content-Length Request Header found", contentLength);
assertNull(contentLength, "Content-Length Request Header found");
final String transferEncoding = request.getHeader(TRANSFER_ENCODING_HEADER);
assertEquals("chunked", transferEncoding);
@ -710,13 +712,13 @@ public class InvokeHTTPTest {
final RecordedRequest request = takeRequestCompleted();
final String contentType = request.getHeader(CONTENT_TYPE_HEADER);
assertNotNull("Content Type not found", contentType);
assertNotNull(contentType, "Content Type not found");
final Pattern multipartPattern = Pattern.compile("^multipart/form-data.+$");
assertTrue("Content Type not matched", multipartPattern.matcher(contentType).matches());
assertTrue(multipartPattern.matcher(contentType).matches(), "Content Type not matched");
final String body = request.getBody().readUtf8();
assertTrue("Form Data Parameter not found", body.contains(formDataParameter));
assertTrue(body.contains(formDataParameter), "Form Data Parameter not found");
}
@Test
@ -725,6 +727,53 @@ public class InvokeHTTPTest {
assertRequestMethodSuccess(PUT_METHOD);
}
@ParameterizedTest(name = "{index} => When {0} http://baseUrl/{1}, filename of the response FlowFile should be {2}")
@MethodSource
public void testResponseFlowFileFilenameExtractedFromRemoteUrl(String httpMethod, String inputUrl, String expectedFileName) throws MalformedURLException {
URL baseUrl = new URL(getMockWebServerUrl());
URL targetUrl = new URL(baseUrl, inputUrl);
runner.setProperty(InvokeHTTP.PROP_METHOD, httpMethod);
runner.setProperty(InvokeHTTP.PROP_URL, targetUrl.toString());
runner.setProperty(InvokeHTTP.FLOW_FILE_NAMING_STRATEGY, FlowFileNamingStrategy.URL_PATH.name());
Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put(CoreAttributes.FILENAME.key(), FLOW_FILE_INITIAL_FILENAME);
runner.enqueue(FLOW_FILE_CONTENT, ffAttributes);
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
runner.run();
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).iterator().next();
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), expectedFileName);
}
private static Stream<Arguments> testResponseFlowFileFilenameExtractedFromRemoteUrl() {
return Stream.of(
Arguments.of(GET_METHOD, "file", "file"),
Arguments.of(GET_METHOD, "file/", "file"),
Arguments.of(GET_METHOD, "file.txt", "file.txt"),
Arguments.of(GET_METHOD, "file.txt/", "file.txt"),
Arguments.of(GET_METHOD, "file.txt/?qp=v", "file.txt"),
Arguments.of(GET_METHOD, "f%69%6Cle.txt", "f%69%6Cle.txt"),
Arguments.of(GET_METHOD, "path/to/file.txt", "file.txt"),
Arguments.of(GET_METHOD, "", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(POST_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(POST_METHOD, "", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(PUT_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(PUT_METHOD, "", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(PATCH_METHOD, "", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(PATCH_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(DELETE_METHOD, "", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(DELETE_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(HEAD_METHOD, "", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(HEAD_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(OPTIONS_METHOD, "", FLOW_FILE_INITIAL_FILENAME),
Arguments.of(OPTIONS_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME)
);
}
private void setUrlProperty() {
runner.setProperty(InvokeHTTP.PROP_URL, getMockWebServerUrl());
}
@ -742,7 +791,7 @@ public class InvokeHTTPTest {
private RecordedRequest takeRequestCompleted() throws InterruptedException {
final RecordedRequest request = mockWebServer.takeRequest(TAKE_REQUEST_COMPLETED_TIMEOUT, TimeUnit.SECONDS);
assertNotNull("Request not found", request);
assertNotNull(request, "Request not found");
return request;
}
@ -771,7 +820,7 @@ public class InvokeHTTPTest {
private void assertRelationshipStatusCodeEquals(final Relationship relationship, final int statusCode) {
final List<MockFlowFile> responseFlowFiles = runner.getFlowFilesForRelationship(relationship);
final String message = String.format("FlowFiles not found for Relationship [%s]", relationship);
assertFalse(message, responseFlowFiles.isEmpty());
assertFalse(responseFlowFiles.isEmpty(), message);
final MockFlowFile responseFlowFile = responseFlowFiles.iterator().next();
assertStatusCodeEquals(responseFlowFile, statusCode);
}
@ -790,7 +839,7 @@ public class InvokeHTTPTest {
final Optional<LogMessage> errorMessage = errorMessages.stream().findFirst();
if (errorMessage.isPresent()) {
final String message = String.format("Error Message Logged: %s", errorMessage.get().getMsg());
assertFalse(message, errorMessages.isEmpty());
assertFalse(errorMessages.isEmpty(), message);
}
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);

View File

@ -503,6 +503,11 @@
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>