diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index 7d1956695e..9914ad704b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -62,6 +62,8 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.http.ssl.SSLContextBuilder; +import org.apache.nifi.annotation.behavior.DynamicProperties; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -72,8 +74,10 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; @@ -102,6 +106,11 @@ import org.apache.nifi.util.Tuple; + "once the content has been fetched from the given URL, it will not be fetched again until the content on the remote server changes. Note that due to limitations on state " + "management, stored \"last modified\" and etag fields never expire. If the URL in GetHttp uses Expression Language that is unbounded, there " + "is the potential for Out of Memory Errors to occur.") +@DynamicProperties({ + @DynamicProperty(name = "Header Name", value = "The Expression Language to be used to populate the header value", description = "The additional headers to be sent by the processor " + + "whenever making a new HTTP request. \n " + + "Setting a dynamic property name to XYZ and value to ${attribute} will result in the header 'XYZ: attribute_value' being sent to the HTTP endpoint"), +}) @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"), @WritesAttribute(attribute = "mime.type", description = "The MIME Type of the FlowFile, as reported by the HTTP Content-Type header") @@ -235,6 +244,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { private Set relationships; private List properties; + private volatile List customHeaders = new ArrayList<>(); private final AtomicBoolean clearState = new AtomicBoolean(false); @@ -281,6 +291,14 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { if (clearState.getAndSet(false)) { context.getStateManager().clear(Scope.LOCAL); } + if (customHeaders.size() == 0) { + for (Map.Entry property : context.getProperties().entrySet()) { + // only add the custom defined Headers (i.e. dynamic properties) + if (!getSupportedPropertyDescriptors().contains(property.getKey())) { + customHeaders.add(property.getKey()); + } + } + } } @Override @@ -306,6 +324,17 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { return results; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .required(false) + .dynamic(true) + .build(); + } + private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException, UnrecoverableKeyException { @@ -467,6 +496,18 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { if (accept != null) { get.addHeader(HEADER_ACCEPT, accept); } + + // Add dynamic headers + + PropertyValue customHeaderValue; + for (PropertyDescriptor customProperty : customHeaders) { + customHeaderValue = context.getProperty(customProperty).evaluateAttributeExpressions(); + if (StringUtils.isNotBlank(customHeaderValue.getValue())) { + get.addHeader(customProperty.getName(), customHeaderValue.getValue()); + } + } + + // create the http client try ( final CloseableHttpClient client = clientBuilder.build() ) { // NOTE: including this inner try in order to swallow exceptions on close diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java index d07baa58f5..8ec7a0260d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.processors.standard; -import javax.servlet.http.HttpServletResponse; - import org.apache.nifi.components.state.Scope; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.reporting.InitializationException; @@ -31,6 +29,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import javax.servlet.http.HttpServletResponse; import java.net.URLEncoder; import java.util.HashMap; import java.util.Map; @@ -245,6 +244,40 @@ public class TestGetHTTP { } } + @Test + public final void testDynamicHeaders() throws Exception { + // set up web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(UserAgentTestingServlet.class, "/*"); + + // create the service + TestServer server = new TestServer(); + server.addHandler(handler); + + try { + server.startServer(); + + String destination = server.getUrl(); + + // set up NiFi mock controller + controller = TestRunners.newTestRunner(GetHTTP.class); + controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); + controller.setProperty(GetHTTP.URL, destination); + controller.setProperty(GetHTTP.FILENAME, "testFile"); + controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); + controller.setProperty(GetHTTP.USER_AGENT, "testUserAgent"); + controller.setProperty("Static-Header", "StaticHeaderValue"); + controller.setProperty("EL-Header", "${now()}"); + + controller.run(); + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); + + // shutdown web service + } finally { + server.shutdownServer(); + } + } + @Test public final void testExpressionLanguage() throws Exception { // set up web service