NIFI-12785 Corrected InvokeHTTP URL handling to avoid double encoding

This closes #8458

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2024-02-28 19:39:05 +00:00 committed by exceptionfactory
parent df524b18b1
commit 942d13c118
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 45 additions and 11 deletions

View File

@ -203,7 +203,8 @@ public class InvokeHTTP extends AbstractProcessor {
public static final PropertyDescriptor HTTP_URL = new PropertyDescriptor.Builder() public static final PropertyDescriptor HTTP_URL = new PropertyDescriptor.Builder()
.name("HTTP URL") .name("HTTP URL")
.description("HTTP remote URL including a scheme of http or https, as well as a hostname or IP address with optional port and path elements.") .description("HTTP remote URL including a scheme of http or https, as well as a hostname or IP address with optional port and path elements." +
" Any encoding of the URL must be done by the user.")
.required(true) .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.URL_VALIDATOR) .addValidator(StandardValidators.URL_VALIDATOR)
@ -852,19 +853,18 @@ public class InvokeHTTP extends AbstractProcessor {
FlowFile responseFlowFile = null; FlowFile responseFlowFile = null;
try { try {
final String urlProperty = trimToEmpty(context.getProperty(HTTP_URL).evaluateAttributeExpressions(requestFlowFile).getValue()); final String urlProperty = trimToEmpty(context.getProperty(HTTP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
final URL url = URLValidator.createURL(urlProperty);
Request httpRequest = configureRequest(context, session, requestFlowFile, url); Request httpRequest = configureRequest(context, session, requestFlowFile, urlProperty);
logRequest(logger, httpRequest); logRequest(logger, httpRequest);
if (httpRequest.body() != null) { if (httpRequest.body() != null) {
session.getProvenanceReporter().send(requestFlowFile, url.toExternalForm(), true); session.getProvenanceReporter().send(requestFlowFile, urlProperty, true);
} }
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
try (Response responseHttp = okHttpClient.newCall(httpRequest).execute()) { try (Response responseHttp = okHttpClient.newCall(httpRequest).execute()) {
logResponse(logger, url, responseHttp); logResponse(logger, urlProperty, responseHttp);
// store the status code and message // store the status code and message
int statusCode = responseHttp.code(); int statusCode = responseHttp.code();
@ -874,7 +874,7 @@ public class InvokeHTTP extends AbstractProcessor {
Map<String, String> statusAttributes = new HashMap<>(); Map<String, String> statusAttributes = new HashMap<>();
statusAttributes.put(STATUS_CODE, String.valueOf(statusCode)); statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
statusAttributes.put(STATUS_MESSAGE, statusMessage); statusAttributes.put(STATUS_MESSAGE, statusMessage);
statusAttributes.put(REQUEST_URL, url.toExternalForm()); statusAttributes.put(REQUEST_URL, urlProperty);
statusAttributes.put(REQUEST_DURATION, Long.toString(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos))); statusAttributes.put(REQUEST_DURATION, Long.toString(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)));
statusAttributes.put(RESPONSE_URL, responseHttp.request().url().toString()); statusAttributes.put(RESPONSE_URL, responseHttp.request().url().toString());
statusAttributes.put(TRANSACTION_ID, txId.toString()); statusAttributes.put(TRANSACTION_ID, txId.toString());
@ -927,6 +927,7 @@ public class InvokeHTTP extends AbstractProcessor {
// update FlowFile's filename attribute with an extracted value from the remote URL // update FlowFile's filename attribute with an extracted value from the remote URL
if (FlowFileNamingStrategy.URL_PATH.equals(getFlowFileNamingStrategy(context)) && HttpMethod.GET.name().equals(httpRequest.method())) { if (FlowFileNamingStrategy.URL_PATH.equals(getFlowFileNamingStrategy(context)) && HttpMethod.GET.name().equals(httpRequest.method())) {
final URL url = URLValidator.createURL(urlProperty);
String fileName = getFileNameFromUrl(url); String fileName = getFileNameFromUrl(url);
if (fileName != null) { if (fileName != null) {
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.FILENAME.key(), fileName); responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.FILENAME.key(), fileName);
@ -950,9 +951,9 @@ public class InvokeHTTP extends AbstractProcessor {
// emit provenance event // emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (requestFlowFile != null) { if (requestFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis); session.getProvenanceReporter().fetch(responseFlowFile, urlProperty, millis);
} else { } else {
session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis); session.getProvenanceReporter().receive(responseFlowFile, urlProperty, millis);
} }
} }
} }
@ -1012,7 +1013,7 @@ public class InvokeHTTP extends AbstractProcessor {
} }
} }
private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) { private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, String url) {
final Request.Builder requestBuilder = new Request.Builder(); final Request.Builder requestBuilder = new Request.Builder();
requestBuilder.url(url); requestBuilder.url(url);
@ -1231,10 +1232,10 @@ public class InvokeHTTP extends AbstractProcessor {
} }
} }
private void logResponse(ComponentLog logger, URL url, Response response) { private void logResponse(ComponentLog logger, String url, Response response) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("\nResponse from remote service:\n\t{}\n{}", logger.debug("\nResponse from remote service:\n\t{}\n{}",
url.toExternalForm(), getLogString(response.headers().toMultimap())); url, getLogString(response.headers().toMultimap()));
} }
} }

