NIFI-7605 Removed user-agent default value so no header will be sent by default.

Added and updated unit tests.

This closes #4428.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
Signed-off-by: Joe Witt <joe.witt@gmail.com>
This commit is contained in:
Mike Thomsen 2020-07-24 07:52:27 -04:00 committed by Andy LoPresto
parent c980b64bf5
commit 0861b2f632
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
2 changed files with 129 additions and 100 deletions

View File

@ -124,7 +124,7 @@ import org.joda.time.format.DateTimeFormatter;
@WritesAttribute(attribute = "invokehttp.java.exception.message", description = "The Java exception message raised when the processor fails"), @WritesAttribute(attribute = "invokehttp.java.exception.message", description = "The Java exception message raised when the processor fails"),
@WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to " @WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to "
+ "will become the attribute key and the value would be the body of the HTTP response.")}) + "will become the attribute key and the value would be the body of the HTTP response.")})
@DynamicProperties ({ @DynamicProperties({
@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, @DynamicProperty(name = "Header Name", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = description =
"Send request header with a key matching the Dynamic Property Key and a value created by evaluating " "Send request header with a key matching the Dynamic Property Key and a value created by evaluating "
@ -149,7 +149,7 @@ public class InvokeHTTP extends AbstractProcessor {
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
public static final String FORM_BASE= "post:form"; public static final String FORM_BASE = "post:form";
// Set of flowfile attributes which we generally always ignore during // Set of flowfile attributes which we generally always ignore during
// processing, including when converting http headers, copying attributes, etc. // processing, including when converting http headers, copying attributes, etc.
@ -254,7 +254,6 @@ public class InvokeHTTP extends AbstractProcessor {
.displayName("Useragent") .displayName("Useragent")
.description("The Useragent identifier sent along with each request") .description("The Useragent identifier sent along with each request")
.required(false) .required(false)
.defaultValue("Apache Nifi/${nifi.version} (git:${nifi.build.git.commit.id.describe}; https://nifi.apache.org/)")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -351,7 +350,7 @@ public class InvokeHTTP extends AbstractProcessor {
+ "will be set as the filename property of the form data.") + "will be set as the filename property of the form data.")
.required(false) .required(false)
.defaultValue("true") .defaultValue("true")
.allowableValues("true","false") .allowableValues("true", "false")
.build(); .build();
// Per RFC 7235, 2617, and 2616. // Per RFC 7235, 2617, and 2616.
@ -739,7 +738,7 @@ public class InvokeHTTP extends AbstractProcessor {
// configure ETag cache if enabled // configure ETag cache if enabled
final boolean etagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean(); final boolean etagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean();
if(etagEnabled) { if (etagEnabled) {
final int maxCacheSizeBytes = context.getProperty(PROP_ETAG_MAX_CACHE_SIZE).asDataSize(DataUnit.B).intValue(); final int maxCacheSizeBytes = context.getProperty(PROP_ETAG_MAX_CACHE_SIZE).asDataSize(DataUnit.B).intValue();
okHttpClientBuilder.cache(new Cache(getETagCacheDir(), maxCacheSizeBytes)); okHttpClientBuilder.cache(new Cache(getETagCacheDir(), maxCacheSizeBytes));
} }
@ -803,7 +802,7 @@ public class InvokeHTTP extends AbstractProcessor {
// Checking to see if the property to put the body of the response in an attribute was set // Checking to see if the property to put the body of the response in an attribute was set
boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet(); boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet();
if (requestFlowFile == null) { if (requestFlowFile == null) {
if(context.hasNonLoopConnection()){ if (context.hasNonLoopConnection()) {
return; return;
} }
@ -821,10 +820,10 @@ public class InvokeHTTP extends AbstractProcessor {
// log ETag cache metrics // log ETag cache metrics
final boolean eTagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean(); final boolean eTagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean();
if(eTagEnabled && logger.isDebugEnabled()) { if (eTagEnabled && logger.isDebugEnabled()) {
final Cache cache = okHttpClient.cache(); final Cache cache = okHttpClient.cache();
logger.debug("OkHttp ETag cache metrics :: Request Count: {} | Network Count: {} | Hit Count: {}", logger.debug("OkHttp ETag cache metrics :: Request Count: {} | Network Count: {} | Hit Count: {}",
new Object[] {cache.requestCount(), cache.networkCount(), cache.hitCount()}); new Object[]{cache.requestCount(), cache.networkCount(), cache.hitCount()});
} }
// Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute. // Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
@ -928,7 +927,7 @@ public class InvokeHTTP extends AbstractProcessor {
// emit provenance event // emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if(requestFlowFile != null) { if (requestFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis); session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis);
} else { } else {
session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis); session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis);
@ -960,14 +959,14 @@ public class InvokeHTTP extends AbstractProcessor {
+ url.toExternalForm() + ". It took " + millis + "millis,"); + url.toExternalForm() + ". It took " + millis + "millis,");
} }
} finally { } finally {
if(outputStreamToRequestAttribute != null){ if (outputStreamToRequestAttribute != null) {
outputStreamToRequestAttribute.close(); outputStreamToRequestAttribute.close();
outputStreamToRequestAttribute = null; outputStreamToRequestAttribute = null;
} }
if(teeInputStream != null){ if (teeInputStream != null) {
teeInputStream.close(); teeInputStream.close();
teeInputStream = null; teeInputStream = null;
} else if(responseBodyStream != null){ } else if (responseBodyStream != null) {
responseBodyStream.close(); responseBodyStream.close();
responseBodyStream = null; responseBodyStream = null;
} }
@ -1046,9 +1045,7 @@ public class InvokeHTTP extends AbstractProcessor {
} }
String userAgent = trimToEmpty(context.getProperty(PROP_USERAGENT).evaluateAttributeExpressions(requestFlowFile).getValue()); String userAgent = trimToEmpty(context.getProperty(PROP_USERAGENT).evaluateAttributeExpressions(requestFlowFile).getValue());
if (!userAgent.isEmpty()) {
requestBuilder.addHeader("User-Agent", userAgent); requestBuilder.addHeader("User-Agent", userAgent);
}
requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile); requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile);
@ -1134,7 +1131,7 @@ public class InvokeHTTP extends AbstractProcessor {
} }
// don't include dynamic form data properties // don't include dynamic form data properties
if ( DYNAMIC_FORM_PARAMETER_NAME.matcher(headerKey).matches()) { if (DYNAMIC_FORM_PARAMETER_NAME.matcher(headerKey).matches()) {
continue; continue;
} }
@ -1168,7 +1165,7 @@ public class InvokeHTTP extends AbstractProcessor {
} }
private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode){ private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode) {
// check if we should yield the processor // check if we should yield the processor
if (!isSuccess(statusCode) && request == null) { if (!isSuccess(statusCode) && request == null) {
context.yield(); context.yield();
@ -1275,10 +1272,10 @@ public class InvokeHTTP extends AbstractProcessor {
/** /**
* Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings. * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
*/ */
private Map<String, String> convertAttributesFromHeaders(URL url, Response responseHttp){ private Map<String, String> convertAttributesFromHeaders(URL url, Response responseHttp) {
// create a new hashmap to store the values from the connection // create a new hashmap to store the values from the connection
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
responseHttp.headers().names().forEach( (key) -> { responseHttp.headers().names().forEach((key) -> {
if (key == null) { if (key == null) {
return; return;
} }
@ -1316,7 +1313,7 @@ public class InvokeHTTP extends AbstractProcessor {
* Retrieve the directory in which OkHttp should cache responses. This method opts * Retrieve the directory in which OkHttp should cache responses. This method opts
* to use a temp directory to write the cache, which means that the cache will be written * to use a temp directory to write the cache, which means that the cache will be written
* to a new location each time this processor is scheduled. * to a new location each time this processor is scheduled.
* * <p>
* Ref: https://github.com/square/okhttp/wiki/Recipes#response-caching * Ref: https://github.com/square/okhttp/wiki/Recipes#response-caching
* *
* @return the directory in which the ETag cache should be written * @return the directory in which the ETag cache should be written

View File

@ -16,10 +16,12 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
@ -46,11 +48,11 @@ import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import static java.nio.charset.StandardCharsets.UTF_8; import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
public class TestInvokeHTTP extends TestInvokeHttpCommon { public class TestInvokeHTTP extends TestInvokeHttpCommon {
private static final Logger logger = LoggerFactory.getLogger(TestInvokeHTTP.class);
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
@ -345,13 +347,43 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
} }
@Test @Test
public void testUserAgent() throws Exception { public void testShouldNotSendUserAgentByDefault() throws Exception {
addHandler(new EchoUseragentHandler()); // Arrange
addHandler(new EchoUserAgentHandler());
runner.setProperty(InvokeHTTP.PROP_URL, url); runner.setProperty(InvokeHTTP.PROP_URL, url);
createFlowFiles(runner); createFlowFiles(runner);
// Act
runner.run();
// Assert
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
String content = new String(response.toByteArray(), UTF_8);
logger.info("Returned flowfile content: " + content);
assertTrue(content.isEmpty());
response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
}
@Test
public void testShouldSetUserAgentExplicitly() throws Exception {
addHandler(new EchoUserAgentHandler());
runner.setProperty(InvokeHTTP.PROP_USERAGENT, "Apache NiFi/${nifi.version} (git:${nifi.build.git.commit.id.describe}; https://nifi.apache.org/)");
runner.setProperty(InvokeHTTP.PROP_URL, url);
createFlowFiles(runner);
runner.run(); runner.run();
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
@ -363,7 +395,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
String content = new String(response.toByteArray(), UTF_8); String content = new String(response.toByteArray(), UTF_8);
assertTrue(content.startsWith("Apache Nifi/" + NifiBuildProperties.NIFI_VERSION + " (")); assertTrue(content.startsWith("Apache NiFi/" + NifiBuildProperties.NIFI_VERSION + " ("));
assertFalse("Missing expression language variables: " + content, content.contains("; ;")); assertFalse("Missing expression language variables: " + content, content.contains("; ;"));
response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
@ -371,8 +403,8 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
} }
@Test @Test
public void testUserAgentChanged() throws Exception { public void testShouldSetUserAgentWithExpressionLanguage() throws Exception {
addHandler(new EchoUseragentHandler()); addHandler(new EchoUserAgentHandler());
runner.setProperty(InvokeHTTP.PROP_URL, url); runner.setProperty(InvokeHTTP.PROP_URL, url);
runner.setProperty(InvokeHTTP.PROP_USERAGENT, "${literal('And now for something completely different...')}"); runner.setProperty(InvokeHTTP.PROP_USERAGENT, "${literal('And now for something completely different...')}");
@ -396,7 +428,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
} }
public static class EchoUseragentHandler extends AbstractHandler { public static class EchoUserAgentHandler extends AbstractHandler {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {