diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 6900d3cbab..f9977bcf62 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -81,6 +81,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -96,6 +97,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy; import org.apache.nifi.processors.standard.util.ProxyAuthenticator; import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream; import org.apache.nifi.proxy.ProxyConfiguration; @@ -166,6 +168,14 @@ public class InvokeHTTP extends AbstractProcessor { public static final String HTTP = "http"; public static final String HTTPS = "https"; + public static final String GET_METHOD = "GET"; + public static final String POST_METHOD = "POST"; + public static final String PUT_METHOD = "PUT"; + public static final String PATCH_METHOD = "PATCH"; + public static final String DELETE_METHOD = "DELETE"; + public static final String HEAD_METHOD = "HEAD"; + public static final String OPTIONS_METHOD = "OPTIONS"; + private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = Pattern.compile("post:form:(?.*)$"); private static final String FORM_DATA_NAME_GROUP = "formDataName"; @@ -175,7 +185,7 @@ public class InvokeHTTP extends AbstractProcessor { .description("HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported. " + "Methods other than POST, PUT and PATCH will be sent without a message body.") .required(true) - .defaultValue("GET") + .defaultValue(GET_METHOD) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) .build(); @@ -484,6 +494,19 @@ public class InvokeHTTP extends AbstractProcessor { .allowableValues("True", "False") .build(); + public static final PropertyDescriptor FLOW_FILE_NAMING_STRATEGY = new PropertyDescriptor.Builder() + .name("flow-file-naming-strategy") + .description("Determines the strategy used for setting the filename attribute of the FlowFile.") + .displayName("FlowFile Naming Strategy") + .required(true) + .defaultValue(FlowFileNamingStrategy.RANDOM.name()) + .allowableValues( + Arrays.stream(FlowFileNamingStrategy.values()).map(strategy -> + new AllowableValue(strategy.name(), strategy.name(), strategy.getDescription()) + ).toArray(AllowableValue[]::new) + ) + .build(); + private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS}; public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS); @@ -499,6 +522,7 @@ public class InvokeHTTP extends AbstractProcessor { PROP_DATE_HEADER, PROP_FOLLOW_REDIRECTS, DISABLE_HTTP2_PROTOCOL, + FLOW_FILE_NAMING_STRATEGY, PROP_ATTRIBUTES_TO_SEND, PROP_USERAGENT, PROP_BASIC_AUTH_USERNAME, @@ -817,7 +841,7 @@ public class InvokeHTTP extends AbstractProcessor { } String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase(); - if ("POST".equals(request) || "PUT".equals(request) || "PATCH".equals(request)) { + if (POST_METHOD.equals(request) || PUT_METHOD.equals(request) || PATCH_METHOD.equals(request)) { return; } else if (putToAttribute) { requestFlowFile = session.create(); @@ -912,6 +936,14 @@ public class InvokeHTTP extends AbstractProcessor { // this will overwrite any existing flowfile attributes responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp)); + // update FlowFile's filename attribute with an extracted value from the remote URL + if (FlowFileNamingStrategy.URL_PATH.equals(getFlowFileNamingStrategy(context)) && GET_METHOD.equals(httpRequest.method())) { + String fileName = getFileNameFromUrl(url); + if (fileName != null) { + responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.FILENAME.key(), fileName); + } + } + // transfer the message body to the payload // can potentially be null in edge cases if (bodyExists) { @@ -995,7 +1027,6 @@ public class InvokeHTTP extends AbstractProcessor { } } - private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) { final Request.Builder requestBuilder = new Request.Builder(); @@ -1013,25 +1044,25 @@ public class InvokeHTTP extends AbstractProcessor { // set the request method String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase(); switch (method) { - case "GET": + case GET_METHOD: requestBuilder.get(); break; - case "POST": + case POST_METHOD: RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile); requestBuilder.post(requestBody); break; - case "PUT": + case PUT_METHOD: requestBody = getRequestBodyToSend(session, context, requestFlowFile); requestBuilder.put(requestBody); break; - case "PATCH": + case PATCH_METHOD: requestBody = getRequestBodyToSend(session, context, requestFlowFile); requestBuilder.patch(requestBody); break; - case "HEAD": + case HEAD_METHOD: requestBuilder.head(); break; - case "DELETE": + case DELETE_METHOD: requestBuilder.delete(); break; default: @@ -1151,7 +1182,6 @@ public class InvokeHTTP extends AbstractProcessor { } } - private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode) { // check if we should yield the processor if (!isSuccess(statusCode) && request == null) { @@ -1272,4 +1302,20 @@ public class InvokeHTTP extends AbstractProcessor { private static File getETagCacheDir() throws IOException { return Files.createTempDirectory(InvokeHTTP.class.getSimpleName()).toFile(); } + + private FlowFileNamingStrategy getFlowFileNamingStrategy(final ProcessContext context) { + final String strategy = context.getProperty(FLOW_FILE_NAMING_STRATEGY).getValue(); + return FlowFileNamingStrategy.valueOf(strategy); + } + + private String getFileNameFromUrl(URL url) { + String fileName = null; + String path = StringUtils.removeEnd(url.getPath(), "/"); + + if (!StringUtils.isEmpty(path)) { + fileName = path.substring(path.lastIndexOf('/') + 1); + } + + return fileName; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/FlowFileNamingStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/FlowFileNamingStrategy.java new file mode 100644 index 0000000000..249db9bcb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/FlowFileNamingStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.http; + +public enum FlowFileNamingStrategy { + RANDOM("FlowFile filename attribute will be a random value."), + URL_PATH("FlowFile filename attribute will be extracted from the remote URL path. " + + "The attribute may contain URL encoded characters. " + + "If the path doesn't exist, the attribute will be a random value."); + + private final String description; + + FlowFileNamingStrategy(final String description) { + this.description = description; + } + + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java index ab4156d14f..469ba88000 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java @@ -22,6 +22,7 @@ import okhttp3.mockwebserver.RecordedRequest; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.security.util.StandardTlsConfiguration; import org.apache.nifi.security.util.TemporaryKeyStoreBuilder; @@ -33,16 +34,13 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.util.ssl.SslContextUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; +import java.net.URL; import java.security.GeneralSecurityException; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; @@ -53,17 +51,33 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Stream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_MOVED_TEMP; import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED; import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.apache.nifi.processors.standard.InvokeHTTP.GET_METHOD; +import static org.apache.nifi.processors.standard.InvokeHTTP.POST_METHOD; +import static org.apache.nifi.processors.standard.InvokeHTTP.PUT_METHOD; +import static org.apache.nifi.processors.standard.InvokeHTTP.PATCH_METHOD; +import static org.apache.nifi.processors.standard.InvokeHTTP.DELETE_METHOD; +import static org.apache.nifi.processors.standard.InvokeHTTP.HEAD_METHOD; +import static org.apache.nifi.processors.standard.InvokeHTTP.OPTIONS_METHOD; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -96,24 +110,12 @@ public class InvokeHTTPTest { private static final String REPEATED_HEADER = "Repeated"; - private static final String GET_METHOD = "GET"; - - private static final String DELETE_METHOD = "DELETE"; - - private static final String HEAD_METHOD = "HEAD"; - - private static final String OPTIONS_METHOD = "OPTIONS"; - - private static final String POST_METHOD = "POST"; - - private static final String PATCH_METHOD = "PATCH"; - - private static final String PUT_METHOD = "PUT"; - private static final String TEXT_PLAIN = "text/plain"; private static final String FLOW_FILE_CONTENT = String.class.getName(); + private static final String FLOW_FILE_INITIAL_FILENAME = Double.class.getName(); + private static final int TAKE_REQUEST_COMPLETED_TIMEOUT = 1; private static final String TLS_CONNECTION_TIMEOUT = "60 s"; @@ -126,7 +128,7 @@ public class InvokeHTTPTest { private TestRunner runner; - @BeforeClass + @BeforeAll public static void setStores() { generatedTlsConfiguration = new TemporaryKeyStoreBuilder().build(); truststoreTlsConfiguration = new StandardTlsConfiguration( @@ -139,7 +141,7 @@ public class InvokeHTTPTest { ); } - @Before + @BeforeEach public void setRunner() { mockWebServer = new MockWebServer(); runner = TestRunners.newTestRunner(new InvokeHTTP()); @@ -147,7 +149,7 @@ public class InvokeHTTPTest { runner.setProperty(InvokeHTTP.PROP_MAX_IDLE_CONNECTIONS, Integer.toString(0)); } - @After + @AfterEach public void shutdownServer() throws IOException { mockWebServer.shutdown(); } @@ -418,13 +420,13 @@ public class InvokeHTTPTest { final RecordedRequest request = takeRequestCompleted(); final String dateHeader = request.getHeader(DATE_HEADER); - assertNotNull("Request Date not found", dateHeader); + assertNotNull(dateHeader, "Request Date not found"); final Pattern rfcDatePattern = Pattern.compile("^.+? \\d{4} \\d{2}:\\d{2}:\\d{2} GMT$"); - assertTrue("Request Date RFC 2616 not matched", rfcDatePattern.matcher(dateHeader).matches()); + assertTrue(rfcDatePattern.matcher(dateHeader).matches(), "Request Date RFC 2616 not matched"); final ZonedDateTime zonedDateTime = ZonedDateTime.parse(dateHeader, DateTimeFormatter.RFC_1123_DATE_TIME); - assertNotNull("Request Date Parsing Failed", zonedDateTime); + assertNotNull(zonedDateTime, "Request Date Parsing Failed"); } @Test @@ -457,8 +459,8 @@ public class InvokeHTTPTest { runner.run(); final RecordedRequest secondRequest = takeRequestCompleted(); - assertNull("Accept Header found", secondRequest.getHeader(ACCEPT_HEADER)); - assertNull("Default-Content-Type Header found", secondRequest.getHeader(defaultContentTypeHeader)); + assertNull(secondRequest.getHeader(ACCEPT_HEADER), "Accept Header found"); + assertNull(secondRequest.getHeader(defaultContentTypeHeader), "Default-Content-Type Header found"); } @Test @@ -506,10 +508,10 @@ public class InvokeHTTPTest { final RecordedRequest request = takeRequestCompleted(); final String authorization = request.getHeader(AUTHORIZATION_HEADER); - assertNotNull("Authorization Header not found", authorization); + assertNotNull(authorization, "Authorization Header not found"); final Pattern basicAuthPattern = Pattern.compile("^Basic [^\\s]+$"); - assertTrue("Basic Authentication not matched", basicAuthPattern.matcher(authorization).matches()); + assertTrue(basicAuthPattern.matcher(authorization).matches(), "Basic Authentication not matched"); } @Test @@ -529,13 +531,13 @@ public class InvokeHTTPTest { assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK); final RecordedRequest request = takeRequestCompleted(); - assertNull("Authorization Header not found", request.getHeader(AUTHORIZATION_HEADER)); + assertNull(request.getHeader(AUTHORIZATION_HEADER), "Authorization Header found"); final RecordedRequest authenticatedRequest = takeRequestCompleted(); final String authorization = authenticatedRequest.getHeader(AUTHORIZATION_HEADER); - assertNotNull("Authorization Header not found", authorization); - assertTrue("Digest Realm not found", authorization.contains(realm)); - assertTrue("Digest Nonce not found", authorization.contains(nonce)); + assertNotNull(authorization, "Authorization Header not found"); + assertTrue(authorization.contains(realm), "Digest Realm not found"); + assertTrue(authorization.contains(nonce), "Digest Nonce not found"); } @Test @@ -682,7 +684,7 @@ public class InvokeHTTPTest { final RecordedRequest request = takeRequestCompleted(); final String contentLength = request.getHeader(CONTENT_LENGTH_HEADER); - assertNull("Content-Length Request Header found", contentLength); + assertNull(contentLength, "Content-Length Request Header found"); final String transferEncoding = request.getHeader(TRANSFER_ENCODING_HEADER); assertEquals("chunked", transferEncoding); @@ -710,13 +712,13 @@ public class InvokeHTTPTest { final RecordedRequest request = takeRequestCompleted(); final String contentType = request.getHeader(CONTENT_TYPE_HEADER); - assertNotNull("Content Type not found", contentType); + assertNotNull(contentType, "Content Type not found"); final Pattern multipartPattern = Pattern.compile("^multipart/form-data.+$"); - assertTrue("Content Type not matched", multipartPattern.matcher(contentType).matches()); + assertTrue(multipartPattern.matcher(contentType).matches(), "Content Type not matched"); final String body = request.getBody().readUtf8(); - assertTrue("Form Data Parameter not found", body.contains(formDataParameter)); + assertTrue(body.contains(formDataParameter), "Form Data Parameter not found"); } @Test @@ -725,6 +727,53 @@ public class InvokeHTTPTest { assertRequestMethodSuccess(PUT_METHOD); } + @ParameterizedTest(name = "{index} => When {0} http://baseUrl/{1}, filename of the response FlowFile should be {2}") + @MethodSource + public void testResponseFlowFileFilenameExtractedFromRemoteUrl(String httpMethod, String inputUrl, String expectedFileName) throws MalformedURLException { + URL baseUrl = new URL(getMockWebServerUrl()); + URL targetUrl = new URL(baseUrl, inputUrl); + + runner.setProperty(InvokeHTTP.PROP_METHOD, httpMethod); + runner.setProperty(InvokeHTTP.PROP_URL, targetUrl.toString()); + runner.setProperty(InvokeHTTP.FLOW_FILE_NAMING_STRATEGY, FlowFileNamingStrategy.URL_PATH.name()); + + Map ffAttributes = new HashMap<>(); + ffAttributes.put(CoreAttributes.FILENAME.key(), FLOW_FILE_INITIAL_FILENAME); + runner.enqueue(FLOW_FILE_CONTENT, ffAttributes); + + mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK)); + + runner.run(); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).iterator().next(); + flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), expectedFileName); + } + + private static Stream testResponseFlowFileFilenameExtractedFromRemoteUrl() { + return Stream.of( + Arguments.of(GET_METHOD, "file", "file"), + Arguments.of(GET_METHOD, "file/", "file"), + Arguments.of(GET_METHOD, "file.txt", "file.txt"), + Arguments.of(GET_METHOD, "file.txt/", "file.txt"), + Arguments.of(GET_METHOD, "file.txt/?qp=v", "file.txt"), + Arguments.of(GET_METHOD, "f%69%6Cle.txt", "f%69%6Cle.txt"), + Arguments.of(GET_METHOD, "path/to/file.txt", "file.txt"), + Arguments.of(GET_METHOD, "", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(POST_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(POST_METHOD, "", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(PUT_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(PUT_METHOD, "", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(PATCH_METHOD, "", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(PATCH_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(DELETE_METHOD, "", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(DELETE_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(HEAD_METHOD, "", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(HEAD_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(OPTIONS_METHOD, "", FLOW_FILE_INITIAL_FILENAME), + Arguments.of(OPTIONS_METHOD, "has/path", FLOW_FILE_INITIAL_FILENAME) + ); + } + private void setUrlProperty() { runner.setProperty(InvokeHTTP.PROP_URL, getMockWebServerUrl()); } @@ -742,7 +791,7 @@ public class InvokeHTTPTest { private RecordedRequest takeRequestCompleted() throws InterruptedException { final RecordedRequest request = mockWebServer.takeRequest(TAKE_REQUEST_COMPLETED_TIMEOUT, TimeUnit.SECONDS); - assertNotNull("Request not found", request); + assertNotNull(request, "Request not found"); return request; } @@ -771,7 +820,7 @@ public class InvokeHTTPTest { private void assertRelationshipStatusCodeEquals(final Relationship relationship, final int statusCode) { final List responseFlowFiles = runner.getFlowFilesForRelationship(relationship); final String message = String.format("FlowFiles not found for Relationship [%s]", relationship); - assertFalse(message, responseFlowFiles.isEmpty()); + assertFalse(responseFlowFiles.isEmpty(), message); final MockFlowFile responseFlowFile = responseFlowFiles.iterator().next(); assertStatusCodeEquals(responseFlowFile, statusCode); } @@ -790,7 +839,7 @@ public class InvokeHTTPTest { final Optional errorMessage = errorMessages.stream().findFirst(); if (errorMessage.isPresent()) { final String message = String.format("Error Message Logged: %s", errorMessage.get().getMsg()); - assertFalse(message, errorMessages.isEmpty()); + assertFalse(errorMessages.isEmpty(), message); } runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); diff --git a/pom.xml b/pom.xml index b4362ad926..6427220654 100644 --- a/pom.xml +++ b/pom.xml @@ -503,6 +503,11 @@ junit-vintage-engine test + + org.junit.jupiter + junit-jupiter-params + test + org.mockito mockito-core