NIFI-819 - Extend GetHTTP to use dynamically add HTTP headers to a request

This closes #1462.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Andre F de Miranda 2017-02-02 10:23:23 +11:00 committed by Aldrin Piri
parent af2861f105
commit ea3c294f94
No known key found for this signature in database
GPG Key ID: 531AEBAA4CFE5D00
2 changed files with 76 additions and 2 deletions

View File

@ -62,6 +62,8 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContextBuilder;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
@ -72,8 +74,10 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
@ -102,6 +106,11 @@ import org.apache.nifi.util.Tuple;
+ "once the content has been fetched from the given URL, it will not be fetched again until the content on the remote server changes. Note that due to limitations on state " + "once the content has been fetched from the given URL, it will not be fetched again until the content on the remote server changes. Note that due to limitations on state "
+ "management, stored \"last modified\" and etag fields never expire. If the URL in GetHttp uses Expression Language that is unbounded, there " + "management, stored \"last modified\" and etag fields never expire. If the URL in GetHttp uses Expression Language that is unbounded, there "
+ "is the potential for Out of Memory Errors to occur.") + "is the potential for Out of Memory Errors to occur.")
@DynamicProperties({
@DynamicProperty(name = "Header Name", value = "The Expression Language to be used to populate the header value", description = "The additional headers to be sent by the processor " +
"whenever making a new HTTP request. \n " +
"Setting a dynamic property name to XYZ and value to ${attribute} will result in the header 'XYZ: attribute_value' being sent to the HTTP endpoint"),
})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"), @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type of the FlowFile, as reported by the HTTP Content-Type header") @WritesAttribute(attribute = "mime.type", description = "The MIME Type of the FlowFile, as reported by the HTTP Content-Type header")
@ -235,6 +244,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
private Set<Relationship> relationships; private Set<Relationship> relationships;
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private volatile List<PropertyDescriptor> customHeaders = new ArrayList<>();
private final AtomicBoolean clearState = new AtomicBoolean(false); private final AtomicBoolean clearState = new AtomicBoolean(false);
@ -281,6 +291,14 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
if (clearState.getAndSet(false)) { if (clearState.getAndSet(false)) {
context.getStateManager().clear(Scope.LOCAL); context.getStateManager().clear(Scope.LOCAL);
} }
if (customHeaders.size() == 0) {
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
// only add the custom defined Headers (i.e. dynamic properties)
if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
customHeaders.add(property.getKey());
}
}
}
} }
@Override @Override
@ -306,6 +324,17 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
return results; return results;
} }
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.required(false)
.dynamic(true)
.build();
}
private SSLContext createSSLContext(final SSLContextService service) private SSLContext createSSLContext(final SSLContextService service)
throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException, UnrecoverableKeyException { throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException, UnrecoverableKeyException {
@ -467,6 +496,18 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
if (accept != null) { if (accept != null) {
get.addHeader(HEADER_ACCEPT, accept); get.addHeader(HEADER_ACCEPT, accept);
} }
// Add dynamic headers
PropertyValue customHeaderValue;
for (PropertyDescriptor customProperty : customHeaders) {
customHeaderValue = context.getProperty(customProperty).evaluateAttributeExpressions();
if (StringUtils.isNotBlank(customHeaderValue.getValue())) {
get.addHeader(customProperty.getName(), customHeaderValue.getValue());
}
}
// create the http client // create the http client
try ( final CloseableHttpClient client = clientBuilder.build() ) { try ( final CloseableHttpClient client = clientBuilder.build() ) {
// NOTE: including this inner try in order to swallow exceptions on close // NOTE: including this inner try in order to swallow exceptions on close

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
@ -31,6 +29,7 @@ import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import javax.servlet.http.HttpServletResponse;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -245,6 +244,40 @@ public class TestGetHTTP {
} }
} }
@Test
public final void testDynamicHeaders() throws Exception {
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(UserAgentTestingServlet.class, "/*");
// create the service
TestServer server = new TestServer();
server.addHandler(handler);
try {
server.startServer();
String destination = server.getUrl();
// set up NiFi mock controller
controller = TestRunners.newTestRunner(GetHTTP.class);
controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(GetHTTP.URL, destination);
controller.setProperty(GetHTTP.FILENAME, "testFile");
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
controller.setProperty(GetHTTP.USER_AGENT, "testUserAgent");
controller.setProperty("Static-Header", "StaticHeaderValue");
controller.setProperty("EL-Header", "${now()}");
controller.run();
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
// shutdown web service
} finally {
server.shutdownServer();
}
}
@Test @Test
public final void testExpressionLanguage() throws Exception { public final void testExpressionLanguage() throws Exception {
// set up web service // set up web service