NIFI-825: Use new method of accessing controller services and rather than caching an SSLContext, obtain one from the service each time

This commit is contained in:
Mark Payne 2015-08-06 11:55:08 -04:00
parent e59ee5dda1
commit 4b9ee460a8
1 changed files with 34 additions and 64 deletions

View File

@ -39,7 +39,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -52,10 +51,10 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@ -84,10 +83,8 @@ import org.joda.time.format.DateTimeFormatter;
+ "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
public final class InvokeHTTP extends AbstractProcessor {
//-- properties --//
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Config.PROPERTIES;
}
@ -99,70 +96,47 @@ public final class InvokeHTTP extends AbstractProcessor {
return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
}
//-- relationships --//
@Override
public Set<Relationship> getRelationships() {
return Config.RELATIONSHIPS;
}
//-- class properties --//
final AtomicReference<SSLContext> sslContextRef = new AtomicReference<>();
final AtomicReference<Pattern> attributesToSendRef = new AtomicReference<>();
private volatile Pattern attributesToSend = null;
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
newValue = StringUtils.trimToEmpty(newValue);
// listen for the SSL Context Service property and retrieve the SSLContext from the controller service.
if (Config.PROP_SSL_CONTEXT_SERVICE.getName().equalsIgnoreCase(descriptor.getName())) {
if (newValue.isEmpty()) {
sslContextRef.set(null);
} else {
SSLContextService svc = (SSLContextService) getControllerServiceLookup().getControllerService(newValue);
sslContextRef.set(svc.createSSLContext(ClientAuth.NONE)); // ClientAuth is only useful for servers, not clients.
getLogger().info("Loading SSL configuration from keystore={} and truststore={}",
new Object[]{svc.getKeyStoreFile(), svc.getTrustStoreFile()});
}
}
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
final String trimmedValue = StringUtils.trimToEmpty(newValue);
// compile the attributes-to-send filter pattern
if (Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) {
if (newValue.isEmpty()) {
attributesToSendRef.set(null);
attributesToSend = null;
} else {
attributesToSendRef.set(Pattern.compile(newValue));
attributesToSend = Pattern.compile(trimmedValue);
}
}
}
//-- processing --//
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
List<FlowFile> flowfiles = session.get(Config.MAX_RESULTS_PER_THREAD);
if (flowfiles.isEmpty()) {
context.yield();
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
for (FlowFile flowfile : flowfiles) {
Transaction transaction = new Transaction(getLogger(), sslContextRef, attributesToSendRef, context, session, flowfile);
final SSLContextService sslService = context.getProperty(Config.PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE);
Transaction transaction = new Transaction(getLogger(), sslContext, attributesToSend, context, session, flowFile);
transaction.process();
}
}
/**
*
* Stores properties, relationships, configuration values, hard coded strings, magic numbers, etc.
*
*
*/
public interface Config {
//-- magic numbers --//
int MAX_RESULTS_PER_THREAD = 50;
//-- flowfile attribute keys returned after reading the response --//
// flowfile attribute keys returned after reading the response
String STATUS_CODE = "invokehttp.status.code";
String STATUS_MESSAGE = "invokehttp.status.message";
String RESPONSE_BODY = "invokehttp.response.body";
@ -179,7 +153,7 @@ public final class InvokeHTTP extends AbstractProcessor {
"uuid", "filename", "path"
)));
//-- properties --//
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
.description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
@ -297,7 +271,7 @@ public final class InvokeHTTP extends AbstractProcessor {
.dynamic(true)
.build();
//-- relationships --//
// relationships
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
.name("Original")
.description("Original FlowFile will be routed upon success (2xx status codes).")
@ -351,8 +325,8 @@ public final class InvokeHTTP extends AbstractProcessor {
private static final Charset utf8 = Charset.forName("UTF-8");
private final ProcessorLog logger;
private final AtomicReference<SSLContext> sslContextRef;
private final AtomicReference<Pattern> attributesToSendRef;
private final SSLContext sslContext;
private final Pattern attributesToSend;
private final ProcessContext context;
private final ProcessSession session;
@ -366,32 +340,31 @@ public final class InvokeHTTP extends AbstractProcessor {
private String statusMessage;
public Transaction(
ProcessorLog logger,
AtomicReference<SSLContext> sslContextRef,
AtomicReference<Pattern> attributesToSendRef,
ProcessContext context,
ProcessSession session,
FlowFile request) {
final ProcessorLog logger,
final SSLContext sslContext,
final Pattern attributesToSend,
final ProcessContext context,
final ProcessSession session,
final FlowFile request) {
this.logger = logger;
this.sslContextRef = sslContextRef;
this.attributesToSendRef = attributesToSendRef;
this.sslContext = sslContext;
this.attributesToSend = attributesToSend;
this.context = context;
this.session = session;
this.request = request;
}
public void process() {
public void process() {
try {
openConnection();
sendRequest();
readResponse();
transfer();
} catch (Throwable t) {
} catch (final Exception e) {
// log exception
logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), t}, t);
logger.error("Routing to {} due to exception: {}", new Object[] { REL_FAILURE.getName(), e }, e);
// penalize
request = session.penalize(request);
@ -404,10 +377,9 @@ public final class InvokeHTTP extends AbstractProcessor {
if (response != null) {
session.remove(response);
}
} catch (Throwable t1) {
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{t1}, t1);
} catch (final Exception e1) {
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] { e1 }, e1);
}
}
}
@ -447,7 +419,6 @@ public final class InvokeHTTP extends AbstractProcessor {
HttpsURLConnection sconn = (HttpsURLConnection) conn;
// check if the ssl context is set
SSLContext sslContext = sslContextRef.get();
if (sslContext != null) {
sconn.setSSLSocketFactory(sslContext.getSocketFactory());
}
@ -573,10 +544,9 @@ public final class InvokeHTTP extends AbstractProcessor {
// iterate through the flowfile attributes, adding any attribute that
// matches the attributes-to-send pattern. if the pattern is not set
// (it's an optional property), ignore that attribute entirely
Pattern p = attributesToSendRef.get();
if (p != null) {
if (attributesToSend != null) {
Map<String, String> attributes = request.getAttributes();
Matcher m = p.matcher("");
Matcher m = attributesToSend.matcher("");
for (Map.Entry<String, String> entry : attributes.entrySet()) {
String key = trimToEmpty(entry.getKey());
String val = trimToEmpty(entry.getValue());