NIFI-11940 Added SSLContextService to GetSplunk

- Included ClassLoader isolation to work around Splunk Service static configuration limitations

This closes #7668

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2023-09-05 01:00:30 +02:00 committed by exceptionfactory
parent 130c8e9903
commit 87b8f230d2
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 33 additions and 2 deletions

View File

@ -24,6 +24,7 @@ import com.splunk.ServiceArgs;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -35,11 +36,13 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -49,6 +52,8 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.ssl.SSLContextService;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
@ -68,7 +73,6 @@ import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.scheduling.SchedulingStrategy;
@TriggerSerially @TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@ -79,11 +83,12 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
@WritesAttribute(attribute="splunk.earliest.time", description = "The value of the earliest time that was used when performing the query."), @WritesAttribute(attribute="splunk.earliest.time", description = "The value of the earliest time that was used when performing the query."),
@WritesAttribute(attribute="splunk.latest.time", description = "The value of the latest time that was used when performing the query.") @WritesAttribute(attribute="splunk.latest.time", description = "The value of the latest time that was used when performing the query.")
}) })
@RequiresInstanceClassLoading(cloneAncestorResources = true)
@Stateful(scopes = Scope.CLUSTER, description = "If using one of the managed Time Range Strategies, this processor will " + @Stateful(scopes = Scope.CLUSTER, description = "If using one of the managed Time Range Strategies, this processor will " +
"store the values of the latest and earliest times from the previous execution so that the next execution of the " + "store the values of the latest and earliest times from the previous execution so that the next execution of the " +
"can pick up where the last execution left off. The state will be cleared and start over if the query is changed.") "can pick up where the last execution left off. The state will be cleared and start over if the query is changed.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class GetSplunk extends AbstractProcessor { public class GetSplunk extends AbstractProcessor implements ClassloaderIsolationKeyProvider {
public static final String HTTP_SCHEME = "http"; public static final String HTTP_SCHEME = "http";
public static final String HTTPS_SCHEME = "https"; public static final String HTTPS_SCHEME = "https";
@ -247,6 +252,13 @@ public class GetSplunk extends AbstractProcessor {
.defaultValue(TLS_1_2_VALUE.getValue()) .defaultValue(TLS_1_2_VALUE.getValue())
.build(); .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("Results retrieved from Splunk are sent out this relationship.") .description("Results retrieved from Splunk are sent out this relationship.")
@ -289,6 +301,7 @@ public class GetSplunk extends AbstractProcessor {
descriptors.add(PASSWORD); descriptors.add(PASSWORD);
descriptors.add(SECURITY_PROTOCOL); descriptors.add(SECURITY_PROTOCOL);
descriptors.add(OUTPUT_MODE); descriptors.add(OUTPUT_MODE);
descriptors.add(SSL_CONTEXT_SERVICE);
this.descriptors = Collections.unmodifiableList(descriptors); this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();
@ -572,6 +585,11 @@ public class GetSplunk extends AbstractProcessor {
serviceArgs.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(secProtocol)); serviceArgs.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(secProtocol));
} }
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
Service.setSSLSocketFactory(sslContextService.createContext().getSocketFactory());
}
return Service.connect(serviceArgs); return Service.connect(serviceArgs);
} }
@ -606,6 +624,19 @@ public class GetSplunk extends AbstractProcessor {
} }
} }
@Override
public String getClassloaderIsolationKey(PropertyContext context) {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
// Class loader isolation is only necessary when SSL is enabled, as Service.setSSLSocketFactory
// changes the Socket Factory for all instances.
return sslContextService.getIdentifier();
} else {
// This workaround ensures that instances don't unnecessarily use an isolated classloader.
return getClass().getName();
}
}
static class TimeRange { static class TimeRange {
final String earliestTime; final String earliestTime;