View File

@ -25,11 +25,14 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.URLValidator;
import org.apache.nifi.processors.standard.http.ContentEncodingStrategy; import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy; import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
import org.apache.nifi.processors.standard.http.CookieStrategy; import org.apache.nifi.processors.standard.http.CookieStrategy;
import org.apache.nifi.processors.standard.http.HttpHeader; import org.apache.nifi.processors.standard.http.HttpHeader;
import org.apache.nifi.processors.standard.http.HttpMethod; import org.apache.nifi.processors.standard.http.HttpMethod;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
@ -79,6 +82,7 @@ import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -548,6 +552,35 @@ public class InvokeHTTPTest {
assertEquals(userAgent, userAgentHeader); assertEquals(userAgent, userAgentHeader);
} }
@Test
void testRunGetHttp200SuccessWithEncodableUrl() throws Exception {
final String partialEncodableUrl = "/gitlab/ftp%2Fstage%2F15m%2FsomeFile.yaml/raw?ref=main";
final String nonEncodedUrl = mockWebServer.url(partialEncodableUrl).newBuilder().host(LOCALHOST).build().toString();
final String encodedUrl = URLValidator.createURL(nonEncodedUrl).toExternalForm();
runner.setProperty(InvokeHTTP.HTTP_URL, nonEncodedUrl);
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
runner.enqueue(FLOW_FILE_CONTENT);
runner.run();
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.RESPONSE, HTTP_OK);
MockFlowFile flowFile = getResponseFlowFile();
final String actualUrl = flowFile.getAttribute(InvokeHTTP.REQUEST_URL);
assertNotEquals(encodedUrl, actualUrl);
assertTrue(actualUrl.endsWith(partialEncodableUrl));
final ProvenanceEventRecord event = runner.getProvenanceEvents().stream()
.filter(record -> record.getEventType() == ProvenanceEventType.FETCH)
.findFirst()
.orElse(null);
assertNotNull(event);
final String transitUri = event.getTransitUri();
assertNotEquals(encodedUrl, transitUri);
assertTrue(transitUri.endsWith(partialEncodableUrl));
}
@Test @Test
public void testRunGetHttp302NoRetryResponseRedirectsDefaultEnabled() { public void testRunGetHttp302NoRetryResponseRedirectsDefaultEnabled() {
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_MOVED_TEMP).setHeader(LOCATION_HEADER, getMockWebServerUrl())); mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_MOVED_TEMP).setHeader(LOCATION_HEADER, getMockWebServerUrl()));